Suyog Maid
Suyog Maid
📄
Article2026-01-17

Google Cloud Functions: Building Production-Ready Serverless Applications

#gcp#cloud-functions#serverless#firebase#event-driven#nodejs#python

Google Cloud Functions: Building Production-Ready Serverless Applications

Google Cloud Functions enables you to build event-driven applications without managing servers. In this comprehensive guide, I'll share production-tested patterns for building scalable, secure, and cost-effective serverless applications on GCP.

Why Cloud Functions?

Cloud Functions offers compelling advantages:

  • Zero Server Management: Focus on code, not infrastructure
  • Automatic Scaling: From zero to thousands of instances
  • Event-Driven: Respond to Cloud Storage, Pub/Sub, HTTP, and more
  • Pay-Per-Use: Billed in 100ms increments
  • Integrated Ecosystem: Native integration with GCP services
  • Multi-Language Support: Node.js, Python, Go, Java, .NET, Ruby, PHP

Function Types and Triggers

HTTP Functions

# main.py - HTTP triggered function
import functions_framework
from flask import jsonify, Request
import json

@functions_framework.http
def hello_http(request: Request):
    """
    HTTP Cloud Function
    """
    # Handle CORS
    if request.method == 'OPTIONS':
        headers = {
            'Access-Control-Allow-Origin': '*',
            'Access-Control-Allow-Methods': 'GET, POST',
            'Access-Control-Allow-Headers': 'Content-Type',
            'Access-Control-Max-Age': '3600'
        }
        return ('', 204, headers)
    
    # Set CORS headers for main request
    headers = {
        'Access-Control-Allow-Origin': '*'
    }
    
    # Parse request
    request_json = request.get_json(silent=True)
    request_args = request.args
    
    if request_json and 'name' in request_json:
        name = request_json['name']
    elif request_args and 'name' in request_args:
        name = request_args['name']
    else:
        name = 'World'
    
    response = {
        'message': f'Hello {name}!',
        'timestamp': datetime.utcnow().isoformat()
    }
    
    return (jsonify(response), 200, headers)

Cloud Storage Triggers

# storage_trigger.py
import functions_framework
from google.cloud import storage, vision
import os

storage_client = storage.Client()
vision_client = vision.ImageAnnotatorClient()

@functions_framework.cloud_event
def process_image(cloud_event):
    """
    Triggered by Cloud Storage object creation
    """
    data = cloud_event.data
    
    bucket_name = data['bucket']
    file_name = data['name']
    
    print(f'Processing file: gs://{bucket_name}/{file_name}')
    
    # Skip if not an image
    if not file_name.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')):
        print(f'Skipping non-image file: {file_name}')
        return
    
    # Analyze image with Vision API
    image = vision.Image()
    image.source.image_uri = f'gs://{bucket_name}/{file_name}'
    
    # Detect labels
    response = vision_client.label_detection(image=image)
    labels = [label.description for label in response.label_annotations]
    
    # Detect safe search
    safe_search = vision_client.safe_search_detection(image=image)
    
    # Store metadata
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)
    
    metadata = {
        'labels': ','.join(labels),
        'adult': safe_search.safe_search_annotation.adult.name,
        'violence': safe_search.safe_search_annotation.violence.name
    }
    
    blob.metadata = metadata
    blob.patch()
    
    print(f'Image processed: {labels}')
    
    # If inappropriate content detected, move to quarantine
    if (safe_search.safe_search_annotation.adult >= 4 or 
        safe_search.safe_search_annotation.violence >= 4):
        quarantine_bucket = storage_client.bucket(os.environ['QUARANTINE_BUCKET'])
        bucket.copy_blob(blob, quarantine_bucket, file_name)
        blob.delete()
        print(f'Image quarantined: {file_name}')

Pub/Sub Triggers

# pubsub_trigger.py
import functions_framework
import base64
import json
from google.cloud import bigquery, firestore

bq_client = bigquery.Client()
db = firestore.Client()

