Search Tutorials


Top AWS Data Engineering Scenario Questions (2026) | JavaInUse

Top 20 AWS Data Engineering Scenario Questions


  1. Design a real-time clickstream analytics pipeline
  2. Build a data lake with raw, curated, and enriched zones
  3. Design a CDC pipeline from RDS to data warehouse
  4. Build a real-time fraud detection system
  5. Design a multi-tenant data platform
  6. Build an event-driven ETL pipeline
  7. Design a data quality monitoring framework
  8. Build a cost-optimized data archive solution
  9. Design a cross-region data replication strategy
  10. Build a real-time recommendation engine pipeline
  11. Design a data governance framework
  12. Build a streaming ML inference pipeline
  13. Design a disaster recovery data strategy
  14. Build a data catalog and discovery platform
  15. Design a PII data handling pipeline
  16. Build a real-time aggregation dashboard
  17. Design a schema evolution strategy
  18. Build a data pipeline testing framework
  19. Design a data lineage tracking system
  20. Build a serverless data processing architecture

1. Design a real-time clickstream analytics pipeline

Scenario: E-commerce company needs real-time user behavior analytics.

Architecture:
┌─────────────────────────────────────────────────────────────┐
│  Web/Mobile Apps                                             │
│       │                                                      │
│       ▼                                                      │
│  API Gateway ──▶ Kinesis Data Streams                       │
│                      │                                       │
│       ┌──────────────┼──────────────┐                       │
│       │              │              │                       │
│       ▼              ▼              ▼                       │
│  Lambda          Kinesis        Kinesis                     │
│  (Real-time)     Analytics      Firehose                    │
│       │              │              │                       │
│       ▼              ▼              ▼                       │
│  DynamoDB        OpenSearch     S3 (Parquet)                │
│  (Counters)      (Dashboard)    (Data Lake)                 │
│                                     │                       │
│                                     ▼                       │
│                               Athena/Redshift               │
└─────────────────────────────────────────────────────────────┘

# Kinesis Stream for clickstream
kinesis.create_stream(StreamName='clickstream', ShardCount=10)

# Lambda for real-time processing
def handler(event, context):
    for record in event['Records']:
        data = json.loads(base64.b64decode(record['kinesis']['data']))
        
        # Update real-time counters in DynamoDB
        dynamodb.update_item(
            TableName='page_views',
            Key={'page_id': {'S': data['page_id']}, 'hour': {'S': data['hour']}},
            UpdateExpression='ADD view_count :inc',
            ExpressionAttributeValues={':inc': {'N': '1'}}
        )
        
        # Check for anomalies
        if data['events_per_minute'] > 1000:
            sns.publish(TopicArn=ALERT_TOPIC, Message=f"Traffic spike: {data}")

# Kinesis Analytics for aggregations
CREATE STREAM pageview_aggregates AS
SELECT STREAM
    page_id,
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
    COUNT(*) as views,
    COUNT(DISTINCT user_id) as unique_users
FROM clickstream
GROUP BY page_id, TUMBLE(event_time, INTERVAL '1' MINUTE);

# Firehose for S3 landing
- Format conversion to Parquet
- Partitioning: year/month/day/hour
- Compression: Snappy

2. Build a data lake with raw, curated, and enriched zones

Scenario: Build a multi-zone data lake with proper governance.

Architecture:
┌─────────────────────────────────────────────────────────────┐
│                    AWS Lake Formation                        │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │   RAW ZONE   │  │ CURATED ZONE │  │ENRICHED ZONE │      │
│  │    (Bronze)  │  │   (Silver)   │  │    (Gold)    │      │
│  │              │  │              │  │              │      │
│  │ s3://lake/   │  │ s3://lake/   │  │ s3://lake/   │      │
│  │   raw/       │  │   curated/   │  │   enriched/  │      │
│  │              │  │              │  │              │      │
│  │ - JSON       │  │ - Parquet    │  │ - Parquet    │      │
│  │ - CSV        │  │ - Deduplicated│  │ - Aggregated │      │
│  │ - As-is data │  │ - Validated  │  │ - ML features│      │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘      │
│         │                 │                 │               │
│         ▼                 ▼                 ▼               │
│  ┌──────────────────────────────────────────────────┐      │
│  │              AWS Glue Data Catalog               │      │
│  └──────────────────────────────────────────────────┘      │
└─────────────────────────────────────────────────────────────┘

# Glue ETL: Raw to Curated
raw_df = glueContext.create_dynamic_frame.from_catalog(
    database="raw_db",
    table_name="events"
)

# Data quality checks
from awsgluedq.transforms import EvaluateDataQuality
quality_results = EvaluateDataQuality.apply(
    frame=raw_df,
    ruleset="""
        Rules = [
            ColumnExists "user_id",
            IsComplete "event_timestamp",
            ColumnValues "event_type" in ["click", "view", "purchase"]
        ]
    """
)

# Transform and deduplicate
curated_df = raw_df.toDF() \
    .dropDuplicates(["event_id"]) \
    .withColumn("processed_date", current_date())

# Write to curated zone
glueContext.write_dynamic_frame.from_options(
    frame=DynamicFrame.fromDF(curated_df, glueContext, "curated"),
    connection_type="s3",
    connection_options={"path": "s3://lake/curated/events/"},
    format="parquet",
    format_options={"compression": "snappy"}
)

# Lake Formation permissions
lakeformation.grant_permissions(
    Principal={'DataLakePrincipalIdentifier': 'arn:aws:iam::...:role/AnalystRole'},
    Resource={'Table': {'DatabaseName': 'curated_db', 'Name': 'events'}},
    Permissions=['SELECT']
)

3. Design a CDC pipeline from RDS to data warehouse

Scenario: Real-time replication from operational RDS to Redshift analytics.

Architecture:
┌─────────────────────────────────────────────────────────────┐
│                                                              │
│  RDS (PostgreSQL)                                            │
│       │                                                      │
│       │ (Binary Log / Logical Replication)                  │
│       ▼                                                      │
│  AWS DMS (CDC Task)                                          │
│       │                                                      │
│       ▼                                                      │
│  Kinesis Data Streams                                        │
│       │                                                      │
│       ├──────────────┬──────────────┐                       │
│       │              │              │                       │
│       ▼              ▼              ▼                       │
│  Lambda          Firehose       S3 (Archive)                │
│  (Transform)     (to Redshift)                              │
│       │              │                                       │
│       ▼              ▼                                       │
│  DynamoDB        Redshift                                    │
│  (Latest State)  (Analytics)                                 │
│                                                              │
└─────────────────────────────────────────────────────────────┘

