Serverless Observability with AWS Lambda Powertools and CloudWatch

·

7 min read

Your Lambda function failed at 3 AM. Production is down. You have 347 CloudWatch log streams to search through. The error message says "An error occurred." That's it. That's the entire error message.

Sound familiar? Welcome to distributed systems debugging-where traditional logging falls apart and you need observability, not just logs.

The Three Pillars of Observability

  1. Logs: What happened

  2. Metrics: How much/how many

  3. Traces: Where and why

CloudWatch gives you the infrastructure. Powertools gives you the patterns.

AWS Lambda Powertools: The Swiss Army Knife

Powertools is an open-source library (available in Python, TypeScript, Java, .NET) that implements serverless observability best practices out of the box.

Key features:

  • Structured JSON logging with correlation IDs

  • Custom metrics via CloudWatch EMF (Embedded Metric Format)

  • X-Ray tracing with subsegments

  • Event validation and parsing

  • Idempotency support

  • Parameter and secrets handling

Processing 300 billion Lambda invocations per week (AWS data), Powertools is battle-tested at scale.

Installation & Setup

Python

# Install Powertools
pip install "aws-lambda-powertools[all]"

# Or use Lambda Layer
# Layer ARN: arn:aws:lambda:REGION:017000801446:layer:AWSLambdaPowertoolsPythonV3-python312-arm64:1

CDK setup:

import * as lambda from 'aws-cdk-lib/aws-lambda';

const powertoolsLayer = lambda.LayerVersion.fromLayerVersionArn(
  this,
  'PowertoolsLayer',
  `arn:aws:lambda:${this.region}:017000801446:layer:AWSLambdaPowertoolsPythonV3-python312-arm64:1`
);

const func = new lambda.Function(this, 'Function', {
  runtime: lambda.Runtime.PYTHON_3_12,
  handler: 'index.handler',
  code: lambda.Code.fromAsset('lambda'),
  layers: [powertoolsLayer],
  environment: {
    POWERTOOLS_SERVICE_NAME: 'order-service',
    POWERTOOLS_METRICS_NAMESPACE: 'MyApp',
    LOG_LEVEL: 'INFO',
  },
});

Pillar 1: Structured Logging

Before (Traditional Logging)

import logging
import json

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    logger.info("Processing order")
    order_id = event['orderId']
    logger.info(f"Order ID: {order_id}")

    try:
        process_order(order_id)
        logger.info("Order processed successfully")
    except Exception as e:
        logger.error(f"Error: {str(e)}")
        raise

# Output (hard to query):
# [INFO] Processing order
# [INFO] Order ID: ORD-12345
# [INFO] Order processed successfully

After (Powertools Structured Logging)

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger(service="order-service")

@logger.inject_lambda_context(correlation_id_path="headers.x-correlation-id")
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    logger.info("Processing order", extra={
        "order_id": event['orderId'],
        "customer_id": event['customerId'],
        "amount": event['amount']
    })

    try:
        result = process_order(event)
        logger.info("Order processed", extra={"result": result})
        return result
    except ValidationError as e:
        logger.error("Validation failed", extra={"error": str(e)})
        raise
    except Exception as e:
        logger.exception("Processing failed")
        raise

# Output (structured JSON):
# {
#   "level": "INFO",
#   "location": "lambda_handler:12",
#   "message": "Processing order",
#   "timestamp": "2025-11-14T10:30:00.123Z",
#   "service": "order-service",
#   "cold_start": true,
#   "function_name": "order-processor",
#   "function_arn": "arn:aws:lambda:us-east-1:123:function:order-processor",
#   "function_request_id": "abc-123",
#   "correlation_id": "xyz-789",
#   "order_id": "ORD-12345",
#   "customer_id": "CUST-456",
#   "amount": 99.99
# }

Advanced Logging Patterns

Append persistent keys:

logger = Logger(service="payment-service")

# Add keys that appear in ALL subsequent logs
logger.append_keys(environment="production", region="us-east-1")

@logger.inject_lambda_context
def lambda_handler(event, context):
    # These will automatically include environment and region
    logger.info("Processing payment")

Log sampling for high-volume functions:

logger = Logger(service="high-traffic-service", sample_rate=0.1)  # Log 10% of requests

@logger.inject_lambda_context
def lambda_handler(event, context):
    logger.debug("Detailed debug info")  # Only logged 10% of the time
    logger.info("Always logged info")    # Always logged

Child loggers for modules:

# main.py
from aws_lambda_powertools import Logger

logger = Logger(service="order-service")

# payment_processor.py
from aws_lambda_powertools import Logger

# Inherits parent configuration
logger = Logger(service="order-service", child=True)

def process_payment(order):
    logger.info("Processing payment", extra={"order_id": order['id']})

Pillar 2: Custom Metrics with EMF

CloudWatch EMF (Embedded Metric Format) lets you create custom metrics without separate API calls - embedded in logs, extracted asynchronously.

Business Metrics

from aws_lambda_powertools import Metrics
from aws_lambda_powertools.metrics import MetricUnit