@functions_framework.cloud_event
def process_message(cloud_event):
    """
    Triggered by Pub/Sub message
    """
    # Decode message
    pubsub_message = base64.b64decode(cloud_event.data['message']['data']).decode()
    message_data = json.loads(pubsub_message)
    
    print(f'Processing message: {message_data}')
    
    # Extract data
    event_type = message_data.get('event_type')
    user_id = message_data.get('user_id')
    timestamp = message_data.get('timestamp')
    
    # Store in BigQuery for analytics
    table_id = f"{os.environ['PROJECT_ID']}.analytics.user_events"
    
    rows_to_insert = [{
        'event_type': event_type,
        'user_id': user_id,
        'timestamp': timestamp,
        'properties': json.dumps(message_data.get('properties', {}))
    }]
    
    errors = bq_client.insert_rows_json(table_id, rows_to_insert)
    
    if errors:
        print(f'BigQuery insert errors: {errors}')
        raise Exception('Failed to insert into BigQuery')
    
    # Update user profile in Firestore
    user_ref = db.collection('users').document(user_id)
    user_ref.update({
        'last_activity': timestamp,
        'event_count': firestore.Increment(1)
    })
    
    print(f'Event processed for user {user_id}')

Firestore Triggers

# firestore_trigger.py
import functions_framework
from google.cloud import tasks_v2, pubsub_v1
import json
import os

tasks_client = tasks_v2.CloudTasksClient()
publisher = pubsub_v1.PublisherClient()

@functions_framework.cloud_event
def on_user_create(cloud_event):
    """
    Triggered when new user document is created in Firestore
    """
    # Get the document data
    data = cloud_event.data
    
    if 'value' not in data:
        return
    
    user_data = data['value']['fields']
    user_id = cloud_event.data['value']['name'].split('/')[-1]
    
    email = user_data.get('email', {}).get('stringValue')
    name = user_data.get('name', {}).get('stringValue')
    
    print(f'New user created: {user_id} - {email}')
    
    # Send welcome email (via Cloud Tasks)
    send_welcome_email(user_id, email, name)
    
    # Publish event to Pub/Sub
    topic_path = publisher.topic_path(
        os.environ['PROJECT_ID'],
        'user-events'
    )
    
    message_data = {
        'event_type': 'user_created',
        'user_id': user_id,
        'email': email
    }
    
    publisher.publish(
        topic_path,
        json.dumps(message_data).encode('utf-8')
    )
    
    print(f'Welcome email queued for {email}')

def send_welcome_email(user_id, email, name):
    """Queue welcome email task"""
    project = os.environ['PROJECT_ID']
    queue = 'email-queue'
    location = os.environ['REGION']
    
    parent = tasks_client.queue_path(project, location, queue)
    
    task = {
        'http_request': {
            'http_method': tasks_v2.HttpMethod.POST,
            'url': f'https://{location}-{project}.cloudfunctions.net/send-email',
            'headers': {
                'Content-Type': 'application/json'
            },
            'body': json.dumps({
                'to': email,
                'template': 'welcome',
                'data': {
                    'name': name,
                    'user_id': user_id
                }
            }).encode()
        }
    }
    
    tasks_client.create_task(request={'parent': parent, 'task': task})

Deployment with Terraform

Cloud Function Resource

# cloud_function.tf
resource "google_storage_bucket" "function_bucket" {
  name     = "${var.project_id}-cloud-functions"
  location = var.region
  
  uniform_bucket_level_access = true
  
  lifecycle_rule {
    condition {
      age = 30
    }
    action {
      type = "Delete"
    }
  }
}

resource "google_storage_bucket_object" "function_source" {
  name   = "function-source-${data.archive_file.function_zip.output_md5}.zip"
  bucket = google_storage_bucket.function_bucket.name
  source = data.archive_file.function_zip.output_path
}

data "archive_file" "function_zip" {
  type        = "zip"
  source_dir  = "${path.module}/function"
  output_path = "${path.module}/function.zip"
}

resource "google_cloudfunctions2_function" "api_function" {
  name     = "api-function"
  location = var.region
  
  build_config {
    runtime     = "python311"
    entry_point = "hello_http"
    
    source {
      storage_source {
        bucket = google_storage_bucket.function_bucket.name
        object = google_storage_bucket_object.function_source.name
      }
    }
  }
  
  service_config {
    max_instance_count = 100
    min_instance_count = 1
    available_memory   = "256M"
    timeout_seconds    = 60
    
    environment_variables = {
      PROJECT_ID = var.project_id
      ENVIRONMENT = var.environment
    }
    
    secret_environment_variables {
      key        = "API_KEY"
      project_id = var.project_id
      secret     = google_secret_manager_secret.api_key.secret_id
      version    = "latest"
    }
    
    ingress_settings               = "ALLOW_ALL"
    all_traffic_on_latest_revision = true
    
    service_account_email = google_service_account.function_sa.email
  }
  
  labels = {
    environment = var.environment
    managed_by  = "terraform"
  }
}

