Search Tutorials


Top AWS Kinesis Interview Questions (2026) | JavaInUse

Top 20 AWS Kinesis Interview Questions


  1. What is Amazon Kinesis?
  2. What are Kinesis Data Streams?
  3. What is a Kinesis shard?
  4. What is the Kinesis Client Library (KCL)?
  5. What is Kinesis Data Firehose?
  6. What is Kinesis Data Analytics?
  7. How do you produce data to Kinesis?
  8. How do you consume data from Kinesis?
  9. What is enhanced fan-out?
  10. How do you handle failures in Kinesis?
  11. What is data retention in Kinesis?
  12. How do you scale Kinesis streams?
  13. What are partition keys?
  14. What is Kinesis Video Streams?
  15. How do you transform data in Firehose?
  16. What are Kinesis Analytics windowing functions?
  17. How do you integrate Kinesis with Lambda?
  18. What are Kinesis security best practices?
  19. How do you monitor Kinesis?
  20. What are common Kinesis patterns?

1. What is Amazon Kinesis?

Amazon Kinesis is a platform for collecting, processing, and analyzing real-time streaming data at scale.

Kinesis Services:
├── Kinesis Data Streams: Real-time data streaming
├── Kinesis Data Firehose: Load streaming data to destinations
├── Kinesis Data Analytics: SQL/Flink for stream processing
└── Kinesis Video Streams: Video streaming

Use Cases:
├── Real-time analytics
├── Log and event data collection
├── IoT data ingestion
├── Clickstream analysis
├── Social media feeds
└── Gaming data processing

Architecture:
┌─────────────────────────────────────────────────────┐
│                  Producers                           │
│  (Applications, IoT, Logs, Events)                  │
└──────────────────────┬──────────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────────┐
│            Kinesis Data Streams                      │
│  ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐       │
│  │Shard 1 │ │Shard 2 │ │Shard 3 │ │Shard N │       │
│  └────────┘ └────────┘ └────────┘ └────────┘       │
└──────────────────────┬──────────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────────┐
│                  Consumers                           │
│  (Lambda, KCL Apps, Analytics, Firehose)            │
└─────────────────────────────────────────────────────┘

2. What are Kinesis Data Streams?

Kinesis Data Streams Features:
├── Real-time data streaming
├── Configurable retention (1-365 days)
├── Multiple consumers per stream
├── Replay capability
├── Ordering within shard
└── Encryption at rest

# Create Data Stream
import boto3
kinesis = boto3.client('kinesis')

kinesis.create_stream(
    StreamName='my-stream',
    ShardCount=4,
    StreamModeDetails={
        'StreamMode': 'PROVISIONED'  # or 'ON_DEMAND'
    }
)

# On-Demand mode
kinesis.create_stream(
    StreamName='my-stream-on-demand',
    StreamModeDetails={
        'StreamMode': 'ON_DEMAND'
    }
)

Data Stream Modes:
├── Provisioned: Specify shard count
│   ├── 1 MB/sec write per shard
│   ├── 2 MB/sec read per shard
│   └── 1,000 records/sec write per shard
│
└── On-Demand: Auto-scales
    ├── Up to 200 MB/sec write
    ├── Up to 400 MB/sec read
    └── Pay per GB ingested/retrieved

3. What is a Kinesis shard?

A shard is the base throughput unit of a Kinesis data stream.

Shard Characteristics:
├── 1 MB/sec input (1,000 records/sec)
├── 2 MB/sec output (5 reads/sec)
├── Data ordered within shard
├── 24-hour default retention
└── Unique sequence number per record