# DMS Task configuration
dms.create_replication_task(
    ReplicationTaskIdentifier='rds-to-kinesis-cdc',
    SourceEndpointArn=source_endpoint_arn,
    TargetEndpointArn=kinesis_endpoint_arn,
    ReplicationInstanceArn=replication_instance_arn,
    MigrationType='cdc',  # Change data capture only
    TableMappings=json.dumps({
        "rules": [{
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "object-locator": {
                "schema-name": "public",
                "table-name": "%"
            },
            "rule-action": "include"
        }]
    }),
    ReplicationTaskSettings=json.dumps({
        "TargetMetadata": {
            "ParallelLoadThreads": 4,
            "ParallelLoadBufferSize": 500
        },
        "FullLoadSettings": {
            "TargetTablePrepMode": "DO_NOTHING"
        },
        "Logging": {"EnableLogging": True}
    })
)

# Lambda to handle CDC events
def handler(event, context):
    for record in event['Records']:
        cdc_event = json.loads(base64.b64decode(record['kinesis']['data']))
        
        operation = cdc_event['metadata']['operation']  # INSERT, UPDATE, DELETE
        table = cdc_event['metadata']['table-name']
        data = cdc_event['data']
        
        if operation == 'INSERT':
            # Upsert to DynamoDB for latest state
            dynamodb.put_item(TableName=f'{table}_current', Item=data)
        elif operation == 'UPDATE':
            dynamodb.put_item(TableName=f'{table}_current', Item=data)
        elif operation == 'DELETE':
            dynamodb.delete_item(TableName=f'{table}_current', Key={'id': data['id']})

4. Build a real-time fraud detection system

Scenario: Detect fraudulent transactions in real-time with ML.

Architecture:
┌─────────────────────────────────────────────────────────────┐
│  Transaction API                                             │
│       │                                                      │
│       ▼                                                      │
│  Kinesis Data Streams (transactions)                        │
│       │                                                      │
│       ├──────────────┬──────────────┐                       │
│       │              │              │                       │
│       ▼              ▼              ▼                       │
│  Lambda          Lambda          Firehose                   │
│  (Feature Eng)   (Rule Engine)   (Archive)                  │
│       │              │                                       │
│       ▼              ▼                                       │
│  SageMaker       DynamoDB                                    │
│  (ML Inference)  (Rules/History)                            │
│       │                                                      │
│       └──────────────┬──────────────┐                       │
│                      │              │                       │
│                      ▼              ▼                       │
│                  Approve        SNS Alert                   │
│                  Transaction    (Fraud Team)                │
└─────────────────────────────────────────────────────────────┘

# Feature engineering Lambda
def handler(event, context):
    for record in event['Records']:
        txn = json.loads(base64.b64decode(record['kinesis']['data']))
        
        # Get historical features from DynamoDB
        user_history = dynamodb.get_item(
            TableName='user_transactions',
            Key={'user_id': {'S': txn['user_id']}}
        )
        
        # Calculate real-time features
        features = {
            'amount': txn['amount'],
            'merchant_category': txn['merchant_category'],
            'hour_of_day': datetime.fromisoformat(txn['timestamp']).hour,
            'avg_transaction_amount': user_history.get('avg_amount', 0),
            'transaction_count_24h': user_history.get('count_24h', 0),
            'distance_from_last_txn': calculate_distance(txn, user_history),
            'time_since_last_txn': calculate_time_delta(txn, user_history)
        }
        
        # Call SageMaker endpoint
        response = sagemaker_runtime.invoke_endpoint(
            EndpointName='fraud-detection-model',
            ContentType='application/json',
            Body=json.dumps(features)
        )
        
        prediction = json.loads(response['Body'].read())
        fraud_score = prediction['fraud_probability']
        
        if fraud_score > 0.8:
            # Block transaction and alert
            sns.publish(TopicArn=FRAUD_ALERT_TOPIC, Message=json.dumps(txn))
            return {'action': 'BLOCK', 'reason': 'High fraud score'}
        elif fraud_score > 0.5:
            # Request additional verification
            return {'action': 'VERIFY', 'reason': 'Suspicious activity'}
        else:
            return {'action': 'APPROVE'}

5. Design a multi-tenant data platform

Scenario: SaaS platform with isolated data for multiple tenants.

Architecture Options:
┌─────────────────────────────────────────────────────────────┐
│  Option 1: Siloed (Separate resources per tenant)           │
│                                                              │
│  Tenant A          Tenant B          Tenant C               │
│  ┌────────┐       ┌────────┐       ┌────────┐              │
│  │S3/tenant-a│    │S3/tenant-b│    │S3/tenant-c│           │
│  │Redshift-A │    │Redshift-B │    │Redshift-C │           │
│  │Glue Jobs A│    │Glue Jobs B│    │Glue Jobs C│           │
│  └────────┘       └────────┘       └────────┘              │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│  Option 2: Pooled (Shared resources with isolation)         │
│                                                              │
│  ┌────────────────────────────────────────────────────┐    │
│  │                   Shared S3                         │    │
│  │  s3://data-lake/tenant_id=A/                       │    │
│  │  s3://data-lake/tenant_id=B/                       │    │
│  │  s3://data-lake/tenant_id=C/                       │    │
│  └────────────────────────────────────────────────────┘    │
│                         │                                   │
│  ┌────────────────────────────────────────────────────┐    │
│  │            Lake Formation (Row/Column Security)    │    │
│  │  Policy: tenant_id = ${session:tenant_id}         │    │
│  └────────────────────────────────────────────────────┘    │
│                         │                                   │
│  ┌────────────────────────────────────────────────────┐    │
│  │               Shared Redshift                       │    │
│  │  RLS: WHERE tenant_id = current_setting('tenant')  │    │
│  └────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘

# Lake Formation data cell filtering
lakeformation.create_data_cells_filter(
    TableData={
        'TableCatalogId': account_id,
        'DatabaseName': 'multi_tenant_db',
        'TableName': 'customer_data',
        'Name': 'tenant_filter'
    },
    RowFilter={
        'FilterExpression': 'tenant_id = :tenant',
        'AllRowsWildcard': {}
    }
)

# Grant with session tags
lakeformation.grant_permissions(
    Principal={'DataLakePrincipalIdentifier': f'arn:aws:iam::{account_id}:role/TenantRole'},
    Resource={
        'DataCellsFilter': {
            'TableCatalogId': account_id,
            'DatabaseName': 'multi_tenant_db',
            'TableName': 'customer_data',
            'Name': 'tenant_filter'
        }
    },
    Permissions=['SELECT'],
    PermissionsWithGrantOption=[]
)

# Redshift Row-Level Security
CREATE RLS POLICY tenant_isolation
USING (tenant_id = current_setting('app.tenant_id')::varchar);

ATTACH RLS POLICY tenant_isolation ON customer_data TO ROLE analyst_role;




6. Build an event-driven ETL pipeline

Scenario: Trigger ETL automatically when new data arrives.

