Integrated Risk Management (IRM) Services¶
This document describes the IRM services implemented in Sovereign GRC for comprehensive risk management, incident response, and business continuity.
Overview¶
Sovereign GRC provides six core IRM services:
- Risk Engine - Dynamic risk calculation and monitoring
- KRI Automation - Key Risk Indicator monitoring and alerting
- A2A Attestation - Agent-to-agent vendor attestation protocol
- Incident Workflow - NIST 800-61 compliant incident response
- OPA Playbooks - Automated incident response actions
- BCM Discovery - Business continuity dependency mapping
Risk Engine Service¶
Location: src/backend/services/risk_engine.py
Features¶
- 5x5 Risk Matrix: Calculates inherent risk scores (likelihood × impact)
- Residual Risk Calculation: Factors in control effectiveness (max 80% reduction)
- Risk Appetite Monitoring: Tracks status against organizational thresholds
- Event-Driven Updates: Recalculates when control findings change
Risk Calculation Formula¶
Inherent Risk = Likelihood (1-5) × Impact (1-5) × 5
Residual Risk = Inherent Risk × (1 - Control Effectiveness × 0.8)
Risk Appetite Thresholds¶
| Status | Residual Risk Score |
|---|---|
| Within Appetite | < 25 |
| Approaching Limit | 25-49 |
| Exceeds Appetite | 50-74 |
| Critical | ≥ 75 |
Usage¶
from src.backend.services import get_risk_engine
async with get_async_session() as db:
engine = get_risk_engine(db)
# Calculate inherent risk
inherent = engine.calculate_inherent_risk(likelihood=4, impact=5)
# Calculate control effectiveness
effectiveness = await engine.calculate_control_effectiveness(org_id)
# Recalculate all risks for an organization
updated = await engine.recalculate_all_risks(org_id)
# Get risk heatmap data
heatmap = await engine.get_risk_heatmap_data(org_id)
KRI Automation Service¶
Location: src/backend/services/kri_automation.py
Features¶
- Automated Data Collection: Execute Steampipe queries on schedule
- Threshold Monitoring: Green/Amber/Red status based on configurable thresholds
- Trend Analysis: Track improving/stable/worsening trends
- Alert Generation: Notify when thresholds are breached
KRI Thresholds¶
KRIs use a traffic light system: - Green: Value ≤ green threshold (healthy) - Amber: Value > green, ≤ amber threshold (warning) - Red: Value > amber threshold (critical)
Scheduler Integration¶
Add a KRI collection job:
from src.backend.scheduler import get_scheduler
from src.backend.scheduler.models import ScheduledJobCreate, JobType
scheduler = get_scheduler()
scheduler.add_job(ScheduledJobCreate(
name="KRI Collection - Daily",
job_type=JobType.KRI_COLLECTION,
schedule={"type": "cron", "expression": "0 6 * * *"}, # 6 AM daily
config={"organization_id": str(org_id)},
))
Usage¶
from src.backend.services import get_kri_automation_service
async with get_async_session() as db:
service = get_kri_automation_service(db)
# Update a KRI value manually
kri, alert = await service.update_kri_value(kri_id, new_value=15.5)
# Collect all automated KRIs for an org
summary = await service.collect_all_kris(org_id, steampipe_executor)
# Get KRI dashboard data
dashboard = await service.get_kri_dashboard(org_id)
# Forecast KRI trend
forecast = await service.forecast_kri_trend(kri_id, periods=3)
A2A Attestation Service¶
Location: src/backend/services/a2a_attestation.py
Features¶
- Machine-to-Machine Protocol: Automated attestation requests/responses
- Cryptographic Verification: HMAC-SHA256 signing and verification
- Report Ingestion: SOC 2 Type II and ISO 27001 certificate parsing
- Vendor Scoring: Automated risk rating from attestation data
Protocol Flow¶
1. Create attestation request with required frameworks
2. Send signed request to vendor's A2A endpoint
3. Receive and validate signed response
4. Map attestation to internal control framework
5. Calculate vendor security score
6. Update vendor risk rating
Supported Frameworks¶
- SOC 2 Type I/II
- ISO 27001
- ISO 27017/27018
- HIPAA
- PCI-DSS
- NIST CSF
- CIS Controls
- CAIQ
Usage¶
from src.backend.services import get_a2a_attestation_service
async with get_async_session() as db:
service = get_a2a_attestation_service(db)
# Create and send attestation request
assessment, request = await service.create_attestation_request(
vendor_id=vendor_id,
requester_org_id=org_id,
requester_org_name="Acme Corp",
frameworks=["SOC2_TYPE2", "ISO27001"],
)
response = await service.send_attestation_request(vendor_id, request)
# Process response
if response:
assessment = await service.process_attestation_response(
assessment_id=assessment.id,
response=response,
)
# Ingest SOC 2 report manually
assessment = await service.ingest_soc2_report(
vendor_id=vendor_id,
report_data=parsed_report,
report_period_end=datetime.now(),
)
Incident Workflow Engine¶
Location: src/backend/services/incident_workflow.py
Features¶
- NIST 800-61 Phases: Detection → Analysis → Containment → Eradication → Recovery → Post-Incident
- SLA Tracking: Response and resolution time targets by severity
- Escalation Levels: Warning (75%), Critical (90%), Breached (100%)
- Regulatory Notifications: GDPR (72h), HIPAA (60d), PCI-DSS (24h), etc.
SLA Configuration¶
| Severity | Response Time | Resolution Time |
|---|---|---|
| Critical | 15 minutes | 4 hours |
| High | 1 hour | 24 hours |
| Medium | 4 hours | 72 hours |
| Low | 24 hours | 1 week |
| Informational | 72 hours | 30 days |
Phase Tasks¶
Each phase automatically generates standard tasks:
- Detection: Triage, identify affected systems, notify commander
- Analysis: Collect evidence, determine root cause, assess regulatory impact
- Containment: Isolate systems, block IPs, disable accounts
- Eradication: Remove artifacts, patch systems, reset credentials
- Recovery: Restore from backup, verify functionality, monitor
- Post-Incident: Review meeting, document lessons, update playbooks
Usage¶
from src.backend.services import get_incident_workflow_engine
async with get_async_session() as db:
engine = get_incident_workflow_engine(db)
# Create incident
incident = await engine.create_incident(
org_id=org_id,
title="Ransomware Detection",
description="Malware detected on workstation WS-042",
category=IncidentCategory.MALICIOUS_CODE,
severity=IncidentSeverity.CRITICAL,
)
# Transition to next phase
incident = await engine.transition_phase(
incident_id=incident.id,
new_phase=IncidentPhase.ANALYSIS,
actor_id=user_id,
)
# Check SLA status
sla_status = await engine.check_sla_status(incident.id)
# Assess breach notification requirements
incident = await engine.assess_breach_notification(
incident_id=incident.id,
is_reportable=True,
affected_records=5000,
regulatory_frameworks=["GDPR", "CCPA"],
)
# Get incident metrics
metrics = await engine.get_incident_metrics(org_id, days=30)
OPA Playbooks Service¶
Location: src/backend/services/opa_playbooks.py
Features¶
- OPA Integration: Evaluate Rego policies for automated actions
- Built-in Playbooks: Pre-defined for common incident types
- Action Handlers: Pluggable automated action execution
- Fallback Actions: Default actions when OPA is unavailable
Built-in Playbooks¶
| Playbook | Incident Category | Actions |
|---|---|---|
| ransomware | Malicious Code | Isolate, disable accounts, snapshot, notify |
| data_breach | Data Breach | Isolate, revoke tokens, enhanced logging |
| unauthorized_access | Unauthorized Access | Disable accounts, revoke tokens, block IPs |
| phishing | Phishing | Disable accounts, revoke tokens, notify |
| denial_of_service | DoS | Block IPs, enhanced logging, notify |
| insider_threat | Insider Threat | Disable (with approval), revoke, snapshot |
Action Handlers¶
Built-in automated actions:
notify_incident_commander- Send notificationisolate_affected_systems- Network isolationblock_malicious_ips- Firewall rulesdisable_compromised_accounts- IdP integrationcreate_forensic_snapshot- VM/cloud snapshotsrevoke_access_tokens- Session terminationenable_enhanced_logging- SIEM configurationinitiate_backup_restore- DR system integration
Registering Custom Actions¶
from src.backend.services.opa_playbooks import register_action
@register_action("quarantine_endpoint")
async def quarantine_endpoint(incident, action, db):
"""Quarantine endpoint via EDR."""
endpoint_id = action.params.get("endpoint_id")
# Call EDR API
return {"status": "success", "endpoint_id": endpoint_id}
Usage¶
from src.backend.services import get_opa_playbook_service
async with get_async_session() as db:
service = get_opa_playbook_service(db)
# Execute playbook for incident
execution = await service.execute_playbook(
incident_id=incident_id,
playbook_id="ransomware", # Auto-selects if None
execute_automated=True,
create_tasks=True,
)
# Test playbook with mock context
result = await service.test_playbook(
playbook_id="data_breach",
test_context={
"severity": "critical",
"category": "data_breach",
"affected_systems": ["db-prod-01"],
},
)
BCM Discovery Service¶
Location: src/backend/services/bcm_discovery.py
Features¶
- Auto-Discovery: Find cloud resources via Steampipe tags
- Dependency Graphs: Visual representation of process dependencies
- SPOF Detection: Identify single points of failure
- Impact Analysis: Calculate cascading effects of failures
- Recovery Sequencing: Optimal restoration order
Tagging Convention¶
Tag cloud resources with Sovereign BCM tags:
| Tag | Description | Example |
|---|---|---|
sovereign:process |
Process identifier | order-fulfillment |
sovereign:criticality |
Criticality tier | business_critical |
sovereign:rto |
RTO in hours | 4 |
sovereign:rpo |
RPO in hours | 1 |
sovereign:environment |
Environment | production |
Supported Resource Types¶
- AWS: EC2, RDS, Lambda, ECS, EKS, ElastiCache, S3, DynamoDB, SQS, SNS, ELB/ALB
- Azure: VMs, SQL Database, Storage Accounts
- GCP: Compute, Cloud SQL
Usage¶
from src.backend.services import get_bcm_discovery_service
async with get_async_session() as db:
service = get_bcm_discovery_service(db)
# Discover dependencies from cloud metadata
summary = await service.discover_dependencies(org_id, steampipe_executor)
# Build dependency graph
graph = await service.build_dependency_graph(org_id)
# Analyze impact of asset failure
impact = await service.analyze_asset_impact(org_id, asset_id="arn:aws:...")
# Get single points of failure
spofs = await service.get_spof_analysis(org_id)
# Get recovery sequence
sequence = await service.get_recovery_sequence(org_id)
# Get BCM dashboard
dashboard = await service.get_bcm_dashboard(org_id)
Cascading RTO Calculation¶
The service calculates effective RTO by considering:
- Declared RTO of the business process
- Recovery time of all dependencies
- Dependency chain ordering
If effective RTO exceeds MTD (Maximum Tolerable Downtime), the process is flagged as at-risk.
Database Models¶
Risk Models (src/backend/db/models/risk.py)¶
RiskScenario- Library of potential threatsRiskRegisterEntry- Instantiated risks with scoresRiskControlMapping- Links risks to mitigating controlsKeyRiskIndicator- KRIs with thresholds and automation
Vendor Models (src/backend/db/models/vendor.py)¶
Vendor- Third-party entitiesVendorContract- Legal agreementsVendorAssessment- Due diligence evaluations
Incident Models (src/backend/db/models/incident.py)¶
Incident- Security/operational incidentsIncidentTask- Response tasksIncidentTimeline- Audit trail
BCM Models (src/backend/db/models/bcm.py)¶
BusinessProcess- Critical functions with RTO/RPOProcessDependency- Links to assetsRecoveryPlan- DR documentation