Home / Documentation / Security & Compliance / Implement Compliance Controls

Implement Compliance Controls

13 min read
Updated Jun 19, 2025

Implement Compliance Controls

Step-by-step guide to implementing technical and administrative controls for regulatory compliance across your infrastructure.

Compliance Framework Overview

Implementing compliance controls requires a systematic approach that addresses people, processes, and technology to meet regulatory requirements.

Key Components

  • Risk Assessment: Identify and evaluate compliance risks
  • Control Implementation: Deploy technical and administrative controls
  • Monitoring: Continuous compliance verification
  • Documentation: Maintain evidence for audits
  • Improvement: Regular updates and enhancements

Technical Controls Implementation

Access Control Implementation

# RBAC implementation with AWS IAM
# Create compliance-specific roles
aws iam create-role --role-name ComplianceAuditor \
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [{
      "Effect": "Allow",
      "Principal": {"Service": "ec2.amazonaws.com"},
      "Action": "sts:AssumeRole"
    }]
  }'

# Attach read-only policies for audit access
aws iam attach-role-policy --role-name ComplianceAuditor \
  --policy-arn arn:aws:iam::aws:policy/SecurityAudit

# Create custom policy for compliance tools
aws iam create-policy --policy-name ComplianceMonitoring \
  --policy-document '{
    "Version": "2012-10-17",
    "Statement": [{
      "Effect": "Allow",
      "Action": [
        "logs:Describe*",
        "logs:Get*",
        "logs:List*",
        "cloudtrail:LookupEvents",
        "config:Describe*"
      ],
      "Resource": "*"
    }]
  }'

Encryption Controls

# Implement encryption at rest
# PostgreSQL transparent data encryption
ALTER SYSTEM SET ssl = on;
ALTER SYSTEM SET ssl_cert_file = '/etc/postgresql/server.crt';
ALTER SYSTEM SET ssl_key_file = '/etc/postgresql/server.key';

# Application-level encryption
from cryptography.fernet import Fernet
import os

class ComplianceEncryption:
    def __init__(self):
        self.key = os.environ.get('ENCRYPTION_KEY', Fernet.generate_key())
        self.cipher = Fernet(self.key)
    
    def encrypt_pii(self, data):
        """Encrypt personally identifiable information"""
        return self.cipher.encrypt(data.encode()).decode()
    
    def decrypt_pii(self, encrypted_data):
        """Decrypt PII for authorized access"""
        return self.cipher.decrypt(encrypted_data.encode()).decode()

Audit Logging Implementation

# Comprehensive audit logging setup
# Application audit logger
import logging
import json
from datetime import datetime

class ComplianceLogger:
    def __init__(self, log_file='/var/log/compliance/audit.log'):
        self.logger = logging.getLogger('compliance')
        handler = logging.FileHandler(log_file)
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
    
    def log_access(self, user, resource, action, result):
        """Log all access attempts for compliance"""
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'user': user,
            'resource': resource,
            'action': action,
            'result': result,
            'ip_address': self.get_client_ip(),
            'session_id': self.get_session_id()
        }
        self.logger.info(json.dumps(log_entry))

Data Protection Controls

Data Classification

Classification Examples Controls Required
Public Marketing materials Basic access controls
Internal Employee directories Authentication required
Confidential Financial reports Encryption, audit logging
Restricted PII, PHI, PCI data Full encryption, MFA, monitoring

Data Loss Prevention (DLP)

# DLP policy implementation
class DLPEngine:
    def __init__(self):
        self.patterns = {
            'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
            'credit_card': r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
        }
    
    def scan_content(self, content):
        """Scan content for sensitive data patterns"""
        findings = []
        for data_type, pattern in self.patterns.items():
            if re.search(pattern, content):
                findings.append({
                    'type': data_type,
                    'severity': 'high',
                    'action': 'block'
                })
        return findings
    
    def enforce_policy(self, content, destination):
        """Enforce DLP policies on data movement"""
        findings = self.scan_content(content)
        if findings:
            self.alert_security_team(findings)
            if destination.is_external():
                raise DLPViolation("Sensitive data transfer blocked")

Compliance Monitoring

Continuous Compliance Monitoring