Architecture:
┌─────────────────────────────────────────────────────────────┐
│  S3 Bucket (Raw Data Landing)                               │
│       │                                                      │
│       │ (S3 Event Notification)                             │
│       ▼                                                      │
│  EventBridge                                                 │
│       │                                                      │
│       ├──────────────┬──────────────┐                       │
│       │              │              │                       │
│       ▼              ▼              ▼                       │
│  Step Functions   Lambda          SNS                       │
│  (Orchestration)  (Validation)    (Notification)            │
│       │                                                      │
│       ├──────────────┬──────────────┐                       │
│       │              │              │                       │
│       ▼              ▼              ▼                       │
│  Glue Crawler    Glue Job        Quality Check              │
│  (Schema)        (Transform)     (DQ Rules)                 │
│       │              │              │                       │
│       └──────────────┼──────────────┘                       │
│                      ▼                                       │
│                  Curated Zone                                │
└─────────────────────────────────────────────────────────────┘

# EventBridge Rule
{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {"name": ["raw-data-landing"]},
    "object": {"key": [{"prefix": "incoming/"}]}
  }
}

# Step Functions Definition
{
  "StartAt": "ValidateFile",
  "States": {
    "ValidateFile": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:...:function:validate-file",
      "Next": "FileValid?"
    },
    "FileValid?": {
      "Type": "Choice",
      "Choices": [{
        "Variable": "$.validation.status",
        "StringEquals": "VALID",
        "Next": "UpdateCatalog"
      }],
      "Default": "HandleInvalidFile"
    },
    "UpdateCatalog": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startCrawler.sync",
      "Parameters": {"Name": "raw-data-crawler"},
      "Next": "RunETL"
    },
    "RunETL": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "raw-to-curated",
        "Arguments": {"--source_path.$": "$.s3.object.key"}
      },
      "Next": "QualityCheck"
    },
    "QualityCheck": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startDataQualityRulesetEvaluationRun.sync",
      "Parameters": {"RulesetNames": ["curated-quality-rules"]},
      "Next": "NotifySuccess"
    },
    "NotifySuccess": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {"TopicArn": "...", "Message": "ETL completed"},
      "End": true
    },
    "HandleInvalidFile": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:...:function:handle-invalid",
      "End": true
    }
  }
}

7. Design a data quality monitoring framework

Scenario: Continuous data quality monitoring with alerting.

Framework Components:
┌─────────────────────────────────────────────────────────────┐
│                Data Quality Framework                        │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                 DQ Rule Repository                   │   │
│  │  - Completeness (null checks)                       │   │
│  │  - Uniqueness (duplicate checks)                    │   │
│  │  - Validity (format, range checks)                  │   │
│  │  - Consistency (referential integrity)              │   │
│  │  - Timeliness (freshness checks)                    │   │
│  └─────────────────────────────────────────────────────┘   │
│                          │                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              Glue Data Quality                       │   │
│  │  ruleset = """                                      │   │
│  │    Rules = [                                        │   │
│  │      Completeness "email" > 0.99,                   │   │
│  │      Uniqueness "customer_id" = 1.0,               │   │
│  │      ColumnValues "status" in ["A","I","P"],       │   │
│  │      RowCount between 1000 and 1000000             │   │
│  │    ]                                                │   │
│  │  """                                                │   │
│  └─────────────────────────────────────────────────────┘   │
│                          │                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │            Quality Metrics Dashboard                │   │
│  │  CloudWatch → QuickSight                           │   │
│  └─────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

# Glue Data Quality Rules
glue.create_data_quality_ruleset(
    Name='customer_data_quality',
    Ruleset="""
        Rules = [
            Completeness "email" >= 0.99,
            Completeness "customer_id" = 1.0,
            Uniqueness "customer_id" = 1.0,
            ColumnValues "country_code" in ["US", "CA", "UK", "DE", "FR"],
            ColumnLength "phone" between 10 and 15,
            ColumnValues "created_date" matches "\\d{4}-\\d{2}-\\d{2}",
            CustomSql "SELECT COUNT(*) FROM primary_table t1 
                       LEFT JOIN reference_table t2 ON t1.ref_id = t2.id 
                       WHERE t2.id IS NULL" = 0,
            RowCount > 0,
            DataFreshness "updated_at" <= 24 hours
        ]
    """,
    TargetTable={
        'TableName': 'customers',
        'DatabaseName': 'production_db'
    }
)

# Schedule quality checks
glue.start_data_quality_ruleset_evaluation_run(
    DataSource={
        'GlueTable': {'DatabaseName': 'production_db', 'TableName': 'customers'}
    },
    Role='GlueDataQualityRole',
    RulesetNames=['customer_data_quality']
)

# CloudWatch metrics for alerting
cloudwatch.put_metric_data(
    Namespace='DataQuality',
    MetricData=[{
        'MetricName': 'CompletionRate',
        'Dimensions': [{'Name': 'Table', 'Value': 'customers'}],
        'Value': 0.95,
        'Unit': 'Percent'
    }]
)

8. Build a cost-optimized data archive solution

Scenario: Archive historical data with minimal cost while maintaining accessibility.

Architecture:
┌─────────────────────────────────────────────────────────────┐
│                  Data Lifecycle                              │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  Hot Data (0-30 days)           │ S3 Standard              │
│  ├── High access frequency       │ Cost: $$$                │
│  └── Low latency required        │                          │
│                                  │                          │
│  Warm Data (30-90 days)         │ S3 Standard-IA           │
│  ├── Medium access frequency     │ Cost: $$                 │
│  └── Acceptable latency          │                          │
│                                  │                          │
│  Cold Data (90-365 days)        │ S3 Glacier Instant       │
│  ├── Rare access                 │ Cost: $                  │
│  └── Minutes retrieval           │                          │
│                                  │                          │
│  Archive (>365 days)            │ S3 Glacier Deep Archive  │
│  ├── Very rare access            │ Cost: ¢                  │
│  └── Hours retrieval             │                          │
└─────────────────────────────────────────────────────────────┘

# S3 Lifecycle Policy
s3.put_bucket_lifecycle_configuration(
    Bucket='data-lake',
    LifecycleConfiguration={
        'Rules': [{
            'ID': 'archive-historical-data',
            'Filter': {'Prefix': 'processed/'},
            'Status': 'Enabled',
            'Transitions': [
                {'Days': 30, 'StorageClass': 'STANDARD_IA'},
                {'Days': 90, 'StorageClass': 'GLACIER_IR'},
                {'Days': 365, 'StorageClass': 'DEEP_ARCHIVE'}
            ],
            'NoncurrentVersionTransitions': [
                {'NoncurrentDays': 30, 'StorageClass': 'GLACIER'}
            ],
            'Expiration': {'Days': 2555}  # 7 years for compliance
        }]
    }
)

# Intelligent Tiering for unpredictable access
s3.put_bucket_intelligent_tiering_configuration(
    Bucket='data-lake',
    Id='auto-archive',
    IntelligentTieringConfiguration={
        'Status': 'Enabled',
        'Tierings': [
            {'Days': 90, 'AccessTier': 'ARCHIVE_ACCESS'},
            {'Days': 180, 'AccessTier': 'DEEP_ARCHIVE_ACCESS'}
        ]
    }
)

# Query archived data with Athena
CREATE EXTERNAL TABLE archived_data
STORED AS PARQUET
LOCATION 's3://data-lake/archive/'
TBLPROPERTIES ('storage.class'='GLACIER_IR');

