Serverless Security Best Practices: Protecting Your AWS Lambda Functions
8 min read
"We don't have to think about security - it's serverless!" Said no good engineer ever.
The infrastructure is handled by AWS, but overall responsibility for security is yours when it comes to applications. Serverless systems also offer new attack vectors: function permissions, APIs gateways, event sources, secret management – and many more. And then there's this: "'Infinitely Scalable' Systems = 'Infinitely Expensive' Security Events."
Now, let's change all
A Serverless Security Model
Traditional servers: One big castle with a moat. Serverless: Hundreds of tiny castles, each with its own moat.
The good thing is that the blast radius is smaller. The bad thing is that there is surface area that needs to be secured. The blast radius is really small. The surface area is a big problem to secure the blast radius.
Shared Responsibility
AWS manages:
Physical infrastructure
Hypervisor isolation
Runtime patching (for managed runtimes)
Network infrastructure
You manage:
Application code vulnerabilities
IAM permissions
Secrets and credentials
Dependency vulnerabilities
Data encryption
API authentication
Principle #1: Least Privilege IAM
The problem: Default Lambda execution role has * permissions.
The solution: Granular, function-specific permissions.
❌ Dangerous (Overprivileged)
Resources:
UserServiceFunction:
Type: AWS::Serverless::Function
Properties:
Handler: index.handler
Runtime: python3.12
Policies:
- AmazonDynamoDBFullAccess # TOO BROAD!
- AmazonS3FullAccess # DANGER!
✅ Secure (Least Privilege)
Resources:
UserServiceFunction:
Type: AWS::Serverless::Function
Properties:
Handler: index.handler
Runtime: python3.12
Policies:
- Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- dynamodb:GetItem
- dynamodb:Query
Resource:
- !GetAtt UsersTable.Arn
- !Sub '${UsersTable.Arn}/index/*'
- Effect: Allow
Action:
- s3:GetObject
Resource: !Sub 'arn:aws:s3:::${UserAvatarsBucket}/avatars/*'
CDK with Principle of Least Privilege
import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as iam from 'aws-cdk-lib/aws-iam';
export class SecureLambdaStack extends cdk.Stack {
constructor(scope: cdk.App, id: string) {
super(scope, id);
const usersTable = new dynamodb.Table(this, 'UsersTable', {
partitionKey: { name: 'userId', type: dynamodb.AttributeType.STRING },
});
const userFunction = new lambda.Function(this, 'UserFunction', {
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'index.handler',
code: lambda.Code.fromAsset('lambda'),
});
// Grant ONLY read access to specific table
usersTable.grantReadData(userFunction);
// Add specific S3 permissions
userFunction.addToRolePolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['s3:GetObject'],
resources: ['arn:aws:s3:::user-avatars/avatars/*'],
}));
}
}
Principle #2: Secrets Management
Never hardcode credentials. AWS Secrets Manager or Parameter Store can handle this.
❌ Insecure
import psycopg2
# NEVER DO THIS!
DB_PASSWORD = "MyS3cr3tP@ssw0rd"
conn = psycopg2.connect(
host="db.example.com",
user="admin",
password=DB_PASSWORD
)
✅ Secure with Secrets Manager
import boto3
import json
from functools import lru_cache
from aws_lambda_powertools import Logger
logger = Logger()
secrets_client = boto3.client('secretsmanager')
@lru_cache(maxsize=1)
def get_db_credentials():
"""
Fetch credentials from Secrets Manager
Cache result to avoid repeated API calls
"""
try:
response = secrets_client.get_secret_value(
SecretId='production/database/credentials'
)
return json.loads(response['SecretString'])
except Exception as e:
logger.exception("Failed to retrieve secret")
raise
def lambda_handler(event, context):
creds = get_db_credentials()
conn = psycopg2.connect(
host=creds['host'],
user=creds['username'],
password=creds['password'],
dbname=creds['database']
)
# Use connection...
Automatic Secret Rotation
import * as secretsmanager from 'aws-cdk-lib/aws-secretsmanager';
import * as rds from 'aws-cdk-lib/aws-rds';
const dbSecret = new secretsmanager.Secret(this, 'DBSecret', {
secretName: 'production/database/credentials',
generateSecretString: {
secretStringTemplate: JSON.stringify({ username: 'admin' }),
generateStringKey: 'password',
excludePunctuation: true,
passwordLength: 32,
},
});
// Automatically rotate every 30 days
dbSecret.addRotationSchedule('RotationSchedule', {
automaticallyAfter: cdk.Duration.days(30),
rotationLambda: rotationFunction,
});
Principle #3: API Gateway Security
Your Lambda function could be secure, but if the API Gateway is wide open like this.
Authentication: Multiple Layers
1. API Keys (Basic - NOT for production authentication)
Resources:
UsagePlan:
Type: AWS::ApiGateway::UsagePlan
Properties:
ApiStages:
- ApiId: !Ref RestApi
Stage: !Ref Stage
Throttle:
BurstLimit: 200
RateLimit: 100
2. IAM Authorization (AWS services & internal)
const api = new apigw.RestApi(this, 'PrivateAPI', {
restApiName: 'internal-service-api',
});
const resource = api.root.addResource('users');
resource.addMethod('GET', new apigw.LambdaIntegration(getUserFunction), {
authorizationType: apigw.AuthorizationType.IAM,
});
3. AWS Cognito User Pools (User Authentication)
const userPool = new cognito.UserPool(this, 'UserPool', {
userPoolName: 'my-app-users',
selfSignUpEnabled: true,
signInAliases: { email: true },
autoVerify: { email: true },
passwordPolicy: {
minLength: 12,
requireLowercase: true,
requireUppercase: true,
requireDigits: true,
requireSymbols: true,
},
});
const auth = new apigw.CognitoUserPoolsAuthorizer(this, 'Authorizer', {
cognitoUserPools: [userPool],
});
resource.addMethod('POST', integration, {
authorizer: auth,
authorizationType: apigw.AuthorizationType.COGNITO,
});
4. Lambda Authorizer (Custom auth logic)
# lambda_authorizer.py
import jwt
from aws_lambda_powertools import Logger
logger = Logger()
def lambda_handler(event, context):
"""
Custom JWT token validation
"""
token = event['authorizationToken'].replace('Bearer ', '')
method_arn = event['methodArn']
try:
# Validate JWT token
decoded = jwt.decode(
token,
'your-secret-key',
algorithms=['HS256']
)
principal_id = decoded['sub']
# Generate IAM policy
return generate_policy(principal_id, 'Allow', method_arn, decoded)
except jwt.ExpiredSignatureError:
logger.error("Token expired")
raise Exception('Unauthorized')
except Exception as e:
logger.exception("Auth failed")
raise Exception('Unauthorized')
def generate_policy(principal_id, effect, resource, context):
return {
'principalId': principal_id,
'policyDocument': {
'Version': '2012-10-17',
'Statement': [{
'Action': 'execute-api:Invoke',
'Effect': effect,
'Resource': resource
}]
},
'context': context # Pass to Lambda function
}
Rate Limiting and Throttling
const api = new apigw.RestApi(this, 'ThrottledAPI', {
deployOptions: {
throttlingBurstLimit: 100, // Max concurrent requests
throttlingRateLimit: 50, // Requests per second
},
});
// Method-specific throttling
resource.addMethod('POST', integration, {
throttling: {
burstLimit: 20,
rateLimit: 10,
},
});
Request Validation
const requestValidator = new apigw.RequestValidator(this, 'RequestValidator', {
restApi: api,
requestValidatorName: 'validate-body',
validateRequestBody: true,
validateRequestParameters: true,
});
const model = new apigw.Model(this, 'UserModel', {
restApi: api,
contentType: 'application/json',
schema: {
type: apigw.JsonSchemaType.OBJECT,
required: ['email', 'name'],
properties: {
email: { type: apigw.JsonSchemaType.STRING, format: 'email' },
name: { type: apigw.JsonSchemaType.STRING, minLength: 1, maxLength: 100 },
age: { type: apigw.JsonSchemaType.INTEGER, minimum: 0, maximum: 150 },
},
},
});
resource.addMethod('POST', integration, {
requestValidator,
requestModels: {
'application/json': model,
},
});
Principle #4: Dependency Scanning
Third-party libraries have vulnerabilities.
Scan Dependencies Using CI/CD
# .github/workflows/security-scan.yml
name: Security Scan
on: [push, pull_request]
jobs:
scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Run Bandit (SAST)
run: |
pip install bandit
bandit -r src/ -f json -o bandit-report.json
- name: Run Safety (dependency check)
run: |
pip install safety
safety check --json > safety-report.json
- name: Upload scan results
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: bandit-report.json
Runtime Protection with Lambda Layers
# Lambda function with dependency validation
import sys
import importlib.metadata
def validate_dependencies():
"""
Check for known vulnerable package versions
"""
vulnerable_packages = {
'requests': ['2.25.0', '2.25.1'], # Example
'urllib3': ['1.26.4'],
}
for package, bad_versions in vulnerable_packages.items():
try:
version = importlib.metadata.version(package)
if version in bad_versions:
raise SecurityError(f"{package} {version} has known vulnerabilities")
except importlib.metadata.PackageNotFoundError:
pass
# Run on cold start
validate_dependencies()
Principle #5: Store environment variables safely
Environment variables can be seen in the console. Encrypt sensitive environment variables.
import * as kms from 'aws-cdk-lib/aws-kms';
const encryptionKey = new kms.Key(this, 'EnvVarKey', {
enableKeyRotation: true,
description: 'Encryption key for Lambda environment variables',
});
const secureFunction = new lambda.Function(this, 'SecureFunction', {
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'index.handler',
code: lambda.Code.fromAsset('lambda'),
environment: {
DB_HOST: 'db.example.com', // Non-sensitive - plaintext OK
REGION: 'us-east-1',
},
environmentEncryption: encryptionKey, // Encrypt all env vars
});
// Grant decrypt permission
encryptionKey.grantDecrypt(secureFunction);
Access encrypted variables:
import os
import boto3
import base64
kms = boto3.client('kms')
def decrypt_env_var(encrypted_value):
"""
Decrypt KMS-encrypted environment variable
"""
decrypted = kms.decrypt(
CiphertextBlob=base64.b64decode(encrypted_value)
)
return decrypted['Plaintext'].decode('utf-8')
Principle #6: VPC Isolation
For functions utilizing private resources:
const vpc = new ec2.Vpc(this, 'PrivateVPC', {
maxAzs: 2,
subnetConfiguration: [
{
name: 'private',
subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS,
},
],
});
const securityGroup = new ec2.SecurityGroup(this, 'LambdaSG', {
vpc,
description: 'Security group for Lambda functions',
allowAllOutbound: false, // Explicit egress rules
});
// Allow only database access
securityGroup.addEgressRule(
ec2.Peer.ipv4('10.0.0.0/8'),
ec2.Port.tcp(5432),
'PostgreSQL access'
);
const vpcFunction = new lambda.Function(this, 'VPCFunction', {
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'index.handler',
code: lambda.Code.fromAsset('lambda'),
vpc,
vpcSubnets: { subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS },
securityGroups: [securityGroup],
});
Principle #7: Input Validation
Never trust user input.
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.validation import validator
from pydantic import BaseModel, EmailStr, Field
logger = Logger()
class UserInput(BaseModel):
email: EmailStr
name: str = Field(..., min_length=1, max_length=100)
age: int = Field(..., ge=0, le=150)
class Config:
str_strip_whitespace = True
@validator(inbound_schema=UserInput)
def lambda_handler(event, context):
"""
Pydantic automatically validates input
Raises ValidationError if invalid
"""
user_data = UserInput(**event)
logger.info("Valid user data", extra={
"email": user_data.email,
"name": user_data.name
})
# Process validated data...
return {'statusCode': 200}
Monitoring & Alerting
Without monitoring, security is only security theater.
from aws_lambda_powertools.metrics import MetricUnit, Metrics
metrics = Metrics()
@metrics.log_metrics
def lambda_handler(event, context):
try:
# Your logic
metrics.add_metric(name="SuccessfulAuth", unit=MetricUnit.Count, value=1)
except AuthenticationError:
# Track failed auth attempts
metrics.add_metric(name="FailedAuth", unit=MetricUnit.Count, value=1)
# Alert on suspicious activity
if get_failed_attempts(event['sourceIp']) > 10:
send_security_alert(event)
raise
Security Checklist
[ ] IAM roles follow least privilege principle
[ ] Secrets stored in Secrets Manager/Parameter Store
[ ] API Gateway has authentication enabled
[ ] Rate limiting is enabled
[ ] Input validation added
[ ] Dependencies scanned for vulnerabilities
[ ] Environment variables are encrypted
[ ] VPC Isolation for Private Resources
[ ] CloudTrail logging enabled
[ ] Security monitoring and alerting configured
[ ] Penetration testing conducted
[ ] Incident response plan documented
Conclusion
Serverless security isn't magic - it's architectural. Through these patterns, you establish defense in depth where all layers are protecting, yet nothing depends upon a single point for integrity.
Beginning with IAM least privilege and secrets management, you should include API auth and validation, and then introduce monitoring and alerting. You are continually improving your security position, and it will be reflected in your audit trail.
Just remember: The cloud is safe. Application security: that's your problem
What serverless security practices have you implemented? Share your experiences!