GuardDuty phát hiện mối đe doạ rất tốt, nhưng nếu không có hành động tự động đi kèm thì finding chỉ nằm đó chờ người xem — và kẻ tấn công không chờ. Bài này mô tả cách mình triển khai pipeline auto-remediation trong production: từ lúc GuardDuty phát ra finding đến lúc EC2 instance bị cô lập, EBS được snapshot để forensic, IAM credentials bị thu hồi — tất cả trong vòng dưới 60 giây, không cần con người can thiệp. Mình cũng chia sẻ cách mở rộng pattern này ra multi-account với Organizations và Security Hub aggregation.
TL;DR
- Pipeline auto-remediation GuardDuty → EventBridge → Lambda cô lập EC2 và revoke IAM credentials trong dưới 60 giây, không cần con người can thiệp.
- Chỉ auto-remediate severity ≥ 7 (HIGH/CRITICAL) cho finding types có false-positive rate thấp (
CryptoCurrency:EC2/*,UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration); severity 4-6.9 chỉ notify.- Quarantine Security Group outbound chỉ allow HTTPS tới SSM VPC Endpoint — forensic team vẫn SSM Session Manager vào instance mà không có network khác.
- Lưu original SGs vào tag
OriginalSecurityGroupscho rollback nhanh khi false positive; không terminate instance, chỉ swap SG.- Revoke credentials:
AKIA(long-term) → disable key + attach deny-all policy;ASIA(temporary STS) → put role policy vớiaws:TokenIssueTimeDateLessThan để revoke all sessions.- Multi-account qua Organizations: delegated admin Security Account assume
SOAR-RemediationRole(deploy qua StackSet) vào member accounts; dùngExternalIdchống confused deputy.- GuardDuty Tester (CDK) tạo real findings để battle-test pipeline end-to-end trước production — đừng đợi incident thật mới run lần đầu.
Tại sao cần auto-remediation?
Trong một môi trường production điển hình mình vận hành:
- GuardDuty bật ở tất cả account và region, trung bình mỗi tuần có 50–200 findings (phần lớn severity thấp).
- Findings severity HIGH (≥7) xuất hiện 2–5 lần/tháng — đây là những thứ cần phản ứng ngay: crypto mining, credential exfiltration, C2 communication.
- Thời gian trung bình từ lúc finding xuất hiện đến lúc analyst nhìn thấy: 15–45 phút (nếu không có on-call 24/7 thì có thể là vài giờ).
- Trong 15 phút đó, attacker đã có thể lateral move, exfiltrate data, hoặc deploy ransomware.
Auto-remediation giải quyết khoảng trống thời gian này. Nguyên tắc của mình:
- Severity ≥ 7 (HIGH/CRITICAL): auto-remediate ngay, notify sau.
- Severity 4–6.9 (MEDIUM): notify, chờ analyst quyết định.
- Severity < 4 (LOW): log, aggregate, review hàng tuần.
Với nguyên tắc này, mình chỉ auto-remediate những finding mà false positive rate rất thấp và hậu quả của việc không hành động lớn hơn hậu quả của false positive.
Tổng quan kiến trúc
┌─────────────────────────────────────────────────────────────────────┐
│ Security Account │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ GuardDuty ──→ EventBridge Rule ──→ Lambda Function │
│ (Delegated (filter HIGH │ │
│ Admin) severity) ├──→ Isolate EC2 (change SG) │
│ ├──→ Snapshot EBS volumes │
│ ├──→ Tag instance │
│ ├──→ Revoke IAM credentials │
│ └──→ SNS notification │
│ │
│ Cross-account: Lambda AssumeRole ──→ Member Account │
│ └── SOAR-RemediationRole │
└─────────────────────────────────────────────────────────────────────┘
Flow chi tiết:
- GuardDuty phát hiện anomaly (VPC Flow Logs, CloudTrail, DNS logs, Runtime Monitoring) → tạo finding.
- EventBridge nhận event với source
aws.guardduty, filter theo severity ≥ 7 và resource type. - Lambda được invoke, parse finding detail, xác định loại resource bị ảnh hưởng.
- Remediation actions tuỳ theo finding type:
- EC2 findings → cô lập instance (swap Security Group), snapshot EBS, tag.
- IAM findings → disable access key, attach deny-all policy.
- SNS gửi notification cho security team với context đầy đủ.
Mình chọn pattern Direct Lambda (không qua Step Functions) cho pipeline này vì:
- Đơn giản, dễ debug.
- Latency thấp hơn (~2–3 giây vs ~5–8 giây với Step Functions).
- Với 3–4 actions tuần tự trong cùng một Lambda, retry logic viết bằng code đủ dùng.
Nếu bạn cần orchestration phức tạp hơn (human approval, parallel branches, wait states), Step Functions là lựa chọn tốt hơn — mình sẽ note ở cuối bài.
EventBridge Rule: filter đúng finding
Rule cho EC2 findings (HIGH severity)
{
"source": ["aws.guardduty"],
"detail-type": ["GuardDuty Finding"],
"detail": {
"severity": [{ "numeric": [">=", 7] }],
"type": [
{ "prefix": "UnauthorizedAccess:EC2/" },
{ "prefix": "Recon:EC2/" },
{ "prefix": "Trojan:EC2/" },
{ "prefix": "Backdoor:EC2/" },
{ "prefix": "Impact:EC2/" },
{ "prefix": "CryptoCurrency:EC2/" }
]
}
}
Giải thích:
severity >= 7: chỉ HIGH và CRITICAL findings.prefixmatching: cover tất cả sub-types trong mỗi category. Ví dụUnauthorizedAccess:EC2/sẽ match cảUnauthorizedAccess:EC2/RDPBruteForce,UnauthorizedAccess:EC2/SSHBruteForce, v.v.- Không dùng
resource.resourceTypeở đây vì prefix đã đủ specific cho EC2.
Rule cho IAM findings
{
"source": ["aws.guardduty"],
"detail-type": ["GuardDuty Finding"],
"detail": {
"severity": [{ "numeric": [">=", 7] }],
"type": [
{ "prefix": "UnauthorizedAccess:IAMUser/" },
{ "prefix": "Discovery:IAMUser/" },
{ "prefix": "PenTest:IAMUser/" }
]
}
}
Mình tách thành 2 rule riêng biệt (EC2 và IAM) để mỗi rule trigger một Lambda function khác nhau. Lý do: logic remediation khác nhau hoàn toàn, và khi có incident bạn muốn xem CloudWatch Logs của từng function riêng.
Lambda Function: cô lập EC2 instance
Đây là Lambda function chính xử lý EC2 findings. Mình viết bằng Python 3.12, runtime nhẹ, cold start nhanh.
Quarantine Security Group
Trước tiên, mỗi VPC cần có sẵn một Quarantine Security Group với rule:
- Inbound: không có rule nào (block tất cả traffic vào).
- Outbound: chỉ cho phép HTTPS (443) tới VPC endpoint của SSM — để sau này forensic team vẫn có thể SSM Session Manager vào instance mà không cần mở network.
{
"SecurityGroup": "sg-quarantine",
"InboundRules": [],
"OutboundRules": [
{
"Protocol": "tcp",
"Port": 443,
"Destination": "com.amazonaws.ap-southeast-1.ssm (VPC Endpoint prefix list)"
}
]
}
Code Lambda
import boto3
import json
import os
from datetime import datetime, timezone
ec2 = boto3.client('ec2')
sns = boto3.client('sns')
QUARANTINE_SG_ID = os.environ['QUARANTINE_SG_ID']
SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN']
def lambda_handler(event, context):
"""Handle GuardDuty EC2 finding - isolate instance."""
detail = event['detail']
finding_id = detail['id']
finding_type = detail['type']
severity = detail['severity']
instance_id = detail['resource']['instanceDetails']['instanceId']
account_id = detail['accountId']
region = detail['region']
print(f"Processing finding {finding_id}: {finding_type} "
f"(severity={severity}) for instance {instance_id}")
# Step 1: Cô lập instance - swap Security Group
original_sgs = isolate_instance(instance_id)
# Step 2: Snapshot tất cả EBS volumes để forensic
snapshot_ids = create_forensic_snapshots(instance_id, finding_id)
# Step 3: Tag instance với metadata incident
tag_instance(instance_id, finding_id, finding_type, original_sgs)
# Step 4: Notify security team
notify(finding_id, finding_type, instance_id, account_id,
region, severity, original_sgs, snapshot_ids)
return {
'statusCode': 200,
'findingId': finding_id,
'instanceId': instance_id,
'action': 'isolated',
'snapshots': snapshot_ids
}
def isolate_instance(instance_id):
"""Replace all SGs with quarantine SG. Return original SGs for rollback."""
# Lấy SG hiện tại trước khi thay đổi (để lưu vào tag cho rollback)
response = ec2.describe_instances(InstanceIds=[instance_id])
original_sgs = [
sg['GroupId']
for sg in response['Reservations'][0]['Instances'][0]['SecurityGroups']
]
# Swap sang quarantine SG
ec2.modify_instance_attribute(
InstanceId=instance_id,
Groups=[QUARANTINE_SG_ID]
)
print(f"Isolated {instance_id}: {original_sgs} -> [{QUARANTINE_SG_ID}]")
return original_sgs
def create_forensic_snapshots(instance_id, finding_id):
"""Snapshot all EBS volumes attached to the instance."""
volumes = ec2.describe_volumes(
Filters=[{'Name': 'attachment.instance-id', 'Values': [instance_id]}]
)
snapshot_ids = []
for vol in volumes['Volumes']:
snap = ec2.create_snapshot(
VolumeId=vol['VolumeId'],
Description=f"Forensic snapshot - GuardDuty finding {finding_id}",
TagSpecifications=[{
'ResourceType': 'snapshot',
'Tags': [
{'Key': 'Purpose', 'Value': 'Forensic'},
{'Key': 'IncidentId', 'Value': finding_id},
{'Key': 'SourceInstance', 'Value': instance_id},
{'Key': 'CreatedBy', 'Value': 'GuardDuty-AutoRemediation'}
]
}]
)
snapshot_ids.append(snap['SnapshotId'])
print(f"Created {len(snapshot_ids)} forensic snapshots: {snapshot_ids}")
return snapshot_ids
def tag_instance(instance_id, finding_id, finding_type, original_sgs):
"""Tag instance with incident metadata for tracking and rollback."""
now = datetime.now(timezone.utc).isoformat()
ec2.create_tags(
Resources=[instance_id],
Tags=[
{'Key': 'SecurityStatus', 'Value': 'Quarantined'},
{'Key': 'QuarantinedAt', 'Value': now},
{'Key': 'GuardDutyFindingId', 'Value': finding_id},
{'Key': 'GuardDutyFindingType', 'Value': finding_type},
{'Key': 'OriginalSecurityGroups', 'Value': ','.join(original_sgs)},
]
)
def notify(finding_id, finding_type, instance_id, account_id,
region, severity, original_sgs, snapshot_ids):
"""Send SNS notification to security team."""
message = {
'summary': f"🚨 EC2 Instance Isolated - {finding_type}",
'finding_id': finding_id,
'instance_id': instance_id,
'account_id': account_id,
'region': region,
'severity': severity,
'action_taken': 'Instance isolated (SG swapped to quarantine)',
'original_security_groups': original_sgs,
'forensic_snapshots': snapshot_ids,
'next_steps': [
'Review finding in GuardDuty console',
'Use SSM Session Manager to investigate instance',
'Analyze forensic snapshots',
'If false positive: restore original SGs from tag'
]
}
sns.publish(
TopicArn=SNS_TOPIC_ARN,
Subject=f"[SECURITY] EC2 Isolated: {instance_id} ({finding_type})",
Message=json.dumps(message, indent=2)
)
Điểm quan trọng trong code
- Lưu original SGs vào tag: khi analyst xác nhận false positive, họ có thể rollback bằng cách đọc tag
OriginalSecurityGroupsvà restore lại. Không cần tìm trong CloudTrail. - Snapshot trước khi bất kỳ thay đổi nào khác: đảm bảo có bản sao nguyên vẹn của disk tại thời điểm phát hiện.
- Không terminate instance: cô lập ≠ xoá. Instance vẫn chạy để forensic team phân tích memory, process, network connections qua SSM.
- Idempotent: nếu Lambda bị invoke lại (EventBridge retry), việc swap SG và tag lại không gây side effect. Snapshot sẽ tạo thêm bản mới nhưng không ảnh hưởng gì.
Auto-revoke IAM credentials bị xâm phạm
Pattern thứ hai mình triển khai song song: khi GuardDuty phát hiện IAM credential bị compromise (ví dụ: UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration.OutsideAWS), Lambda sẽ tự động vô hiệu hoá credential đó.
Phân biệt loại credential
GuardDuty finding cho IAM có 2 dạng chính:
| Access Key bắt đầu bằng | Loại | Hành động |
|---|---|---|
AKIA | Long-term (IAM User) | Disable key + attach deny-all policy |
ASIA | Temporary (STS/Role) | Revoke sessions + update trust policy |
Lambda code cho IAM remediation
import boto3
import json
import os
from datetime import datetime, timezone
iam = boto3.client('iam')
sns = boto3.client('sns')
SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN']
def lambda_handler(event, context):
"""Handle GuardDuty IAM finding - revoke compromised credentials."""
detail = event['detail']
finding_id = detail['id']
finding_type = detail['type']
# Extract IAM details from finding
resource = detail['resource']
access_key_details = resource.get('accessKeyDetails', {})
access_key_id = access_key_details.get('accessKeyId', '')
principal_id = access_key_details.get('principalId', '')
user_type = access_key_details.get('userType', '')
username = access_key_details.get('userName', '')
print(f"Processing IAM finding: {finding_type}, "
f"key={access_key_id}, user={username}, type={user_type}")
if access_key_id.startswith('AKIA'):
# Long-term credential - IAM User
revoke_long_term_credential(username, access_key_id, finding_id)
elif access_key_id.startswith('ASIA'):
# Temporary credential - Role/STS
revoke_temporary_credential(username, principal_id, finding_id)
else:
print(f"Unknown credential type: {access_key_id}")
notify_iam_revocation(finding_id, finding_type, username,
access_key_id, user_type)
return {'statusCode': 200, 'action': 'credentials_revoked'}
def revoke_long_term_credential(username, access_key_id, finding_id):
"""Disable access key and attach deny-all policy to IAM user."""
# Step 1: Disable the compromised access key
iam.update_access_key(
UserName=username,
AccessKeyId=access_key_id,
Status='Inactive'
)
print(f"Disabled access key {access_key_id} for user {username}")
# Step 2: Attach explicit deny-all policy
# Deny policy đảm bảo ngay cả khi có key khác còn active,
# user cũng không thể làm gì cho đến khi security team review
deny_policy = {
"Version": "2012-10-17",
"Statement": [{
"Sid": "DenyAllAfterCompromise",
"Effect": "Deny",
"Action": "*",
"Resource": "*",
"Condition": {
"DateGreaterThan": {
"aws:TokenIssueTime": datetime.now(timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
}
}
}]
}
iam.put_user_policy(
UserName=username,
PolicyName=f'GuardDuty-Deny-{finding_id[:8]}',
PolicyDocument=json.dumps(deny_policy)
)
print(f"Attached deny-all policy to user {username}")
def revoke_temporary_credential(role_name, principal_id, finding_id):
"""Revoke all active sessions for a role."""
# Với temporary credentials (ASIA*), ta không thể "disable" key
# mà phải revoke tất cả sessions bằng cách thêm inline policy
# với condition DateLessThan = now
revoke_policy = {
"Version": "2012-10-17",
"Statement": [{
"Sid": "RevokeOlderSessions",
"Effect": "Deny",
"Action": "*",
"Resource": "*",
"Condition": {
"DateLessThan": {
"aws:TokenIssueTime": datetime.now(timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
}
}
}]
}
try:
iam.put_role_policy(
RoleName=role_name,
PolicyName=f'RevokeOlderSessions-{finding_id[:8]}',
PolicyDocument=json.dumps(revoke_policy)
)
print(f"Revoked all sessions for role {role_name}")
except iam.exceptions.NoSuchEntityException:
print(f"Role {role_name} not found - may be in another account")
def notify_iam_revocation(finding_id, finding_type, username,
access_key_id, user_type):
"""Notify security team about credential revocation."""
sns.publish(
TopicArn=SNS_TOPIC_ARN,
Subject=f"[SECURITY] IAM Credentials Revoked: {username}",
Message=json.dumps({
'summary': f"🔑 IAM Credentials Revoked - {finding_type}",
'finding_id': finding_id,
'username': username,
'access_key_id': access_key_id,
'user_type': user_type,
'action_taken': 'Key disabled + deny-all policy attached',
'next_steps': [
'Investigate CloudTrail for actions taken with this credential',
'Check for persistence mechanisms (new users, roles, keys)',
'Rotate any secrets this credential had access to',
'Remove deny policy after investigation complete'
]
}, indent=2)
)
Tại sao dùng deny policy thay vì chỉ disable key?
Disable access key chỉ chặn key đó. Nhưng nếu attacker đã:
- Tạo thêm access key khác cho cùng user
- Assume role khác bằng credential đã compromise
- Tạo console password
Thì deny-all inline policy sẽ override tất cả permission, bất kể credential nào được dùng. Đây là defense-in-depth: disable key (chặn vector cụ thể) + deny policy (chặn toàn bộ user).
Lưu ý quan trọng: deny policy dùng Condition với aws:TokenIssueTime để chỉ block sessions được issue sau thời điểm compromise được phát hiện. Sessions cũ hơn (nếu có) cũng bị block bởi DateLessThan condition trong revoke policy cho role.
IAM Policies cho Lambda functions
Execution Role cho EC2 Remediation Lambda
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "EC2Isolation",
"Effect": "Allow",
"Action": [
"ec2:ModifyInstanceAttribute",
"ec2:DescribeInstances",
"ec2:CreateTags"
],
"Resource": "*",
"Condition": {
"StringEquals": {
"aws:RequestedRegion": ["ap-southeast-1", "us-east-1"]
}
}
},
{
"Sid": "EBSForensicSnapshot",
"Effect": "Allow",
"Action": [
"ec2:DescribeVolumes",
"ec2:CreateSnapshot",
"ec2:CreateTags"
],
"Resource": "*"
},
{
"Sid": "Notification",
"Effect": "Allow",
"Action": "sns:Publish",
"Resource": "arn:aws:sns:ap-southeast-1:111122223333:SecOps-Alerts"
},
{
"Sid": "Logging",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:111122223333:*"
},
{
"Sid": "CrossAccountAssume",
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Resource": "arn:aws:iam::*:role/SOAR-RemediationRole"
}
]
}
Execution Role cho IAM Remediation Lambda
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "IAMRemediation",
"Effect": "Allow",
"Action": [
"iam:UpdateAccessKey",
"iam:PutUserPolicy",
"iam:PutRolePolicy"
],
"Resource": [
"arn:aws:iam::111122223333:user/*",
"arn:aws:iam::111122223333:role/*"
]
},
{
"Sid": "Notification",
"Effect": "Allow",
"Action": "sns:Publish",
"Resource": "arn:aws:sns:ap-southeast-1:111122223333:SecOps-Alerts"
},
{
"Sid": "Logging",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:111122223333:*"
},
{
"Sid": "CrossAccountAssume",
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Resource": "arn:aws:iam::*:role/SOAR-RemediationRole"
}
]
}
Trust Policy cho Lambda Execution Role
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}]
}
Nguyên tắc least privilege mình áp dụng:
- Giới hạn region bằng
aws:RequestedRegioncondition — Lambda chỉ có thể thao tác trên instance ở region mình cho phép. - SNS publish chỉ cho topic cụ thể, không phải
*. - Cross-account assume chỉ cho role name cố định (
SOAR-RemediationRole), không phải bất kỳ role nào. - IAM actions giới hạn ở
UpdateAccessKey,PutUserPolicy,PutRolePolicy— không cóDeleteUser,CreateUser, hayAttachUserPolicy(managed policy).
Multi-account setup với Organizations + Security Hub
Trong thực tế, không ai chỉ có 1 AWS account. Mình vận hành 30+ accounts trong một Organization, và pattern auto-remediation cần hoạt động xuyên suốt tất cả accounts.
Kiến trúc Delegated Administrator
┌─────────────────────────────────────────────────────────────────────┐
│ AWS Organization │
├─────────────────────────────────────────────────────────────────────┤
│ Management Account (root) │
│ └── Chỉ làm 1 việc: designate delegated admin │
│ • guardduty:EnableOrganizationAdminAccount │
│ • securityhub:EnableOrganizationAdminAccount │
├─────────────────────────────────────────────────────────────────────┤
│ Security Account (Delegated Admin) ← TẤT CẢ LOGIC Ở ĐÂY │
│ ├── GuardDuty Administrator (aggregates findings từ mọi account) │
│ ├── Security Hub Administrator (single pane of glass) │
│ ├── EventBridge Rules (filter + route findings) │
│ ├── Lambda Functions (execute remediation) │
│ ├── Step Functions (complex playbooks) │
│ └── SNS Topics (notifications) │
├─────────────────────────────────────────────────────────────────────┤
│ Member Account A (Production) │
│ ├── GuardDuty enabled (auto-enrolled by delegated admin) │
│ ├── Security Hub enabled (auto-enrolled) │
│ └── IAM Role: SOAR-RemediationRole (trust Security Account) │
├─────────────────────────────────────────────────────────────────────┤
│ Member Account B (Staging) │
│ ├── GuardDuty enabled (auto-enrolled) │
│ ├── Security Hub enabled (auto-enrolled) │
│ └── IAM Role: SOAR-RemediationRole (trust Security Account) │
└─────────────────────────────────────────────────────────────────────┘
Bước 1: Designate Delegated Admin (chạy từ Management Account)
# GuardDuty delegated admin
aws guardduty enable-organization-admin-account \
--admin-account-id 222222222222
# Security Hub delegated admin
aws securityhub enable-organization-admin-account \
--admin-account-id 222222222222
Bước 2: Auto-enable cho tất cả member accounts (chạy từ Security Account)
# GuardDuty: auto-enable cho mọi account mới join Organization
DETECTOR_ID=$(aws guardduty list-detectors --query 'DetectorIds[0]' --output text)
aws guardduty update-organization-configuration \
--detector-id $DETECTOR_ID \
--auto-enable-organization-members ALL \
--features '[
{"Name": "S3_DATA_EVENTS", "AutoEnable": "ALL"},
{"Name": "EKS_AUDIT_LOGS", "AutoEnable": "ALL"},
{"Name": "RUNTIME_MONITORING", "AutoEnable": "ALL",
"AdditionalConfiguration": [
{"Name": "EKS_ADDON_MANAGEMENT", "AutoEnable": "ALL"},
{"Name": "EC2_AGENT_MANAGEMENT", "AutoEnable": "ALL"}
]}
]'
# Security Hub: auto-enable
aws securityhub update-organization-configuration \
--auto-enable
Bước 3: Deploy SOAR-RemediationRole qua StackSets
Mỗi member account cần một IAM role mà Lambda ở Security Account có thể assume vào:
AWSTemplateFormatVersion: '2010-09-09'
Description: Cross-account role for GuardDuty auto-remediation
Parameters:
SecurityAccountId:
Type: String
Description: Account ID of the Security/Audit account
Resources:
SOARRemediationRole:
Type: AWS::IAM::Role
Properties:
RoleName: SOAR-RemediationRole
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${SecurityAccountId}:role/GuardDuty-EC2-Remediation-Role'
Action: sts:AssumeRole
Condition:
StringEquals:
sts:ExternalId: !Ref 'AWS::AccountId'
Policies:
- PolicyName: RemediationPermissions
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: EC2Isolation
Effect: Allow
Action:
- ec2:ModifyInstanceAttribute
- ec2:DescribeInstances
- ec2:DescribeVolumes
- ec2:CreateSnapshot
- ec2:CreateTags
- ec2:DescribeSecurityGroups
Resource: '*'
- Sid: IAMRemediation
Effect: Allow
Action:
- iam:UpdateAccessKey
- iam:PutUserPolicy
- iam:PutRolePolicy
- iam:ListAccessKeys
Resource:
- !Sub 'arn:aws:iam::${AWS::AccountId}:user/*'
- !Sub 'arn:aws:iam::${AWS::AccountId}:role/*'
Deploy bằng StackSet:
aws cloudformation create-stack-set \
--stack-set-name soar-remediation-role \
--template-body file://soar-remediation-role.yaml \
--permission-model SERVICE_MANAGED \
--auto-deployment Enabled=true,RetainStacksOnAccountRemoval=false \
--capabilities CAPABILITY_NAMED_IAM \
--parameters ParameterKey=SecurityAccountId,ParameterValue=222222222222
aws cloudformation create-stack-instances \
--stack-set-name soar-remediation-role \
--deployment-targets OrganizationalUnitIds=ou-xxxx-yyyyyyyy \
--regions ap-southeast-1
Bước 4: Cross-account Lambda logic
Khi GuardDuty finding đến từ member account, Lambda cần assume role vào account đó:
import boto3
def get_cross_account_client(service, account_id, region='ap-southeast-1'):
"""Assume SOAR-RemediationRole in member account."""
sts = boto3.client('sts')
assumed = sts.assume_role(
RoleArn=f'arn:aws:iam::{account_id}:role/SOAR-RemediationRole',
RoleSessionName='GuardDuty-AutoRemediation',
ExternalId=account_id # thêm ExternalId để chống confused deputy
)
return boto3.client(
service,
region_name=region,
aws_access_key_id=assumed['Credentials']['AccessKeyId'],
aws_secret_access_key=assumed['Credentials']['SecretAccessKey'],
aws_session_token=assumed['Credentials']['SessionToken']
)
def lambda_handler(event, context):
detail = event['detail']
account_id = detail['accountId']
region = detail['region']
instance_id = detail['resource']['instanceDetails']['instanceId']
# Nếu finding từ chính security account → dùng client local
# Nếu từ member account → assume role
if account_id == context.invoked_function_arn.split(':')[4]:
ec2 = boto3.client('ec2', region_name=region)
else:
ec2 = get_cross_account_client('ec2', account_id, region)
# Tiếp tục logic remediation như trước...
ec2.modify_instance_attribute(
InstanceId=instance_id,
Groups=[get_quarantine_sg(account_id, region)]
)
Security Hub Aggregation: single pane of glass
Khi GuardDuty là delegated admin, tất cả findings tự động xuất hiện trong Security Hub của Security Account. Điều này cho phép:
- Unified view: analyst nhìn findings từ mọi account ở một chỗ.
- Cross-account correlation: Security Hub correlate findings từ nhiều account để phát hiện lateral movement.
- Custom Actions: analyst có thể trigger manual remediation từ Security Hub console cho findings mà auto-remediation không cover.
- Compliance tracking: Security Hub standards (CIS, AWS Foundational) chạy song song, cho context thêm về security posture.
Mình cũng cấu hình Security Hub finding aggregation để aggregate findings từ tất cả regions về một region chính:
aws securityhub create-finding-aggregator \
--region ap-southeast-1 \
--region-linking-mode ALL_REGIONS
Vận hành: testing, monitoring, và xử lý false positives
Test với GuardDuty sample findings
GuardDuty có API tạo sample findings — đây là cách mình validate pipeline trước khi đưa vào production:
# Tạo sample findings cho tất cả finding types
DETECTOR_ID=$(aws guardduty list-detectors --query 'DetectorIds[0]' --output text)
aws guardduty create-sample-findings \
--detector-id $DETECTOR_ID \
--finding-types \
"UnauthorizedAccess:EC2/MaliciousIPCaller.Custom" \
"CryptoCurrency:EC2/BitcoinTool.B!DNS" \
"UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration.OutsideAWS"
Sample findings có sample: true trong event detail, nên mình thêm check trong Lambda:
def lambda_handler(event, context):
detail = event['detail']
# Skip sample findings trong production
if detail.get('service', {}).get('additionalInfo', {}).get('sample') is True:
print("Sample finding detected - skipping remediation")
# Vẫn log để confirm pipeline hoạt động
return {'statusCode': 200, 'action': 'skipped_sample'}
# ... logic remediation thật
Trong môi trường staging/test, mình bỏ check này để sample findings trigger full pipeline — validate end-to-end.
GuardDuty Tester (open-source)
AWS cung cấp GuardDuty Tester — tool dùng CDK deploy một môi trường test với EC2 instances thực sự thực hiện các hành vi suspicious (port scan, DNS query tới known-bad domain, crypto mining simulation). Tool này tạo real findings (không phải sample), nên pipeline sẽ trigger y như production.
# Clone và deploy
git clone https://github.com/awslabs/amazon-guardduty-tester.git
cd amazon-guardduty-tester
npm install
cdk deploy
# Chạy test scenarios
aws ssm send-command \
--document-name "GuardDutyTesterDocument" \
--targets "Key=tag:Purpose,Values=GuardDutyTester"
Monitoring false positives
False positive là rủi ro lớn nhất của auto-remediation. Một instance production bị isolate nhầm = downtime. Mình giảm thiểu bằng:
1. Suppression Rules trong GuardDuty:
aws guardduty create-filter \
--detector-id $DETECTOR_ID \
--name "SuppressNATGatewayPortScan" \
--action ARCHIVE \
--finding-criteria '{
"Criterion": {
"type": {
"Eq": ["Recon:EC2/Portscan"]
},
"resource.instanceDetails.tags.value": {
"Eq": ["nat-gateway"]
}
}
}'
Ví dụ: NAT Gateway instance thường bị flag là port scan vì nó forward traffic từ nhiều source. Suppression rule archive finding này trước khi nó đến EventBridge.
2. Trusted IP Lists:
# Tạo trusted IP list cho office IP, VPN, known scanners
aws guardduty create-ip-set \
--detector-id $DETECTOR_ID \
--name "TrustedIPs" \
--format TXT \
--location s3://my-security-bucket/trusted-ips.txt \
--activate
File trusted-ips.txt:
# Office IPs
203.0.113.0/24
# VPN endpoints
198.51.100.0/24
# Authorized pen-test provider
192.0.2.50/32
3. Tag-based exclusion trong Lambda:
def should_remediate(instance_id, ec2_client):
"""Check if instance should be auto-remediated based on tags."""
response = ec2_client.describe_instances(InstanceIds=[instance_id])
instance = response['Reservations'][0]['Instances'][0]
tags = {t['Key']: t['Value'] for t in instance.get('Tags', [])}
# Skip instances tagged as exempt
if tags.get('AutoRemediation') == 'disabled':
print(f"Instance {instance_id} is exempt from auto-remediation")
return False
# Skip instances already quarantined (avoid double-processing)
if tags.get('SecurityStatus') == 'Quarantined':
print(f"Instance {instance_id} already quarantined")
return False
return True
4. CloudWatch Metrics và Alarms:
Mình publish custom metrics từ Lambda để track:
import boto3
cloudwatch = boto3.client('cloudwatch')
def publish_metrics(action, finding_type, is_false_positive=False):
cloudwatch.put_metric_data(
Namespace='GuardDuty/AutoRemediation',
MetricData=[
{
'MetricName': 'RemediationActions',
'Value': 1,
'Unit': 'Count',
'Dimensions': [
{'Name': 'Action', 'Value': action},
{'Name': 'FindingType', 'Value': finding_type}
]
},
{
'MetricName': 'FalsePositives',
'Value': 1 if is_false_positive else 0,
'Unit': 'Count',
'Dimensions': [
{'Name': 'FindingType', 'Value': finding_type}
]
}
]
)
Alarm khi false positive rate > 10% trong 7 ngày → tự động disable auto-remediation cho finding type đó và notify team review suppression rules.
5. Rollback procedure:
Khi analyst xác nhận false positive, rollback đơn giản:
# Đọc original SGs từ tag
ORIGINAL_SGS=$(aws ec2 describe-tags \
--filters "Name=resource-id,Values=i-0123456789" \
"Name=key,Values=OriginalSecurityGroups" \
--query 'Tags[0].Value' --output text)
# Restore original security groups
aws ec2 modify-instance-attribute \
--instance-id i-0123456789 \
--groups $(echo $ORIGINAL_SGS | tr ',' ' ')
# Update tags
aws ec2 create-tags \
--resources i-0123456789 \
--tags Key=SecurityStatus,Value=Restored \
Key=RestoredAt,Value=$(date -u +%Y-%m-%dT%H:%M:%SZ)
Operational checklist
Trước khi bật auto-remediation trong production, mình luôn đi qua checklist này:
| # | Item | Status |
|---|---|---|
| 1 | Quarantine SG tồn tại ở mọi VPC, mọi account | ☐ |
| 2 | SOAR-RemediationRole deployed ở mọi member account | ☐ |
| 3 | Lambda tested với sample findings (end-to-end) | ☐ |
| 4 | Suppression rules cho known false positives đã cấu hình | ☐ |
| 5 | Trusted IP list đã upload và activate | ☐ |
| 6 | SNS topic có subscriber (email/Slack/PagerDuty) | ☐ |
| 7 | CloudWatch alarms cho Lambda errors và false positive rate | ☐ |
| 8 | Runbook cho rollback procedure đã document | ☐ |
| 9 | DryRun mode tested trong 1 tuần không có unexpected trigger | ☐ |
| 10 | Stakeholders (DevOps, App teams) đã được thông báo | ☐ |
Kết luận
Auto-remediation không phải “bật lên và quên đi”. Nó là một hệ thống sống cần tuning liên tục:
- Tuần đầu: chạy DryRun, chỉ log + notify, không hành động.
- Tuần 2–4: bật remediation cho 1–2 finding types có confidence cao nhất (
CryptoCurrency:EC2/*,UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration). - Tháng 2+: mở rộng dần sang các finding types khác, thêm suppression rules khi phát hiện false positives.
Pattern mình mô tả ở đây — GuardDuty → EventBridge → Lambda — là nền tảng. Từ đây bạn có thể mở rộng:
- Thêm Step Functions cho playbooks phức tạp (human approval trước khi terminate instance).
- Tích hợp Amazon Detective để tự động enrich finding với context (related findings, resource history).
- Kết nối Slack/Teams qua chatbot để analyst approve/reject remediation từ mobile.
- Dùng DynamoDB track blocked IPs với TTL để tự động unblock sau 24h.
Quan trọng nhất: test, test, test. Dùng sample findings, dùng GuardDuty Tester, chạy game day với team. Khi incident thật xảy ra, bạn muốn pipeline đã được battle-tested, không phải lần đầu chạy.