-- Restore before querying
SELECT * FROM archived_data WHERE year = 2020;  -- Triggers restore

9. Design a cross-region data replication strategy

Scenario: Global company needs data available in multiple regions.

Architecture:
┌─────────────────────────────────────────────────────────────┐
│                   US-EAST-1 (Primary)                        │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  S3 Bucket ──── Cross-Region Replication ────┐     │   │
│  │  Redshift ────── Datashare ─────────────────────┐ │   │
│  │  DynamoDB ──── Global Tables ────────────────────┼─┘  │
│  └─────────────────────────────────────────────────────┘   │
│                          │                                  │
└─────────────────────────────────────────────────────────────┘
                           │
┌─────────────────────────────────────────────────────────────┐
│                   EU-WEST-1 (Secondary)                      │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  S3 Bucket (Replica)                                │   │
│  │  Redshift (Consumer Cluster)                        │   │
│  │  DynamoDB (Global Table Replica)                    │   │
│  └─────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

# S3 Cross-Region Replication
s3.put_bucket_replication(
    Bucket='primary-bucket',
    ReplicationConfiguration={
        'Role': 'arn:aws:iam::...:role/S3ReplicationRole',
        'Rules': [{
            'ID': 'replicate-all',
            'Status': 'Enabled',
            'Priority': 1,
            'Filter': {'Prefix': ''},
            'Destination': {
                'Bucket': 'arn:aws:s3:::replica-bucket-eu',
                'ReplicationTime': {'Status': 'Enabled', 'Time': {'Minutes': 15}},
                'Metrics': {'Status': 'Enabled', 'EventThreshold': {'Minutes': 15}}
            },
            'DeleteMarkerReplication': {'Status': 'Enabled'}
        }]
    }
)

# Redshift Data Sharing
-- Primary cluster (producer)
CREATE DATASHARE sales_share;
ALTER DATASHARE sales_share ADD SCHEMA public;
ALTER DATASHARE sales_share ADD TABLE public.sales;
GRANT USAGE ON DATASHARE sales_share TO ACCOUNT '123456789012' REGION 'eu-west-1';

-- Secondary cluster (consumer)
CREATE DATABASE sales_db FROM DATASHARE sales_share
OF ACCOUNT '987654321098' NAMESPACE 'ns-xxx';

# DynamoDB Global Tables
dynamodb.create_global_table(
    GlobalTableName='global-users',
    ReplicationGroup=[
        {'RegionName': 'us-east-1'},
        {'RegionName': 'eu-west-1'},
        {'RegionName': 'ap-southeast-1'}
    ]
)

10. Build a real-time recommendation engine pipeline

Scenario: E-commerce real-time product recommendations.

Architecture:
┌─────────────────────────────────────────────────────────────┐
│  User Activity                                               │
│       │                                                      │
│       ▼                                                      │
│  Kinesis Streams (user_events)                              │
│       │                                                      │
│       ├──────────────┬──────────────┐                       │
│       │              │              │                       │
│       ▼              ▼              ▼                       │
│  Lambda          Firehose       Personalize                 │
│  (Feature Store) (Archive)      (Training)                  │
│       │                              │                       │
│       ▼                              │                       │
│  SageMaker ◄─────────────────────────┘                      │
│  Feature Store                                               │
│       │                                                      │
│       ▼                                                      │
│  ┌─────────────────────────────────────────────────────┐   │
│  │            Recommendation Flow                       │   │
│  │  API Gateway → Lambda → Personalize/SageMaker       │   │
│  │                      → ElastiCache (cached recs)    │   │
│  └─────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

# Real-time feature update
def update_user_features(event, context):
    for record in event['Records']:
        activity = json.loads(base64.b64decode(record['kinesis']['data']))
        
        # Update real-time features in Feature Store
        sagemaker_featurestore.put_record(
            FeatureGroupName='user-features',
            Record=[
                {'FeatureName': 'user_id', 'ValueAsString': activity['user_id']},
                {'FeatureName': 'last_viewed_category', 'ValueAsString': activity['category']},
                {'FeatureName': 'session_view_count', 'ValueAsString': str(activity['view_count'])},
                {'FeatureName': 'last_activity_time', 'ValueAsString': activity['timestamp']}
            ]
        )
        
        # Update interaction dataset for Personalize
        personalize_events.put_events(
            trackingId='tracking-id',
            userId=activity['user_id'],
            sessionId=activity['session_id'],
            eventList=[{
                'eventType': activity['event_type'],
                'itemId': activity['product_id'],
                'sentAt': datetime.now().timestamp()
            }]
        )

# Get recommendations API
def get_recommendations(event, context):
    user_id = event['pathParameters']['user_id']
    
    # Check cache first
    cached = elasticache.get(f'recs:{user_id}')
    if cached:
        return {'statusCode': 200, 'body': cached}
    
    # Get real-time features
    features = sagemaker_featurestore.get_record(
        FeatureGroupName='user-features',
        RecordIdentifierValueAsString=user_id
    )
    
    # Get recommendations
    response = personalize_runtime.get_recommendations(
        campaignArn='arn:aws:personalize:...:campaign/product-recs',
        userId=user_id,
        numResults=10,
        context={'DEVICE': event.get('device', 'web')}
    )
    
    # Cache and return
    elasticache.setex(f'recs:{user_id}', 300, json.dumps(response))
    return {'statusCode': 200, 'body': json.dumps(response)}

11. Design a data governance framework

Scenario: Enterprise data governance with access controls, auditing, and compliance.

Framework:
┌─────────────────────────────────────────────────────────────┐
│               Data Governance Framework                      │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌──────────────────────────────────────────────────────┐  │
│  │               AWS Lake Formation                      │  │
│  │  ┌─────────────────────────────────────────────────┐ │  │
│  │  │ Data Catalog  │ Permissions │ Column/Row Security││  │
│  │  └─────────────────────────────────────────────────┘ │  │
│  └──────────────────────────────────────────────────────┘  │
│                          │                                  │
│  ┌──────────────────────────────────────────────────────┐  │
│  │             Data Classification                       │  │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐             │  │
│  │  │ PUBLIC   │ │INTERNAL  │ │CONFIDENTIAL│            │  │
│  │  │          │ │          │ │ (PII/PHI) │            │  │
│  │  └──────────┘ └──────────┘ └──────────┘             │  │
│  └──────────────────────────────────────────────────────┘  │
│                          │                                  │
│  ┌──────────────────────────────────────────────────────┐  │
│  │               Macie (PII Detection)                   │  │
│  └──────────────────────────────────────────────────────┘  │
│                          │                                  │
│  ┌──────────────────────────────────────────────────────┐  │
│  │           CloudTrail (Audit Logging)                  │  │
│  └──────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘

# Tag-Based Access Control
lakeformation.add_lf_tags_to_resource(
    Resource={'Table': {'DatabaseName': 'hr_db', 'Name': 'employees'}},
    LFTags=[
        {'TagKey': 'classification', 'TagValues': ['confidential']},
        {'TagKey': 'pii', 'TagValues': ['true']}
    ]
)