Shard Structure:
┌─────────────────────────────────────────────────────┐
│                     Stream                           │
├─────────────────────────────────────────────────────┤
│  Shard 1                                             │
│  Hash Range: 0 - 85070591730234615865843651857942052863
│  ┌────┬────┬────┬────┬────┬────┐                    │
│  │ R1 │ R2 │ R3 │ R4 │ R5 │... │  (Ordered)        │
│  └────┴────┴────┴────┴────┴────┘                    │
├─────────────────────────────────────────────────────┤
│  Shard 2                                             │
│  Hash Range: 85070591730234615865843651857942052864 - ...
│  ┌────┬────┬────┬────┬────┬────┐                    │
│  │ R1 │ R2 │ R3 │ R4 │ R5 │... │  (Ordered)        │
│  └────┴────┴────┴────┴────┴────┘                    │
└─────────────────────────────────────────────────────┘

# Describe stream (get shard info)
response = kinesis.describe_stream(StreamName='my-stream')
for shard in response['StreamDescription']['Shards']:
    print(f"Shard ID: {shard['ShardId']}")
    print(f"Hash Key Range: {shard['HashKeyRange']}")
    print(f"Sequence Number Range: {shard['SequenceNumberRange']}")

4. What is the Kinesis Client Library (KCL)?

KCL simplifies building consumer applications with automatic load balancing and checkpointing.

KCL Features:
├── Automatic shard assignment
├── Load balancing across workers
├── Checkpointing (DynamoDB)
├── Failure handling
├── Lease management
└── Enhanced fan-out support

KCL Architecture:
┌─────────────────────────────────────────────────────┐
│              KCL Application                         │
├─────────────────────────────────────────────────────┤
│  Worker 1          Worker 2          Worker 3       │
│  ┌─────────┐      ┌─────────┐      ┌─────────┐     │
│  │Shard 1  │      │Shard 2  │      │Shard 3  │     │
│  │Processor│      │Processor│      │Processor│     │
│  └─────────┘      └─────────┘      └─────────┘     │
│       │                │                │           │
│       └────────────────┼────────────────┘           │
│                        │                            │
│              ┌─────────▼─────────┐                  │
│              │   DynamoDB        │                  │
│              │   (Checkpoints)   │                  │
│              └───────────────────┘                  │
└─────────────────────────────────────────────────────┘

# KCL 2.x Python Implementation
from amazon_kinesis_utils import kcl

class RecordProcessor:
    def initialize(self, initialization_input):
        self.shard_id = initialization_input.shard_id
        
    def process_records(self, process_records_input):
        for record in process_records_input.records:
            data = record.data.decode('utf-8')
            # Process record
            print(f"Processing: {data}")
        
        # Checkpoint after processing
        process_records_input.checkpointer.checkpoint()
    
    def shutdown(self, shutdown_input):
        if shutdown_input.reason == 'TERMINATE':
            shutdown_input.checkpointer.checkpoint()

# Run with MultiLangDaemon
# java -cp kcl-2.x.jar software.amazon.kinesis.multilang.MultiLangDaemon --properties-file kcl.properties

5. What is Kinesis Data Firehose?

Kinesis Data Firehose is a fully managed service to load streaming data into data stores.

Firehose Destinations:
├── Amazon S3
├── Amazon Redshift (via S3)
├── Amazon OpenSearch
├── Splunk
├── HTTP Endpoints
└── Third-party services (Datadog, MongoDB, etc.)

Features:
├── Automatic scaling
├── Data transformation (Lambda)
├── Format conversion (Parquet, ORC)
├── Data compression (GZIP, Snappy)
├── Encryption
└── Backup to S3

# Create Firehose Delivery Stream
firehose = boto3.client('firehose')

firehose.create_delivery_stream(
    DeliveryStreamName='my-firehose',
    DeliveryStreamType='DirectPut',  # or 'KinesisStreamAsSource'
    S3DestinationConfiguration={
        'RoleARN': 'arn:aws:iam::123456789012:role/FirehoseRole',
        'BucketARN': 'arn:aws:s3:::my-bucket',
        'Prefix': 'data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/',
        'ErrorOutputPrefix': 'errors/',
        'BufferingHints': {
            'SizeInMBs': 128,
            'IntervalInSeconds': 300
        },
        'CompressionFormat': 'GZIP',
        'EncryptionConfiguration': {
            'KMSEncryptionConfig': {
                'AWSKMSKeyARN': 'arn:aws:kms:...'
            }
        }
    }
)

