Skip to content

Integrated Risk Management (IRM) Services

This document describes the IRM services implemented in Sovereign GRC for comprehensive risk management, incident response, and business continuity.

Overview

Sovereign GRC provides six core IRM services:

  1. Risk Engine - Dynamic risk calculation and monitoring
  2. KRI Automation - Key Risk Indicator monitoring and alerting
  3. A2A Attestation - Agent-to-agent vendor attestation protocol
  4. Incident Workflow - NIST 800-61 compliant incident response
  5. OPA Playbooks - Automated incident response actions
  6. BCM Discovery - Business continuity dependency mapping

Risk Engine Service

Location: src/backend/services/risk_engine.py

Features

  • 5x5 Risk Matrix: Calculates inherent risk scores (likelihood × impact)
  • Residual Risk Calculation: Factors in control effectiveness (max 80% reduction)
  • Risk Appetite Monitoring: Tracks status against organizational thresholds
  • Event-Driven Updates: Recalculates when control findings change

Risk Calculation Formula

Inherent Risk = Likelihood (1-5) × Impact (1-5) × 5
Residual Risk = Inherent Risk × (1 - Control Effectiveness × 0.8)

Risk Appetite Thresholds

Status Residual Risk Score
Within Appetite < 25
Approaching Limit 25-49
Exceeds Appetite 50-74
Critical ≥ 75

Usage

from src.backend.services import get_risk_engine

async with get_async_session() as db:
    engine = get_risk_engine(db)

    # Calculate inherent risk
    inherent = engine.calculate_inherent_risk(likelihood=4, impact=5)

    # Calculate control effectiveness
    effectiveness = await engine.calculate_control_effectiveness(org_id)

    # Recalculate all risks for an organization
    updated = await engine.recalculate_all_risks(org_id)

    # Get risk heatmap data
    heatmap = await engine.get_risk_heatmap_data(org_id)

KRI Automation Service

Location: src/backend/services/kri_automation.py

Features

  • Automated Data Collection: Execute Steampipe queries on schedule
  • Threshold Monitoring: Green/Amber/Red status based on configurable thresholds
  • Trend Analysis: Track improving/stable/worsening trends
  • Alert Generation: Notify when thresholds are breached

KRI Thresholds

KRIs use a traffic light system: - Green: Value ≤ green threshold (healthy) - Amber: Value > green, ≤ amber threshold (warning) - Red: Value > amber threshold (critical)

Scheduler Integration

Add a KRI collection job:

from src.backend.scheduler import get_scheduler
from src.backend.scheduler.models import ScheduledJobCreate, JobType

scheduler = get_scheduler()
scheduler.add_job(ScheduledJobCreate(
    name="KRI Collection - Daily",
    job_type=JobType.KRI_COLLECTION,
    schedule={"type": "cron", "expression": "0 6 * * *"},  # 6 AM daily
    config={"organization_id": str(org_id)},
))

Usage

from src.backend.services import get_kri_automation_service

async with get_async_session() as db:
    service = get_kri_automation_service(db)

    # Update a KRI value manually
    kri, alert = await service.update_kri_value(kri_id, new_value=15.5)

    # Collect all automated KRIs for an org
    summary = await service.collect_all_kris(org_id, steampipe_executor)

    # Get KRI dashboard data
    dashboard = await service.get_kri_dashboard(org_id)

    # Forecast KRI trend
    forecast = await service.forecast_kri_trend(kri_id, periods=3)

A2A Attestation Service

Location: src/backend/services/a2a_attestation.py

Features

  • Machine-to-Machine Protocol: Automated attestation requests/responses
  • Cryptographic Verification: HMAC-SHA256 signing and verification
  • Report Ingestion: SOC 2 Type II and ISO 27001 certificate parsing
  • Vendor Scoring: Automated risk rating from attestation data

Protocol Flow

1. Create attestation request with required frameworks
2. Send signed request to vendor's A2A endpoint
3. Receive and validate signed response
4. Map attestation to internal control framework
5. Calculate vendor security score
6. Update vendor risk rating