# Grant permissions based on tags
lakeformation.grant_permissions(
    Principal={'DataLakePrincipalIdentifier': 'arn:aws:iam::...:role/HRAnalyst'},
    Resource={
        'LFTagPolicy': {
            'ResourceType': 'TABLE',
            'Expression': [
                {'TagKey': 'classification', 'TagValues': ['confidential']},
                {'TagKey': 'department', 'TagValues': ['hr']}
            ]
        }
    },
    Permissions=['SELECT']
)

# Enable Macie for PII detection
macie.create_classification_job(
    name='pii-scan-data-lake',
    s3JobDefinition={
        'bucketDefinitions': [{'accountId': account_id, 'buckets': ['data-lake']}],
        'scoping': {
            'includes': {'and': [{'simpleScopeTerm': {'key': 'OBJECT_EXTENSION', 'values': ['parquet', 'csv']}}]}
        }
    },
    jobType='SCHEDULED',
    scheduleFrequency={'dailySchedule': {}}
)

12. Build a streaming ML inference pipeline

Scenario: Real-time ML predictions on streaming data.

Architecture:
┌─────────────────────────────────────────────────────────────┐
│  IoT Sensors / Applications                                  │
│       │                                                      │
│       ▼                                                      │
│  Kinesis Data Streams                                        │
│       │                                                      │
│       ├──────────────────┬──────────────┐                   │
│       │                  │              │                   │
│       ▼                  ▼              ▼                   │
│  Kinesis Analytics   Lambda          Firehose               │
│  (Flink + ML)        (SageMaker)     (Archive)              │
│       │                  │                                   │
│       ▼                  ▼                                   │
│  Output Stream       DynamoDB                                │
│       │              (Results)                               │
│       ▼                                                      │
│  Lambda → SNS (Alerts)                                       │
└─────────────────────────────────────────────────────────────┘

# Lambda with SageMaker inference
def handler(event, context):
    predictions = []
    
    for record in event['Records']:
        data = json.loads(base64.b64decode(record['kinesis']['data']))
        
        # Prepare features
        features = prepare_features(data)
        
        # Batch inference for efficiency
        response = sagemaker_runtime.invoke_endpoint(
            EndpointName='anomaly-detector',
            ContentType='application/json',
            Body=json.dumps({'instances': [features]})
        )
        
        prediction = json.loads(response['Body'].read())
        
        if prediction['anomaly_score'] > 0.9:
            # Send alert
            sns.publish(
                TopicArn=ALERT_TOPIC,
                Message=json.dumps({'device_id': data['device_id'], 'anomaly': prediction})
            )
        
        predictions.append({
            'device_id': data['device_id'],
            'timestamp': data['timestamp'],
            'prediction': prediction
        })
    
    # Store predictions
    with dynamodb.Table('predictions').batch_writer() as batch:
        for pred in predictions:
            batch.put_item(Item=pred)
    
    return {'processed': len(predictions)}

# Kinesis Analytics with Flink ML
# Use Amazon Kinesis Data Analytics for Apache Flink
# with SageMaker integration or built-in ML algorithms

from pyflink.table import StreamTableEnvironment

t_env.execute_sql("""
    CREATE TABLE predictions AS
    SELECT 
        device_id,
        event_time,
        ML_PREDICT('anomaly-model', temperature, pressure, vibration) as anomaly_score
    FROM sensor_data
    WHERE ML_PREDICT('anomaly-model', temperature, pressure, vibration) > 0.5
""")

13. Design a disaster recovery data strategy

Scenario: Design DR strategy with RPO < 1 hour and RTO < 4 hours.

DR Tiers:
┌─────────────────────────────────────────────────────────────┐
│  Tier 1: Pilot Light (Low cost, Higher RTO)                 │
│  - Core infrastructure pre-provisioned                       │
│  - Data replicated continuously                              │
│  - Compute scaled up on failover                            │
│  RTO: 4-8 hours | RPO: ~1 hour                              │
├─────────────────────────────────────────────────────────────┤
│  Tier 2: Warm Standby (Medium cost, Medium RTO)             │
│  - Scaled-down version running                               │
│  - Data replicated in near real-time                        │
│  - Quick scale-up on failover                               │
│  RTO: 1-4 hours | RPO: ~15 minutes                          │
├─────────────────────────────────────────────────────────────┤
│  Tier 3: Multi-Site Active (High cost, Lowest RTO)          │
│  - Full production in multiple regions                       │
│  - Real-time data sync                                       │
│  - Automatic failover                                        │
│  RTO: minutes | RPO: ~0                                      │
└─────────────────────────────────────────────────────────────┘

Implementation:
┌─────────────────────────────────────────────────────────────┐
│  Primary (us-east-1)              DR (us-west-2)            │
│  ┌────────────────┐              ┌────────────────┐        │
│  │ S3 ────────────┼── CRR ──────►│ S3 (Replica)   │        │
│  │ RDS ───────────┼── Read Rep ─►│ RDS (Standby)  │        │
│  │ Redshift ──────┼── Snapshot ─►│ Redshift       │        │
│  │ DynamoDB ──────┼── Global ───►│ DynamoDB       │        │
│  │ OpenSearch ────┼── CCR ──────►│ OpenSearch     │        │
│  └────────────────┘              └────────────────┘        │
│                                                             │
│  Route 53 (Health checks + Failover routing)               │
└─────────────────────────────────────────────────────────────┘

# Automated Redshift snapshot to DR region
redshift.create_snapshot_copy_grant(
    SnapshotCopyGrantName='dr-grant',
    KmsKeyId='arn:aws:kms:us-west-2:...:key/...'
)

redshift.enable_snapshot_copy(
    ClusterIdentifier='production-cluster',
    DestinationRegion='us-west-2',
    RetentionPeriod=7,
    SnapshotCopyGrantName='dr-grant'
)

# RDS Cross-Region Read Replica
rds.create_db_instance_read_replica(
    DBInstanceIdentifier='dr-replica',
    SourceDBInstanceIdentifier='arn:aws:rds:us-east-1:...:db:production',
    DBInstanceClass='db.r5.xlarge',
    AvailabilityZone='us-west-2a',
    SourceRegion='us-east-1'
)

# Failover automation with Lambda
def failover_handler(event, context):
    # 1. Promote RDS replica
    rds.promote_read_replica(DBInstanceIdentifier='dr-replica')
    
    # 2. Update Route 53
    route53.change_resource_record_sets(
        HostedZoneId='...',
        ChangeBatch={'Changes': [{'Action': 'UPSERT', 'ResourceRecordSet': {...}}]}
    )
    
    # 3. Scale up DR Redshift
    redshift.resize_cluster(
        ClusterIdentifier='dr-cluster',
        ClusterType='multi-node',
        NumberOfNodes=4
    )

14. Build a data catalog and discovery platform