# Put record to Firehose
firehose.put_record(
    DeliveryStreamName='my-firehose',
    Record={'Data': json.dumps({'event': 'click', 'timestamp': time.time()})}
)




6. What is Kinesis Data Analytics?

Kinesis Data Analytics processes and analyzes streaming data in real-time using SQL or Apache Flink.

Analytics Options:
├── SQL-based: Simple SQL queries on streams
└── Apache Flink: Complex stream processing

# SQL-based Analytics
CREATE OR REPLACE STREAM "DESTINATION_STREAM" (
    event_time TIMESTAMP,
    event_count INTEGER,
    total_amount DOUBLE
);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_STREAM"
SELECT STREAM
    STEP("SOURCE_STREAM".ROWTIME BY INTERVAL '1' MINUTE) AS event_time,
    COUNT(*) AS event_count,
    SUM(amount) AS total_amount
FROM "SOURCE_STREAM"
GROUP BY STEP("SOURCE_STREAM".ROWTIME BY INTERVAL '1' MINUTE);

# Apache Flink Application (Python)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

# Create Kinesis source
t_env.execute_sql("""
    CREATE TABLE kinesis_source (
        event_id STRING,
        event_type STRING,
        amount DOUBLE,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kinesis',
        'stream' = 'my-stream',
        'aws.region' = 'us-east-1',
        'format' = 'json'
    )
""")

# Tumbling window aggregation
t_env.execute_sql("""
    SELECT 
        TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
        event_type,
        COUNT(*) as event_count,
        SUM(amount) as total_amount
    FROM kinesis_source
    GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE), event_type
""")

7. How do you produce data to Kinesis?

Producer Options:

1. AWS SDK (PutRecord/PutRecords)
# Single record
kinesis.put_record(
    StreamName='my-stream',
    Data=json.dumps({'event': 'click', 'user': 'user123'}),
    PartitionKey='user123'  # Determines shard
)

# Batch records (up to 500)
records = [
    {'Data': json.dumps({'event': f'event_{i}'}), 'PartitionKey': f'key_{i % 4}'}
    for i in range(500)
]
response = kinesis.put_records(StreamName='my-stream', Records=records)

# Check for failures
if response['FailedRecordCount'] > 0:
    for i, record in enumerate(response['Records']):
        if 'ErrorCode' in record:
            print(f"Failed record {i}: {record['ErrorCode']}")

2. Kinesis Producer Library (KPL)
# High throughput with aggregation
from amazon_kpl import KinesisProducer

producer = KinesisProducer(
    stream_name='my-stream',
    region='us-east-1',
    aggregation_enabled=True,
    aggregation_max_count=100,
    record_max_buffered_time=1000
)

for i in range(10000):
    producer.put_record(
        data=json.dumps({'event_id': i}),
        partition_key=str(i % 10)
    )

producer.flush_sync()

3. Kinesis Agent
# /etc/aws-kinesis/agent.json
{
  "flows": [{
    "filePattern": "/var/log/app/*.log",
    "kinesisStream": "my-stream",
    "partitionKeyOption": "RANDOM"
  }]
}

8. How do you consume data from Kinesis?

Consumer Options:

1. GetRecords API (polling)
# Get shard iterator
response = kinesis.get_shard_iterator(
    StreamName='my-stream',
    ShardId='shardId-000000000000',
    ShardIteratorType='TRIM_HORIZON'  # LATEST, AT_TIMESTAMP, AT_SEQUENCE_NUMBER
)
shard_iterator = response['ShardIterator']

# Read records
while True:
    response = kinesis.get_records(
        ShardIterator=shard_iterator,
        Limit=100
    )
    
    for record in response['Records']:
        data = json.loads(record['Data'])
        print(f"Sequence: {record['SequenceNumber']}, Data: {data}")
    
    shard_iterator = response['NextShardIterator']
    time.sleep(0.2)  # Avoid throttling

