Suyog Maid
Suyog Maid
📄
Article2026-01-18

Building Scalable Data Analytics Pipelines with Google BigQuery and Dataflow

#gcp#bigquery#dataflow#data-analytics#pubsub#data-engineering#apache-beam

Building Scalable Data Analytics Pipelines with Google BigQuery and Dataflow

Google Cloud Platform offers powerful tools for data analytics and processing. BigQuery provides serverless data warehousing, while Dataflow enables both batch and streaming data processing. This guide explores building production-ready data pipelines leveraging these services.

Architecture Overview

Modern Data Pipeline Architecture

Data Sources
    ├── Application Logs
    ├── User Events
    ├── IoT Sensors
    └── External APIs
         │
         ▼
    Cloud Pub/Sub (Ingestion)
         │
         ▼
    Cloud Dataflow (Processing)
         │
         ├──► BigQuery (Analytics)
         ├──► Cloud Storage (Archive)
         └──► Cloud Bigtable (Real-time)
         │
         ▼
    Visualization
         ├── Looker Studio
         ├── Tableau
         └── Custom Dashboards

BigQuery Fundamentals

Table Design and Partitioning

-- Create partitioned and clustered table
CREATE TABLE `project.dataset.events`
(
  event_id STRING NOT NULL,
  user_id STRING NOT NULL,
  event_type STRING NOT NULL,
  event_timestamp TIMESTAMP NOT NULL,
  event_data JSON,
  user_country STRING,
  user_device STRING,
  session_id STRING
)
PARTITION BY DATE(event_timestamp)
CLUSTER BY user_country, event_type
OPTIONS(
  description="User events table with partitioning and clustering",
  require_partition_filter=true,
  partition_expiration_days=365
);

-- Create materialized view for common queries
CREATE MATERIALIZED VIEW `project.dataset.daily_user_stats`
PARTITION BY event_date
CLUSTER BY user_country
AS
SELECT
  DATE(event_timestamp) as event_date,
  user_country,
  COUNT(DISTINCT user_id) as unique_users,
  COUNT(*) as total_events,
  COUNTIF(event_type = 'purchase') as purchases,
  SUM(CAST(JSON_EXTRACT_SCALAR(event_data, '$.amount') AS FLOAT64)) as revenue
FROM `project.dataset.events`
GROUP BY event_date, user_country;

Advanced SQL Queries

-- Window functions for user journey analysis
WITH user_sessions AS (
  SELECT
    user_id,
    session_id,
    event_timestamp,
    event_type,
    LAG(event_type) OVER (
      PARTITION BY user_id, session_id 
      ORDER BY event_timestamp
    ) as previous_event,
    LEAD(event_type) OVER (
      PARTITION BY user_id, session_id 
      ORDER BY event_timestamp
    ) as next_event,
    TIMESTAMP_DIFF(
      event_timestamp,
      LAG(event_timestamp) OVER (
        PARTITION BY user_id, session_id 
        ORDER BY event_timestamp
      ),
      SECOND
    ) as seconds_since_last_event
  FROM `project.dataset.events`
  WHERE DATE(event_timestamp) = CURRENT_DATE()
)
SELECT
  event_type,
  previous_event,
  next_event,
  COUNT(*) as occurrence_count,
  AVG(seconds_since_last_event) as avg_time_between_events
FROM user_sessions
WHERE previous_event IS NOT NULL
GROUP BY event_type, previous_event, next_event
ORDER BY occurrence_count DESC;

-- Cohort analysis
WITH user_cohorts AS (
  SELECT
    user_id,
    DATE_TRUNC(MIN(DATE(event_timestamp)), MONTH) as cohort_month
  FROM `project.dataset.events`
  GROUP BY user_id
),
cohort_activity AS (
  SELECT
    c.cohort_month,
    DATE_TRUNC(DATE(e.event_timestamp), MONTH) as activity_month,
    COUNT(DISTINCT e.user_id) as active_users
  FROM user_cohorts c
  JOIN `project.dataset.events` e ON c.user_id = e.user_id
  GROUP BY cohort_month, activity_month
)
SELECT
  cohort_month,
  activity_month,
  active_users,
  DATE_DIFF(activity_month, cohort_month, MONTH) as months_since_cohort,
  active_users / FIRST_VALUE(active_users) OVER (
    PARTITION BY cohort_month 
    ORDER BY activity_month
  ) as retention_rate
FROM cohort_activity
ORDER BY cohort_month, activity_month;

Cloud Dataflow Pipelines