metrics = Metrics(namespace="ECommerce", service="order-service")

@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event, context):
    # Track business metrics
    metrics.add_metric(name="OrderPlaced", unit=MetricUnit.Count, value=1)
    metrics.add_metric(name="OrderValue", unit=MetricUnit.Dollars, value=event['amount'])

    # Add dimensions for filtering
    metrics.add_dimension(name="Environment", value="production")
    metrics.add_dimension(name="PaymentMethod", value=event['paymentMethod'])

    # Metrics are automatically flushed at function end
    return process_order(event)

CloudWatch automatically extracts metrics from logs:

{
  "_aws": {
    "Timestamp": 1699977600000,
    "CloudWatchMetrics": [{
      "Namespace": "ECommerce",
      "Dimensions": [["Environment", "PaymentMethod", "service"]],
      "Metrics": [
        {"Name": "OrderPlaced", "Unit": "Count"},
        {"Name": "OrderValue", "Unit": "None"}
      ]
    }]
  },
  "Environment": "production",
  "PaymentMethod": "credit_card",
  "service": "order-service",
  "OrderPlaced": 1,
  "OrderValue": 99.99
}

High-Resolution Metrics

@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event, context):
    start_time = time.time()

    result = process_order(event)

    # Track processing latency
    duration_ms = (time.time() - start_time) * 1000
    metrics.add_metric(
        name="ProcessingDuration",
        unit=MetricUnit.Milliseconds,
        value=duration_ms,
        resolution=1  # High-resolution (1-second granularity)
    )

    return result

Creating CloudWatch Alarms from Metrics

import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch';
import * as cw_actions from 'aws-cdk-lib/aws-cloudwatch-actions';
import * as sns from 'aws-cdk-lib/aws-sns';

// SNS topic for alerts
const alertTopic = new sns.Topic(this, 'AlertTopic', {
  displayName: 'Production Alerts',
});

// Alarm on error rate
const errorAlarm = new cloudwatch.Alarm(this, 'HighErrorRate', {
  metric: new cloudwatch.Metric({
    namespace: 'ECommerce',
    metricName: 'OrderProcessingErrors',
    statistic: 'Sum',
    period: cdk.Duration.minutes(5),
  }),
  threshold: 10,
  evaluationPeriods: 2,
  datapointsToAlarm: 2,
  comparisonOperator: cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD,
  treatMissingData: cloudwatch.TreatMissingData.NOT_BREACHING,
});

errorAlarm.addAlarmAction(new cw_actions.SnsAction(alertTopic));

// Alarm on latency
const latencyAlarm = new cloudwatch.Alarm(this, 'HighLatency', {
  metric: new cloudwatch.Metric({
    namespace: 'ECommerce',
    metricName: 'ProcessingDuration',
    statistic: 'p99',
    period: cdk.Duration.minutes(5),
  }),
  threshold: 3000,  // 3 seconds
  evaluationPeriods: 3,
});

latencyAlarm.addAlarmAction(new cw_actions.SnsAction(alertTopic));

Pillar 3: Distributed Tracing with X-Ray

Tracing shows the complete request path across services.

from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

tracer = Tracer(service="order-service")

@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    result = process_order(event)
    return result

@tracer.capture_method
def process_order(order: dict) -> dict:
    # Validate order
    validated = validate_order(order)

    # Process payment (traced as subsegment)
    payment = process_payment(validated)

    # Update inventory
    inventory = update_inventory(validated)

    return {"payment": payment, "inventory": inventory}

@tracer.capture_method
def validate_order(order: dict) -> dict:
    # Add annotation for filtering in X-Ray console
    tracer.put_annotation(key="order_id", value=order['id'])
    tracer.put_annotation(key="customer_type", value=order.get('customerType', 'standard'))

    # Add metadata for debugging
    tracer.put_metadata(key="order_details", value=order)

    # Validation logic
    if order['amount'] < 0:
        raise ValidationError("Invalid amount")

    return order

@tracer.capture_method
def process_payment(order: dict) -> dict:
    # External API call (automatically traced)
    import requests

    response = requests.post(
        'https://payment-api.example.com/charge',
        json={'amount': order['amount'], 'orderId': order['id']}
    )

    return response.json()

X-Ray Service Map

The X-Ray service map shows:

  • Request flow across services

  • Latency at each hop

  • Error rates per service

  • External API dependencies

API Gateway (200ms avg)
    ↓
Order Service Lambda (500ms avg)
    ├→ DynamoDB (50ms avg) ✓
    ├→ Payment API (300ms avg) ✓
    └→ Inventory Service (100ms avg) ⚠️ 5% errors

Complete Observability Example

Combining all three pillars:

from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent

logger = Logger(service="order-service")
tracer = Tracer(service="order-service")
metrics = Metrics(namespace="ECommerce", service="order-service")