Scenario: Enterprise data catalog for data discovery and collaboration.

Architecture:
┌─────────────────────────────────────────────────────────────┐
│                 Data Catalog Platform                        │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌──────────────────────────────────────────────────────┐  │
│  │              AWS Glue Data Catalog                    │  │
│  │  ┌─────────────────────────────────────────────────┐ │  │
│  │  │ Databases │ Tables │ Schemas │ Partitions      │ │  │
│  │  └─────────────────────────────────────────────────┘ │  │
│  └──────────────────────────────────────────────────────┘  │
│                          │                                  │
│  ┌──────────────────────────────────────────────────────┐  │
│  │                Glue Crawlers                          │  │
│  │  S3 → Crawler → Catalog (auto-schema detection)      │  │
│  │  RDS → Crawler → Catalog (JDBC)                      │  │
│  │  Redshift → Crawler → Catalog                        │  │
│  └──────────────────────────────────────────────────────┘  │
│                          │                                  │
│  ┌──────────────────────────────────────────────────────┐  │
│  │              Data Quality & Profiling                 │  │
│  │  Glue Data Quality + Custom profiling jobs           │  │
│  └──────────────────────────────────────────────────────┘  │
│                          │                                  │
│  ┌──────────────────────────────────────────────────────┐  │
│  │                Search & Discovery                     │  │
│  │  OpenSearch (catalog search) + QuickSight (viz)      │  │
│  └──────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘

# Enhanced table metadata
glue.update_table(
    DatabaseName='analytics',
    TableInput={
        'Name': 'customer_orders',
        'Description': 'Customer orders from e-commerce platform',
        'Parameters': {
            'classification': 'parquet',
            'data_owner': 'analytics-team',
            'data_steward': 'john.doe@company.com',
            'refresh_frequency': 'daily',
            'pii_columns': 'email,phone,address',
            'business_glossary_term': 'Customer Transaction',
            'quality_score': '95'
        },
        'StorageDescriptor': {
            'Columns': [
                {'Name': 'customer_id', 'Type': 'string', 'Comment': 'Unique customer identifier'},
                {'Name': 'email', 'Type': 'string', 'Comment': 'Customer email (PII)'},
                {'Name': 'order_total', 'Type': 'decimal(10,2)', 'Comment': 'Total order value'}
            ],
            'Location': 's3://data-lake/orders/',
            'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
            'SerdeInfo': {'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'}
        }
    }
)

# Index catalog in OpenSearch for search
def index_catalog_to_opensearch():
    tables = glue.get_tables(DatabaseName='analytics')['TableList']
    
    for table in tables:
        doc = {
            'name': table['Name'],
            'database': table['DatabaseName'],
            'description': table.get('Description', ''),
            'columns': [col['Name'] for col in table['StorageDescriptor']['Columns']],
            'owner': table['Parameters'].get('data_owner', ''),
            'tags': table['Parameters'].get('classification', ''),
            'updated': table['UpdateTime'].isoformat()
        }
        
        opensearch.index(index='data-catalog', body=doc, id=f"{table['DatabaseName']}.{table['Name']}")




15. Design a PII data handling pipeline

Scenario: Handle PII data with encryption, masking, and access controls.

Architecture:
┌─────────────────────────────────────────────────────────────┐
│                  PII Data Pipeline                           │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  Raw Data (PII) → Macie (Detection) → Classification        │
│       │                                                      │
│       ▼                                                      │
│  ┌──────────────────────────────────────────────────────┐  │
│  │              Glue ETL (Transformation)                │  │
│  │  - Tokenization (replace with tokens)                 │  │
│  │  - Encryption (KMS)                                   │  │
│  │  - Masking (partial visibility)                       │  │
│  │  - Hashing (one-way for matching)                     │  │
│  └──────────────────────────────────────────────────────┘  │
│       │                                                      │
│       ├──────────────┬──────────────┐                       │
│       │              │              │                       │
│       ▼              ▼              ▼                       │
│  Tokenized      Encrypted       Masked                      │
│  (Analytics)    (Secure Store)  (Reporting)                 │
│                                                              │
│  Lake Formation (Column-level security on PII)              │
└─────────────────────────────────────────────────────────────┘

# PII Detection and Masking in Glue
from pyspark.sql.functions import sha2, regexp_replace, col
from awsglue.transforms import *

# Read raw data
raw_df = glueContext.create_dynamic_frame.from_catalog(
    database="raw_db", table_name="customers"
).toDF()

# Tokenize email (reversible)
def tokenize_email(email):
    return dynamodb.put_item(
        TableName='pii_tokens',
        Item={'token': str(uuid.uuid4()), 'value': email},
        ReturnValues='ALL_OLD'
    )

# Hash for matching (irreversible)
hashed_df = raw_df.withColumn('email_hash', sha2(col('email'), 256))

# Mask SSN (show last 4)
masked_df = hashed_df.withColumn(
    'ssn_masked',
    regexp_replace(col('ssn'), r'^\d{5}', 'XXX-XX-')
)

# Encrypt sensitive columns with KMS
from cryptography.fernet import Fernet
def encrypt_column(value, kms_key):
    # Use KMS data key for encryption
    data_key = kms.generate_data_key(KeyId=kms_key, KeySpec='AES_256')
    cipher = Fernet(base64.b64encode(data_key['Plaintext']))
    return cipher.encrypt(value.encode())

encrypted_df = masked_df.withColumn(
    'address_encrypted',
    encrypt_udf(col('address'))
)

# Lake Formation column-level security
lakeformation.grant_permissions(
    Principal={'DataLakePrincipalIdentifier': 'arn:aws:iam::...:role/AnalystRole'},
    Resource={
        'TableWithColumns': {
            'DatabaseName': 'customers_db',
            'Name': 'customers',
            'ColumnNames': ['customer_id', 'name', 'city'],  # Exclude PII
            'ColumnWildcard': {}
        }
    },
    Permissions=['SELECT']
)

16. Build a real-time aggregation dashboard

Scenario: Real-time business metrics dashboard with sub-second updates.

Architecture:
┌─────────────────────────────────────────────────────────────┐
│  Events (Sales, Clicks, etc.)                                │
│       │                                                      │
│       ▼                                                      │
│  Kinesis Data Streams                                        │
│       │                                                      │
│       ├──────────────┬──────────────┐                       │
│       │              │              │                       │
│       ▼              ▼              ▼                       │
│  Lambda          Analytics      Firehose                     │
│  (Pre-agg)       (Windows)      (Archive)                    │
│       │              │                                       │
│       ▼              ▼                                       │
│  ElastiCache     TimeStream                                  │
│  (Real-time)     (Time series)                              │
│       │              │                                       │
│       └──────────────┼──────────────┐                       │
│                      │              │                       │
│                      ▼              ▼                       │
│               API Gateway      QuickSight                   │
│                    │           (Dashboard)                   │
│                    ▼                                         │
│               WebSocket                                      │
│               (Live updates)                                 │
└─────────────────────────────────────────────────────────────┘

