GuardDuty auto-remediation: isolate EC2 and revoke IAM

An auto-remediation pipeline for GuardDuty using EventBridge and Lambda: isolate instances, snapshot for forensics, revoke credentials, and scale it across an Organization.

· 9 min read · Đọc bản tiếng Việt

GuardDuty is good at detecting threats, but without automated action a finding just sits there waiting for somebody to look at it — and attackers don’t wait. This post walks through how I run an auto-remediation pipeline in production: from the moment GuardDuty emits a finding to the EC2 instance being isolated, EBS volumes snapshotted for forensics, and IAM credentials revoked — all in under 60 seconds, no humans in the loop. I’ll also share how to extend the pattern across many accounts using Organizations and Security Hub aggregation.

TL;DR

  • The GuardDuty → EventBridge → Lambda auto-remediation pipeline isolates an EC2 instance and revokes IAM credentials in under 60 seconds, with no human in the loop.
  • Auto-remediate only severity ≥ 7 (HIGH/CRITICAL) for finding types with a low false-positive rate (CryptoCurrency:EC2/*, UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration); severity 4–6.9 only notifies.
  • The Quarantine Security Group only allows outbound HTTPS to the SSM VPC Endpoint — the forensic team can still reach the instance via SSM Session Manager, with no other network exposure.
  • Save original SGs into a tag OriginalSecurityGroups for fast rollback on false positives; don’t terminate the instance, just swap SGs.
  • Revoke credentials: AKIA (long-term) → disable the key and attach a deny-all policy; ASIA (temporary STS) → put a role policy with aws:TokenIssueTime DateLessThan to revoke all sessions.
  • Multi-account via Organizations: the delegated-admin Security Account assumes SOAR-RemediationRole (deployed via StackSet) into member accounts; use ExternalId to prevent confused-deputy.
  • GuardDuty Tester (CDK) generates real findings to battle-test the pipeline end-to-end before production — don’t wait for a real incident to find out it works.

Why auto-remediation?

In a typical production environment I run:

  • GuardDuty is on across every account and region; on average there are 50–200 findings per week (most of them low severity).
  • HIGH (≥7) severity findings show up 2–5 times per month — these are the ones that need immediate action: crypto mining, credential exfiltration, C2 communication.
  • Mean time from finding to analyst-on-screen: 15–45 minutes (longer without 24/7 on-call — possibly hours).
  • In those 15 minutes the attacker can already lateral-move, exfiltrate data, or deploy ransomware.

Auto-remediation closes that time gap. My rules of thumb:

  1. Severity ≥ 7 (HIGH/CRITICAL): auto-remediate now, notify after.
  2. Severity 4–6.9 (MEDIUM): notify, wait for analyst judgement.
  3. Severity < 4 (LOW): log, aggregate, weekly review.

With that rule, the only findings auto-remediated are the ones where false-positive rate is very low and the cost of inaction exceeds the cost of a false positive.

Architecture overview

┌─────────────────────────────────────────────────────────────────────┐
│                        Security Account                              │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  GuardDuty ──→ EventBridge Rule ──→ Lambda Function                 │
│  (Delegated      (filter HIGH         │                             │
│   Admin)          severity)            ├──→ Isolate EC2 (change SG) │
│                                        ├──→ Snapshot EBS volumes    │
│                                        ├──→ Tag instance            │
│                                        ├──→ Revoke IAM credentials  │
│                                        └──→ SNS notification        │
│                                                                      │
│  Cross-account: Lambda AssumeRole ──→ Member Account                │
│                                        └── SOAR-RemediationRole     │
└─────────────────────────────────────────────────────────────────────┘

Flow in detail:

  1. GuardDuty detects an anomaly (VPC Flow Logs, CloudTrail, DNS logs, Runtime Monitoring) → creates a finding.
  2. EventBridge receives the event from aws.guardduty, filters by severity ≥ 7 and resource type.
  3. Lambda is invoked, parses the finding detail, and identifies the affected resource type.
  4. Remediation actions depend on the finding type:
    • EC2 findings → isolate the instance (swap Security Group), snapshot EBS, tag.
    • IAM findings → disable access key, attach a deny-all policy.
  5. SNS notifies the security team with full context.

I picked the Direct Lambda pattern (no Step Functions) for this pipeline because:

  • Simple, easy to debug.
  • Lower latency (~2–3 seconds vs ~5–8 seconds with Step Functions).
  • For 3–4 sequential actions in a single Lambda, retry logic in code is enough.

If you need more complex orchestration (human approval, parallel branches, wait states), Step Functions is a better fit — I’ll note this at the end.

EventBridge Rule: filter to the right findings

Rule for EC2 findings (HIGH severity)

{
  "source": ["aws.guardduty"],
  "detail-type": ["GuardDuty Finding"],
  "detail": {
    "severity": [{ "numeric": [">=", 7] }],
    "type": [
      { "prefix": "UnauthorizedAccess:EC2/" },
      { "prefix": "Recon:EC2/" },
      { "prefix": "Trojan:EC2/" },
      { "prefix": "Backdoor:EC2/" },
      { "prefix": "Impact:EC2/" },
      { "prefix": "CryptoCurrency:EC2/" }
    ]
  }
}

A few notes:

  • severity >= 7: HIGH and CRITICAL only.
  • prefix matching: covers every sub-type in each category. For example UnauthorizedAccess:EC2/ matches both UnauthorizedAccess:EC2/RDPBruteForce and UnauthorizedAccess:EC2/SSHBruteForce etc.
  • No resource.resourceType here — the prefix is already specific enough to EC2.

Rule for IAM findings

{
  "source": ["aws.guardduty"],
  "detail-type": ["GuardDuty Finding"],
  "detail": {
    "severity": [{ "numeric": [">=", 7] }],
    "type": [
      { "prefix": "UnauthorizedAccess:IAMUser/" },
      { "prefix": "Discovery:IAMUser/" },
      { "prefix": "PenTest:IAMUser/" }
    ]
  }
}

I split this into two separate rules (EC2 and IAM) so each rule triggers a different Lambda. Why: remediation logic differs significantly, and during an incident you want CloudWatch Logs from each function on its own.

Lambda function: isolating an EC2 instance

This is the main Lambda handling EC2 findings. Written in Python 3.12 — lightweight runtime, fast cold start.

Quarantine Security Group

First, every VPC needs a pre-provisioned Quarantine Security Group with these rules:

  • Inbound: no rules at all (blocks all incoming traffic).
  • Outbound: HTTPS (443) only, to the SSM VPC endpoint — so the forensic team can still SSM Session Manager into the instance without opening any other network path.
{
  "SecurityGroup": "sg-quarantine",
  "InboundRules": [],
  "OutboundRules": [
    {
      "Protocol": "tcp",
      "Port": 443,
      "Destination": "com.amazonaws.ap-southeast-1.ssm (VPC Endpoint prefix list)"
    }
  ]
}

Lambda code

import boto3
import json
import os
from datetime import datetime, timezone

ec2 = boto3.client('ec2')
sns = boto3.client('sns')

QUARANTINE_SG_ID = os.environ['QUARANTINE_SG_ID']
SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN']


def lambda_handler(event, context):
    """Handle GuardDuty EC2 finding - isolate instance."""
    detail = event['detail']
    finding_id = detail['id']
    finding_type = detail['type']
    severity = detail['severity']
    instance_id = detail['resource']['instanceDetails']['instanceId']
    account_id = detail['accountId']
    region = detail['region']

    print(f"Processing finding {finding_id}: {finding_type} "
          f"(severity={severity}) for instance {instance_id}")

    # Step 1: Isolate the instance - swap Security Group
    original_sgs = isolate_instance(instance_id)

    # Step 2: Snapshot every EBS volume for forensics
    snapshot_ids = create_forensic_snapshots(instance_id, finding_id)

    # Step 3: Tag the instance with incident metadata
    tag_instance(instance_id, finding_id, finding_type, original_sgs)

    # Step 4: Notify the security team
    notify(finding_id, finding_type, instance_id, account_id,
           region, severity, original_sgs, snapshot_ids)

    return {
        'statusCode': 200,
        'findingId': finding_id,
        'instanceId': instance_id,
        'action': 'isolated',
        'snapshots': snapshot_ids
    }


def isolate_instance(instance_id):
    """Replace all SGs with quarantine SG. Return original SGs for rollback."""
    # Capture the current SGs before changing (saved into a tag for rollback)
    response = ec2.describe_instances(InstanceIds=[instance_id])
    original_sgs = [
        sg['GroupId']
        for sg in response['Reservations'][0]['Instances'][0]['SecurityGroups']
    ]

    # Swap to the quarantine SG
    ec2.modify_instance_attribute(
        InstanceId=instance_id,
        Groups=[QUARANTINE_SG_ID]
    )
    print(f"Isolated {instance_id}: {original_sgs} -> [{QUARANTINE_SG_ID}]")
    return original_sgs


def create_forensic_snapshots(instance_id, finding_id):
    """Snapshot all EBS volumes attached to the instance."""
    volumes = ec2.describe_volumes(
        Filters=[{'Name': 'attachment.instance-id', 'Values': [instance_id]}]
    )
    snapshot_ids = []
    for vol in volumes['Volumes']:
        snap = ec2.create_snapshot(
            VolumeId=vol['VolumeId'],
            Description=f"Forensic snapshot - GuardDuty finding {finding_id}",
            TagSpecifications=[{
                'ResourceType': 'snapshot',
                'Tags': [
                    {'Key': 'Purpose', 'Value': 'Forensic'},
                    {'Key': 'IncidentId', 'Value': finding_id},
                    {'Key': 'SourceInstance', 'Value': instance_id},
                    {'Key': 'CreatedBy', 'Value': 'GuardDuty-AutoRemediation'}
                ]
            }]
        )
        snapshot_ids.append(snap['SnapshotId'])
    print(f"Created {len(snapshot_ids)} forensic snapshots: {snapshot_ids}")
    return snapshot_ids


def tag_instance(instance_id, finding_id, finding_type, original_sgs):
    """Tag instance with incident metadata for tracking and rollback."""
    now = datetime.now(timezone.utc).isoformat()
    ec2.create_tags(
        Resources=[instance_id],
        Tags=[
            {'Key': 'SecurityStatus', 'Value': 'Quarantined'},
            {'Key': 'QuarantinedAt', 'Value': now},
            {'Key': 'GuardDutyFindingId', 'Value': finding_id},
            {'Key': 'GuardDutyFindingType', 'Value': finding_type},
            {'Key': 'OriginalSecurityGroups', 'Value': ','.join(original_sgs)},
        ]
    )


def notify(finding_id, finding_type, instance_id, account_id,
           region, severity, original_sgs, snapshot_ids):
    """Send SNS notification to security team."""
    message = {
        'summary': f"🚨 EC2 Instance Isolated - {finding_type}",
        'finding_id': finding_id,
        'instance_id': instance_id,
        'account_id': account_id,
        'region': region,
        'severity': severity,
        'action_taken': 'Instance isolated (SG swapped to quarantine)',
        'original_security_groups': original_sgs,
        'forensic_snapshots': snapshot_ids,
        'next_steps': [
            'Review finding in GuardDuty console',
            'Use SSM Session Manager to investigate instance',
            'Analyze forensic snapshots',
            'If false positive: restore original SGs from tag'
        ]
    }
    sns.publish(
        TopicArn=SNS_TOPIC_ARN,
        Subject=f"[SECURITY] EC2 Isolated: {instance_id} ({finding_type})",
        Message=json.dumps(message, indent=2)
    )

What matters in this code

  1. Save the original SGs into a tag: when an analyst confirms a false positive, they roll back by reading the OriginalSecurityGroups tag and restoring. No CloudTrail digging required.
  2. Snapshot before any other change: guarantees a clean copy of the disk at detection time.
  3. Never terminate the instance: isolating ≠ deleting. The instance keeps running so the forensic team can analyse memory, processes, and network connections via SSM.
  4. Idempotent: if Lambda is reinvoked (EventBridge retry), re-swapping the SG and re-tagging produces no side effects. A new snapshot will be created but doesn’t break anything.

Auto-revoking compromised IAM credentials

The second pattern I run in parallel: when GuardDuty detects a compromised IAM credential (e.g. UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration.OutsideAWS), the Lambda automatically neutralises the credential.

Distinguishing credential types

GuardDuty IAM findings come in two main flavours:

Access Key prefixTypeAction
AKIALong-term (IAM User)Disable key + attach deny-all policy
ASIATemporary (STS/Role)Revoke sessions + update trust policy

Lambda code for IAM remediation

import boto3
import json
import os
from datetime import datetime, timezone

iam = boto3.client('iam')
sns = boto3.client('sns')

SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN']


def lambda_handler(event, context):
    """Handle GuardDuty IAM finding - revoke compromised credentials."""
    detail = event['detail']
    finding_id = detail['id']
    finding_type = detail['type']

    # Extract IAM details from finding
    resource = detail['resource']
    access_key_details = resource.get('accessKeyDetails', {})
    access_key_id = access_key_details.get('accessKeyId', '')
    principal_id = access_key_details.get('principalId', '')
    user_type = access_key_details.get('userType', '')
    username = access_key_details.get('userName', '')

    print(f"Processing IAM finding: {finding_type}, "
          f"key={access_key_id}, user={username}, type={user_type}")

    if access_key_id.startswith('AKIA'):
        # Long-term credential - IAM User
        revoke_long_term_credential(username, access_key_id, finding_id)
    elif access_key_id.startswith('ASIA'):
        # Temporary credential - Role/STS
        revoke_temporary_credential(username, principal_id, finding_id)
    else:
        print(f"Unknown credential type: {access_key_id}")

    notify_iam_revocation(finding_id, finding_type, username,
                          access_key_id, user_type)

    return {'statusCode': 200, 'action': 'credentials_revoked'}


def revoke_long_term_credential(username, access_key_id, finding_id):
    """Disable access key and attach deny-all policy to IAM user."""
    # Step 1: Disable the compromised access key
    iam.update_access_key(
        UserName=username,
        AccessKeyId=access_key_id,
        Status='Inactive'
    )
    print(f"Disabled access key {access_key_id} for user {username}")

    # Step 2: Attach explicit deny-all policy
    # The deny policy ensures that even if another key is still active,
    # the user can do nothing until the security team reviews
    deny_policy = {
        "Version": "2012-10-17",
        "Statement": [{
            "Sid": "DenyAllAfterCompromise",
            "Effect": "Deny",
            "Action": "*",
            "Resource": "*",
            "Condition": {
                "DateGreaterThan": {
                    "aws:TokenIssueTime": datetime.now(timezone.utc).strftime(
                        "%Y-%m-%dT%H:%M:%SZ"
                    )
                }
            }
        }]
    }
    iam.put_user_policy(
        UserName=username,
        PolicyName=f'GuardDuty-Deny-{finding_id[:8]}',
        PolicyDocument=json.dumps(deny_policy)
    )
    print(f"Attached deny-all policy to user {username}")


def revoke_temporary_credential(role_name, principal_id, finding_id):
    """Revoke all active sessions for a role."""
    # With temporary credentials (ASIA*) you can't "disable" the key —
    # you revoke all sessions by attaching an inline policy with a
    # DateLessThan = now condition
    revoke_policy = {
        "Version": "2012-10-17",
        "Statement": [{
            "Sid": "RevokeOlderSessions",
            "Effect": "Deny",
            "Action": "*",
            "Resource": "*",
            "Condition": {
                "DateLessThan": {
                    "aws:TokenIssueTime": datetime.now(timezone.utc).strftime(
                        "%Y-%m-%dT%H:%M:%SZ"
                    )
                }
            }
        }]
    }
    try:
        iam.put_role_policy(
            RoleName=role_name,
            PolicyName=f'RevokeOlderSessions-{finding_id[:8]}',
            PolicyDocument=json.dumps(revoke_policy)
        )
        print(f"Revoked all sessions for role {role_name}")
    except iam.exceptions.NoSuchEntityException:
        print(f"Role {role_name} not found - may be in another account")


def notify_iam_revocation(finding_id, finding_type, username,
                          access_key_id, user_type):
    """Notify security team about credential revocation."""
    sns.publish(
        TopicArn=SNS_TOPIC_ARN,
        Subject=f"[SECURITY] IAM Credentials Revoked: {username}",
        Message=json.dumps({
            'summary': f"🔑 IAM Credentials Revoked - {finding_type}",
            'finding_id': finding_id,
            'username': username,
            'access_key_id': access_key_id,
            'user_type': user_type,
            'action_taken': 'Key disabled + deny-all policy attached',
            'next_steps': [
                'Investigate CloudTrail for actions taken with this credential',
                'Check for persistence mechanisms (new users, roles, keys)',
                'Rotate any secrets this credential had access to',
                'Remove deny policy after investigation complete'
            ]
        }, indent=2)
    )

Why use a deny policy instead of just disabling the key?

Disabling the access key only blocks that specific key. But if an attacker has already:

  • Created another access key on the same user
  • Assumed a different role with the compromised credential
  • Created a console password

…then a deny-all inline policy overrides every permission regardless of which credential is used. This is defense-in-depth: disable the key (block the specific vector) plus the deny policy (block the entire user).

Important note: the deny policy uses a Condition with aws:TokenIssueTime to only block sessions issued after the compromise was detected. Older sessions (if any) are blocked by the DateLessThan condition in the role’s revoke policy.

IAM policies for the Lambda functions

Execution role for the EC2 remediation Lambda

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "EC2Isolation",
      "Effect": "Allow",
      "Action": [
        "ec2:ModifyInstanceAttribute",
        "ec2:DescribeInstances",
        "ec2:CreateTags"
      ],
      "Resource": "*",
      "Condition": {
        "StringEquals": {
          "aws:RequestedRegion": ["ap-southeast-1", "us-east-1"]
        }
      }
    },
    {
      "Sid": "EBSForensicSnapshot",
      "Effect": "Allow",
      "Action": [
        "ec2:DescribeVolumes",
        "ec2:CreateSnapshot",
        "ec2:CreateTags"
      ],
      "Resource": "*"
    },
    {
      "Sid": "Notification",
      "Effect": "Allow",
      "Action": "sns:Publish",
      "Resource": "arn:aws:sns:ap-southeast-1:111122223333:SecOps-Alerts"
    },
    {
      "Sid": "Logging",
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:111122223333:*"
    },
    {
      "Sid": "CrossAccountAssume",
      "Effect": "Allow",
      "Action": "sts:AssumeRole",
      "Resource": "arn:aws:iam::*:role/SOAR-RemediationRole"
    }
  ]
}

Execution role for the IAM remediation Lambda

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "IAMRemediation",
      "Effect": "Allow",
      "Action": [
        "iam:UpdateAccessKey",
        "iam:PutUserPolicy",
        "iam:PutRolePolicy"
      ],
      "Resource": [
        "arn:aws:iam::111122223333:user/*",
        "arn:aws:iam::111122223333:role/*"
      ]
    },
    {
      "Sid": "Notification",
      "Effect": "Allow",
      "Action": "sns:Publish",
      "Resource": "arn:aws:sns:ap-southeast-1:111122223333:SecOps-Alerts"
    },
    {
      "Sid": "Logging",
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:111122223333:*"
    },
    {
      "Sid": "CrossAccountAssume",
      "Effect": "Allow",
      "Action": "sts:AssumeRole",
      "Resource": "arn:aws:iam::*:role/SOAR-RemediationRole"
    }
  ]
}

Trust policy for the Lambda execution role

{
  "Version": "2012-10-17",
  "Statement": [{
    "Effect": "Allow",
    "Principal": {
      "Service": "lambda.amazonaws.com"
    },
    "Action": "sts:AssumeRole"
  }]
}

Least-privilege principles applied here:

  • Region scope via aws:RequestedRegion — the Lambda can only act on instances in the regions I explicitly allow.
  • SNS publish is scoped to one topic, not *.
  • Cross-account assume is locked to a fixed role name (SOAR-RemediationRole), not any role.
  • IAM actions are limited to UpdateAccessKey, PutUserPolicy, PutRolePolicy — no DeleteUser, CreateUser, or AttachUserPolicy (managed policy).

Multi-account setup with Organizations + Security Hub

In practice, nobody operates a single AWS account. I run 30+ accounts in an Organization, and the auto-remediation pattern has to work across all of them.

Delegated Administrator architecture

┌─────────────────────────────────────────────────────────────────────┐
│                      AWS Organization                                │
├─────────────────────────────────────────────────────────────────────┤
│  Management Account (root)                                          │
│  └── Does exactly one thing: designate delegated admin              │
│      • guardduty:EnableOrganizationAdminAccount                     │
│      • securityhub:EnableOrganizationAdminAccount                   │
├─────────────────────────────────────────────────────────────────────┤
│  Security Account (Delegated Admin) ← ALL LOGIC LIVES HERE         │
│  ├── GuardDuty Administrator (aggregates findings from every acct) │
│  ├── Security Hub Administrator (single pane of glass)              │
│  ├── EventBridge Rules (filter + route findings)                    │
│  ├── Lambda Functions (execute remediation)                         │
│  ├── Step Functions (complex playbooks)                             │
│  └── SNS Topics (notifications)                                    │
├─────────────────────────────────────────────────────────────────────┤
│  Member Account A (Production)                                      │
│  ├── GuardDuty enabled (auto-enrolled by delegated admin)          │
│  ├── Security Hub enabled (auto-enrolled)                          │
│  └── IAM Role: SOAR-RemediationRole (trust Security Account)       │
├─────────────────────────────────────────────────────────────────────┤
│  Member Account B (Staging)                                         │
│  ├── GuardDuty enabled (auto-enrolled)                             │
│  ├── Security Hub enabled (auto-enrolled)                          │
│  └── IAM Role: SOAR-RemediationRole (trust Security Account)       │
└─────────────────────────────────────────────────────────────────────┘

Step 1: Designate the delegated admin (run from the Management Account)

# GuardDuty delegated admin
aws guardduty enable-organization-admin-account \
  --admin-account-id 222222222222

# Security Hub delegated admin
aws securityhub enable-organization-admin-account \
  --admin-account-id 222222222222

Step 2: Auto-enable for every member account (run from the Security Account)

# GuardDuty: auto-enable for every account joining the Organization
DETECTOR_ID=$(aws guardduty list-detectors --query 'DetectorIds[0]' --output text)

aws guardduty update-organization-configuration \
  --detector-id $DETECTOR_ID \
  --auto-enable-organization-members ALL \
  --features '[
    {"Name": "S3_DATA_EVENTS", "AutoEnable": "ALL"},
    {"Name": "EKS_AUDIT_LOGS", "AutoEnable": "ALL"},
    {"Name": "RUNTIME_MONITORING", "AutoEnable": "ALL",
     "AdditionalConfiguration": [
       {"Name": "EKS_ADDON_MANAGEMENT", "AutoEnable": "ALL"},
       {"Name": "EC2_AGENT_MANAGEMENT", "AutoEnable": "ALL"}
     ]}
  ]'

# Security Hub: auto-enable
aws securityhub update-organization-configuration \
  --auto-enable

Step 3: Deploy SOAR-RemediationRole via StackSets

Every member account needs an IAM role that the Lambda in the Security Account can assume into:

AWSTemplateFormatVersion: '2010-09-09'
Description: Cross-account role for GuardDuty auto-remediation

Parameters:
  SecurityAccountId:
    Type: String
    Description: Account ID of the Security/Audit account

Resources:
  SOARRemediationRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: SOAR-RemediationRole
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              AWS: !Sub 'arn:aws:iam::${SecurityAccountId}:role/GuardDuty-EC2-Remediation-Role'
            Action: sts:AssumeRole
            Condition:
              StringEquals:
                sts:ExternalId: !Ref 'AWS::AccountId'
      Policies:
        - PolicyName: RemediationPermissions
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Sid: EC2Isolation
                Effect: Allow
                Action:
                  - ec2:ModifyInstanceAttribute
                  - ec2:DescribeInstances
                  - ec2:DescribeVolumes
                  - ec2:CreateSnapshot
                  - ec2:CreateTags
                  - ec2:DescribeSecurityGroups
                Resource: '*'
              - Sid: IAMRemediation
                Effect: Allow
                Action:
                  - iam:UpdateAccessKey
                  - iam:PutUserPolicy
                  - iam:PutRolePolicy
                  - iam:ListAccessKeys
                Resource:
                  - !Sub 'arn:aws:iam::${AWS::AccountId}:user/*'
                  - !Sub 'arn:aws:iam::${AWS::AccountId}:role/*'

Deploy via StackSet:

aws cloudformation create-stack-set \
  --stack-set-name soar-remediation-role \
  --template-body file://soar-remediation-role.yaml \
  --permission-model SERVICE_MANAGED \
  --auto-deployment Enabled=true,RetainStacksOnAccountRemoval=false \
  --capabilities CAPABILITY_NAMED_IAM \
  --parameters ParameterKey=SecurityAccountId,ParameterValue=222222222222

aws cloudformation create-stack-instances \
  --stack-set-name soar-remediation-role \
  --deployment-targets OrganizationalUnitIds=ou-xxxx-yyyyyyyy \
  --regions ap-southeast-1

Step 4: Cross-account Lambda logic

When a GuardDuty finding originates from a member account, the Lambda needs to assume role into that account:

import boto3

def get_cross_account_client(service, account_id, region='ap-southeast-1'):
    """Assume SOAR-RemediationRole in member account."""
    sts = boto3.client('sts')
    assumed = sts.assume_role(
        RoleArn=f'arn:aws:iam::{account_id}:role/SOAR-RemediationRole',
        RoleSessionName='GuardDuty-AutoRemediation',
        ExternalId=account_id  # ExternalId to defeat confused-deputy
    )
    return boto3.client(
        service,
        region_name=region,
        aws_access_key_id=assumed['Credentials']['AccessKeyId'],
        aws_secret_access_key=assumed['Credentials']['SecretAccessKey'],
        aws_session_token=assumed['Credentials']['SessionToken']
    )


def lambda_handler(event, context):
    detail = event['detail']
    account_id = detail['accountId']
    region = detail['region']
    instance_id = detail['resource']['instanceDetails']['instanceId']

    # If the finding is from the security account itself → use a local client
    # If it's from a member account → assume role
    if account_id == context.invoked_function_arn.split(':')[4]:
        ec2 = boto3.client('ec2', region_name=region)
    else:
        ec2 = get_cross_account_client('ec2', account_id, region)

    # Continue the remediation logic as before...
    ec2.modify_instance_attribute(
        InstanceId=instance_id,
        Groups=[get_quarantine_sg(account_id, region)]
    )

Security Hub aggregation: single pane of glass

When GuardDuty is delegated admin, every finding shows up in Security Hub in the Security Account automatically. That enables:

  1. Unified view: analysts see findings across every account in one place.
  2. Cross-account correlation: Security Hub correlates findings across accounts to detect lateral movement.
  3. Custom Actions: analysts can trigger manual remediation from the Security Hub console for findings auto-remediation doesn’t cover.
  4. Compliance tracking: Security Hub standards (CIS, AWS Foundational) run in parallel, providing more context on the security posture.

I also configure Security Hub finding aggregation to aggregate findings from every region into a single primary region:

aws securityhub create-finding-aggregator \
  --region ap-southeast-1 \
  --region-linking-mode ALL_REGIONS

Operations: testing, monitoring, and false positives

Test with GuardDuty sample findings

GuardDuty has an API for generating sample findings — this is how I validate the pipeline before turning it on in production:

# Generate sample findings across multiple finding types
DETECTOR_ID=$(aws guardduty list-detectors --query 'DetectorIds[0]' --output text)

aws guardduty create-sample-findings \
  --detector-id $DETECTOR_ID \
  --finding-types \
    "UnauthorizedAccess:EC2/MaliciousIPCaller.Custom" \
    "CryptoCurrency:EC2/BitcoinTool.B!DNS" \
    "UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration.OutsideAWS"

Sample findings carry sample: true in the event detail, so I add a check inside the Lambda:

def lambda_handler(event, context):
    detail = event['detail']

    # Skip sample findings in production
    if detail.get('service', {}).get('additionalInfo', {}).get('sample') is True:
        print("Sample finding detected - skipping remediation")
        # Still log it so we can confirm the pipeline works
        return {'statusCode': 200, 'action': 'skipped_sample'}

    # ... the real remediation logic

In staging/test environments, I drop this check so sample findings trigger the full pipeline — useful for end-to-end validation.

GuardDuty Tester (open-source)

AWS publishes the GuardDuty Tester — a CDK-based tool that deploys a test environment with EC2 instances actually performing suspicious behaviour (port scans, DNS queries to known-bad domains, crypto-mining simulation). The tool produces real findings (not samples), so the pipeline behaves exactly like it would in production.

# Clone and deploy
git clone https://github.com/awslabs/amazon-guardduty-tester.git
cd amazon-guardduty-tester
npm install
cdk deploy

# Run test scenarios
aws ssm send-command \
  --document-name "GuardDutyTesterDocument" \
  --targets "Key=tag:Purpose,Values=GuardDutyTester"

Monitoring false positives

False positives are the single biggest risk of auto-remediation. One production instance isolated incorrectly = downtime. I mitigate via:

1. Suppression rules in GuardDuty:

aws guardduty create-filter \
  --detector-id $DETECTOR_ID \
  --name "SuppressNATGatewayPortScan" \
  --action ARCHIVE \
  --finding-criteria '{
    "Criterion": {
      "type": {
        "Eq": ["Recon:EC2/Portscan"]
      },
      "resource.instanceDetails.tags.value": {
        "Eq": ["nat-gateway"]
      }
    }
  }'

For example: NAT Gateway instances are often flagged as port-scanners because they forward traffic from many sources. The suppression rule archives the finding before it reaches EventBridge.

2. Trusted IP lists:

# Create a trusted IP list for office IPs, VPN endpoints, known scanners
aws guardduty create-ip-set \
  --detector-id $DETECTOR_ID \
  --name "TrustedIPs" \
  --format TXT \
  --location s3://my-security-bucket/trusted-ips.txt \
  --activate

trusted-ips.txt:

# Office IPs
203.0.113.0/24
# VPN endpoints
198.51.100.0/24
# Authorized pen-test provider
192.0.2.50/32

3. Tag-based exclusion inside the Lambda:

def should_remediate(instance_id, ec2_client):
    """Check if instance should be auto-remediated based on tags."""
    response = ec2_client.describe_instances(InstanceIds=[instance_id])
    instance = response['Reservations'][0]['Instances'][0]
    tags = {t['Key']: t['Value'] for t in instance.get('Tags', [])}

    # Skip instances tagged as exempt
    if tags.get('AutoRemediation') == 'disabled':
        print(f"Instance {instance_id} is exempt from auto-remediation")
        return False

    # Skip instances already quarantined (avoid double-processing)
    if tags.get('SecurityStatus') == 'Quarantined':
        print(f"Instance {instance_id} already quarantined")
        return False

    return True

4. CloudWatch metrics and alarms:

I publish custom metrics from the Lambda to track:

import boto3

cloudwatch = boto3.client('cloudwatch')

def publish_metrics(action, finding_type, is_false_positive=False):
    cloudwatch.put_metric_data(
        Namespace='GuardDuty/AutoRemediation',
        MetricData=[
            {
                'MetricName': 'RemediationActions',
                'Value': 1,
                'Unit': 'Count',
                'Dimensions': [
                    {'Name': 'Action', 'Value': action},
                    {'Name': 'FindingType', 'Value': finding_type}
                ]
            },
            {
                'MetricName': 'FalsePositives',
                'Value': 1 if is_false_positive else 0,
                'Unit': 'Count',
                'Dimensions': [
                    {'Name': 'FindingType', 'Value': finding_type}
                ]
            }
        ]
    )

When the false-positive rate exceeds 10% over 7 days → auto-disable auto-remediation for that finding type and notify the team to review the suppression rules.

5. Rollback procedure:

When an analyst confirms a false positive, rollback is straightforward:

# Read the original SGs from the tag
ORIGINAL_SGS=$(aws ec2 describe-tags \
  --filters "Name=resource-id,Values=i-0123456789" \
             "Name=key,Values=OriginalSecurityGroups" \
  --query 'Tags[0].Value' --output text)

# Restore original security groups
aws ec2 modify-instance-attribute \
  --instance-id i-0123456789 \
  --groups $(echo $ORIGINAL_SGS | tr ',' ' ')

# Update tags
aws ec2 create-tags \
  --resources i-0123456789 \
  --tags Key=SecurityStatus,Value=Restored \
         Key=RestoredAt,Value=$(date -u +%Y-%m-%dT%H:%M:%SZ)

Operational checklist

Before flipping auto-remediation on in production, I always walk through this checklist:

#ItemStatus
1Quarantine SG exists in every VPC across every account
2SOAR-RemediationRole deployed in every member account
3Lambda tested with sample findings (end-to-end)
4Suppression rules configured for known false positives
5Trusted IP list uploaded and activated
6SNS topic has a subscriber (email/Slack/PagerDuty)
7CloudWatch alarms for Lambda errors and false-positive rate
8Rollback runbook documented
9DryRun mode tested for a week with no unexpected triggers
10Stakeholders (DevOps, App teams) informed

Conclusion

Auto-remediation isn’t “set and forget”. It’s a living system that needs continuous tuning:

  • Week 1: DryRun, log + notify only, no action.
  • Weeks 2–4: enable remediation for 1–2 finding types with the highest confidence (CryptoCurrency:EC2/*, UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration).
  • Month 2+: expand gradually to other finding types, adding suppression rules as false positives surface.

The pattern I described — GuardDuty → EventBridge → Lambda — is the foundation. From here you can extend it:

  • Add Step Functions for complex playbooks (human approval before terminating an instance).
  • Integrate Amazon Detective to automatically enrich a finding with context (related findings, resource history).
  • Connect Slack/Teams via a chatbot so analysts can approve/reject remediation from mobile.
  • Use DynamoDB to track blocked IPs with TTL so they auto-unblock after 24 hours.

Most importantly: test, test, test. Use sample findings, use the GuardDuty Tester, run game days with your team. When a real incident hits, you want the pipeline already battle-tested, not on its first run.