Serverless Observability with AWS Lambda Powertools and CloudWatch
7 min read
Your Lambda function failed at 3 AM. Production is down. You have 347 CloudWatch log streams to search through. The error message says "An error occurred." That's it. That's the entire error message.
Sound familiar? Welcome to distributed systems debugging-where traditional logging falls apart and you need observability, not just logs.
The Three Pillars of Observability
Logs: What happened
Metrics: How much/how many
Traces: Where and why
CloudWatch gives you the infrastructure. Powertools gives you the patterns.
AWS Lambda Powertools: The Swiss Army Knife
Powertools is an open-source library (available in Python, TypeScript, Java, .NET) that implements serverless observability best practices out of the box.
Key features:
Structured JSON logging with correlation IDs
Custom metrics via CloudWatch EMF (Embedded Metric Format)
X-Ray tracing with subsegments
Event validation and parsing
Idempotency support
Parameter and secrets handling
Processing 300 billion Lambda invocations per week (AWS data), Powertools is battle-tested at scale.
Installation & Setup
Python
# Install Powertools
pip install "aws-lambda-powertools[all]"
# Or use Lambda Layer
# Layer ARN: arn:aws:lambda:REGION:017000801446:layer:AWSLambdaPowertoolsPythonV3-python312-arm64:1
CDK setup:
import * as lambda from 'aws-cdk-lib/aws-lambda';
const powertoolsLayer = lambda.LayerVersion.fromLayerVersionArn(
this,
'PowertoolsLayer',
`arn:aws:lambda:${this.region}:017000801446:layer:AWSLambdaPowertoolsPythonV3-python312-arm64:1`
);
const func = new lambda.Function(this, 'Function', {
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'index.handler',
code: lambda.Code.fromAsset('lambda'),
layers: [powertoolsLayer],
environment: {
POWERTOOLS_SERVICE_NAME: 'order-service',
POWERTOOLS_METRICS_NAMESPACE: 'MyApp',
LOG_LEVEL: 'INFO',
},
});
Pillar 1: Structured Logging
Before (Traditional Logging)
import logging
import json
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info("Processing order")
order_id = event['orderId']
logger.info(f"Order ID: {order_id}")
try:
process_order(order_id)
logger.info("Order processed successfully")
except Exception as e:
logger.error(f"Error: {str(e)}")
raise
# Output (hard to query):
# [INFO] Processing order
# [INFO] Order ID: ORD-12345
# [INFO] Order processed successfully
After (Powertools Structured Logging)
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger(service="order-service")
@logger.inject_lambda_context(correlation_id_path="headers.x-correlation-id")
def lambda_handler(event: dict, context: LambdaContext) -> dict:
logger.info("Processing order", extra={
"order_id": event['orderId'],
"customer_id": event['customerId'],
"amount": event['amount']
})
try:
result = process_order(event)
logger.info("Order processed", extra={"result": result})
return result
except ValidationError as e:
logger.error("Validation failed", extra={"error": str(e)})
raise
except Exception as e:
logger.exception("Processing failed")
raise
# Output (structured JSON):
# {
# "level": "INFO",
# "location": "lambda_handler:12",
# "message": "Processing order",
# "timestamp": "2025-11-14T10:30:00.123Z",
# "service": "order-service",
# "cold_start": true,
# "function_name": "order-processor",
# "function_arn": "arn:aws:lambda:us-east-1:123:function:order-processor",
# "function_request_id": "abc-123",
# "correlation_id": "xyz-789",
# "order_id": "ORD-12345",
# "customer_id": "CUST-456",
# "amount": 99.99
# }
Advanced Logging Patterns
Append persistent keys:
logger = Logger(service="payment-service")
# Add keys that appear in ALL subsequent logs
logger.append_keys(environment="production", region="us-east-1")
@logger.inject_lambda_context
def lambda_handler(event, context):
# These will automatically include environment and region
logger.info("Processing payment")
Log sampling for high-volume functions:
logger = Logger(service="high-traffic-service", sample_rate=0.1) # Log 10% of requests
@logger.inject_lambda_context
def lambda_handler(event, context):
logger.debug("Detailed debug info") # Only logged 10% of the time
logger.info("Always logged info") # Always logged
Child loggers for modules:
# main.py
from aws_lambda_powertools import Logger
logger = Logger(service="order-service")
# payment_processor.py
from aws_lambda_powertools import Logger
# Inherits parent configuration
logger = Logger(service="order-service", child=True)
def process_payment(order):
logger.info("Processing payment", extra={"order_id": order['id']})
Pillar 2: Custom Metrics with EMF
CloudWatch EMF (Embedded Metric Format) lets you create custom metrics without separate API calls - embedded in logs, extracted asynchronously.
Business Metrics
from aws_lambda_powertools import Metrics
from aws_lambda_powertools.metrics import MetricUnit
metrics = Metrics(namespace="ECommerce", service="order-service")
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event, context):
# Track business metrics
metrics.add_metric(name="OrderPlaced", unit=MetricUnit.Count, value=1)
metrics.add_metric(name="OrderValue", unit=MetricUnit.Dollars, value=event['amount'])
# Add dimensions for filtering
metrics.add_dimension(name="Environment", value="production")
metrics.add_dimension(name="PaymentMethod", value=event['paymentMethod'])
# Metrics are automatically flushed at function end
return process_order(event)
CloudWatch automatically extracts metrics from logs:
{
"_aws": {
"Timestamp": 1699977600000,
"CloudWatchMetrics": [{
"Namespace": "ECommerce",
"Dimensions": [["Environment", "PaymentMethod", "service"]],
"Metrics": [
{"Name": "OrderPlaced", "Unit": "Count"},
{"Name": "OrderValue", "Unit": "None"}
]
}]
},
"Environment": "production",
"PaymentMethod": "credit_card",
"service": "order-service",
"OrderPlaced": 1,
"OrderValue": 99.99
}
High-Resolution Metrics
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event, context):
start_time = time.time()
result = process_order(event)
# Track processing latency
duration_ms = (time.time() - start_time) * 1000
metrics.add_metric(
name="ProcessingDuration",
unit=MetricUnit.Milliseconds,
value=duration_ms,
resolution=1 # High-resolution (1-second granularity)
)
return result
Creating CloudWatch Alarms from Metrics
import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch';
import * as cw_actions from 'aws-cdk-lib/aws-cloudwatch-actions';
import * as sns from 'aws-cdk-lib/aws-sns';
// SNS topic for alerts
const alertTopic = new sns.Topic(this, 'AlertTopic', {
displayName: 'Production Alerts',
});
// Alarm on error rate
const errorAlarm = new cloudwatch.Alarm(this, 'HighErrorRate', {
metric: new cloudwatch.Metric({
namespace: 'ECommerce',
metricName: 'OrderProcessingErrors',
statistic: 'Sum',
period: cdk.Duration.minutes(5),
}),
threshold: 10,
evaluationPeriods: 2,
datapointsToAlarm: 2,
comparisonOperator: cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD,
treatMissingData: cloudwatch.TreatMissingData.NOT_BREACHING,
});
errorAlarm.addAlarmAction(new cw_actions.SnsAction(alertTopic));
// Alarm on latency
const latencyAlarm = new cloudwatch.Alarm(this, 'HighLatency', {
metric: new cloudwatch.Metric({
namespace: 'ECommerce',
metricName: 'ProcessingDuration',
statistic: 'p99',
period: cdk.Duration.minutes(5),
}),
threshold: 3000, // 3 seconds
evaluationPeriods: 3,
});
latencyAlarm.addAlarmAction(new cw_actions.SnsAction(alertTopic));
Pillar 3: Distributed Tracing with X-Ray
Tracing shows the complete request path across services.
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer(service="order-service")
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> dict:
result = process_order(event)
return result
@tracer.capture_method
def process_order(order: dict) -> dict:
# Validate order
validated = validate_order(order)
# Process payment (traced as subsegment)
payment = process_payment(validated)
# Update inventory
inventory = update_inventory(validated)
return {"payment": payment, "inventory": inventory}
@tracer.capture_method
def validate_order(order: dict) -> dict:
# Add annotation for filtering in X-Ray console
tracer.put_annotation(key="order_id", value=order['id'])
tracer.put_annotation(key="customer_type", value=order.get('customerType', 'standard'))
# Add metadata for debugging
tracer.put_metadata(key="order_details", value=order)
# Validation logic
if order['amount'] < 0:
raise ValidationError("Invalid amount")
return order
@tracer.capture_method
def process_payment(order: dict) -> dict:
# External API call (automatically traced)
import requests
response = requests.post(
'https://payment-api.example.com/charge',
json={'amount': order['amount'], 'orderId': order['id']}
)
return response.json()
X-Ray Service Map
The X-Ray service map shows:
Request flow across services
Latency at each hop
Error rates per service
External API dependencies
API Gateway (200ms avg)
↓
Order Service Lambda (500ms avg)
├→ DynamoDB (50ms avg) ✓
├→ Payment API (300ms avg) ✓
└→ Inventory Service (100ms avg) ⚠️ 5% errors
Complete Observability Example
Combining all three pillars:
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent
logger = Logger(service="order-service")
tracer = Tracer(service="order-service")
metrics = Metrics(namespace="ECommerce", service="order-service")
@logger.inject_lambda_context(correlation_id_path="requestContext.requestId")
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
# Parse API Gateway event
api_event = APIGatewayProxyEvent(event)
logger.info("Order received", extra={
"path": api_event.path,
"method": api_event.http_method,
"source_ip": api_event.request_context.identity.source_ip
})
# Add trace annotations
tracer.put_annotation("customer_id", api_event.headers.get("x-customer-id"))
# Add metric dimensions
metrics.add_dimension("Region", api_event.headers.get("x-region", "unknown"))
try:
order = json.loads(api_event.body)
# Business logic with observability
result = process_order_with_observability(order)
# Track successful order
metrics.add_metric(name="OrderSuccess", unit=MetricUnit.Count, value=1)
metrics.add_metric(name="OrderValue", unit=MetricUnit.Dollars, value=order['amount'])
logger.info("Order processed successfully", extra={"order_id": result['orderId']})
return {
'statusCode': 200,
'body': json.dumps(result)
}
except ValidationError as e:
logger.error("Validation failed", extra={"error": str(e)})
metrics.add_metric(name="ValidationError", unit=MetricUnit.Count, value=1)
return {
'statusCode': 400,
'body': json.dumps({'error': 'Invalid order'})
}
except Exception as e:
logger.exception("Order processing failed")
metrics.add_metric(name="ProcessingError", unit=MetricUnit.Count, value=1)
return {
'statusCode': 500,
'body': json.dumps({'error': 'Internal server error'})
}
@tracer.capture_method
def process_order_with_observability(order: dict) -> dict:
with tracer.provider.in_subsegment("validate_inventory") as subsegment:
inventory_available = check_inventory(order['items'])
subsegment.put_metadata("inventory_check", inventory_available)
if not inventory_available:
raise OutOfStockError("Items unavailable")
with tracer.provider.in_subsegment("charge_payment") as subsegment:
payment_result = charge_payment(order)
subsegment.put_annotation("payment_method", payment_result['method'])
return {
'orderId': order['id'],
'status': 'confirmed',
'payment': payment_result
}
CloudWatch Logs Insights Queries
Structured logs enable powerful queries:
Find all errors for a specific customer
fields @timestamp, message, error, order_id
| filter customer_id = "CUST-12345"
| filter level = "ERROR"
| sort @timestamp desc
| limit 100
Calculate p99 latency by function
fields @duration
| stats pct(@duration, 99) as p99_duration by function_name
| sort p99_duration desc
Track cold starts
fields @timestamp, cold_start, function_name
| filter cold_start = true
| stats count() by function_name, bin(5m)
Correlation ID tracing
fields @timestamp, message, correlation_id
| filter correlation_id = "abc-123-xyz"
| sort @timestamp asc
Dashboard as Code
import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch';
const dashboard = new cloudwatch.Dashboard(this, 'OrderServiceDashboard', {
dashboardName: 'order-service-production',
});
// Lambda metrics
dashboard.addWidgets(
new cloudwatch.GraphWidget({
title: 'Function Invocations',
left: [func.metricInvocations()],
right: [func.metricErrors(), func.metricThrottles()],
}),
new cloudwatch.GraphWidget({
title: 'Latency',
left: [
func.metricDuration({ statistic: 'p50' }),
func.metricDuration({ statistic: 'p99' }),
],
})
);
// Custom business metrics
dashboard.addWidgets(
new cloudwatch.GraphWidget({
title: 'Order Metrics',
left: [
new cloudwatch.Metric({
namespace: 'ECommerce',
metricName: 'OrderPlaced',
statistic: 'Sum',
}),
],
right: [
new cloudwatch.Metric({
namespace: 'ECommerce',
metricName: 'OrderValue',
statistic: 'Sum',
}),
],
})
);
Cost-Effective Observability
Log Sampling for High-Volume Functions
# Sample 10% of requests in production
logger = Logger(
service="high-volume-service",
sample_rate=0.1 if os.getenv('ENV') == 'production' else 1.0
)
Set Log Retention
import * as logs from 'aws-cdk-lib/aws-logs';
const logGroup = new logs.LogGroup(this, 'FunctionLogs', {
logGroupName: `/aws/lambda/${func.functionName}`,
retention: logs.RetentionDays.ONE_WEEK, // Not forever!
});
X-Ray Sampling Rules
// Only trace 5% of requests, but all errors
const samplingRule = {
priority: 1000,
version: 1,
serviceName: 'order-service',
httpMethod: '*',
urlPath: '*',
reservoirSize: 1, // Always trace 1 req/sec
fixedRate: 0.05, // Plus 5% of others
};
Conclusion
Observability isn't optional - it's survival. Powertools + CloudWatch provides production-grade observability patterns with minimal code.
Structure your logs. Emit custom metrics. Trace distributed requests. Your 3 AM self will thank you.
How do you implement observability in serverless? Share your patterns!