resource "google_service_account" "function_sa" {
  account_id   = "cloud-function-sa"
  display_name = "Cloud Function Service Account"
}

resource "google_project_iam_member" "function_permissions" {
  for_each = toset([
    "roles/datastore.user",
    "roles/pubsub.publisher",
    "roles/bigquery.dataEditor",
    "roles/storage.objectViewer"
  ])
  
  project = var.project_id
  role    = each.value
  member  = "serviceAccount:${google_service_account.function_sa.email}"
}

resource "google_cloudfunctions2_function_iam_member" "invoker" {
  project        = google_cloudfunctions2_function.api_function.project
  location       = google_cloudfunctions2_function.api_function.location
  cloud_function = google_cloudfunctions2_function.api_function.name
  
  role   = "roles/cloudfunctions.invoker"
  member = "allUsers"
}

output "function_url" {
  value = google_cloudfunctions2_function.api_function.service_config[0].uri
}

Event-Driven Function

# event_function.tf
resource "google_cloudfunctions2_function" "storage_function" {
  name     = "process-uploads"
  location = var.region
  
  build_config {
    runtime     = "python311"
    entry_point = "process_image"
    
    source {
      storage_source {
        bucket = google_storage_bucket.function_bucket.name
        object = google_storage_bucket_object.function_source.name
      }
    }
  }
  
  service_config {
    max_instance_count = 50
    available_memory   = "512M"
    timeout_seconds    = 300
    
    environment_variables = {
      PROJECT_ID        = var.project_id
      QUARANTINE_BUCKET = google_storage_bucket.quarantine.name
    }
    
    service_account_email = google_service_account.function_sa.email
  }
  
  event_trigger {
    trigger_region = var.region
    event_type     = "google.cloud.storage.object.v1.finalized"
    
    event_filters {
      attribute = "bucket"
      value     = google_storage_bucket.uploads.name
    }
    
    retry_policy = "RETRY_POLICY_RETRY"
  }
}

resource "google_storage_bucket" "uploads" {
  name     = "${var.project_id}-uploads"
  location = var.region
  
  uniform_bucket_level_access = true
  
  cors {
    origin          = ["https://example.com"]
    method          = ["GET", "POST", "PUT"]
    response_header = ["*"]
    max_age_seconds = 3600
  }
}

resource "google_storage_bucket" "quarantine" {
  name     = "${var.project_id}-quarantine"
  location = var.region
  
  uniform_bucket_level_access = true
}

Performance Optimization

Cold Start Mitigation

# optimized_function.py
import functions_framework
from google.cloud import firestore
import os

# Initialize clients globally (outside function)
db = firestore.Client()

# Cache configuration
CONFIG_CACHE = {}

def get_config():
    """Get configuration with caching"""
    if not CONFIG_CACHE:
        doc_ref = db.collection('config').document('app')
        CONFIG_CACHE.update(doc_ref.get().to_dict())
    return CONFIG_CACHE

@functions_framework.http
def optimized_handler(request):
    """
    Optimized function with minimal cold start
    """
    config = get_config()
    
    # Your business logic here
    return {'status': 'success'}

Concurrency Configuration

# function.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: my-function
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/minScale: "1"
        autoscaling.knative.dev/maxScale: "100"
        run.googleapis.com/cpu-throttling: "false"
    spec:
      containerConcurrency: 80
      timeoutSeconds: 300
      containers:
      - image: gcr.io/project/function:latest
        resources:
          limits:
            memory: 512Mi
            cpu: "1"

Security Best Practices

Secret Management

# secrets.py
from google.cloud import secretmanager
from functools import lru_cache
import os

client = secretmanager.SecretManagerServiceClient()

@lru_cache(maxsize=10)
def get_secret(secret_id, version='latest'):
    """
    Retrieve secret from Secret Manager with caching
    """
    project_id = os.environ['PROJECT_ID']
    name = f"projects/{project_id}/secrets/{secret_id}/versions/{version}"
    
    response = client.access_secret_version(request={"name": name})
    return response.payload.data.decode('UTF-8')

# Usage
api_key = get_secret('api-key')
db_password = get_secret('database-password')

Authentication and Authorization

# auth.py
import functions_framework
from google.auth.transport import requests
from google.oauth2 import id_token
import os

