DynamoDB Streams and Lambda: Building Real-Time Data Pipelines

·

6 min read

Your application just hit 100,000 users. Congratulations! Now product wants real-time analytics, cross-region replication, and event-driven notifications. Oh, and they need it yesterday.

Welcome to the world of change data capture (CDC). Traditional databases require polling, custom triggers, or expensive third-party tools. DynamoDB Streams? Built-in, serverless, and pairs perfectly with Lambda.

What Are DynamoDB Streams?

DynamoDB Streams capture every modification to your table-inserts, updates, and deletes- in near real-time. Think of it as a time-ordered log of changes, automatically maintained by AWS.

Key features:

  • Ordered within partition key

  • Guaranteed delivery (no duplicates for same shard)

  • 24-hour retention (replay capability)

  • Near real-time (typically <1 second latency)

  • No impact on table RCU/WCU

Architecture Overview

Real-World Example: User Activity Aggregation

Let's build a system that tracks user activity in real-time:

Step 1: Enable DynamoDB Streams

// lib/user-activity-stack.ts
import * as cdk from 'aws-cdk-lib';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources';

export class UserActivityStack extends cdk.Stack {
  constructor(scope: cdk.App, id: string) {
    super(scope, id);

    // Activities table with stream enabled
    const activitiesTable = new dynamodb.Table(this, 'UserActivities', {
      tableName: 'user-activities',
      partitionKey: { name: 'userId', type: dynamodb.AttributeType.STRING },
      sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
      stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,  // Include before/after
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
      pointInTimeRecovery: true,
    });

    // Aggregation table
    const aggregatesTable = new dynamodb.Table(this, 'UserAggregates', {
      tableName: 'user-aggregates',
      partitionKey: { name: 'userId', type: dynamodb.AttributeType.STRING },
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
    });

    // Stream processor Lambda
    const streamProcessor = new lambda.Function(this, 'StreamProcessor', {
      runtime: lambda.Runtime.PYTHON_3_12,
      handler: 'stream_processor.handler',
      code: lambda.Code.fromAsset('lambda'),
      environment: {
        AGGREGATES_TABLE: aggregatesTable.tableName,
      },
      timeout: cdk.Duration.seconds(30),
    });

    // Grant permissions
    aggregatesTable.grantReadWriteData(streamProcessor);

    // Add stream as event source
    streamProcessor.addEventSource(
      new lambdaEventSources.DynamoEventSource(activitiesTable, {
        startingPosition: lambda.StartingPosition.LATEST,
        batchSize: 100,  // Process up to 100 records at once
        maxBatchingWindow: cdk.Duration.seconds(5),  // Wait up to 5s to fill batch
        bisectBatchOnError: true,  // Isolate failing records
        retryAttempts: 3,
        parallelizationFactor: 10,  // Process up to 10 batches per shard concurrently
        reportBatchItemFailures: true,  // Enable partial batch responses
      })
    );
  }
}

Using AWS SAM:

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Resources:
  UserActivitiesTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: user-activities
      AttributeDefinitions:
        - AttributeName: userId
          AttributeType: S
        - AttributeName: timestamp
          AttributeType: N
      KeySchema:
        - AttributeName: userId
          KeyType: HASH
        - AttributeName: timestamp
          KeyType: RANGE
      BillingMode: PAY_PER_REQUEST
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES

  StreamProcessorFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: lambda/
      Handler: stream_processor.handler
      Runtime: python3.12
      Timeout: 30
      Environment:
        Variables:
          AGGREGATES_TABLE: !Ref UserAggregatesTable
      Policies:
        - DynamoDBCrudPolicy:
            TableName: !Ref UserAggregatesTable
      Events:
        DynamoDBStream:
          Type: DynamoDB
          Properties:
            Stream: !GetAtt UserActivitiesTable.StreamArn
            StartingPosition: LATEST
            BatchSize: 100
            MaximumBatchingWindowInSeconds: 5
            BisectBatchOnFunctionError: true
            MaximumRetryAttempts: 3
            ParallelizationFactor: 10
            FunctionResponseTypes:
              - ReportBatchItemFailures

Step 2: Process Stream Events

# lambda/stream_processor.py
import json
import boto3
from decimal import Decimal
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()
tracer = Tracer()
dynamodb = boto3.resource('dynamodb')
aggregates_table = dynamodb.Table(os.environ['AGGREGATES_TABLE'])

@event_source(data_class=DynamoDBStreamEvent)
@tracer.capture_lambda_handler
def handler(event: DynamoDBStreamEvent, context: LambdaContext):
    """
    Process DynamoDB stream events and update aggregates
    """
    batch_item_failures = []

    for record in event.records:
        try:
            process_record(record)
        except Exception as e:
            logger.exception(f"Failed to process record {record.dynamodb.sequence_number}")
            # Report failure for retry
            batch_item_failures.append({
                "itemIdentifier": record.dynamodb.sequence_number
            })

    # Return failed items for partial batch response
    return {"batchItemFailures": batch_item_failures}

