Suyog Maid
Suyog Maid
šŸ“„
Article2026-01-21

Building Scalable Serverless Applications with AWS Lambda and API Gateway

#aws#lambda#serverless#api-gateway#dynamodb#devops#cloud-architecture

Building Scalable Serverless Applications with AWS Lambda and API Gateway

Serverless architecture has revolutionized how we build and deploy applications. AWS Lambda, combined with API Gateway and other managed services, enables developers to focus on code while AWS handles infrastructure management. In this comprehensive guide, I'll share proven patterns for building production-grade serverless applications.

Why Serverless?

Serverless computing offers compelling advantages:

  • No Server Management: AWS handles provisioning, scaling, and maintenance
  • Automatic Scaling: From zero to thousands of concurrent executions
  • Pay-Per-Use: Only pay for actual compute time (100ms billing increments)
  • Built-in High Availability: Multi-AZ deployment by default
  • Faster Time to Market: Focus on business logic, not infrastructure
  • Reduced Operational Overhead: No patching, no capacity planning

Architecture Overview

Typical Serverless Application Stack

ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│   CloudFront    │  ← CDN for static assets
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
         │
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā–¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│   API Gateway   │  ← RESTful API endpoint
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
         │
    ā”Œā”€ā”€ā”€ā”€ā–¼ā”€ā”€ā”€ā”€ā”
    │ Lambda  │  ← Business logic
    ā””ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”˜
         │
    ā”Œā”€ā”€ā”€ā”€ā–¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
    │   DynamoDB      │  ← NoSQL database
    │   S3            │  ← Object storage
    │   SQS/SNS       │  ← Messaging
    │   EventBridge   │  ← Event routing
    ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

Lambda Function Best Practices

Function Structure

# handler.py
import json
import boto3
import os
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.utilities.validation import validator

logger = Logger()
tracer = Tracer()
metrics = Metrics()

# Initialize AWS clients outside handler for reuse
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['TABLE_NAME'])

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    """
    Main Lambda handler function
    """
    try:
        # Parse input
        body = json.loads(event.get('body', '{}'))
        user_id = body.get('userId')
        
        # Validate input
        if not user_id:
            return create_response(400, {'error': 'userId is required'})
        
        # Business logic
        result = process_user_data(user_id)
        
        # Add custom metrics
        metrics.add_metric(name="SuccessfulProcessing", unit="Count", value=1)
        
        return create_response(200, result)
        
    except Exception as e:
        logger.exception("Error processing request")
        metrics.add_metric(name="ProcessingError", unit="Count", value=1)
        return create_response(500, {'error': 'Internal server error'})

@tracer.capture_method
def process_user_data(user_id: str) -> dict:
    """
    Process user data with DynamoDB
    """
    response = table.get_item(Key={'userId': user_id})
    
    if 'Item' not in response:
        raise ValueError(f"User {user_id} not found")
    
    return response['Item']

def create_response(status_code: int, body: dict) -> dict:
    """
    Create standardized API response
    """
    return {
        'statusCode': status_code,
        'headers': {
            'Content-Type': 'application/json',
            'Access-Control-Allow-Origin': '*',
            'Access-Control-Allow-Credentials': True
        },
        'body': json.dumps(body)
    }

Environment Configuration

# config.py
import os
from typing import Dict, Any

class Config:
    """Application configuration"""
    
    # DynamoDB
    TABLE_NAME = os.environ.get('TABLE_NAME', 'users-table')
    
    # S3
    BUCKET_NAME = os.environ.get('BUCKET_NAME', 'user-uploads')
    
    # API Settings
    API_TIMEOUT = int(os.environ.get('API_TIMEOUT', '30'))
    MAX_RETRIES = int(os.environ.get('MAX_RETRIES', '3'))
    
    # Feature Flags
    ENABLE_CACHING = os.environ.get('ENABLE_CACHING', 'true').lower() == 'true'
    
    @classmethod
    def validate(cls) -> None:
        """Validate required environment variables"""
        required_vars = ['TABLE_NAME', 'BUCKET_NAME']
        missing = [var for var in required_vars if not os.environ.get(var)]
        
        if missing:
            raise ValueError(f"Missing required environment variables: {missing}")