Supported Frameworks

  • SOC 2 Type I/II
  • ISO 27001
  • ISO 27017/27018
  • HIPAA
  • PCI-DSS
  • NIST CSF
  • CIS Controls
  • CAIQ

Usage

from src.backend.services import get_a2a_attestation_service

async with get_async_session() as db:
    service = get_a2a_attestation_service(db)

    # Create and send attestation request
    assessment, request = await service.create_attestation_request(
        vendor_id=vendor_id,
        requester_org_id=org_id,
        requester_org_name="Acme Corp",
        frameworks=["SOC2_TYPE2", "ISO27001"],
    )

    response = await service.send_attestation_request(vendor_id, request)

    # Process response
    if response:
        assessment = await service.process_attestation_response(
            assessment_id=assessment.id,
            response=response,
        )

    # Ingest SOC 2 report manually
    assessment = await service.ingest_soc2_report(
        vendor_id=vendor_id,
        report_data=parsed_report,
        report_period_end=datetime.now(),
    )

Incident Workflow Engine

Location: src/backend/services/incident_workflow.py

Features

  • NIST 800-61 Phases: Detection → Analysis → Containment → Eradication → Recovery → Post-Incident
  • SLA Tracking: Response and resolution time targets by severity
  • Escalation Levels: Warning (75%), Critical (90%), Breached (100%)
  • Regulatory Notifications: GDPR (72h), HIPAA (60d), PCI-DSS (24h), etc.

SLA Configuration

Severity Response Time Resolution Time
Critical 15 minutes 4 hours
High 1 hour 24 hours
Medium 4 hours 72 hours
Low 24 hours 1 week
Informational 72 hours 30 days

Phase Tasks

Each phase automatically generates standard tasks:

  • Detection: Triage, identify affected systems, notify commander
  • Analysis: Collect evidence, determine root cause, assess regulatory impact
  • Containment: Isolate systems, block IPs, disable accounts
  • Eradication: Remove artifacts, patch systems, reset credentials
  • Recovery: Restore from backup, verify functionality, monitor
  • Post-Incident: Review meeting, document lessons, update playbooks

Usage

from src.backend.services import get_incident_workflow_engine

async with get_async_session() as db:
    engine = get_incident_workflow_engine(db)

    # Create incident
    incident = await engine.create_incident(
        org_id=org_id,
        title="Ransomware Detection",
        description="Malware detected on workstation WS-042",
        category=IncidentCategory.MALICIOUS_CODE,
        severity=IncidentSeverity.CRITICAL,
    )

    # Transition to next phase
    incident = await engine.transition_phase(
        incident_id=incident.id,
        new_phase=IncidentPhase.ANALYSIS,
        actor_id=user_id,
    )

    # Check SLA status
    sla_status = await engine.check_sla_status(incident.id)

    # Assess breach notification requirements
    incident = await engine.assess_breach_notification(
        incident_id=incident.id,
        is_reportable=True,
        affected_records=5000,
        regulatory_frameworks=["GDPR", "CCPA"],
    )

    # Get incident metrics
    metrics = await engine.get_incident_metrics(org_id, days=30)

OPA Playbooks Service

Location: src/backend/services/opa_playbooks.py

Features

  • OPA Integration: Evaluate Rego policies for automated actions
  • Built-in Playbooks: Pre-defined for common incident types
  • Action Handlers: Pluggable automated action execution
  • Fallback Actions: Default actions when OPA is unavailable

Built-in Playbooks

Playbook Incident Category Actions
ransomware Malicious Code Isolate, disable accounts, snapshot, notify
data_breach Data Breach Isolate, revoke tokens, enhanced logging
unauthorized_access Unauthorized Access Disable accounts, revoke tokens, block IPs
phishing Phishing Disable accounts, revoke tokens, notify
denial_of_service DoS Block IPs, enhanced logging, notify
insider_threat Insider Threat Disable (with approval), revoke, snapshot

Action Handlers