def process_record(record):
    """Process individual DynamoDB stream record"""
    event_name = record.event_name  # INSERT, MODIFY, or REMOVE

    logger.info(f"Processing {event_name} event", extra={
        "event_id": record.event_id,
        "sequence_number": record.dynamodb.sequence_number
    })

    if event_name == "INSERT":
        handle_insert(record.dynamodb.new_image)
    elif event_name == "MODIFY":
        handle_modify(record.dynamodb.new_image, record.dynamodb.old_image)
    elif event_name == "REMOVE":
        handle_remove(record.dynamodb.old_image)

def handle_insert(new_image):
    """Handle new activity insertion"""
    user_id = new_image['userId']
    activity_type = new_image['activityType']

    # Update aggregate counters
    aggregates_table.update_item(
        Key={'userId': user_id},
        UpdateExpression="""
            SET totalActivities = if_not_exists(totalActivities, :zero) + :one,
                lastActivityTime = :timestamp,
                activityCounts.#activityType = 
                    if_not_exists(activityCounts.#activityType, :zero) + :one
        """,
        ExpressionAttributeNames={
            '#activityType': activity_type
        },
        ExpressionAttributeValues={
            ':zero': 0,
            ':one': 1,
            ':timestamp': new_image['timestamp']
        }
    )

    logger.info("Aggregate updated", extra={
        "user_id": user_id,
        "activity_type": activity_type
    })

def handle_modify(new_image, old_image):
    """Handle activity modification"""
    # Compare old vs new, update aggregates if needed
    if new_image['status'] != old_image['status']:
        user_id = new_image['userId']

        aggregates_table.update_item(
            Key={'userId': user_id},
            UpdateExpression="SET lastModified = :timestamp",
            ExpressionAttributeValues={
                ':timestamp': new_image['timestamp']
            }
        )

def handle_remove(old_image):
    """Handle activity deletion"""
    user_id = old_image['userId']
    activity_type = old_image['activityType']

    # Decrement counters
    aggregates_table.update_item(
        Key={'userId': user_id},
        UpdateExpression="""
            SET totalActivities = totalActivities - :one,
                activityCounts.#activityType = activityCounts.#activityType - :one
        """,
        ExpressionAttributeNames={
            '#activityType': activity_type
        },
        ExpressionAttributeValues={
            ':one': 1
        }
    )

Advanced Pattern: Cross-Region Replication

Build active-active multi-region architecture:

// Replicate to secondary region
const replicationFunction = new lambda.Function(this, 'ReplicationFunction', {
  runtime: lambda.Runtime.PYTHON_3_12,
  handler: 'replicator.handler',
  code: lambda.Code.fromAsset('lambda'),
  environment: {
    TARGET_TABLE_NAME: 'user-activities',
    TARGET_REGION: 'eu-west-1',
  },
  timeout: cdk.Duration.seconds(30),
});

replicationFunction.addEventSource(
  new lambdaEventSources.DynamoEventSource(activitiesTable, {
    startingPosition: lambda.StartingPosition.LATEST,
    batchSize: 100,
    filters: [
      // Only replicate specific activity types
      lambda.FilterCriteria.filter({
        eventName: lambda.FilterRule.isEqual('INSERT'),
        dynamodb: {
          NewImage: {
            replicationEnabled: {
              BOOL: lambda.FilterRule.isEqual(true)
            }
          }
        }
      })
    ]
  })
);

Replication handler:

# lambda/replicator.py
import boto3
import os
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent, event_source

logger = Logger()

# DynamoDB client for target region
target_region = os.environ['TARGET_REGION']
target_table_name = os.environ['TARGET_TABLE_NAME']
dynamodb = boto3.resource('dynamodb', region_name=target_region)
target_table = dynamodb.Table(target_table_name)

@event_source(data_class=DynamoDBStreamEvent)
def handler(event: DynamoDBStreamEvent, context):
    """
    Replicate DynamoDB items to another region
    """
    batch_failures = []

    with target_table.batch_writer() as batch:
        for record in event.records:
            try:
                if record.event_name in ["INSERT", "MODIFY"]:
                    item = record.dynamodb.new_image

                    # Add replication metadata
                    item['replicatedFrom'] = os.environ['AWS_REGION']
                    item['replicatedAt'] = int(time.time())

                    # Write to target table
                    batch.put_item(Item=item)

                    logger.info("Item replicated", extra={
                        "source_region": os.environ['AWS_REGION'],
                        "target_region": target_region,
                        "item_id": item.get('userId')
                    })

            except Exception as e:
                logger.exception("Replication failed")
                batch_failures.append({
                    "itemIdentifier": record.dynamodb.sequence_number
                })

    return {"batchItemFailures": batch_failures}