API Gateway Configuration

REST API with Lambda Integration

# serverless.yml
service: user-service

provider:
  name: aws
  runtime: python3.11
  region: us-east-1
  stage: ${opt:stage, 'dev'}
  
  environment:
    TABLE_NAME: ${self:custom.tableName}
    BUCKET_NAME: ${self:custom.bucketName}
    STAGE: ${self:provider.stage}
  
  iam:
    role:
      statements:
        - Effect: Allow
          Action:
            - dynamodb:GetItem
            - dynamodb:PutItem
            - dynamodb:UpdateItem
            - dynamodb:Query
            - dynamodb:Scan
          Resource:
            - !GetAtt UsersTable.Arn
            - !Sub "${UsersTable.Arn}/index/*"
        
        - Effect: Allow
          Action:
            - s3:GetObject
            - s3:PutObject
          Resource:
            - !Sub "${UserUploadsBucket.Arn}/*"

functions:
  getUser:
    handler: handlers/users.get_user
    events:
      - http:
          path: /users/{userId}
          method: GET
          cors: true
          authorizer:
            type: COGNITO_USER_POOLS
            authorizerId: !Ref ApiGatewayAuthorizer
          request:
            parameters:
              paths:
                userId: true
    timeout: 30
    memorySize: 512
    reservedConcurrency: 100
  
  createUser:
    handler: handlers/users.create_user
    events:
      - http:
          path: /users
          method: POST
          cors: true
          authorizer:
            type: COGNITO_USER_POOLS
            authorizerId: !Ref ApiGatewayAuthorizer
    timeout: 30
    memorySize: 512
  
  updateUser:
    handler: handlers/users.update_user
    events:
      - http:
          path: /users/{userId}
          method: PUT
          cors: true
          authorizer:
            type: COGNITO_USER_POOLS
            authorizerId: !Ref ApiGatewayAuthorizer

resources:
  Resources:
    UsersTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: ${self:custom.tableName}
        BillingMode: PAY_PER_REQUEST
        AttributeDefinitions:
          - AttributeName: userId
            AttributeType: S
          - AttributeName: email
            AttributeType: S
        KeySchema:
          - AttributeName: userId
            KeyType: HASH
        GlobalSecondaryIndexes:
          - IndexName: EmailIndex
            KeySchema:
              - AttributeName: email
                KeyType: HASH
            Projection:
              ProjectionType: ALL
        StreamSpecification:
          StreamViewType: NEW_AND_OLD_IMAGES
        PointInTimeRecoverySpecification:
          PointInTimeRecoveryEnabled: true
        SSESpecification:
          SSEEnabled: true
    
    UserUploadsBucket:
      Type: AWS::S3::Bucket
      Properties:
        BucketName: ${self:custom.bucketName}
        BucketEncryption:
          ServerSideEncryptionConfiguration:
            - ServerSideEncryptionByDefault:
                SSEAlgorithm: AES256
        PublicAccessBlockConfiguration:
          BlockPublicAcls: true
          BlockPublicPolicy: true
          IgnorePublicAcls: true
          RestrictPublicBuckets: true
        LifecycleConfiguration:
          Rules:
            - Id: DeleteOldFiles
              Status: Enabled
              ExpirationInDays: 90

custom:
  tableName: users-${self:provider.stage}
  bucketName: user-uploads-${self:provider.stage}-${aws:accountId}

Performance Optimization

Cold Start Mitigation

# Provisioned Concurrency Configuration
import boto3

lambda_client = boto3.client('lambda')

def configure_provisioned_concurrency(function_name: str, alias: str, concurrency: int):
    """
    Configure provisioned concurrency to reduce cold starts
    """
    response = lambda_client.put_provisioned_concurrency_config(
        FunctionName=function_name,
        Qualifier=alias,
        ProvisionedConcurrentExecutions=concurrency
    )
    return response