# AWS Config Rules for continuous compliance
# PCI DSS compliance rule
aws configservice put-config-rule --config-rule '{
  "ConfigRuleName": "encrypted-volumes",
  "Description": "Checks whether EBS volumes are encrypted",
  "Source": {
    "Owner": "AWS",
    "SourceIdentifier": "ENCRYPTED_VOLUMES"
  },
  "Scope": {
    "ComplianceResourceTypes": [
      "AWS::EC2::Volume"
    ]
  }
}'

# Custom compliance rule
aws configservice put-config-rule --config-rule '{
  "ConfigRuleName": "approved-amis-by-tag",
  "Description": "Checks whether instances are launched from approved AMIs",
  "Source": {
    "Owner": "AWS",
    "SourceIdentifier": "APPROVED_AMIS_BY_TAG"
  },
  "InputParameters": "{\"tag1Key\":\"Approved\",\"tag1Value\":\"true\"}"
}'

Compliance Dashboard

# Compliance metrics collection
class ComplianceDashboard:
    def __init__(self):
        self.metrics = {
            'patch_compliance': 0,
            'encryption_compliance': 0,
            'access_review_completion': 0,
            'training_completion': 0
        }
    
    def calculate_patch_compliance(self):
        """Calculate percentage of systems with current patches"""
        total_systems = self.get_total_systems()
        patched_systems = self.get_patched_systems()
        return (patched_systems / total_systems) * 100
    
    def generate_compliance_report(self):
        """Generate executive compliance report"""
        report = {
            'generated_at': datetime.utcnow().isoformat(),
            'overall_compliance': self.calculate_overall_score(),
            'metrics': self.metrics,
            'findings': self.get_findings(),
            'recommendations': self.get_recommendations()
        }
        return report

Vulnerability Management

Automated Vulnerability Scanning

#!/bin/bash
# Automated vulnerability scanning script

# Install and configure OpenVAS
apt-get update && apt-get install -y openvas
gvm-setup
gvm-feed-update

# Create scan configuration
cat > /etc/gvm/compliance-scan.xml << EOF

  Compliance Scan
  PCI DSS quarterly scan
  daba56c8-73ec-11df-a475-002264764cea

EOF

# Schedule weekly scans
crontab -l > mycron
echo "0 2 * * 0 /usr/bin/gvm-cli --gmp-username admin --gmp-password $GVM_PASS socket --xml ''" >> mycron
crontab mycron

Patch Management

# Automated patch management
class PatchManagement:
    def __init__(self):
        self.critical_patch_sla = 30  # days
        self.standard_patch_sla = 90  # days
    
    def assess_patches(self):
        """Assess available patches by criticality"""
        patches = self.get_available_patches()
        
        categorized = {
            'critical': [],
            'high': [],
            'medium': [],
            'low': []
        }
        
        for patch in patches:
            severity = self.get_patch_severity(patch)
            categorized[severity].append(patch)
        
        return categorized
    
    def schedule_patching(self, patches):
        """Schedule patches based on criticality and maintenance windows"""
        for severity, patch_list in patches.items():
            if severity == 'critical':
                self.schedule_emergency_patching(patch_list)
            else:
                self.schedule_maintenance_window(patch_list, severity)

Identity and Access Management

Access Review Process

# Automated access review system
class AccessReview:
    def __init__(self):
        self.review_frequency = 90  # days
        
    def generate_access_report(self):
        """Generate access report for review"""
        users = self.get_all_users()
        report = []
        
        for user in users:
            user_access = {
                'username': user.username,
                'full_name': user.full_name,
                'department': user.department,
                'manager': user.manager,
                'last_login': user.last_login,
                'permissions': self.get_user_permissions(user),
                'privileged_access': self.has_privileged_access(user),
                'review_required': self.needs_review(user)
            }
            report.append(user_access)
        
        return report
    
    def process_review_decisions(self, decisions):
        """Process manager decisions on access reviews"""
        for decision in decisions:
            if decision['action'] == 'revoke':
                self.revoke_access(decision['user'], decision['permission'])
            elif decision['action'] == 'modify':
                self.modify_access(decision['user'], decision['changes'])
            
            self.log_review_decision(decision)