2. Enhanced Fan-Out (push-based)
# Register consumer
response = kinesis.register_stream_consumer(
    StreamARN='arn:aws:kinesis:...:stream/my-stream',
    ConsumerName='my-consumer'
)
consumer_arn = response['Consumer']['ConsumerARN']

# Subscribe to shard
for event in kinesis.subscribe_to_shard(
    ConsumerARN=consumer_arn,
    ShardId='shardId-000000000000',
    StartingPosition={'Type': 'LATEST'}
)['EventStream']:
    if 'SubscribeToShardEvent' in event:
        for record in event['SubscribeToShardEvent']['Records']:
            print(json.loads(record['Data']))

3. Lambda Event Source
# Configure Lambda trigger
lambda_client.create_event_source_mapping(
    EventSourceArn='arn:aws:kinesis:...:stream/my-stream',
    FunctionName='my-function',
    BatchSize=100,
    StartingPosition='LATEST',
    ParallelizationFactor=10
)

9. What is enhanced fan-out?

Enhanced fan-out provides dedicated throughput per consumer with push-based delivery.

Standard vs Enhanced Fan-Out:

Standard (Shared):
├── 2 MB/sec per shard (shared among consumers)
├── 5 GetRecords calls/sec per shard
├── Polling model (pull)
└── 200ms+ latency

Enhanced Fan-Out:
├── 2 MB/sec per consumer per shard (dedicated)
├── Push-based (SubscribeToShard)
├── ~70ms latency
└── Higher cost

┌─────────────────────────────────────────────────────┐
│                 Standard Fan-Out                     │
│                                                      │
│  Stream (Shard) ─────┬─────── 2 MB/sec total        │
│       │              │                               │
│       ├──────────────┼──────────────┐               │
│       │              │              │               │
│  Consumer A    Consumer B    Consumer C             │
│  (share 2 MB/sec throughput)                        │
└─────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐
│               Enhanced Fan-Out                       │
│                                                      │
│  Stream (Shard) ─────────────────                   │
│       │                                              │
│       ├─────────── 2 MB/sec ──── Consumer A         │
│       ├─────────── 2 MB/sec ──── Consumer B         │
│       └─────────── 2 MB/sec ──── Consumer C         │
│  (each gets dedicated 2 MB/sec)                     │
└─────────────────────────────────────────────────────┘

# Register enhanced fan-out consumer
kinesis.register_stream_consumer(
    StreamARN='arn:aws:kinesis:us-east-1:123456789012:stream/my-stream',
    ConsumerName='my-enhanced-consumer'
)

10. How do you handle failures in Kinesis?

Failure Handling Strategies:

1. Producer Retries
def put_record_with_retry(stream, data, partition_key, max_retries=3):
    for attempt in range(max_retries):
        try:
            response = kinesis.put_record(
                StreamName=stream,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except kinesis.exceptions.ProvisionedThroughputExceededException:
            time.sleep(2 ** attempt)  # Exponential backoff
    raise Exception("Max retries exceeded")

# Handle partial failures in PutRecords
def put_records_with_retry(stream, records):
    remaining = records
    while remaining:
        response = kinesis.put_records(StreamName=stream, Records=remaining)
        if response['FailedRecordCount'] == 0:
            break
        remaining = [
            records[i] for i, r in enumerate(response['Records'])
            if 'ErrorCode' in r
        ]
        time.sleep(1)

2. Consumer Checkpointing
# KCL automatic checkpointing
def process_records(self, process_records_input):
    try:
        for record in process_records_input.records:
            self.process_single_record(record)
        process_records_input.checkpointer.checkpoint()
    except Exception as e:
        # Don't checkpoint - will reprocess on restart
        log.error(f"Error processing: {e}")

3. Dead Letter Queue
# Lambda DLQ configuration
lambda_client.create_event_source_mapping(
    EventSourceArn='arn:aws:kinesis:...',
    FunctionName='my-function',
    DestinationConfig={
        'OnFailure': {
            'Destination': 'arn:aws:sqs:...:dlq'
        }
    },
    MaximumRetryAttempts=3,
    BisectBatchOnFunctionError=True
)

11. What is data retention in Kinesis?

Retention Settings:
├── Default: 24 hours
├── Maximum: 365 days (8760 hours)
├── Extended retention: Additional cost
└── Allows replay from any point

# Increase retention
kinesis.increase_stream_retention_period(
    StreamName='my-stream',
    RetentionPeriodHours=168  # 7 days
)

# Decrease retention
kinesis.decrease_stream_retention_period(
    StreamName='my-stream',
    RetentionPeriodHours=24
)

# Start reading from specific timestamp
kinesis.get_shard_iterator(
    StreamName='my-stream',
    ShardId='shardId-000000000000',
    ShardIteratorType='AT_TIMESTAMP',
    Timestamp=datetime(2024, 1, 15, 10, 0, 0)
)

# Start from sequence number
kinesis.get_shard_iterator(
    StreamName='my-stream',
    ShardId='shardId-000000000000',
    ShardIteratorType='AT_SEQUENCE_NUMBER',
    StartingSequenceNumber='49637287329048...'
)

Use Cases for Extended Retention:
├── Replay for debugging
├── Re-process with updated logic
├── Backfill new consumers
├── Compliance requirements
└── Disaster recovery

12. How do you scale Kinesis streams?

Scaling Options:

1. On-Demand Mode (auto-scaling)
kinesis.update_stream_mode(
    StreamARN='arn:aws:kinesis:...:stream/my-stream',
    StreamModeDetails={'StreamMode': 'ON_DEMAND'}
)

2. Provisioned Mode (manual scaling)
# Split shard (increase capacity)
kinesis.split_shard(
    StreamName='my-stream',
    ShardToSplit='shardId-000000000000',
    NewStartingHashKey='170141183460469231731687303715884105728'
)

# Merge shards (decrease capacity)
kinesis.merge_shards(
    StreamName='my-stream',
    ShardToMerge='shardId-000000000001',
    AdjacentShardToMerge='shardId-000000000002'
)

3. Update Shard Count
kinesis.update_shard_count(
    StreamName='my-stream',
    TargetShardCount=8,
    ScalingType='UNIFORM_SCALING'
)

Scaling Considerations:
├── Scaling takes time (resharding)
├── 24-hour cooldown between scaling operations
├── Parent shards remain until data expires
├── Consumers must handle shard changes
└── Consider On-Demand for variable workloads

Capacity Planning:
├── Write: 1 MB/sec or 1000 records/sec per shard
├── Read: 2 MB/sec per shard (shared)
├── Read: 2 MB/sec per consumer (enhanced fan-out)
└── Shards needed = MAX(write_mb/1, write_records/1000, read_mb/2)

13. What are partition keys?

Partition keys determine which shard receives a record, enabling ordered processing.

Partition Key Concepts:
├── MD5 hash maps key to shard
├── Same key → same shard → ordering preserved
├── Random keys → even distribution
└── Hot partitions from uneven keys

# Good: Distributed partition keys
for order in orders:
    kinesis.put_record(
        StreamName='orders-stream',
        Data=json.dumps(order),
        PartitionKey=order['order_id']  # Random UUIDs
    )

# Good: Per-user ordering
for event in user_events:
    kinesis.put_record(
        StreamName='events-stream',
        Data=json.dumps(event),
        PartitionKey=event['user_id']  # Same user → same shard
    )

# Bad: Hot partition
for log in logs:
    kinesis.put_record(
        StreamName='logs-stream',
        Data=json.dumps(log),
        PartitionKey='logs'  # All records to one shard!
    )

# Explicit Hash Key (bypass MD5)
kinesis.put_record(
    StreamName='my-stream',
    Data=data,
    PartitionKey='ignored',
    ExplicitHashKey='0'  # Directly specify shard hash
)

Best Practices:
├── Use high-cardinality keys for even distribution
├── Use entity IDs for ordering requirements
├── Monitor for hot shards (CloudWatch)
├── Consider random suffix for hot keys
└── Use explicit hash keys for precise control

14. What is Kinesis Video Streams?

Kinesis Video Streams captures, processes, and stores video streams for analytics and ML.

Video Streams Features:
├── Secure video ingestion
├── Durable storage
├── Playback capabilities
├── Integration with ML services
├── WebRTC for real-time streaming
└── Edge agent for IoT devices

Use Cases:
├── Security camera footage
├── Smart home devices
├── Industrial monitoring
├── Video analytics
└── Computer vision ML

# Create Video Stream
kvs = boto3.client('kinesisvideo')

kvs.create_stream(
    StreamName='my-video-stream',
    DataRetentionInHours=24,
    MediaType='video/h264',
    Tags={'Application': 'SecurityCameras'}
)

# Get data endpoint
endpoint = kvs.get_data_endpoint(
    StreamName='my-video-stream',
    APIName='GET_MEDIA'
)

# Integration with Rekognition Video
rekognition = boto3.client('rekognition')
rekognition.create_stream_processor(
    Input={
        'KinesisVideoStream': {
            'Arn': 'arn:aws:kinesisvideo:...:stream/my-video-stream/...'
        }
    },
    Output={
        'KinesisDataStream': {
            'Arn': 'arn:aws:kinesis:...:stream/analysis-results'
        }
    },
    Name='face-detection-processor',
    Settings={
        'FaceSearch': {
            'CollectionId': 'my-face-collection',
            'FaceMatchThreshold': 90
        }
    },
    RoleArn='arn:aws:iam::...:role/RekognitionRole'
)

15. How do you transform data in Firehose?

Transformation Options:

1. Lambda Transformation
firehose.create_delivery_stream(
    DeliveryStreamName='transformed-firehose',
    ExtendedS3DestinationConfiguration={
        'RoleARN': role_arn,
        'BucketARN': bucket_arn,
        'ProcessingConfiguration': {
            'Enabled': True,
            'Processors': [{
                'Type': 'Lambda',
                'Parameters': [{
                    'ParameterName': 'LambdaArn',
                    'ParameterValue': 'arn:aws:lambda:...:function:transform'
                }, {
                    'ParameterName': 'BufferSizeInMBs',
                    'ParameterValue': '3'
                }, {
                    'ParameterName': 'BufferIntervalInSeconds',
                    'ParameterValue': '60'
                }]
            }]
        }
    }
)

# Lambda transformation function
def handler(event, context):
    output = []
    for record in event['records']:
        # Decode data
        data = base64.b64decode(record['data']).decode('utf-8')
        payload = json.loads(data)
        
        # Transform
        payload['processed_at'] = datetime.now().isoformat()
        payload['source'] = 'firehose'
        
        # Encode result
        output.append({
            'recordId': record['recordId'],
            'result': 'Ok',  # Ok, Dropped, ProcessingFailed
            'data': base64.b64encode(
                json.dumps(payload).encode('utf-8')
            ).decode('utf-8')
        })
    
    return {'records': output}

2. Format Conversion (Parquet/ORC)
'DataFormatConversionConfiguration': {
    'Enabled': True,
    'SchemaConfiguration': {
        'RoleARN': role_arn,
        'DatabaseName': 'my_database',
        'TableName': 'my_table',
        'Region': 'us-east-1'
    },
    'InputFormatConfiguration': {
        'Deserializer': {'OpenXJsonSerDe': {}}
    },
    'OutputFormatConfiguration': {
        'Serializer': {'ParquetSerDe': {'Compression': 'SNAPPY'}}
    }
}

16. What are Kinesis Analytics windowing functions?

Window Types:

1. Tumbling Window (fixed, non-overlapping)
SELECT STREAM
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
    COUNT(*) as event_count,
    SUM(amount) as total_amount
FROM SOURCE_STREAM
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE);