Analytics Pipeline with Kinesis Firehose

Stream changes to S3 for analytics:

import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as kinesisfirehose from 'aws-cdk-lib/aws-kinesisfirehose';
import * as s3 from 'aws-cdk-lib/aws-s3';

// S3 bucket for analytics
const analyticsBucket = new s3.Bucket(this, 'AnalyticsBucket', {
  bucketName: 'user-activity-analytics',
  lifecycleRules: [{
    transitions: [{
      storageClass: s3.StorageClass.GLACIER,
      transitionAfter: cdk.Duration.days(90),
    }],
  }],
});

// Kinesis stream
const analyticsStream = new kinesis.Stream(this, 'AnalyticsStream', {
  streamName: 'user-activity-analytics',
  shardCount: 2,
});

// Firehose delivery stream
const deliveryStream = new kinesisfirehose.CfnDeliveryStream(this, 'DeliveryStream', {
  deliveryStreamType: 'KinesisStreamAsSource',
  kinesisStreamSourceConfiguration: {
    kinesisStreamArn: analyticsStream.streamArn,
    roleArn: firehoseRole.roleArn,
  },
  s3DestinationConfiguration: {
    bucketArn: analyticsBucket.bucketArn,
    prefix: 'activities/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/',
    bufferingHints: {
      sizeInMBs: 5,
      intervalInSeconds: 300,
    },
    compressionFormat: 'GZIP',
  },
});

// Lambda to push stream events to Kinesis
const analyticsForwarder = new lambda.Function(this, 'AnalyticsForwarder', {
  runtime: lambda.Runtime.PYTHON_3_12,
  handler: 'analytics_forwarder.handler',
  code: lambda.Code.fromAsset('lambda'),
  environment: {
    KINESIS_STREAM_NAME: analyticsStream.streamName,
  },
});

analyticsStream.grantWrite(analyticsForwarder);

analyticsForwarder.addEventSource(
  new lambdaEventSources.DynamoEventSource(activitiesTable, {
    startingPosition: lambda.StartingPosition.LATEST,
    batchSize: 500,  // Higher batch for analytics
  })
);

Handling Idempotency

DynamoDB Streams guarantee no duplicates within a shard, but Lambda retries can cause duplicate processing:

from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    idempotent
)

# Idempotency table
persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency-table")

@idempotent(persistence_store=persistence_layer)
def process_record(record):
    """
    This function is idempotent - safe to retry
    Same record will only be processed once
    """
    # Process record...
    update_aggregates(record)
    send_notification(record)

Monitoring and Debugging

Key CloudWatch Metrics

from aws_lambda_powertools.metrics import Metrics, MetricUnit

metrics = Metrics()

@metrics.log_metrics
def handler(event, context):
    records_processed = len(event['Records'])

    metrics.add_metric(
        name="StreamRecordsProcessed",
        unit=MetricUnit.Count,
        value=records_processed
    )

    # Track processing lag
    oldest_record_age = calculate_oldest_record_age(event['Records'])
    metrics.add_metric(
        name="StreamProcessingLag",
        unit=MetricUnit.Seconds,
        value=oldest_record_age
    )

Debug with Local DynamoDB Streams

# Run DynamoDB Local with streams
docker run -p 8000:8000 amazon/dynamodb-local -jar DynamoDBLocal.jar -inMemory -sharedDb

# Create table with stream
aws dynamodb create-table \
  --table-name user-activities \
  --attribute-definitions AttributeName=userId,AttributeType=S AttributeName=timestamp,AttributeType=N \
  --key-schema AttributeName=userId,KeyType=HASH AttributeName=timestamp,KeyType=RANGE \
  --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES \
  --endpoint-url http://localhost:8000

Best Practices

  1. Enable partial batch responses - Prevent full batch retry on single failure

  2. Use bisect on error - Isolate problematic records

  3. Implement idempotency - Handle duplicate processing gracefully

  4. Monitor iterator age - Alert on processing lag

  5. Set appropriate batch size - Balance throughput vs latency

  6. Use parallelization factor - Process multiple batches concurrently

  7. Filter events early - Reduce unnecessary Lambda invocations

  8. Handle schema evolution - Support old and new record formats

Conclusion

DynamoDB Streams + Lambda unlocks powerful real-time data pipelines: aggregations, replication, analytics, and event-driven workflows - all without managing infrastructure.

Start with a simple aggregation, add monitoring, then scale to cross-region replication and analytics. Your data is always moving; make it work for you.


Building with DynamoDB Streams? Share your use cases in the comments!