Privileged Access Management

# Just-in-time privileged access
class PrivilegedAccessManager:
    def __init__(self):
        self.max_elevation_time = 4  # hours
        
    def request_elevation(self, user, role, justification):
        """Handle privileged access requests"""
        request = {
            'id': self.generate_request_id(),
            'user': user,
            'requested_role': role,
            'justification': justification,
            'requested_at': datetime.utcnow(),
            'expires_at': datetime.utcnow() + timedelta(hours=self.max_elevation_time)
        }
        
        # Auto-approve based on policy or queue for approval
        if self.auto_approve_eligible(user, role):
            return self.grant_temporary_access(request)
        else:
            return self.queue_for_approval(request)
    
    def grant_temporary_access(self, request):
        """Grant time-limited privileged access"""
        # Create temporary role binding
        self.create_temp_role(request['user'], request['requested_role'])
        
        # Schedule automatic revocation
        self.schedule_revocation(request['id'], request['expires_at'])
        
        # Log for audit
        self.audit_log.record_privilege_grant(request)
        
        return request['id']

Incident Response

Automated Incident Response

# Security incident response automation
class IncidentResponse:
    def __init__(self):
        self.severity_levels = ['critical', 'high', 'medium', 'low']
        self.response_teams = self.load_response_teams()
    
    def detect_incident(self, event):
        """Analyze event for security incidents"""
        incident_indicators = [
            self.check_failed_logins(event),
            self.check_data_exfiltration(event),
            self.check_privilege_escalation(event),
            self.check_malware_indicators(event)
        ]
        
        if any(incident_indicators):
            return self.create_incident(event, incident_indicators)
    
    def respond_to_incident(self, incident):
        """Execute incident response playbook"""
        severity = self.assess_severity(incident)
        
        # Immediate containment
        if severity in ['critical', 'high']:
            self.contain_threat(incident)
        
        # Notify response team
        self.notify_team(incident, severity)
        
        # Collect forensics
        self.collect_evidence(incident)
        
        # Create incident ticket
        ticket_id = self.create_incident_ticket(incident)
        
        return ticket_id

Compliance Reporting

Automated Compliance Reports

# Compliance report generator
class ComplianceReporter:
    def __init__(self):
        self.report_templates = {
            'pci_dss': 'templates/pci_dss_report.html',
            'hipaa': 'templates/hipaa_report.html',
            'gdpr': 'templates/gdpr_report.html',
            'soc2': 'templates/soc2_report.html'
        }
    
    def generate_report(self, framework, period):
        """Generate compliance report for specific framework"""
        # Collect metrics
        metrics = {
            'control_effectiveness': self.measure_control_effectiveness(framework),
            'vulnerabilities': self.get_vulnerability_metrics(period),
            'incidents': self.get_incident_metrics(period),
            'access_reviews': self.get_access_review_status(),
            'training': self.get_training_completion(),
            'patches': self.get_patch_compliance()
        }
        
        # Generate evidence
        evidence = self.collect_evidence(framework, period)
        
        # Create report
        report = self.render_report(framework, metrics, evidence)
        
        return report

Third-Party Risk Management

Vendor Assessment

  • Security questionnaires
  • SOC 2 report review
  • Penetration test results
  • Insurance verification
  • Compliance certifications

Ongoing Monitoring

  • Continuous security ratings
  • Breach notification requirements
  • Annual reassessment
  • Contract compliance verification

Implementation Timeline

Phase Duration Activities
Assessment 2-4 weeks Gap analysis, risk assessment
Planning 2-3 weeks Control selection, resource allocation
Implementation 3-6 months Deploy controls, configure systems
Testing 2-4 weeks Validate controls, remediate gaps
Certification 2-4 weeks External audit, certification

Best Practices

  • Automate: Use tools to enforce and monitor compliance
  • Document: Maintain comprehensive evidence
  • Train: Regular compliance awareness training
  • Test: Regular control effectiveness testing
  • Improve: Continuous enhancement based on findings
Note: This documentation is provided for reference purposes only. It reflects general best practices and industry-aligned guidelines, and any examples, claims, or recommendations are intended as illustrative—not definitive or binding.