Time: |----1min----|----1min----|----1min----|
      [  Window 1  ][  Window 2  ][  Window 3  ]

2. Sliding Window (overlapping)
SELECT STREAM
    COUNT(*) OVER (
        PARTITION BY user_id
        ORDER BY event_time
        RANGE INTERVAL '5' MINUTE PRECEDING
    ) as events_last_5min
FROM SOURCE_STREAM;

3. Stagger Window (groups by key + time)
SELECT STREAM
    user_id,
    STEP(event_time BY INTERVAL '10' SECOND) as window_start,
    COUNT(*) as event_count
FROM SOURCE_STREAM
GROUP BY user_id, STEP(event_time BY INTERVAL '10' SECOND);

# Flink Windowing
t_env.execute_sql("""
    SELECT 
        user_id,
        TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
        COUNT(*) as event_count
    FROM events
    GROUP BY 
        user_id,
        TUMBLE(event_time, INTERVAL '1' MINUTE)
""")

# Session Window (Flink)
t_env.execute_sql("""
    SELECT 
        user_id,
        SESSION_START(event_time, INTERVAL '30' MINUTE) as session_start,
        COUNT(*) as events_in_session
    FROM events
    GROUP BY 
        user_id,
        SESSION(event_time, INTERVAL '30' MINUTE)
""")