@logger.inject_lambda_context(correlation_id_path="requestContext.requestId")
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    # Parse API Gateway event
    api_event = APIGatewayProxyEvent(event)

    logger.info("Order received", extra={
        "path": api_event.path,
        "method": api_event.http_method,
        "source_ip": api_event.request_context.identity.source_ip
    })

    # Add trace annotations
    tracer.put_annotation("customer_id", api_event.headers.get("x-customer-id"))

    # Add metric dimensions
    metrics.add_dimension("Region", api_event.headers.get("x-region", "unknown"))

    try:
        order = json.loads(api_event.body)

        # Business logic with observability
        result = process_order_with_observability(order)

        # Track successful order
        metrics.add_metric(name="OrderSuccess", unit=MetricUnit.Count, value=1)
        metrics.add_metric(name="OrderValue", unit=MetricUnit.Dollars, value=order['amount'])

        logger.info("Order processed successfully", extra={"order_id": result['orderId']})

        return {
            'statusCode': 200,
            'body': json.dumps(result)
        }

    except ValidationError as e:
        logger.error("Validation failed", extra={"error": str(e)})
        metrics.add_metric(name="ValidationError", unit=MetricUnit.Count, value=1)

        return {
            'statusCode': 400,
            'body': json.dumps({'error': 'Invalid order'})
        }

    except Exception as e:
        logger.exception("Order processing failed")
        metrics.add_metric(name="ProcessingError", unit=MetricUnit.Count, value=1)

        return {
            'statusCode': 500,
            'body': json.dumps({'error': 'Internal server error'})
        }

@tracer.capture_method
def process_order_with_observability(order: dict) -> dict:
    with tracer.provider.in_subsegment("validate_inventory") as subsegment:
        inventory_available = check_inventory(order['items'])
        subsegment.put_metadata("inventory_check", inventory_available)

    if not inventory_available:
        raise OutOfStockError("Items unavailable")

    with tracer.provider.in_subsegment("charge_payment") as subsegment:
        payment_result = charge_payment(order)
        subsegment.put_annotation("payment_method", payment_result['method'])

    return {
        'orderId': order['id'],
        'status': 'confirmed',
        'payment': payment_result
    }

CloudWatch Logs Insights Queries

Structured logs enable powerful queries:

Find all errors for a specific customer

fields @timestamp, message, error, order_id
| filter customer_id = "CUST-12345"
| filter level = "ERROR"
| sort @timestamp desc
| limit 100

Calculate p99 latency by function

fields @duration
| stats pct(@duration, 99) as p99_duration by function_name
| sort p99_duration desc

Track cold starts

fields @timestamp, cold_start, function_name
| filter cold_start = true
| stats count() by function_name, bin(5m)

Correlation ID tracing

fields @timestamp, message, correlation_id
| filter correlation_id = "abc-123-xyz"
| sort @timestamp asc

Dashboard as Code

import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch';

const dashboard = new cloudwatch.Dashboard(this, 'OrderServiceDashboard', {
  dashboardName: 'order-service-production',
});

// Lambda metrics
dashboard.addWidgets(
  new cloudwatch.GraphWidget({
    title: 'Function Invocations',
    left: [func.metricInvocations()],
    right: [func.metricErrors(), func.metricThrottles()],
  }),
  new cloudwatch.GraphWidget({
    title: 'Latency',
    left: [
      func.metricDuration({ statistic: 'p50' }),
      func.metricDuration({ statistic: 'p99' }),
    ],
  })
);

// Custom business metrics
dashboard.addWidgets(
  new cloudwatch.GraphWidget({
    title: 'Order Metrics',
    left: [
      new cloudwatch.Metric({
        namespace: 'ECommerce',
        metricName: 'OrderPlaced',
        statistic: 'Sum',
      }),
    ],
    right: [
      new cloudwatch.Metric({
        namespace: 'ECommerce',
        metricName: 'OrderValue',
        statistic: 'Sum',
      }),
    ],
  })
);

Cost-Effective Observability

Log Sampling for High-Volume Functions

# Sample 10% of requests in production
logger = Logger(
    service="high-volume-service",
    sample_rate=0.1 if os.getenv('ENV') == 'production' else 1.0
)

Set Log Retention

import * as logs from 'aws-cdk-lib/aws-logs';

const logGroup = new logs.LogGroup(this, 'FunctionLogs', {
  logGroupName: `/aws/lambda/${func.functionName}`,
  retention: logs.RetentionDays.ONE_WEEK,  // Not forever!
});

X-Ray Sampling Rules

// Only trace 5% of requests, but all errors
const samplingRule = {
  priority: 1000,
  version: 1,
  serviceName: 'order-service',
  httpMethod: '*',
  urlPath: '*',
  reservoirSize: 1,  // Always trace 1 req/sec
  fixedRate: 0.05,   // Plus 5% of others
};

Conclusion

Observability isn't optional - it's survival. Powertools + CloudWatch provides production-grade observability patterns with minimal code.

Structure your logs. Emit custom metrics. Trace distributed requests. Your 3 AM self will thank you.


How do you implement observability in serverless? Share your patterns!