# Lambda for real-time aggregation
def handler(event, context):
    aggregates = defaultdict(lambda: {'count': 0, 'sum': 0})
    
    for record in event['Records']:
        data = json.loads(base64.b64decode(record['kinesis']['data']))
        
        # Aggregate by dimension
        key = f"{data['product_category']}:{data['region']}"
        aggregates[key]['count'] += 1
        aggregates[key]['sum'] += data['amount']
    
    # Update Redis with atomic operations
    pipe = redis.pipeline()
    for key, agg in aggregates.items():
        pipe.hincrby(f'metrics:{current_minute()}', f'{key}:count', agg['count'])
        pipe.hincrbyfloat(f'metrics:{current_minute()}', f'{key}:sum', agg['sum'])
        pipe.expire(f'metrics:{current_minute()}', 3600)
    pipe.execute()
    
    # Publish to WebSocket
    api_gateway.post_to_connection(
        ConnectionId=connection_id,
        Data=json.dumps({'type': 'update', 'aggregates': dict(aggregates)})
    )

# Timestream for historical time series
timestream_write.write_records(
    DatabaseName='metrics_db',
    TableName='sales_metrics',
    Records=[{
        'Dimensions': [
            {'Name': 'category', 'Value': data['category']},
            {'Name': 'region', 'Value': data['region']}
        ],
        'MeasureName': 'sales',
        'MeasureValue': str(data['amount']),
        'MeasureValueType': 'DOUBLE',
        'Time': str(int(time.time() * 1000)),
        'TimeUnit': 'MILLISECONDS'
    }]
)

# Query Timestream
SELECT 
    bin(time, 1m) as minute,
    category,
    SUM(measure_value::double) as total_sales
FROM "metrics_db"."sales_metrics"
WHERE time > ago(1h)
GROUP BY bin(time, 1m), category
ORDER BY minute DESC

17. Design a schema evolution strategy

Scenario: Handle schema changes without breaking existing pipelines.

Strategies:
┌─────────────────────────────────────────────────────────────┐
│                Schema Evolution Strategies                   │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  1. Backward Compatible (readers can read old data)         │
│     - Add optional fields with defaults                      │
│     - Don't remove required fields                           │
│                                                              │
│  2. Forward Compatible (old readers can read new data)      │
│     - Don't add required fields                              │
│     - Old code ignores new fields                            │
│                                                              │
│  3. Full Compatible (both directions)                       │
│     - Only add optional fields                               │
│     - Only remove optional fields                            │
└─────────────────────────────────────────────────────────────┘

# Glue Schema Registry
glue.create_schema(
    RegistryId={'RegistryName': 'data-schemas'},
    SchemaName='customer_events',
    DataFormat='AVRO',
    Compatibility='BACKWARD',  # BACKWARD, FORWARD, FULL, NONE
    SchemaDefinition=json.dumps({
        "type": "record",
        "name": "CustomerEvent",
        "fields": [
            {"name": "customer_id", "type": "string"},
            {"name": "event_type", "type": "string"},
            {"name": "timestamp", "type": "long"},
            {"name": "email", "type": ["null", "string"], "default": null}  # Optional
        ]
    })
)

# Register new schema version
glue.register_schema_version(
    SchemaId={'RegistryName': 'data-schemas', 'SchemaName': 'customer_events'},
    SchemaDefinition=json.dumps({
        "type": "record",
        "name": "CustomerEvent",
        "fields": [
            {"name": "customer_id", "type": "string"},
            {"name": "event_type", "type": "string"},
            {"name": "timestamp", "type": "long"},
            {"name": "email", "type": ["null", "string"], "default": null},
            {"name": "phone", "type": ["null", "string"], "default": null}  # New field
        ]
    })
)

# Kinesis producer with schema registry
from aws_glue_schema_registry import GlueSchemaRegistryAvroSerializer

serializer = GlueSchemaRegistryAvroSerializer(
    registry_name='data-schemas'
)

data = {'customer_id': '123', 'event_type': 'click', 'timestamp': int(time.time())}
encoded = serializer.encode('customer_events', data)

kinesis.put_record(StreamName='events', Data=encoded, PartitionKey='123')

# Glue ETL handling schema evolution
df = glueContext.create_dynamic_frame.from_catalog(database="db", table_name="events")
df_resolved = df.resolveChoice(choice="make_struct")  # Handle schema conflicts

18. Build a data pipeline testing framework

Scenario: Comprehensive testing for data pipelines.

Testing Framework:
┌─────────────────────────────────────────────────────────────┐
│                Data Pipeline Testing                         │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌──────────────────────────────────────────────────────┐  │
│  │                 Unit Tests                            │  │
│  │  - Transformation logic                               │  │
│  │  - Data validation functions                          │  │
│  │  - Schema mapping                                     │  │
│  └──────────────────────────────────────────────────────┘  │
│                          │                                  │
│  ┌──────────────────────────────────────────────────────┐  │
│  │              Integration Tests                        │  │
│  │  - End-to-end pipeline execution                      │  │
│  │  - AWS service interactions                           │  │
│  │  - Data flow validation                               │  │
│  └──────────────────────────────────────────────────────┘  │
│                          │                                  │
│  ┌──────────────────────────────────────────────────────┐  │
│  │               Data Quality Tests                      │  │
│  │  - Row counts                                         │  │
│  │  - Schema validation                                  │  │
│  │  - Business rule validation                           │  │
│  │  - Referential integrity                              │  │
│  └──────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘

# Unit test for Glue transformation
import pytest
from pyspark.sql import SparkSession
from my_glue_job import transform_data

@pytest.fixture(scope="session")
def spark():
    return SparkSession.builder.master("local[*]").getOrCreate()

def test_transform_removes_nulls(spark):
    input_data = [
        {"id": 1, "name": "Alice", "email": "alice@test.com"},
        {"id": 2, "name": None, "email": "bob@test.com"},
        {"id": 3, "name": "Charlie", "email": None}
    ]
    input_df = spark.createDataFrame(input_data)
    
    result_df = transform_data(input_df)
    
    assert result_df.filter("name IS NULL").count() == 0
    assert result_df.count() == 2

def test_transform_standardizes_email(spark):
    input_data = [{"id": 1, "email": "ALICE@TEST.COM"}]
    input_df = spark.createDataFrame(input_data)
    
    result_df = transform_data(input_df)
    
    assert result_df.first()["email"] == "alice@test.com"

# Integration test with moto (AWS mocking)
import moto

@moto.mock_s3
def test_pipeline_writes_to_s3():
    s3 = boto3.client('s3')
    s3.create_bucket(Bucket='test-bucket')
    
    # Run pipeline
    run_pipeline(source='test_data.csv', destination='s3://test-bucket/output/')
    
    # Verify output
    objects = s3.list_objects_v2(Bucket='test-bucket', Prefix='output/')
    assert objects['KeyCount'] > 0