17. How do you integrate Kinesis with Lambda?

Lambda Event Source Mapping:
lambda_client.create_event_source_mapping(
    EventSourceArn='arn:aws:kinesis:us-east-1:123456789012:stream/my-stream',
    FunctionName='my-processor',
    BatchSize=100,
    MaximumBatchingWindowInSeconds=5,
    StartingPosition='LATEST',
    ParallelizationFactor=10,  # Up to 10 concurrent per shard
    MaximumRecordAgeInSeconds=60,
    BisectBatchOnFunctionError=True,
    MaximumRetryAttempts=3,
    DestinationConfig={
        'OnFailure': {'Destination': 'arn:aws:sqs:...:dlq'}
    },
    TumblingWindowInSeconds=60,  # For aggregation
    FunctionResponseTypes=['ReportBatchItemFailures']
)

# Lambda Handler
def handler(event, context):
    failed_records = []
    
    for record in event['Records']:
        try:
            # Decode record
            data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            payload = json.loads(data)
            
            # Process
            process_record(payload)
            
        except Exception as e:
            print(f"Error processing record: {e}")
            failed_records.append({
                'itemIdentifier': record['kinesis']['sequenceNumber']
            })
    
    # Return failed records for retry
    return {'batchItemFailures': failed_records}

# Tumbling Window Handler (aggregation)
def handler(event, context):
    # event['window'] contains start and end times
    # event['state'] contains previous window state
    
    state = event.get('state', {'count': 0, 'sum': 0})
    
    for record in event['Records']:
        data = json.loads(base64.b64decode(record['kinesis']['data']))
        state['count'] += 1
        state['sum'] += data.get('amount', 0)
    
    if event.get('isFinalInvokeForWindow'):
        # Final invocation - emit result
        save_aggregation(state)
        return {'state': {}}
    
    return {'state': state}

18. What are Kinesis security best practices?

Security Best Practices:

1. Encryption
# Server-side encryption
kinesis.start_stream_encryption(
    StreamName='my-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'  # or custom KMS key
)

# Client-side encryption (application level)
from cryptography.fernet import Fernet
key = Fernet.generate_key()
cipher = Fernet(key)
encrypted_data = cipher.encrypt(data.encode())

2. IAM Policies
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords"
            ],
            "Resource": "arn:aws:kinesis:*:*:stream/my-stream"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:*:*:stream/my-stream"
        }
    ]
}