# Warm-up function
def keep_warm_handler(event, context):
    """
    Scheduled function to keep Lambda warm
    """
    return {
        'statusCode': 200,
        'body': json.dumps({'message': 'Warm-up successful'})
    }

Lambda Layers for Dependencies

# layers.yml
layers:
  pythonDependencies:
    path: layers/python-dependencies
    name: ${self:service}-python-deps-${self:provider.stage}
    description: Python dependencies layer
    compatibleRuntimes:
      - python3.11
    retain: false

functions:
  myFunction:
    handler: handler.main
    layers:
      - !Ref PythonDependenciesLambdaLayer

Connection Pooling

# database.py
import pymysql
from pymysql.cursors import DictCursor

# Reuse database connection across invocations
db_connection = None

def get_db_connection():
    """
    Get or create database connection with connection pooling
    """
    global db_connection
    
    if db_connection is None or not db_connection.open:
        db_connection = pymysql.connect(
            host=os.environ['DB_HOST'],
            user=os.environ['DB_USER'],
            password=os.environ['DB_PASSWORD'],
            database=os.environ['DB_NAME'],
            cursorclass=DictCursor,
            connect_timeout=5,
            read_timeout=10,
            write_timeout=10
        )
    
    return db_connection

Security Best Practices

IAM Least Privilege

# Specific IAM permissions
iamRoleStatements:
  - Effect: Allow
    Action:
      - dynamodb:GetItem
      - dynamodb:PutItem
    Resource:
      - !GetAtt UsersTable.Arn
  
  - Effect: Allow
    Action:
      - s3:GetObject
    Resource:
      - !Sub "${DataBucket.Arn}/public/*"
  
  - Effect: Allow
    Action:
      - kms:Decrypt
    Resource:
      - !GetAtt EncryptionKey.Arn
    Condition:
      StringEquals:
        kms:ViaService:
          - !Sub "dynamodb.${AWS::Region}.amazonaws.com"

Input Validation

# validation.py
from aws_lambda_powertools.utilities.validation import validate
from typing import Dict, Any

# JSON Schema for request validation
CREATE_USER_SCHEMA = {
    "type": "object",
    "required": ["email", "name"],
    "properties": {
        "email": {
            "type": "string",
            "format": "email",
            "maxLength": 255
        },
        "name": {
            "type": "string",
            "minLength": 1,
            "maxLength": 100
        },
        "age": {
            "type": "integer",
            "minimum": 0,
            "maximum": 150
        }
    },
    "additionalProperties": False
}

@validate(inbound_schema=CREATE_USER_SCHEMA)
def create_user_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """
    Handler with automatic input validation
    """
    body = json.loads(event['body'])
    # Body is already validated against schema
    return process_user_creation(body)

Secrets Management

# secrets.py
import boto3
import json
from functools import lru_cache

secrets_client = boto3.client('secretsmanager')

@lru_cache(maxsize=1)
def get_secret(secret_name: str) -> dict:
    """
    Retrieve secret from AWS Secrets Manager with caching
    """
    try:
        response = secrets_client.get_secret_value(SecretId=secret_name)
        return json.loads(response['SecretString'])
    except Exception as e:
        logger.error(f"Error retrieving secret: {e}")
        raise

# Usage
db_credentials = get_secret('prod/database/credentials')
db_password = db_credentials['password']

Event-Driven Architecture

DynamoDB Streams Processing

# stream_processor.py
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent, event_source

@event_source(data_class=DynamoDBStreamEvent)
def dynamodb_stream_handler(event: DynamoDBStreamEvent, context):
    """
    Process DynamoDB stream events
    """
    for record in event.records:
        if record.event_name == 'INSERT':
            new_image = record.dynamodb.new_image
            logger.info(f"New user created: {new_image}")
            send_welcome_email(new_image['email'])
        
        elif record.event_name == 'MODIFY':
            old_image = record.dynamodb.old_image
            new_image = record.dynamodb.new_image
            logger.info(f"User updated: {new_image}")
            track_user_changes(old_image, new_image)
        
        elif record.event_name == 'REMOVE':
            old_image = record.dynamodb.old_image
            logger.info(f"User deleted: {old_image}")
            cleanup_user_data(old_image['userId'])