Apache Beam Pipeline for Stream Processing

# dataflow_pipeline.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
from apache_beam.transforms import window
import json
from datetime import datetime

class ParsePubSubMessage(beam.DoFn):
    """Parse Pub/Sub message and extract event data"""
    
    def process(self, element):
        try:
            # Decode message
            message_data = json.loads(element.decode('utf-8'))
            
            # Add processing timestamp
            message_data['processed_at'] = datetime.utcnow().isoformat()
            
            yield message_data
        except Exception as e:
            # Log error and continue
            print(f"Error parsing message: {e}")

class EnrichEventData(beam.DoFn):
    """Enrich event data with additional context"""
    
    def __init__(self):
        self.geo_lookup = None
    
    def setup(self):
        # Initialize lookup tables (could be from Cloud Storage or external API)
        self.geo_lookup = self._load_geo_data()
    
    def _load_geo_data(self):
        # Load geolocation data
        return {
            '192.168.1.1': {'country': 'US', 'city': 'New York'},
            # ... more mappings
        }
    
    def process(self, element):
        # Add geolocation data
        ip_address = element.get('ip_address')
        if ip_address and ip_address in self.geo_lookup:
            element['geo_data'] = self.geo_lookup[ip_address]
        
        # Add derived fields
        element['event_hour'] = datetime.fromisoformat(
            element['event_timestamp']
        ).hour
        
        yield element

class CalculateMetrics(beam.DoFn):
    """Calculate real-time metrics"""
    
    def process(self, element):
        # Extract key metrics
        yield {
            'window_start': element[0].start.to_utc_datetime().isoformat(),
            'window_end': element[0].end.to_utc_datetime().isoformat(),
            'event_type': element[1]['event_type'],
            'count': element[1]['count'],
            'unique_users': element[1]['unique_users'],
            'total_revenue': element[1]['total_revenue']
        }