3. VPC Endpoints
# Private access without internet
ec2.create_vpc_endpoint(
    VpcId='vpc-xxx',
    ServiceName='com.amazonaws.us-east-1.kinesis-streams',
    VpcEndpointType='Interface',
    SubnetIds=['subnet-xxx'],
    SecurityGroupIds=['sg-xxx']
)

4. Resource Policies
# Cross-account access control
kinesis.put_resource_policy(
    ResourceARN='arn:aws:kinesis:...:stream/my-stream',
    Policy=json.dumps({
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {"AWS": "arn:aws:iam::OTHER_ACCOUNT:root"},
            "Action": ["kinesis:GetRecords", "kinesis:GetShardIterator"],
            "Resource": "*"
        }]
    })
)

5. CloudTrail Auditing
# Enable data event logging

19. How do you monitor Kinesis?

CloudWatch Metrics:

Stream-level:
├── IncomingBytes / IncomingRecords
├── GetRecords.Bytes / Records / IteratorAgeMilliseconds
├── WriteProvisionedThroughputExceeded
├── ReadProvisionedThroughputExceeded
└── PutRecord.Success / PutRecords.Success

Enhanced Shard-level:
├── IncomingBytes per shard
├── OutgoingBytes per shard
├── IteratorAgeMilliseconds per shard
└── ReadProvisionedThroughputExceeded per shard

# Enable enhanced monitoring
kinesis.enable_enhanced_monitoring(
    StreamName='my-stream',
    ShardLevelMetrics=['IncomingBytes', 'OutgoingBytes', 'IteratorAgeMilliseconds']
)

# CloudWatch Alarm
cloudwatch.put_metric_alarm(
    AlarmName='KinesisIteratorAge',
    MetricName='GetRecords.IteratorAgeMilliseconds',
    Namespace='AWS/Kinesis',
    Statistic='Maximum',
    Period=60,
    EvaluationPeriods=5,
    Threshold=60000,  # 1 minute behind
    ComparisonOperator='GreaterThanThreshold',
    Dimensions=[
        {'Name': 'StreamName', 'Value': 'my-stream'}
    ],
    AlarmActions=['arn:aws:sns:...:alerts']
)

Key Metrics to Monitor:
├── IteratorAgeMilliseconds: Consumer lag
├── WriteProvisionedThroughputExceeded: Producer throttling
├── ReadProvisionedThroughputExceeded: Consumer throttling
└── GetRecords.Success: Read success rate

20. What are common Kinesis patterns?

1. Fan-Out Pattern
Source → Kinesis Stream → Multiple consumers
                       ├→ Lambda (real-time alerts)
                       ├→ Firehose (S3 archive)
                       ├→ Analytics (aggregations)
                       └→ KCL App (custom processing)

2. Event Sourcing
# All state changes as immutable events
kinesis.put_record(
    StreamName='events',
    Data=json.dumps({
        'eventType': 'OrderCreated',
        'aggregateId': 'order-123',
        'data': {'items': [...], 'total': 100},
        'timestamp': time.time()
    }),
    PartitionKey='order-123'
)

3. Lambda-Kinesis-Lambda
# Chain processing
Lambda 1 → Kinesis Stream A → Lambda 2 → Kinesis Stream B → Lambda 3

4. Real-time Dashboard
Producers → Kinesis → Lambda → DynamoDB ← API Gateway ← Dashboard

5. Log Aggregation
App Servers → Kinesis Agent → Firehose → S3 → Athena

6. Clickstream Analytics
Web/Mobile → API Gateway → Kinesis → Analytics (SQL) → Dashboard
                                    → Firehose → S3 (Parquet)

7. IoT Data Pipeline
IoT Devices → IoT Core → Kinesis → Lambda → DynamoDB
                                 → Firehose → S3
                                 → Analytics → CloudWatch

8. ML Feature Pipeline
Raw Events → Kinesis → Lambda (feature extraction) → SageMaker Feature Store


Popular Posts