Built-in automated actions:

  • notify_incident_commander - Send notification
  • isolate_affected_systems - Network isolation
  • block_malicious_ips - Firewall rules
  • disable_compromised_accounts - IdP integration
  • create_forensic_snapshot - VM/cloud snapshots
  • revoke_access_tokens - Session termination
  • enable_enhanced_logging - SIEM configuration
  • initiate_backup_restore - DR system integration

Registering Custom Actions

from src.backend.services.opa_playbooks import register_action

@register_action("quarantine_endpoint")
async def quarantine_endpoint(incident, action, db):
    """Quarantine endpoint via EDR."""
    endpoint_id = action.params.get("endpoint_id")
    # Call EDR API
    return {"status": "success", "endpoint_id": endpoint_id}

Usage

from src.backend.services import get_opa_playbook_service

async with get_async_session() as db:
    service = get_opa_playbook_service(db)

    # Execute playbook for incident
    execution = await service.execute_playbook(
        incident_id=incident_id,
        playbook_id="ransomware",  # Auto-selects if None
        execute_automated=True,
        create_tasks=True,
    )

    # Test playbook with mock context
    result = await service.test_playbook(
        playbook_id="data_breach",
        test_context={
            "severity": "critical",
            "category": "data_breach",
            "affected_systems": ["db-prod-01"],
        },
    )

BCM Discovery Service

Location: src/backend/services/bcm_discovery.py

Features

  • Auto-Discovery: Find cloud resources via Steampipe tags
  • Dependency Graphs: Visual representation of process dependencies
  • SPOF Detection: Identify single points of failure
  • Impact Analysis: Calculate cascading effects of failures
  • Recovery Sequencing: Optimal restoration order

Tagging Convention

Tag cloud resources with Sovereign BCM tags:

Tag Description Example
sovereign:process Process identifier order-fulfillment
sovereign:criticality Criticality tier business_critical
sovereign:rto RTO in hours 4
sovereign:rpo RPO in hours 1
sovereign:environment Environment production

Supported Resource Types

  • AWS: EC2, RDS, Lambda, ECS, EKS, ElastiCache, S3, DynamoDB, SQS, SNS, ELB/ALB
  • Azure: VMs, SQL Database, Storage Accounts
  • GCP: Compute, Cloud SQL

Usage

from src.backend.services import get_bcm_discovery_service

async with get_async_session() as db:
    service = get_bcm_discovery_service(db)

    # Discover dependencies from cloud metadata
    summary = await service.discover_dependencies(org_id, steampipe_executor)

    # Build dependency graph
    graph = await service.build_dependency_graph(org_id)

    # Analyze impact of asset failure
    impact = await service.analyze_asset_impact(org_id, asset_id="arn:aws:...")

    # Get single points of failure
    spofs = await service.get_spof_analysis(org_id)

    # Get recovery sequence
    sequence = await service.get_recovery_sequence(org_id)

    # Get BCM dashboard
    dashboard = await service.get_bcm_dashboard(org_id)

Cascading RTO Calculation

The service calculates effective RTO by considering:

  1. Declared RTO of the business process
  2. Recovery time of all dependencies
  3. Dependency chain ordering
Effective RTO = Declared RTO + Max(Dependency Recovery Times)

If effective RTO exceeds MTD (Maximum Tolerable Downtime), the process is flagged as at-risk.

Database Models

Risk Models (src/backend/db/models/risk.py)

  • RiskScenario - Library of potential threats
  • RiskRegisterEntry - Instantiated risks with scores
  • RiskControlMapping - Links risks to mitigating controls
  • KeyRiskIndicator - KRIs with thresholds and automation

Vendor Models (src/backend/db/models/vendor.py)

  • Vendor - Third-party entities
  • VendorContract - Legal agreements
  • VendorAssessment - Due diligence evaluations

Incident Models (src/backend/db/models/incident.py)

  • Incident - Security/operational incidents
  • IncidentTask - Response tasks
  • IncidentTimeline - Audit trail

BCM Models (src/backend/db/models/bcm.py)

  • BusinessProcess - Critical functions with RTO/RPO
  • ProcessDependency - Links to assets
  • RecoveryPlan - DR documentation