# Data quality test
def test_data_quality_after_pipeline():
    result = glue.start_data_quality_ruleset_evaluation_run(
        DataSource={'GlueTable': {'DatabaseName': 'test_db', 'TableName': 'output'}},
        Role='GlueRole',
        RulesetNames=['quality_rules']
    )
    
    # Wait and check results
    status = glue.get_data_quality_ruleset_evaluation_run(RunId=result['RunId'])
    assert status['Status'] == 'SUCCEEDED'
    assert all(r['Result'] == 'PASS' for r in status['Results'])

19. Design a data lineage tracking system

Scenario: Track data lineage from source to consumption.

Architecture:
┌─────────────────────────────────────────────────────────────┐
│                 Data Lineage System                          │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌──────────────────────────────────────────────────────┐  │
│  │                Lineage Sources                        │  │
│  │  - Glue Job metadata                                  │  │
│  │  - Step Functions execution                           │  │
│  │  - CloudTrail S3 events                               │  │
│  │  - Custom instrumentation                             │  │
│  └──────────────────────────────────────────────────────┘  │
│                          │                                  │
│  ┌──────────────────────────────────────────────────────┐  │
│  │             Lineage Collection                        │  │
│  │  EventBridge → Lambda → Neptune (Graph DB)           │  │
│  └──────────────────────────────────────────────────────┘  │
│                          │                                  │
│  ┌──────────────────────────────────────────────────────┐  │
│  │               Lineage Graph                           │  │
│  │                                                       │  │
│  │   [Source A] ──► [ETL Job 1] ──► [Table X]          │  │
│  │        │              │              │               │  │
│  │        │              ▼              ▼               │  │
│  │        └───► [ETL Job 2] ──► [Table Y] ──► [Report] │  │
│  │                                                       │  │
│  └──────────────────────────────────────────────────────┘  │
│                          │                                  │
│  ┌──────────────────────────────────────────────────────┐  │
│  │              Lineage Visualization                    │  │
│  │  QuickSight / Custom UI with Neptune queries         │  │
│  └──────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘

# Capture Glue job lineage
def capture_glue_lineage(job_run):
    lineage_event = {
        'job_name': job_run['JobName'],
        'run_id': job_run['JobRunId'],
        'inputs': extract_inputs(job_run),
        'outputs': extract_outputs(job_run),
        'timestamp': job_run['CompletedOn'].isoformat(),
        'execution_time': job_run['ExecutionTime']
    }
    
    # Store in Neptune
    gremlin.addV('ETLJob').property('name', lineage_event['job_name']).property('run_id', lineage_event['run_id'])
    
    for input_table in lineage_event['inputs']:
        gremlin.addE('reads_from').from_(job_vertex).to(input_table_vertex)
    
    for output_table in lineage_event['outputs']:
        gremlin.addE('writes_to').from_(job_vertex).to(output_table_vertex)

# Query lineage graph
def get_upstream_lineage(table_name, depth=5):
    query = f"""
    g.V().has('Table', 'name', '{table_name}')
      .repeat(__.in('writes_to').out('reads_from'))
      .times({depth})
      .path()
      .by('name')
    """
    return gremlin.submit(query)

def get_downstream_lineage(table_name, depth=5):
    query = f"""
    g.V().has('Table', 'name', '{table_name}')
      .repeat(__.out('reads_from').in('writes_to'))
      .times({depth})
      .path()
      .by('name')
    """
    return gremlin.submit(query)

# Impact analysis
def get_impacted_assets(source_change):
    downstream = get_downstream_lineage(source_change, depth=10)
    impacted_jobs = [node for path in downstream for node in path if node['type'] == 'ETLJob']
    impacted_tables = [node for path in downstream for node in path if node['type'] == 'Table']
    impacted_reports = [node for path in downstream for node in path if node['type'] == 'Report']
    return {'jobs': impacted_jobs, 'tables': impacted_tables, 'reports': impacted_reports}

20. Build a serverless data processing architecture

Scenario: Fully serverless data platform with minimal operational overhead.

Architecture:
┌─────────────────────────────────────────────────────────────┐
│             Serverless Data Platform                         │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  Ingestion:                                                  │
│  ┌──────────────────────────────────────────────────────┐  │
│  │  API Gateway → Lambda → Kinesis Data Streams         │  │
│  │  S3 Events → Lambda → SQS → Lambda                   │  │
│  │  EventBridge → Step Functions                        │  │
│  └──────────────────────────────────────────────────────┘  │
│                          │                                  │
│  Processing:                                                 │
│  ┌──────────────────────────────────────────────────────┐  │
│  │  Lambda (real-time)                                   │  │
│  │  Glue (Serverless Spark)                             │  │
│  │  Athena (ad-hoc queries)                             │  │
│  │  Step Functions (orchestration)                       │  │
│  └──────────────────────────────────────────────────────┘  │
│                          │                                  │
│  Storage:                                                    │
│  ┌──────────────────────────────────────────────────────┐  │
│  │  S3 (data lake)                                       │  │
│  │  DynamoDB (operational)                               │  │
│  │  Timestream (time series)                             │  │
│  │  Redshift Serverless (analytics)                      │  │
│  └──────────────────────────────────────────────────────┘  │
│                          │                                  │
│  Serving:                                                    │
│  ┌──────────────────────────────────────────────────────┐  │
│  │  API Gateway → Lambda → DynamoDB/Athena              │  │
│  │  AppSync (GraphQL)                                    │  │
│  │  QuickSight (BI)                                      │  │
│  └──────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘

# Serverless ETL with Step Functions + Athena
{
  "StartAt": "RunAthenaQuery",
  "States": {
    "RunAthenaQuery": {
      "Type": "Task",
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "INSERT INTO curated.orders SELECT * FROM raw.orders WHERE order_date = ''",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://athena-results/"
        }
      },
      "Next": "UpdateGlueCatalog"
    },
    "UpdateGlueCatalog": {
      "Type": "Task",
      "Resource": "arn:aws:states:::aws-sdk:glue:updatePartition",
      "Parameters": {
        "DatabaseName": "curated",
        "TableName": "orders",
        "PartitionValueList.$": "States.Array($.date)"
      },
      "Next": "NotifyCompletion"
    },
    "NotifyCompletion": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:...:etl-notifications",
        "Message.$": "States.Format('ETL completed for {}', $.date)"
      },
      "End": true
    }
  }
}

# Redshift Serverless for on-demand analytics
redshift_serverless.create_workgroup(
    workgroupName='analytics',
    namespaceName='analytics-ns',
    baseCapacity=32,  # Auto-scales to 0 when idle
    maxCapacity=128,
    configParameters=[
        {'parameterKey': 'auto_mv', 'parameterValue': 'true'},
        {'parameterKey': 'enable_case_sensitive_identifier', 'parameterValue': 'true'}
    ]
)

# Cost optimization
- Lambda: Pay per invocation + duration
- Glue: Pay per DPU-hour
- Athena: Pay per TB scanned
- Redshift Serverless: Pay per RPU-hour (scales to 0)
- S3: Pay per GB stored + requests


Popular Posts