@functions_framework.http
def authenticated_function(request):
    """
    Function with authentication
    """
    # Verify Firebase/Google ID token
    auth_header = request.headers.get('Authorization', '')
    
    if not auth_header.startswith('Bearer '):
        return {'error': 'Missing authorization'}, 401
    
    id_token_value = auth_header.split('Bearer ')[1]
    
    try:
        # Verify token
        claims = id_token.verify_oauth2_token(
            id_token_value,
            requests.Request(),
            os.environ['GOOGLE_CLIENT_ID']
        )
        
        user_id = claims['sub']
        email = claims.get('email')
        
        # Check permissions
        if not has_permission(user_id, 'admin'):
            return {'error': 'Insufficient permissions'}, 403
        
        # Process request
        return process_admin_request(request, user_id)
        
    except ValueError as e:
        return {'error': 'Invalid token'}, 401

def has_permission(user_id, role):
    """Check user permissions"""
    # Implement your permission logic
    return True

def process_admin_request(request, user_id):
    """Process authenticated request"""
    return {'status': 'success', 'user_id': user_id}

Monitoring and Logging

Structured Logging

# logging_example.py
import functions_framework
import json
import sys
from datetime import datetime

def log_structured(severity, message, **kwargs):
    """
    Write structured log entry
    """
    entry = {
        'severity': severity,
        'message': message,
        'timestamp': datetime.utcnow().isoformat(),
        **kwargs
    }
    print(json.dumps(entry), file=sys.stderr if severity == 'ERROR' else sys.stdout)

@functions_framework.http
def logged_function(request):
    """
    Function with structured logging
    """
    log_structured('INFO', 'Function invoked', 
                   method=request.method,
                   path=request.path)
    
    try:
        result = process_request(request)
        
        log_structured('INFO', 'Request processed successfully',
                       result=result)
        
        return result
        
    except Exception as e:
        log_structured('ERROR', 'Request processing failed',
                       error=str(e),
                       traceback=traceback.format_exc())
        raise

def process_request(request):
    """Process the request"""
    return {'status': 'success'}

Custom Metrics

# metrics.py
from google.cloud import monitoring_v3
import time
import os

client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{os.environ['PROJECT_ID']}"

def write_custom_metric(metric_type, value, labels=None):
    """
    Write custom metric to Cloud Monitoring
    """
    series = monitoring_v3.TimeSeries()
    series.metric.type = f"custom.googleapis.com/{metric_type}"
    
    if labels:
        for key, val in labels.items():
            series.metric.labels[key] = val
    
    series.resource.type = "cloud_function"
    series.resource.labels["function_name"] = os.environ.get('FUNCTION_NAME', 'unknown')
    series.resource.labels["region"] = os.environ.get('FUNCTION_REGION', 'us-central1')
    
    now = time.time()
    seconds = int(now)
    nanos = int((now - seconds) * 10 ** 9)
    
    interval = monitoring_v3.TimeInterval(
        {"end_time": {"seconds": seconds, "nanos": nanos}}
    )
    
    point = monitoring_v3.Point(
        {"interval": interval, "value": {"double_value": value}}
    )
    
    series.points = [point]
    
    client.create_time_series(name=project_name, time_series=[series])

# Usage
@functions_framework.http
def monitored_function(request):
    start_time = time.time()
    
    try:
        result = process_request(request)
        
        # Record success metric
        write_custom_metric('function/success_count', 1)
        
        return result
        
    except Exception as e:
        # Record error metric
        write_custom_metric('function/error_count', 1, 
                           labels={'error_type': type(e).__name__})
        raise
    
    finally:
        # Record duration
        duration = time.time() - start_time
        write_custom_metric('function/duration', duration)

Key Takeaways

  1. Initialize Outside Handler: Create clients globally to reduce cold starts
  2. Use Secrets Manager: Never hardcode credentials
  3. Implement Retry Logic: Handle transient failures gracefully
  4. Structure Logs: Use JSON for better querying in Cloud Logging
  5. Set Appropriate Timeouts: Match timeout to workload requirements
  6. Monitor Everything: Use Cloud Monitoring for metrics and alerts
  7. Optimize Memory: Right-size memory allocation for cost efficiency
  8. Use Service Accounts: Follow principle of least privilege

Conclusion

Google Cloud Functions enables rapid development of event-driven applications without infrastructure management. By following these best practices, you'll build production-ready serverless applications that are secure, performant, and cost-effective.

Start simple, monitor closely, and iterate based on real-world usage patterns. The serverless paradigm allows you to focus on business logic while GCP handles the infrastructure.


Building on GCP? Check out my posts on BigQuery for analytics and Cloud Run for containerized workloads!

Share this insight