Google Cloud Functions: Building Production-Ready Serverless Applications
Google Cloud Functions enables you to build event-driven applications without managing servers. In this comprehensive guide, I'll share production-tested patterns for building scalable, secure, and cost-effective serverless applications on GCP.
Why Cloud Functions?
Cloud Functions offers compelling advantages:
- Zero Server Management: Focus on code, not infrastructure
- Automatic Scaling: From zero to thousands of instances
- Event-Driven: Respond to Cloud Storage, Pub/Sub, HTTP, and more
- Pay-Per-Use: Billed in 100ms increments
- Integrated Ecosystem: Native integration with GCP services
- Multi-Language Support: Node.js, Python, Go, Java, .NET, Ruby, PHP
Function Types and Triggers
HTTP Functions
# main.py - HTTP triggered function
import functions_framework
from flask import jsonify, Request
import json
@functions_framework.http
def hello_http(request: Request):
"""
HTTP Cloud Function
"""
# Handle CORS
if request.method == 'OPTIONS':
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Max-Age': '3600'
}
return ('', 204, headers)
# Set CORS headers for main request
headers = {
'Access-Control-Allow-Origin': '*'
}
# Parse request
request_json = request.get_json(silent=True)
request_args = request.args
if request_json and 'name' in request_json:
name = request_json['name']
elif request_args and 'name' in request_args:
name = request_args['name']
else:
name = 'World'
response = {
'message': f'Hello {name}!',
'timestamp': datetime.utcnow().isoformat()
}
return (jsonify(response), 200, headers)
Cloud Storage Triggers
# storage_trigger.py
import functions_framework
from google.cloud import storage, vision
import os
storage_client = storage.Client()
vision_client = vision.ImageAnnotatorClient()
@functions_framework.cloud_event
def process_image(cloud_event):
"""
Triggered by Cloud Storage object creation
"""
data = cloud_event.data
bucket_name = data['bucket']
file_name = data['name']
print(f'Processing file: gs://{bucket_name}/{file_name}')
# Skip if not an image
if not file_name.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')):
print(f'Skipping non-image file: {file_name}')
return
# Analyze image with Vision API
image = vision.Image()
image.source.image_uri = f'gs://{bucket_name}/{file_name}'
# Detect labels
response = vision_client.label_detection(image=image)
labels = [label.description for label in response.label_annotations]
# Detect safe search
safe_search = vision_client.safe_search_detection(image=image)
# Store metadata
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
metadata = {
'labels': ','.join(labels),
'adult': safe_search.safe_search_annotation.adult.name,
'violence': safe_search.safe_search_annotation.violence.name
}
blob.metadata = metadata
blob.patch()
print(f'Image processed: {labels}')
# If inappropriate content detected, move to quarantine
if (safe_search.safe_search_annotation.adult >= 4 or
safe_search.safe_search_annotation.violence >= 4):
quarantine_bucket = storage_client.bucket(os.environ['QUARANTINE_BUCKET'])
bucket.copy_blob(blob, quarantine_bucket, file_name)
blob.delete()
print(f'Image quarantined: {file_name}')
Pub/Sub Triggers
# pubsub_trigger.py
import functions_framework
import base64
import json
from google.cloud import bigquery, firestore
bq_client = bigquery.Client()
db = firestore.Client()
@functions_framework.cloud_event
def process_message(cloud_event):
"""
Triggered by Pub/Sub message
"""
# Decode message
pubsub_message = base64.b64decode(cloud_event.data['message']['data']).decode()
message_data = json.loads(pubsub_message)
print(f'Processing message: {message_data}')
# Extract data
event_type = message_data.get('event_type')
user_id = message_data.get('user_id')
timestamp = message_data.get('timestamp')
# Store in BigQuery for analytics
table_id = f"{os.environ['PROJECT_ID']}.analytics.user_events"
rows_to_insert = [{
'event_type': event_type,
'user_id': user_id,
'timestamp': timestamp,
'properties': json.dumps(message_data.get('properties', {}))
}]
errors = bq_client.insert_rows_json(table_id, rows_to_insert)
if errors:
print(f'BigQuery insert errors: {errors}')
raise Exception('Failed to insert into BigQuery')
# Update user profile in Firestore
user_ref = db.collection('users').document(user_id)
user_ref.update({
'last_activity': timestamp,
'event_count': firestore.Increment(1)
})
print(f'Event processed for user {user_id}')
Firestore Triggers
# firestore_trigger.py
import functions_framework
from google.cloud import tasks_v2, pubsub_v1
import json
import os
tasks_client = tasks_v2.CloudTasksClient()
publisher = pubsub_v1.PublisherClient()
@functions_framework.cloud_event
def on_user_create(cloud_event):
"""
Triggered when new user document is created in Firestore
"""
# Get the document data
data = cloud_event.data
if 'value' not in data:
return
user_data = data['value']['fields']
user_id = cloud_event.data['value']['name'].split('/')[-1]
email = user_data.get('email', {}).get('stringValue')
name = user_data.get('name', {}).get('stringValue')
print(f'New user created: {user_id} - {email}')
# Send welcome email (via Cloud Tasks)
send_welcome_email(user_id, email, name)
# Publish event to Pub/Sub
topic_path = publisher.topic_path(
os.environ['PROJECT_ID'],
'user-events'
)
message_data = {
'event_type': 'user_created',
'user_id': user_id,
'email': email
}
publisher.publish(
topic_path,
json.dumps(message_data).encode('utf-8')
)
print(f'Welcome email queued for {email}')
def send_welcome_email(user_id, email, name):
"""Queue welcome email task"""
project = os.environ['PROJECT_ID']
queue = 'email-queue'
location = os.environ['REGION']
parent = tasks_client.queue_path(project, location, queue)
task = {
'http_request': {
'http_method': tasks_v2.HttpMethod.POST,
'url': f'https://{location}-{project}.cloudfunctions.net/send-email',
'headers': {
'Content-Type': 'application/json'
},
'body': json.dumps({
'to': email,
'template': 'welcome',
'data': {
'name': name,
'user_id': user_id
}
}).encode()
}
}
tasks_client.create_task(request={'parent': parent, 'task': task})
Deployment with Terraform
Cloud Function Resource
# cloud_function.tf
resource "google_storage_bucket" "function_bucket" {
name = "${var.project_id}-cloud-functions"
location = var.region
uniform_bucket_level_access = true
lifecycle_rule {
condition {
age = 30
}
action {
type = "Delete"
}
}
}
resource "google_storage_bucket_object" "function_source" {
name = "function-source-${data.archive_file.function_zip.output_md5}.zip"
bucket = google_storage_bucket.function_bucket.name
source = data.archive_file.function_zip.output_path
}
data "archive_file" "function_zip" {
type = "zip"
source_dir = "${path.module}/function"
output_path = "${path.module}/function.zip"
}
resource "google_cloudfunctions2_function" "api_function" {
name = "api-function"
location = var.region
build_config {
runtime = "python311"
entry_point = "hello_http"
source {
storage_source {
bucket = google_storage_bucket.function_bucket.name
object = google_storage_bucket_object.function_source.name
}
}
}
service_config {
max_instance_count = 100
min_instance_count = 1
available_memory = "256M"
timeout_seconds = 60
environment_variables = {
PROJECT_ID = var.project_id
ENVIRONMENT = var.environment
}
secret_environment_variables {
key = "API_KEY"
project_id = var.project_id
secret = google_secret_manager_secret.api_key.secret_id
version = "latest"
}
ingress_settings = "ALLOW_ALL"
all_traffic_on_latest_revision = true
service_account_email = google_service_account.function_sa.email
}
labels = {
environment = var.environment
managed_by = "terraform"
}
}
resource "google_service_account" "function_sa" {
account_id = "cloud-function-sa"
display_name = "Cloud Function Service Account"
}
resource "google_project_iam_member" "function_permissions" {
for_each = toset([
"roles/datastore.user",
"roles/pubsub.publisher",
"roles/bigquery.dataEditor",
"roles/storage.objectViewer"
])
project = var.project_id
role = each.value
member = "serviceAccount:${google_service_account.function_sa.email}"
}
resource "google_cloudfunctions2_function_iam_member" "invoker" {
project = google_cloudfunctions2_function.api_function.project
location = google_cloudfunctions2_function.api_function.location
cloud_function = google_cloudfunctions2_function.api_function.name
role = "roles/cloudfunctions.invoker"
member = "allUsers"
}
output "function_url" {
value = google_cloudfunctions2_function.api_function.service_config[0].uri
}
Event-Driven Function
# event_function.tf
resource "google_cloudfunctions2_function" "storage_function" {
name = "process-uploads"
location = var.region
build_config {
runtime = "python311"
entry_point = "process_image"
source {
storage_source {
bucket = google_storage_bucket.function_bucket.name
object = google_storage_bucket_object.function_source.name
}
}
}
service_config {
max_instance_count = 50
available_memory = "512M"
timeout_seconds = 300
environment_variables = {
PROJECT_ID = var.project_id
QUARANTINE_BUCKET = google_storage_bucket.quarantine.name
}
service_account_email = google_service_account.function_sa.email
}
event_trigger {
trigger_region = var.region
event_type = "google.cloud.storage.object.v1.finalized"
event_filters {
attribute = "bucket"
value = google_storage_bucket.uploads.name
}
retry_policy = "RETRY_POLICY_RETRY"
}
}
resource "google_storage_bucket" "uploads" {
name = "${var.project_id}-uploads"
location = var.region
uniform_bucket_level_access = true
cors {
origin = ["https://example.com"]
method = ["GET", "POST", "PUT"]
response_header = ["*"]
max_age_seconds = 3600
}
}
resource "google_storage_bucket" "quarantine" {
name = "${var.project_id}-quarantine"
location = var.region
uniform_bucket_level_access = true
}
Performance Optimization
Cold Start Mitigation
# optimized_function.py
import functions_framework
from google.cloud import firestore
import os
# Initialize clients globally (outside function)
db = firestore.Client()
# Cache configuration
CONFIG_CACHE = {}
def get_config():
"""Get configuration with caching"""
if not CONFIG_CACHE:
doc_ref = db.collection('config').document('app')
CONFIG_CACHE.update(doc_ref.get().to_dict())
return CONFIG_CACHE
@functions_framework.http
def optimized_handler(request):
"""
Optimized function with minimal cold start
"""
config = get_config()
# Your business logic here
return {'status': 'success'}
Concurrency Configuration
# function.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: my-function
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/minScale: "1"
autoscaling.knative.dev/maxScale: "100"
run.googleapis.com/cpu-throttling: "false"
spec:
containerConcurrency: 80
timeoutSeconds: 300
containers:
- image: gcr.io/project/function:latest
resources:
limits:
memory: 512Mi
cpu: "1"
Security Best Practices
Secret Management
# secrets.py
from google.cloud import secretmanager
from functools import lru_cache
import os
client = secretmanager.SecretManagerServiceClient()
@lru_cache(maxsize=10)
def get_secret(secret_id, version='latest'):
"""
Retrieve secret from Secret Manager with caching
"""
project_id = os.environ['PROJECT_ID']
name = f"projects/{project_id}/secrets/{secret_id}/versions/{version}"
response = client.access_secret_version(request={"name": name})
return response.payload.data.decode('UTF-8')
# Usage
api_key = get_secret('api-key')
db_password = get_secret('database-password')
Authentication and Authorization
# auth.py
import functions_framework
from google.auth.transport import requests
from google.oauth2 import id_token
import os
@functions_framework.http
def authenticated_function(request):
"""
Function with authentication
"""
# Verify Firebase/Google ID token
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return {'error': 'Missing authorization'}, 401
id_token_value = auth_header.split('Bearer ')[1]
try:
# Verify token
claims = id_token.verify_oauth2_token(
id_token_value,
requests.Request(),
os.environ['GOOGLE_CLIENT_ID']
)
user_id = claims['sub']
email = claims.get('email')
# Check permissions
if not has_permission(user_id, 'admin'):
return {'error': 'Insufficient permissions'}, 403
# Process request
return process_admin_request(request, user_id)
except ValueError as e:
return {'error': 'Invalid token'}, 401
def has_permission(user_id, role):
"""Check user permissions"""
# Implement your permission logic
return True
def process_admin_request(request, user_id):
"""Process authenticated request"""
return {'status': 'success', 'user_id': user_id}
Monitoring and Logging
Structured Logging
# logging_example.py
import functions_framework
import json
import sys
from datetime import datetime
def log_structured(severity, message, **kwargs):
"""
Write structured log entry
"""
entry = {
'severity': severity,
'message': message,
'timestamp': datetime.utcnow().isoformat(),
**kwargs
}
print(json.dumps(entry), file=sys.stderr if severity == 'ERROR' else sys.stdout)
@functions_framework.http
def logged_function(request):
"""
Function with structured logging
"""
log_structured('INFO', 'Function invoked',
method=request.method,
path=request.path)
try:
result = process_request(request)
log_structured('INFO', 'Request processed successfully',
result=result)
return result
except Exception as e:
log_structured('ERROR', 'Request processing failed',
error=str(e),
traceback=traceback.format_exc())
raise
def process_request(request):
"""Process the request"""
return {'status': 'success'}
Custom Metrics
# metrics.py
from google.cloud import monitoring_v3
import time
import os
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{os.environ['PROJECT_ID']}"
def write_custom_metric(metric_type, value, labels=None):
"""
Write custom metric to Cloud Monitoring
"""
series = monitoring_v3.TimeSeries()
series.metric.type = f"custom.googleapis.com/{metric_type}"
if labels:
for key, val in labels.items():
series.metric.labels[key] = val
series.resource.type = "cloud_function"
series.resource.labels["function_name"] = os.environ.get('FUNCTION_NAME', 'unknown')
series.resource.labels["region"] = os.environ.get('FUNCTION_REGION', 'us-central1')
now = time.time()
seconds = int(now)
nanos = int((now - seconds) * 10 ** 9)
interval = monitoring_v3.TimeInterval(
{"end_time": {"seconds": seconds, "nanos": nanos}}
)
point = monitoring_v3.Point(
{"interval": interval, "value": {"double_value": value}}
)
series.points = [point]
client.create_time_series(name=project_name, time_series=[series])
# Usage
@functions_framework.http
def monitored_function(request):
start_time = time.time()
try:
result = process_request(request)
# Record success metric
write_custom_metric('function/success_count', 1)
return result
except Exception as e:
# Record error metric
write_custom_metric('function/error_count', 1,
labels={'error_type': type(e).__name__})
raise
finally:
# Record duration
duration = time.time() - start_time
write_custom_metric('function/duration', duration)
Key Takeaways
- Initialize Outside Handler: Create clients globally to reduce cold starts
- Use Secrets Manager: Never hardcode credentials
- Implement Retry Logic: Handle transient failures gracefully
- Structure Logs: Use JSON for better querying in Cloud Logging
- Set Appropriate Timeouts: Match timeout to workload requirements
- Monitor Everything: Use Cloud Monitoring for metrics and alerts
- Optimize Memory: Right-size memory allocation for cost efficiency
- Use Service Accounts: Follow principle of least privilege
Conclusion
Google Cloud Functions enables rapid development of event-driven applications without infrastructure management. By following these best practices, you'll build production-ready serverless applications that are secure, performant, and cost-effective.
Start simple, monitor closely, and iterate based on real-world usage patterns. The serverless paradigm allows you to focus on business logic while GCP handles the infrastructure.
Building on GCP? Check out my posts on BigQuery for analytics and Cloud Run for containerized workloads!