Implement Compliance Controls
Implement Compliance Controls
Step-by-step guide to implementing technical and administrative controls for regulatory compliance across your infrastructure.
Compliance Framework Overview
Implementing compliance controls requires a systematic approach that addresses people, processes, and technology to meet regulatory requirements.
Key Components
- Risk Assessment: Identify and evaluate compliance risks
- Control Implementation: Deploy technical and administrative controls
- Monitoring: Continuous compliance verification
- Documentation: Maintain evidence for audits
- Improvement: Regular updates and enhancements
Technical Controls Implementation
Access Control Implementation
# RBAC implementation with AWS IAM
# Create compliance-specific roles
aws iam create-role --role-name ComplianceAuditor \
--assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "ec2.amazonaws.com"},
"Action": "sts:AssumeRole"
}]
}'
# Attach read-only policies for audit access
aws iam attach-role-policy --role-name ComplianceAuditor \
--policy-arn arn:aws:iam::aws:policy/SecurityAudit
# Create custom policy for compliance tools
aws iam create-policy --policy-name ComplianceMonitoring \
--policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"logs:Describe*",
"logs:Get*",
"logs:List*",
"cloudtrail:LookupEvents",
"config:Describe*"
],
"Resource": "*"
}]
}'
Encryption Controls
# Implement encryption at rest
# PostgreSQL transparent data encryption
ALTER SYSTEM SET ssl = on;
ALTER SYSTEM SET ssl_cert_file = '/etc/postgresql/server.crt';
ALTER SYSTEM SET ssl_key_file = '/etc/postgresql/server.key';
# Application-level encryption
from cryptography.fernet import Fernet
import os
class ComplianceEncryption:
def __init__(self):
self.key = os.environ.get('ENCRYPTION_KEY', Fernet.generate_key())
self.cipher = Fernet(self.key)
def encrypt_pii(self, data):
"""Encrypt personally identifiable information"""
return self.cipher.encrypt(data.encode()).decode()
def decrypt_pii(self, encrypted_data):
"""Decrypt PII for authorized access"""
return self.cipher.decrypt(encrypted_data.encode()).decode()
Audit Logging Implementation
# Comprehensive audit logging setup
# Application audit logger
import logging
import json
from datetime import datetime
class ComplianceLogger:
def __init__(self, log_file='/var/log/compliance/audit.log'):
self.logger = logging.getLogger('compliance')
handler = logging.FileHandler(log_file)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_access(self, user, resource, action, result):
"""Log all access attempts for compliance"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'user': user,
'resource': resource,
'action': action,
'result': result,
'ip_address': self.get_client_ip(),
'session_id': self.get_session_id()
}
self.logger.info(json.dumps(log_entry))
Data Protection Controls
Data Classification
Classification | Examples | Controls Required |
---|---|---|
Public | Marketing materials | Basic access controls |
Internal | Employee directories | Authentication required |
Confidential | Financial reports | Encryption, audit logging |
Restricted | PII, PHI, PCI data | Full encryption, MFA, monitoring |
Data Loss Prevention (DLP)
# DLP policy implementation
class DLPEngine:
def __init__(self):
self.patterns = {
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
}
def scan_content(self, content):
"""Scan content for sensitive data patterns"""
findings = []
for data_type, pattern in self.patterns.items():
if re.search(pattern, content):
findings.append({
'type': data_type,
'severity': 'high',
'action': 'block'
})
return findings
def enforce_policy(self, content, destination):
"""Enforce DLP policies on data movement"""
findings = self.scan_content(content)
if findings:
self.alert_security_team(findings)
if destination.is_external():
raise DLPViolation("Sensitive data transfer blocked")
Compliance Monitoring
Continuous Compliance Monitoring
# AWS Config Rules for continuous compliance
# PCI DSS compliance rule
aws configservice put-config-rule --config-rule '{
"ConfigRuleName": "encrypted-volumes",
"Description": "Checks whether EBS volumes are encrypted",
"Source": {
"Owner": "AWS",
"SourceIdentifier": "ENCRYPTED_VOLUMES"
},
"Scope": {
"ComplianceResourceTypes": [
"AWS::EC2::Volume"
]
}
}'
# Custom compliance rule
aws configservice put-config-rule --config-rule '{
"ConfigRuleName": "approved-amis-by-tag",
"Description": "Checks whether instances are launched from approved AMIs",
"Source": {
"Owner": "AWS",
"SourceIdentifier": "APPROVED_AMIS_BY_TAG"
},
"InputParameters": "{\"tag1Key\":\"Approved\",\"tag1Value\":\"true\"}"
}'
Compliance Dashboard
# Compliance metrics collection
class ComplianceDashboard:
def __init__(self):
self.metrics = {
'patch_compliance': 0,
'encryption_compliance': 0,
'access_review_completion': 0,
'training_completion': 0
}
def calculate_patch_compliance(self):
"""Calculate percentage of systems with current patches"""
total_systems = self.get_total_systems()
patched_systems = self.get_patched_systems()
return (patched_systems / total_systems) * 100
def generate_compliance_report(self):
"""Generate executive compliance report"""
report = {
'generated_at': datetime.utcnow().isoformat(),
'overall_compliance': self.calculate_overall_score(),
'metrics': self.metrics,
'findings': self.get_findings(),
'recommendations': self.get_recommendations()
}
return report
Vulnerability Management
Automated Vulnerability Scanning
#!/bin/bash
# Automated vulnerability scanning script
# Install and configure OpenVAS
apt-get update && apt-get install -y openvas
gvm-setup
gvm-feed-update
# Create scan configuration
cat > /etc/gvm/compliance-scan.xml << EOF
Compliance Scan
PCI DSS quarterly scan
daba56c8-73ec-11df-a475-002264764cea
EOF
# Schedule weekly scans
crontab -l > mycron
echo "0 2 * * 0 /usr/bin/gvm-cli --gmp-username admin --gmp-password $GVM_PASS socket --xml ''" >> mycron
crontab mycron
Patch Management
# Automated patch management
class PatchManagement:
def __init__(self):
self.critical_patch_sla = 30 # days
self.standard_patch_sla = 90 # days
def assess_patches(self):
"""Assess available patches by criticality"""
patches = self.get_available_patches()
categorized = {
'critical': [],
'high': [],
'medium': [],
'low': []
}
for patch in patches:
severity = self.get_patch_severity(patch)
categorized[severity].append(patch)
return categorized
def schedule_patching(self, patches):
"""Schedule patches based on criticality and maintenance windows"""
for severity, patch_list in patches.items():
if severity == 'critical':
self.schedule_emergency_patching(patch_list)
else:
self.schedule_maintenance_window(patch_list, severity)
Identity and Access Management
Access Review Process
# Automated access review system
class AccessReview:
def __init__(self):
self.review_frequency = 90 # days
def generate_access_report(self):
"""Generate access report for review"""
users = self.get_all_users()
report = []
for user in users:
user_access = {
'username': user.username,
'full_name': user.full_name,
'department': user.department,
'manager': user.manager,
'last_login': user.last_login,
'permissions': self.get_user_permissions(user),
'privileged_access': self.has_privileged_access(user),
'review_required': self.needs_review(user)
}
report.append(user_access)
return report
def process_review_decisions(self, decisions):
"""Process manager decisions on access reviews"""
for decision in decisions:
if decision['action'] == 'revoke':
self.revoke_access(decision['user'], decision['permission'])
elif decision['action'] == 'modify':
self.modify_access(decision['user'], decision['changes'])
self.log_review_decision(decision)
Privileged Access Management
# Just-in-time privileged access
class PrivilegedAccessManager:
def __init__(self):
self.max_elevation_time = 4 # hours
def request_elevation(self, user, role, justification):
"""Handle privileged access requests"""
request = {
'id': self.generate_request_id(),
'user': user,
'requested_role': role,
'justification': justification,
'requested_at': datetime.utcnow(),
'expires_at': datetime.utcnow() + timedelta(hours=self.max_elevation_time)
}
# Auto-approve based on policy or queue for approval
if self.auto_approve_eligible(user, role):
return self.grant_temporary_access(request)
else:
return self.queue_for_approval(request)
def grant_temporary_access(self, request):
"""Grant time-limited privileged access"""
# Create temporary role binding
self.create_temp_role(request['user'], request['requested_role'])
# Schedule automatic revocation
self.schedule_revocation(request['id'], request['expires_at'])
# Log for audit
self.audit_log.record_privilege_grant(request)
return request['id']
Incident Response
Automated Incident Response
# Security incident response automation
class IncidentResponse:
def __init__(self):
self.severity_levels = ['critical', 'high', 'medium', 'low']
self.response_teams = self.load_response_teams()
def detect_incident(self, event):
"""Analyze event for security incidents"""
incident_indicators = [
self.check_failed_logins(event),
self.check_data_exfiltration(event),
self.check_privilege_escalation(event),
self.check_malware_indicators(event)
]
if any(incident_indicators):
return self.create_incident(event, incident_indicators)
def respond_to_incident(self, incident):
"""Execute incident response playbook"""
severity = self.assess_severity(incident)
# Immediate containment
if severity in ['critical', 'high']:
self.contain_threat(incident)
# Notify response team
self.notify_team(incident, severity)
# Collect forensics
self.collect_evidence(incident)
# Create incident ticket
ticket_id = self.create_incident_ticket(incident)
return ticket_id
Compliance Reporting
Automated Compliance Reports
# Compliance report generator
class ComplianceReporter:
def __init__(self):
self.report_templates = {
'pci_dss': 'templates/pci_dss_report.html',
'hipaa': 'templates/hipaa_report.html',
'gdpr': 'templates/gdpr_report.html',
'soc2': 'templates/soc2_report.html'
}
def generate_report(self, framework, period):
"""Generate compliance report for specific framework"""
# Collect metrics
metrics = {
'control_effectiveness': self.measure_control_effectiveness(framework),
'vulnerabilities': self.get_vulnerability_metrics(period),
'incidents': self.get_incident_metrics(period),
'access_reviews': self.get_access_review_status(),
'training': self.get_training_completion(),
'patches': self.get_patch_compliance()
}
# Generate evidence
evidence = self.collect_evidence(framework, period)
# Create report
report = self.render_report(framework, metrics, evidence)
return report
Third-Party Risk Management
Vendor Assessment
- Security questionnaires
- SOC 2 report review
- Penetration test results
- Insurance verification
- Compliance certifications
Ongoing Monitoring
- Continuous security ratings
- Breach notification requirements
- Annual reassessment
- Contract compliance verification
Implementation Timeline
Phase | Duration | Activities |
---|---|---|
Assessment | 2-4 weeks | Gap analysis, risk assessment |
Planning | 2-3 weeks | Control selection, resource allocation |
Implementation | 3-6 months | Deploy controls, configure systems |
Testing | 2-4 weeks | Validate controls, remediate gaps |
Certification | 2-4 weeks | External audit, certification |
Best Practices
- Automate: Use tools to enforce and monitor compliance
- Document: Maintain comprehensive evidence
- Train: Regular compliance awareness training
- Test: Regular control effectiveness testing
- Improve: Continuous enhancement based on findings
Related Resources
Note: This documentation is provided for reference purposes only. It reflects general best practices and industry-aligned guidelines, and any examples, claims, or recommendations are intended as illustrative—not definitive or binding.