def run_pipeline(argv=None):
    """Main pipeline execution"""
    
    # Pipeline options
    options = PipelineOptions(argv)
    options.view_as(StandardOptions).streaming = True
    
    # BigQuery table schema
    table_schema = {
        'fields': [
            {'name': 'event_id', 'type': 'STRING', 'mode': 'REQUIRED'},
            {'name': 'user_id', 'type': 'STRING', 'mode': 'REQUIRED'},
            {'name': 'event_type', 'type': 'STRING', 'mode': 'REQUIRED'},
            {'name': 'event_timestamp', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'},
            {'name': 'event_data', 'type': 'JSON', 'mode': 'NULLABLE'},
            {'name': 'geo_data', 'type': 'JSON', 'mode': 'NULLABLE'},
            {'name': 'processed_at', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'}
        ]
    }
    
    with beam.Pipeline(options=options) as pipeline:
        # Read from Pub/Sub
        events = (
            pipeline
            | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(
                subscription='projects/PROJECT_ID/subscriptions/events-sub'
            )
            | 'Parse Messages' >> beam.ParDo(ParsePubSubMessage())
            | 'Enrich Data' >> beam.ParDo(EnrichEventData())
        )
        
        # Write raw events to BigQuery
        events | 'Write to BigQuery' >> WriteToBigQuery(
            table='PROJECT_ID:dataset.events',
            schema=table_schema,
            write_disposition=BigQueryDisposition.WRITE_APPEND,
            create_disposition=BigQueryDisposition.CREATE_IF_NEEDED
        )
        
        # Calculate real-time metrics
        metrics = (
            events
            | 'Window into 1-minute intervals' >> beam.WindowInto(
                window.FixedWindows(60)
            )
            | 'Group by event type' >> beam.GroupBy('event_type')
            | 'Calculate aggregates' >> beam.CombinePerKey(
                lambda events: {
                    'count': len(events),
                    'unique_users': len(set(e['user_id'] for e in events)),
                    'total_revenue': sum(
                        float(e.get('event_data', {}).get('amount', 0))
                        for e in events
                        if e.get('event_type') == 'purchase'
                    )
                }
            )
            | 'Format metrics' >> beam.ParDo(CalculateMetrics())
            | 'Write metrics to BigQuery' >> WriteToBigQuery(
                table='PROJECT_ID:dataset.realtime_metrics',
                write_disposition=BigQueryDisposition.WRITE_APPEND,
                create_disposition=BigQueryDisposition.CREATE_IF_NEEDED
            )
        )

if __name__ == '__main__':
    run_pipeline()

Batch Processing Pipeline

# batch_pipeline.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import ReadFromBigQuery, WriteToBigQuery

class TransformData(beam.DoFn):
    """Transform and clean data"""
    
    def process(self, element):
        # Data cleaning
        if element.get('user_id') and element.get('event_timestamp'):
            # Remove PII
            element['user_id_hash'] = hash(element['user_id'])
            del element['user_id']
            
            # Standardize formats
            element['event_type'] = element['event_type'].lower().strip()
            
            yield element

def run_batch_pipeline(argv=None):
    """Batch processing pipeline"""
    
    options = PipelineOptions(argv)
    
    with beam.Pipeline(options=options) as pipeline:
        # Read from BigQuery
        raw_data = (
            pipeline
            | 'Read from BigQuery' >> ReadFromBigQuery(
                query='''
                    SELECT *
                    FROM `project.dataset.events`
                    WHERE DATE(event_timestamp) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
                ''',
                use_standard_sql=True
            )
        )
        
        # Transform data
        transformed = (
            raw_data
            | 'Transform' >> beam.ParDo(TransformData())
        )
        
        # Write to different destinations based on event type
        (
            transformed
            | 'Filter purchases' >> beam.Filter(
                lambda x: x['event_type'] == 'purchase'
            )
            | 'Write purchases' >> WriteToBigQuery(
                table='project:dataset.purchases',
                write_disposition=BigQueryDisposition.WRITE_TRUNCATE
            )
        )
        
        (
            transformed
            | 'Filter page views' >> beam.Filter(
                lambda x: x['event_type'] == 'page_view'
            )
            | 'Write page views' >> WriteToBigQuery(
                table='project:dataset.page_views',
                write_disposition=BigQueryDisposition.WRITE_TRUNCATE
            )
        )

if __name__ == '__main__':
    run_batch_pipeline()

Infrastructure as Code with Terraform

# bigquery.tf
resource "google_bigquery_dataset" "analytics" {
  dataset_id  = "analytics"
  project     = var.project_id
  location    = "US"
  description = "Analytics dataset for event data"
  
  default_table_expiration_ms = 31536000000 # 365 days
  
  access {
    role          = "OWNER"
    user_by_email = var.owner_email
  }
  
  access {
    role          = "READER"
    group_by_email = var.analyst_group_email
  }
  
  labels = {
    environment = var.environment
    team        = "data-engineering"
  }
}

resource "google_bigquery_table" "events" {
  dataset_id = google_bigquery_dataset.analytics.dataset_id
  table_id   = "events"
  project    = var.project_id
  
  time_partitioning {
    type                     = "DAY"
    field                    = "event_timestamp"
    require_partition_filter = true
    expiration_ms            = 31536000000
  }
  
  clustering = ["user_country", "event_type"]
  
  schema = jsonencode([
    {
      name = "event_id"
      type = "STRING"
      mode = "REQUIRED"
    },
    {
      name = "user_id"
      type = "STRING"
      mode = "REQUIRED"
    },
    {
      name = "event_type"
      type = "STRING"
      mode = "REQUIRED"
    },
    {
      name = "event_timestamp"
      type = "TIMESTAMP"
      mode = "REQUIRED"
    },
    {
      name = "event_data"
      type = "JSON"
      mode = "NULLABLE"
    }
  ])
  
  labels = {
    environment = var.environment
  }
}

# Pub/Sub topic and subscription
resource "google_pubsub_topic" "events" {
  name    = "events-topic"
  project = var.project_id
  
  message_retention_duration = "86400s" # 24 hours
  
  labels = {
    environment = var.environment
  }
}

resource "google_pubsub_subscription" "events" {
  name    = "events-subscription"
  topic   = google_pubsub_topic.events.name
  project = var.project_id
  
  ack_deadline_seconds = 20
  
  message_retention_duration = "86400s"
  retain_acked_messages      = false
  
  expiration_policy {
    ttl = "" # Never expire
  }
  
  retry_policy {
    minimum_backoff = "10s"
    maximum_backoff = "600s"
  }
  
  dead_letter_policy {
    dead_letter_topic     = google_pubsub_topic.dead_letter.id
    max_delivery_attempts = 5
  }
}

# Dataflow job
resource "google_dataflow_job" "streaming_pipeline" {
  name              = "streaming-events-pipeline"
  project           = var.project_id
  region            = var.region
  temp_gcs_location = "gs://${google_storage_bucket.dataflow_temp.name}/temp"
  
  template_gcs_path = "gs://dataflow-templates/latest/PubSub_to_BigQuery"
  
  parameters = {
    inputTopic      = google_pubsub_topic.events.id
    outputTableSpec = "${var.project_id}:${google_bigquery_dataset.analytics.dataset_id}.${google_bigquery_table.events.table_id}"
  }
  
  on_delete = "cancel"
  
  labels = {
    environment = var.environment
  }
}

# Cloud Storage for Dataflow temp files
resource "google_storage_bucket" "dataflow_temp" {
  name     = "${var.project_id}-dataflow-temp"
  location = "US"
  project  = var.project_id
  
  uniform_bucket_level_access = true
  
  lifecycle_rule {
    condition {
      age = 1
    }
    action {
      type = "Delete"
    }
  }
}

Monitoring and Optimization

Query Performance Monitoring

-- Analyze query performance
SELECT
  user_email,
  job_id,
  creation_time,
  total_bytes_processed / POW(10, 9) as gb_processed,
  total_slot_ms / 1000 as slot_seconds,
  TIMESTAMP_DIFF(end_time, start_time, SECOND) as duration_seconds,
  total_bytes_billed / POW(10, 9) as gb_billed,
  (total_bytes_billed / POW(10, 12)) * 5 as estimated_cost_usd
FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
  AND state = 'DONE'
  AND job_type = 'QUERY'
ORDER BY total_bytes_processed DESC
LIMIT 100;

-- Identify expensive queries
SELECT
  query,
  COUNT(*) as execution_count,
  AVG(total_bytes_processed) / POW(10, 9) as avg_gb_processed,
  SUM(total_bytes_billed) / POW(10, 12) * 5 as total_cost_usd
FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
  AND job_type = 'QUERY'
  AND state = 'DONE'
GROUP BY query
HAVING total_cost_usd > 10
ORDER BY total_cost_usd DESC;

Best Practices

1. Partitioning and Clustering

  • Always use partitioning for time-series data
  • Cluster by frequently filtered columns
  • Require partition filters to prevent full table scans

2. Cost Optimization

-- Use clustering to reduce bytes scanned
-- Bad: Full table scan
SELECT * FROM `project.dataset.events`
WHERE user_country = 'US';

-- Good: Partition filter + clustering
SELECT * FROM `project.dataset.events`
WHERE DATE(event_timestamp) = CURRENT_DATE()
  AND user_country = 'US';

-- Use approximate aggregation for large datasets
SELECT
  event_type,
  APPROX_COUNT_DISTINCT(user_id) as unique_users
FROM `project.dataset.events`
WHERE DATE(event_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY event_type;

3. Data Quality

# data_quality_checks.py
from google.cloud import bigquery

def run_data_quality_checks():
    client = bigquery.Client()
    
    checks = [
        {
            'name': 'Null Check',
            'query': '''
                SELECT COUNT(*) as null_count
                FROM `project.dataset.events`
                WHERE DATE(event_timestamp) = CURRENT_DATE()
                  AND (user_id IS NULL OR event_type IS NULL)
            ''',
            'threshold': 0
        },
        {
            'name': 'Duplicate Check',
            'query': '''
                SELECT COUNT(*) - COUNT(DISTINCT event_id) as duplicate_count
                FROM `project.dataset.events`
                WHERE DATE(event_timestamp) = CURRENT_DATE()
            ''',
            'threshold': 0
        },
        {
            'name': 'Future Timestamp Check',
            'query': '''
                SELECT COUNT(*) as future_count
                FROM `project.dataset.events`
                WHERE event_timestamp > CURRENT_TIMESTAMP()
            ''',
            'threshold': 0
        }
    ]
    
    for check in checks:
        result = client.query(check['query']).result()
        value = list(result)[0][0]
        
        if value > check['threshold']:
            print(f"❌ {check['name']} failed: {value}")
        else:
            print(f"✅ {check['name']} passed")

if __name__ == '__main__':
    run_data_quality_checks()

Key Takeaways

  1. Partitioning is Essential: Always partition time-series data
  2. Use Clustering: Reduce query costs with proper clustering
  3. Stream Processing: Leverage Dataflow for real-time analytics
  4. Monitor Costs: Track query performance and optimize expensive queries
  5. Data Quality: Implement automated quality checks
  6. Materialized Views: Pre-compute common aggregations

Conclusion

Google BigQuery and Dataflow provide a powerful, scalable platform for data analytics. By following best practices for partitioning, clustering, and cost optimization, you can build efficient data pipelines that scale to petabytes while controlling costs.


Ready to build your data pipeline? Check out my other posts on GCP architecture and data engineering best practices!

Share this insight