SQS Queue Processing

# queue_processor.py
from aws_lambda_powertools.utilities.data_classes import SQSEvent, event_source
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType

processor = BatchProcessor(event_type=EventType.SQS)

@event_source(data_class=SQSEvent)
def sqs_handler(event: SQSEvent, context):
    """
    Process SQS messages with batch processing
    """
    return processor.process(event=event, record_handler=process_record)

def process_record(record):
    """
    Process individual SQS record
    """
    payload = json.loads(record.body)
    logger.info(f"Processing message: {payload}")
    
    # Your business logic here
    result = process_order(payload)
    
    return result

Monitoring and Observability

CloudWatch Metrics and Alarms

# monitoring.yml
resources:
  Resources:
    LambdaErrorAlarm:
      Type: AWS::CloudWatch::Alarm
      Properties:
        AlarmName: ${self:service}-${self:provider.stage}-errors
        AlarmDescription: Alert on Lambda function errors
        MetricName: Errors
        Namespace: AWS/Lambda
        Statistic: Sum
        Period: 300
        EvaluationPeriods: 1
        Threshold: 5
        ComparisonOperator: GreaterThanThreshold
        Dimensions:
          - Name: FunctionName
            Value: !Ref GetUserLambdaFunction
        AlarmActions:
          - !Ref AlertTopic
    
    LambdaDurationAlarm:
      Type: AWS::CloudWatch::Alarm
      Properties:
        AlarmName: ${self:service}-${self:provider.stage}-duration
        AlarmDescription: Alert on high Lambda duration
        MetricName: Duration
        Namespace: AWS/Lambda
        Statistic: Average
        Period: 300
        EvaluationPeriods: 2
        Threshold: 5000
        ComparisonOperator: GreaterThanThreshold
        Dimensions:
          - Name: FunctionName
            Value: !Ref GetUserLambdaFunction

X-Ray Tracing

# Enable X-Ray tracing
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

# Patch all supported libraries
patch_all()

@xray_recorder.capture('process_user_data')
def process_user_data(user_id: str):
    """
    Function with X-Ray tracing
    """
    # Add custom metadata
    xray_recorder.put_metadata('userId', user_id)
    xray_recorder.put_annotation('userType', 'premium')
    
    # Your logic here
    result = fetch_user_from_db(user_id)
    
    return result

Cost Optimization

Right-Sizing Memory

# Use AWS Lambda Power Tuning to find optimal memory
# https://github.com/alexcasalboni/aws-lambda-power-tuning

# Example results:
# 128MB: $0.0000002083 per invocation, 1500ms duration
# 512MB: $0.0000001667 per invocation, 400ms duration  ← Optimal
# 1024MB: $0.0000002500 per invocation, 250ms duration

Reserved Concurrency

functions:
  criticalFunction:
    handler: handler.critical
    reservedConcurrency: 50  # Reserve capacity
  
  batchProcessor:
    handler: handler.batch
    reservedConcurrency: 5   # Limit concurrent executions

Key Takeaways

  1. Optimize Cold Starts: Use provisioned concurrency for latency-sensitive functions
  2. Implement Proper Error Handling: Use dead letter queues and retry logic
  3. Monitor Everything: CloudWatch, X-Ray, and custom metrics
  4. Security First: Least privilege IAM, input validation, secrets management
  5. Cost Awareness: Right-size memory, use reserved concurrency wisely
  6. Event-Driven Design: Leverage SQS, SNS, EventBridge for decoupling
  7. Testing: Unit tests, integration tests, and load testing

Conclusion

AWS Lambda and serverless architecture enable building highly scalable, cost-effective applications. By following these best practices, you'll create production-ready serverless systems that are secure, performant, and maintainable.

The serverless paradigm shift allows teams to focus on delivering business value rather than managing infrastructure. Start small, iterate quickly, and scale confidently with AWS Lambda.


Ready to go serverless? Check out my other posts on AWS architecture patterns and Terraform for infrastructure as code!

Share this insight