Search Tutorials


Top GCP Dataproc & Dataflow Interview Questions (2026) | JavaInUse

Top 20 GCP Dataproc & Dataflow Interview Questions


  1. What is Dataproc?
  2. What is Dataflow?
  3. What are the differences between Dataproc and Dataflow?
  4. What is Apache Beam?
  5. How do you create a Dataproc cluster?
  6. What is Dataproc Serverless?
  7. How do you write a Dataflow pipeline?
  8. What are Dataflow templates?
  9. How do you handle streaming in Dataflow?
  10. What are Dataproc autoscaling policies?
  11. How do you optimize Spark jobs on Dataproc?
  12. What are Dataflow windowing strategies?
  13. How do you handle late data in Dataflow?
  14. What is Dataproc Hub?
  15. How do you connect to BigQuery from Dataproc?
  16. What are Dataflow side inputs?
  17. How do you monitor Dataproc jobs?
  18. What are Dataflow Flex Templates?
  19. How do you optimize Dataflow pipelines?
  20. What are best practices for data processing?

Google Cloud Interview Questions

1. What is Dataproc?

Dataproc is a fully managed service for running Apache Spark, Hadoop, and other open-source data processing frameworks.

Dataproc Features:
+-- Managed Spark/Hadoop clusters
+-- Fast cluster provisioning (~90 seconds)
+-- Autoscaling
+-- Per-second billing
+-- Integrated with GCP services
+-- Optional components (Jupyter, Presto, etc.)
+-- Dataproc Serverless for Spark

Dataproc Architecture:
+-------------------------------------------------------------+
|                     Dataproc Cluster                         |
+-------------------------------------------------------------+
|  +-----------------------------------------------------+   |
|  |              Master Node(s)                          |   |
|  |  +---------+  +---------+  +---------+             |   |
|  |  | YARN RM |  |  HDFS   |  |  Spark  |             |   |
|  |  |         |  | NameNode|  | History |             |   |
|  |  +---------+  +---------+  +---------+             |   |
|  +-----------------------------------------------------+   |
|                          |                                  |
|  +-----------------------------------------------------+   |
|  |              Worker Nodes                            |   |
|  |  +---------+  +---------+  +---------+             |   |
|  |  | Worker 1|  | Worker 2|  | Worker N|             |   |
|  |  | NodeMgr |  | NodeMgr |  | NodeMgr |             |   |
|  |  | DataNode|  | DataNode|  | DataNode|             |   |
|  |  +---------+  +---------+  +---------+             |   |
|  +-----------------------------------------------------+   |
|                                                              |
|  Optional:                                                   |
|  +-- Secondary workers (preemptible)                        |
|  +-- High availability (3 masters)                          |
|  +-- Custom machine types                                   |
+-------------------------------------------------------------+

# Create simple cluster
gcloud dataproc clusters create my-cluster \
    --region=us-central1 \
    --num-workers=2 \
    --worker-machine-type=n1-standard-4 \
    --image-version=2.1-debian11

2. What is Dataflow?

Dataflow is a fully managed service for executing Apache Beam pipelines for batch and stream processing.

Dataflow Features:
+-- Serverless (no cluster management)
+-- Unified batch and stream processing
+-- Auto-scaling
+-- Apache Beam SDK
+-- Exactly-once processing
+-- Integrated with GCP services

Dataflow Architecture:
+-------------------------------------------------------------+
|                     Dataflow Service                         |
+-------------------------------------------------------------+
|                                                              |
|  +-----------------------------------------------------+   |
|  |                   Pipeline                           |   |
|  |                                                      |   |
|  |  Source -> Transform -> Transform -> Sink           |   |
|  |    |          |            |          |             |   |
|  |  Read      ParDo        GroupBy     Write           |   |
|  |  Pub/Sub   Filter       Window      BigQuery        |   |
|  |  GCS       Map          Aggregate   GCS             |   |
|  +-----------------------------------------------------+   |
|                          |                                  |
|                          v                                  |
|  +-----------------------------------------------------+   |
|  |              Worker Pool (Auto-managed)              |   |
|  |  +---------+  +---------+  +---------+             |   |
|  |  | Worker 1|  | Worker 2|  | Worker N|             |   |
|  |  |         |  |         |  |         |             |   |
|  |  +---------+  +---------+  +---------+             |   |
|  |         Auto-scales based on workload               |   |
|  +-----------------------------------------------------+   |
+-------------------------------------------------------------+

# Simple Dataflow pipeline (Python)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    '--project=my-project',
    '--region=us-central1',
    '--runner=DataflowRunner',
    '--temp_location=gs://bucket/temp'
])

with beam.Pipeline(options=options) as p:
    (p
     | 'Read' >> beam.io.ReadFromText('gs://bucket/input.txt')
     | 'Transform' >> beam.Map(lambda x: x.upper())
     | 'Write' >> beam.io.WriteToText('gs://bucket/output'))

3. What are the differences between Dataproc and Dataflow?

AspectDataprocDataflow
InfrastructureManaged clustersServerless
FrameworksSpark, Hadoop, Flink, PrestoApache Beam only
ScalingAutoscaling (configurable)Automatic
Use caseComplex/existing Spark workloadsNew pipelines, streaming
State managementManual (checkpointing)Automatic
PricingPer VM + per secondPer vCPU-hour + GB-hour
Lift and shiftEasier for existing codeRequires Beam rewrite
Cluster managementRequiredNone

When to use Dataproc:
+-- Existing Spark/Hadoop jobs
+-- Complex ML with Spark MLlib
+-- Interactive analysis (Jupyter)
+-- Need fine-grained control
+-- Multiple frameworks in one cluster
+-- Presto/Trino for SQL queries

When to use Dataflow:
+-- New data pipelines
+-- Streaming data processing
+-- Unified batch/stream logic
+-- No infrastructure management desired
+-- Need exactly-once semantics
+-- Auto-scaling without configuration

# Same logic in both:

# Dataproc (PySpark)
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("wordcount").getOrCreate()
df = spark.read.text("gs://bucket/input.txt")
counts = df.groupBy("value").count()
counts.write.format("json").save("gs://bucket/output")

# Dataflow (Apache Beam)
import apache_beam as beam

with beam.Pipeline() as p:
    (p
     | beam.io.ReadFromText('gs://bucket/input.txt')
     | beam.combiners.Count.PerElement()
     | beam.io.WriteToText('gs://bucket/output'))

4. What is Apache Beam?

Apache Beam is a unified programming model for batch and streaming data processing.

Apache Beam Concepts:

Pipeline:
+-- Complete data processing workflow
+-- Contains PCollections and PTransforms
+-- Runs on a runner (Dataflow, Spark, etc.)

PCollection:
+-- Distributed dataset
+-- Immutable
+-- Can be bounded (batch) or unbounded (stream)
+-- Elements can be any type

PTransform:
+-- Data transformation operation
+-- Takes PCollection, outputs PCollection
+-- Built-in: ParDo, Map, Filter, GroupByKey, etc.
+-- Composite transforms

# Beam Pipeline Example
import apache_beam as beam

class ParseEvent(beam.DoFn):
    def process(self, element):
        import json
        event = json.loads(element)
        yield {
            'user_id': event['user_id'],
            'event_type': event['event_type'],
            'timestamp': event['timestamp']
        }

class FilterClicks(beam.DoFn):
    def process(self, element):
        if element['event_type'] == 'click':
            yield element

with beam.Pipeline() as p:
    events = (
        p
        | 'Read' >> beam.io.ReadFromText('gs://bucket/events.json')
        | 'Parse' >> beam.ParDo(ParseEvent())
        | 'Filter' >> beam.ParDo(FilterClicks())
        | 'Count' >> beam.combiners.Count.PerKey()
        | 'Format' >> beam.Map(lambda x: f'{x[0]}: {x[1]}')
        | 'Write' >> beam.io.WriteToText('gs://bucket/output')
    )

Beam Runners:
+-- DirectRunner - Local testing
+-- DataflowRunner - Google Cloud Dataflow
+-- SparkRunner - Apache Spark
+-- FlinkRunner - Apache Flink
+-- SamzaRunner - Apache Samza

5. How do you create a Dataproc cluster?

Cluster Creation Methods:

1. gcloud CLI
gcloud dataproc clusters create my-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --master-machine-type=n1-standard-4 \
    --master-boot-disk-size=500GB \
    --num-workers=2 \
    --worker-machine-type=n1-standard-4 \
    --worker-boot-disk-size=500GB \
    --num-secondary-workers=2 \
    --secondary-worker-type=preemptible \
    --image-version=2.1-debian11 \
    --optional-components=JUPYTER,PRESTO \
    --enable-component-gateway \
    --initialization-actions=gs://bucket/init.sh \
    --metadata=PIP_PACKAGES=pandas,numpy \
    --properties=spark:spark.executor.memory=4g \
    --scopes=cloud-platform \
    --max-idle=1h

2. Terraform
resource "google_dataproc_cluster" "cluster" {
  name   = "my-cluster"
  region = "us-central1"

  cluster_config {
    master_config {
      num_instances = 1
      machine_type  = "n1-standard-4"
      disk_config {
        boot_disk_size_gb = 500
      }
    }

    worker_config {
      num_instances = 2
      machine_type  = "n1-standard-4"
      disk_config {
        boot_disk_size_gb = 500
      }
    }

    preemptible_worker_config {
      num_instances = 2
    }

    software_config {
      image_version = "2.1-debian11"
      optional_components = ["JUPYTER", "PRESTO"]
      override_properties = {
        "spark:spark.executor.memory" = "4g"
      }
    }

    gce_cluster_config {
      subnetwork = "default"
      service_account_scopes = ["cloud-platform"]
    }

    autoscaling_config {
      policy_uri = google_dataproc_autoscaling_policy.asp.name
    }
  }
}

3. Submit job to ephemeral cluster
gcloud dataproc jobs submit pyspark my_job.py \
    --cluster=my-cluster \
    --region=us-central1 \
    --jars=gs://spark-lib/bigquery-connector.jar





6. What is Dataproc Serverless?

Dataproc Serverless lets you run Spark workloads without managing clusters.

Dataproc Serverless Features:
+-- No cluster provisioning
+-- Auto-scaling
+-- Pay only for resources used
+-- Supports batch and interactive
+-- Integrated with BigQuery, GCS
+-- Spark 3.x support

# Submit Serverless Spark batch job
gcloud dataproc batches submit pyspark \
    gs://bucket/my_job.py \
    --region=us-central1 \
    --subnet=default \
    --deps-bucket=gs://bucket/deps \
    --jars=gs://spark-lib/bigquery-connector.jar \
    --properties=spark.executor.instances=10 \
    -- arg1 arg2

# Serverless Spark Interactive (Notebooks)
# Use Vertex AI Workbench with Serverless Spark kernel

# Create session for interactive queries
gcloud dataproc sessions create spark my-session \
    --region=us-central1 \
    --property=spark.executor.instances=5

Serverless vs Cluster:
+--------------------+--------------------+--------------------+
| Aspect             | Serverless         | Cluster            |
+--------------------+--------------------+--------------------+
| Provisioning       | Automatic          | Manual             |
| Scaling            | Automatic          | Configurable       |
| Idle cost          | None               | Per-second billing |
| Startup time       | ~2 minutes         | ~90 seconds        |
| Customization      | Limited            | Full control       |
| Components         | Spark only         | Spark, Hadoop, etc.|
| Pricing            | Higher per-resource| Lower per-resource |
+--------------------+--------------------+--------------------+

# PySpark job for Serverless
# my_job.py
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("serverless-job").getOrCreate()

# Read from BigQuery
df = spark.read.format("bigquery") \
    .option("table", "project.dataset.table") \
    .load()

# Process
result = df.groupBy("category").count()

# Write to GCS
result.write.format("parquet").save("gs://bucket/output")

spark.stop()

7. How do you write a Dataflow pipeline?

Dataflow Pipeline Structure:

# Complete batch pipeline
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery

class ParseCSV(beam.DoFn):
    def process(self, line):
        fields = line.split(',')
        yield {
            'user_id': fields[0],
            'product_id': fields[1],
            'quantity': int(fields[2]),
            'price': float(fields[3]),
            'timestamp': fields[4]
        }

class CalculateTotal(beam.DoFn):
    def process(self, element):
        element['total'] = element['quantity'] * element['price']
        yield element

def run():
    options = PipelineOptions([
        '--project=my-project',
        '--region=us-central1',
        '--runner=DataflowRunner',
        '--temp_location=gs://bucket/temp',
        '--staging_location=gs://bucket/staging',
        '--job_name=sales-pipeline',
        '--max_num_workers=10',
        '--machine_type=n1-standard-4'
    ])
    
    schema = 'user_id:STRING,product_id:STRING,quantity:INTEGER,price:FLOAT,total:FLOAT,timestamp:TIMESTAMP'
    
    with beam.Pipeline(options=options) as p:
        (p
         | 'Read CSV' >> beam.io.ReadFromText('gs://bucket/sales/*.csv', 
                                               skip_header_lines=1)
         | 'Parse' >> beam.ParDo(ParseCSV())
         | 'Calculate' >> beam.ParDo(CalculateTotal())
         | 'Write BQ' >> WriteToBigQuery(
             'project:dataset.sales_data',
             schema=schema,
             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
         ))

if __name__ == '__main__':
    run()

# Run locally
python pipeline.py --runner=DirectRunner

# Run on Dataflow
python pipeline.py --runner=DataflowRunner

8. What are Dataflow templates?

Dataflow Template Types:

1. Classic Templates (deprecated)
# Create template
python pipeline.py \
    --runner=DataflowRunner \
    --template_location=gs://bucket/templates/my-template \
    --staging_location=gs://bucket/staging

# Run template
gcloud dataflow jobs run my-job \
    --gcs-location=gs://bucket/templates/my-template \
    --region=us-central1 \
    --parameters=input=gs://bucket/input,output=gs://bucket/output

2. Flex Templates (recommended)
# pipeline with runtime parameters
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--input_path',
            help='Input file path')
        parser.add_value_provider_argument(
            '--output_table',
            help='Output BigQuery table')

def run():
    options = MyOptions()
    
    with beam.Pipeline(options=options) as p:
        (p
         | beam.io.ReadFromText(options.input_path)
         | beam.Map(lambda x: {'data': x})
         | beam.io.WriteToBigQuery(options.output_table))

# Create Flex Template
# metadata.json
{
  "name": "My Flex Template",
  "parameters": [
    {"name": "input_path", "label": "Input Path"},
    {"name": "output_table", "label": "Output Table"}
  ]
}

# Build and upload
gcloud dataflow flex-template build gs://bucket/templates/my-flex-template.json \
    --image-gcr-path=gcr.io/my-project/dataflow/my-pipeline:latest \
    --sdk-language=PYTHON \
    --flex-template-base-image=PYTHON3 \
    --metadata-file=metadata.json \
    --py-path=.

# Run Flex Template
gcloud dataflow flex-template run my-job \
    --template-file-gcs-location=gs://bucket/templates/my-flex-template.json \
    --region=us-central1 \
    --parameters=input_path=gs://bucket/input,output_table=project:dataset.table

9. How do you handle streaming in Dataflow?

Streaming Pipeline:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows, SlidingWindows
from apache_beam import window

class ParseEvent(beam.DoFn):
    def process(self, element):
        import json
        event = json.loads(element.decode('utf-8'))
        yield event

def run():
    options = PipelineOptions([
        '--project=my-project',
        '--region=us-central1',
        '--runner=DataflowRunner',
        '--streaming',
        '--temp_location=gs://bucket/temp'
    ])
    options.view_as(StandardOptions).streaming = True
    
    with beam.Pipeline(options=options) as p:
        # Read from Pub/Sub
        events = (
            p
            | 'Read Pub/Sub' >> beam.io.ReadFromPubSub(
                subscription='projects/my-project/subscriptions/events-sub')
            | 'Parse' >> beam.ParDo(ParseEvent())
            | 'Add Timestamp' >> beam.Map(
                lambda x: beam.window.TimestampedValue(x, x['timestamp']))
        )
        
        # Fixed window aggregation
        windowed = (
            events
            | 'Window' >> beam.WindowInto(FixedWindows(60))  # 1-minute windows
            | 'Extract Key' >> beam.Map(lambda x: (x['user_id'], 1))
            | 'Count' >> beam.CombinePerKey(sum)
            | 'Format' >> beam.Map(lambda x: {'user_id': x[0], 'count': x[1]})
        )
        
        # Write to BigQuery (streaming inserts)
        windowed | 'Write BQ' >> beam.io.WriteToBigQuery(
            'project:dataset.user_counts',
            schema='user_id:STRING,count:INTEGER',
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            method=beam.io.WriteToBigQuery.Method.STREAMING_INSERTS
        )

Streaming Concepts:
+-- Unbounded PCollection
+-- Windowing (Fixed, Sliding, Session)
+-- Watermarks (event time progress)
+-- Triggers (when to emit results)
+-- Allowed lateness (late data handling)

10. What are Dataproc autoscaling policies?

Autoscaling Policy:

# Create autoscaling policy
gcloud dataproc autoscaling-policies create my-policy \
    --region=us-central1 \
    --min-secondary-workers=0 \
    --max-secondary-workers=20 \
    --scale-up-factor=1.0 \
    --scale-down-factor=1.0 \
    --cooldown-period=2m \
    --graceful-decommission-timeout=1h

# Policy YAML
# my-policy.yaml
workerConfig:
  minInstances: 2
  maxInstances: 2  # Primary workers usually fixed

secondaryWorkerConfig:
  minInstances: 0
  maxInstances: 50
  weight: 1

basicAlgorithm:
  cooldownPeriod: 2m
  yarnConfig:
    scaleUpFactor: 1.0
    scaleDownFactor: 1.0
    scaleUpMinWorkerFraction: 0.0
    scaleDownMinWorkerFraction: 0.0
    gracefulDecommissionTimeout: 1h

# Import policy
gcloud dataproc autoscaling-policies import my-policy \
    --source=my-policy.yaml \
    --region=us-central1

# Attach to cluster
gcloud dataproc clusters create my-cluster \
    --region=us-central1 \
    --autoscaling-policy=my-policy \
    --num-workers=2 \
    --num-secondary-workers=5

Autoscaling Metrics:
+-- YARN pending memory
+-- YARN pending cores
+-- Scale up factor (0.0 to 1.0)
+-- Scale down factor (0.0 to 1.0)
+-- Cooldown period (min time between scaling)

Scaling Behavior:
+-------------------------------------------------------------+
|  YARN Pending Resources High                                 |
|  +-- Scale UP (add secondary workers)                       |
|                                                              |
|  YARN Pending Resources Low                                  |
|  +-- Scale DOWN (remove secondary workers)                  |
|                                                              |
|  Graceful Decommission:                                      |
|  +-- Wait for tasks to complete before removing nodes       |
+-------------------------------------------------------------+

11. How do you optimize Spark jobs on Dataproc?

Spark Optimization Techniques:

1. Right-size executors
# Cluster properties
gcloud dataproc clusters create my-cluster \
    --properties="\
spark:spark.executor.memory=8g,\
spark:spark.executor.cores=4,\
spark:spark.driver.memory=4g,\
spark:spark.dynamicAllocation.enabled=true,\
spark:spark.dynamicAllocation.minExecutors=2,\
spark:spark.dynamicAllocation.maxExecutors=20"

2. Optimize partitioning
# PySpark
df = spark.read.parquet("gs://bucket/data")

# Repartition for parallelism
df = df.repartition(200)

# Coalesce to reduce partitions (no shuffle)
df = df.coalesce(50)

# Partition by column for writes
df.write.partitionBy("date").parquet("gs://bucket/output")

3. Use efficient file formats
# Prefer Parquet over CSV
df.write.format("parquet") \
    .option("compression", "snappy") \
    .save("gs://bucket/output")

4. Broadcast small datasets
from pyspark.sql.functions import broadcast

# Small lookup table
lookup_df = spark.read.parquet("gs://bucket/lookup")  # < 10MB

# Broadcast join
result = large_df.join(broadcast(lookup_df), "key")

5. Cache intermediate results
# Cache frequently accessed data
df.cache()  # or df.persist()
df.count()  # Trigger cache

# Unpersist when done
df.unpersist()

6. Use BigQuery connector efficiently
df = spark.read.format("bigquery") \
    .option("table", "project.dataset.table") \
    .option("filter", "date = '2024-01-15'") \  # Pushdown filter
    .option("materializationDataset", "temp_dataset") \
    .load()

7. Optimize GCS access
# Use gs:// connector optimizations
spark.conf.set("spark.hadoop.google.cloud.auth.service.account.enable", "true")
spark.conf.set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")

12. What are Dataflow windowing strategies?

Windowing Types:

1. Fixed Windows
from apache_beam.transforms.window import FixedWindows

# 5-minute windows
events | beam.WindowInto(FixedWindows(300))

+-----+-----+-----+-----+-----+
| 0-5 | 5-10|10-15|15-20|20-25|  minutes
+-----+-----+-----+-----+-----+

2. Sliding Windows
from apache_beam.transforms.window import SlidingWindows

# 10-minute window, sliding every 5 minutes
events | beam.WindowInto(SlidingWindows(600, 300))

+-------------------+
|     Window 1      |  0-10 min
+-------------------+
      +-------------------+
      |     Window 2      |  5-15 min
      +-------------------+
            +-------------------+
            |     Window 3      |  10-20 min
            +-------------------+

3. Session Windows
from apache_beam.transforms.window import Sessions

# 10-minute gap
events | beam.WindowInto(Sessions(600))

User A: ----●●●----------------●●●●--------
            +--Session 1--+    +-Session 2-+
                 (gap > 10min)

4. Global Window (default for batch)
from apache_beam.transforms.window import GlobalWindows

events | beam.WindowInto(GlobalWindows())

# Complete windowing example
(events
 | 'Add Timestamp' >> beam.Map(
     lambda x: beam.window.TimestampedValue(x, x['event_time']))
 | 'Window' >> beam.WindowInto(
     FixedWindows(60),
     trigger=beam.trigger.AfterWatermark(
         early=beam.trigger.AfterProcessingTime(10),
         late=beam.trigger.AfterCount(1)
     ),
     accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING,
     allowed_lateness=3600
 )
 | 'Count' >> beam.CombinePerKey(sum))

13. How do you handle late data in Dataflow?

Late Data Handling:

# Watermarks and allowed lateness
from apache_beam.transforms.window import FixedWindows
from apache_beam import trigger

(events
 | beam.WindowInto(
     FixedWindows(60),
     # Allow late data up to 1 hour
     allowed_lateness=3600,
     # Trigger configuration
     trigger=trigger.AfterWatermark(
         # Emit early results every 30 seconds
         early=trigger.AfterProcessingTime(30),
         # Emit updates for each late element
         late=trigger.AfterCount(1)
     ),
     # ACCUMULATING: include all data in updates
     # DISCARDING: only new data in updates
     accumulation_mode=trigger.AccumulationMode.ACCUMULATING
 ))

Late Data Flow:
+-------------------------------------------------------------+
|  Event Time:     |-------- Window --------|                 |
|                                                              |
|  Watermark:      ---------------->                          |
|                           |                                  |
|  On-time data:   ●  ●  ●  |                                 |
|                           |                                  |
|  Late data:               |  ●  ●  (within allowed_lateness)|
|                           |                                  |
|  Dropped:                 |        ●  (past allowed_lateness)|
+-------------------------------------------------------------+

# Custom timestamp extractor
class ExtractTimestamp(beam.DoFn):
    def process(self, element, timestamp=beam.DoFn.TimestampParam):
        import datetime
        # Parse event timestamp
        event_ts = datetime.datetime.fromisoformat(element['timestamp'])
        yield beam.window.TimestampedValue(element, event_ts.timestamp())

# Side output for late data
class ProcessWithLate(beam.DoFn):
    LATE_TAG = 'late_data'
    
    def process(self, element, window=beam.DoFn.WindowParam):
        if element['timestamp'] < window.start:
            yield beam.pvalue.TaggedOutput(self.LATE_TAG, element)
        else:
            yield element

results = events | beam.ParDo(ProcessWithLate()).with_outputs(
    ProcessWithLate.LATE_TAG, main='on_time')
on_time = results.on_time
late = results[ProcessWithLate.LATE_TAG]

14. What is Dataproc Hub?

Dataproc Hub provides managed Jupyter notebooks with Dataproc cluster integration.

Dataproc Hub Features:
+-- JupyterLab environment
+-- Pre-configured Spark kernels
+-- Automatic cluster attachment
+-- Git integration
+-- GPU support
+-- Collaborative notebooks

Note: Dataproc Hub is being replaced by Vertex AI Workbench

# Alternative: Component Gateway
gcloud dataproc clusters create my-cluster \
    --region=us-central1 \
    --optional-components=JUPYTER \
    --enable-component-gateway

# Access Jupyter
gcloud dataproc clusters describe my-cluster \
    --region=us-central1 \
    --format='value(config.endpointConfig.httpPorts.Jupyter)'

# Alternative: Vertex AI Workbench with Dataproc
# Create managed notebook
gcloud notebooks instances create my-notebook \
    --location=us-central1-a \
    --machine-type=n1-standard-4 \
    --dataproc-cluster=my-cluster

# Serverless Spark kernel in Vertex AI
# Configure in notebook UI to connect to:
# - Dataproc cluster
# - Dataproc Serverless

Sample Notebook Cell (PySpark):
# %load_ext google.cloud.bigquery

from pyspark.sql import SparkSession

# Spark session auto-configured
spark = SparkSession.builder.getOrCreate()

# Read from BigQuery
df = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data.samples.shakespeare") \
    .load()

df.groupBy("corpus").count().show()

15. How do you connect to BigQuery from Dataproc?

BigQuery Spark Connector:

# Option 1: Cluster creation with connector
gcloud dataproc clusters create my-cluster \
    --region=us-central1 \
    --optional-components=JUPYTER \
    --metadata=SPARK_BQ_VERSION=0.32.2 \
    --initialization-actions=gs://goog-dataproc-initialization-actions-${REGION}/connectors/connectors.sh

# Option 2: Specify JAR at job submission
gcloud dataproc jobs submit pyspark my_job.py \
    --cluster=my-cluster \
    --region=us-central1 \
    --jars=gs://spark-lib/bigquery/spark-3.3-bigquery-0.32.2.jar

# PySpark with BigQuery
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BigQuery Integration") \
    .config("spark.jars", "gs://spark-lib/bigquery/spark-3.3-bigquery-0.32.2.jar") \
    .getOrCreate()

# Read from BigQuery
df = spark.read.format("bigquery") \
    .option("table", "project.dataset.table") \
    .option("filter", "date >= '2024-01-01'") \
    .load()

# Process
result = df.groupBy("category").agg({"amount": "sum"})

# Write to BigQuery
result.write.format("bigquery") \
    .option("table", "project.dataset.output_table") \
    .option("temporaryGcsBucket", "my-temp-bucket") \
    .mode("overwrite") \
    .save()

# Direct write (uses Storage Write API)
result.write.format("bigquery") \
    .option("table", "project.dataset.output_table") \
    .option("writeMethod", "direct") \
    .mode("append") \
    .save()

# Query with SQL
df = spark.read.format("bigquery") \
    .option("query", """
        SELECT category, SUM(amount) as total
        FROM project.dataset.sales
        WHERE date >= '2024-01-01'
        GROUP BY category
    """) \
    .load()

16. What are Dataflow side inputs?

Side Inputs:
+-- Additional inputs to transforms
+-- Broadcast data to all workers
+-- Used for lookups, configs, etc.
+-- Can be PCollection or static value

# Side input as singleton
import apache_beam as beam

class EnrichWithConfig(beam.DoFn):
    def process(self, element, config):
        element['threshold'] = config['threshold']
        yield element

# Pipeline
with beam.Pipeline() as p:
    config = p | 'Read Config' >> beam.io.ReadFromText('gs://bucket/config.json')
    config_view = beam.pvalue.AsSingleton(config)
    
    events = p | 'Read Events' >> beam.io.ReadFromText('gs://bucket/events.json')
    
    enriched = events | beam.ParDo(EnrichWithConfig(), config=config_view)

# Side input as dictionary (map)
class EnrichWithLookup(beam.DoFn):
    def process(self, element, lookup_dict):
        user_id = element['user_id']
        if user_id in lookup_dict:
            element['user_name'] = lookup_dict[user_id]
        yield element

with beam.Pipeline() as p:
    # Create lookup PCollection
    lookup = (
        p
        | beam.io.ReadFromText('gs://bucket/users.csv')
        | beam.Map(lambda line: (line.split(',')[0], line.split(',')[1]))
    )
    lookup_view = beam.pvalue.AsDict(lookup)
    
    events = p | 'Read' >> beam.io.ReadFromText('gs://bucket/events.json')
    enriched = events | beam.ParDo(EnrichWithLookup(), lookup_dict=lookup_view)

# Side input as list
class FilterTopUsers(beam.DoFn):
    def process(self, element, top_users):
        if element['user_id'] in top_users:
            yield element

top_users_view = beam.pvalue.AsList(top_users_pcoll)
filtered = events | beam.ParDo(FilterTopUsers(), top_users=top_users_view)

# Windowed side input
class JoinWithWindow(beam.DoFn):
    def process(self, element, reference, window=beam.DoFn.WindowParam):
        # reference contains data from same window
        ...

17. How do you monitor Dataproc jobs?

Monitoring Options:

1. Job Output and Logs
# View job output
gcloud dataproc jobs wait my-job --region=us-central1

# Stream logs
gcloud dataproc jobs wait my-job \
    --region=us-central1 \
    --driver-log-levels=root=INFO

# View in Cloud Logging
gcloud logging read 'resource.type="cloud_dataproc_job"' --limit=50

2. Cloud Monitoring Metrics
Metrics available:
+-- yarn/cluster/apps_running
+-- yarn/cluster/memory_available
+-- yarn/cluster/container_allocated
+-- spark/executor/count
+-- spark/job/count
+-- hdfs/datanode/capacity
+-- Custom metrics via Spark UI

3. Web Interfaces (Component Gateway)
gcloud dataproc clusters describe my-cluster \
    --region=us-central1 \
    --format='json(config.endpointConfig.httpPorts)'

# Returns URLs for:
+-- YARN ResourceManager
+-- Spark History Server
+-- HDFS NameNode
+-- Jupyter (if enabled)
+-- Presto (if enabled)

4. Job Status via API
from google.cloud import dataproc_v1

client = dataproc_v1.JobControllerClient(
    client_options={'api_endpoint': 'us-central1-dataproc.googleapis.com:443'})

job = client.get_job(
    project_id='my-project',
    region='us-central1',
    job_id='my-job'
)

print(f'Status: {job.status.state.name}')
print(f'Started: {job.status.state_start_time}')

5. Alerting
# Create alert for failed jobs
gcloud monitoring policies create \
    --policy-from-file=job-failure-alert.json

# job-failure-alert.json
{
  "displayName": "Dataproc Job Failures",
  "conditions": [{
    "displayName": "Job failed",
    "conditionThreshold": {
      "filter": "metric.type=\"dataproc.googleapis.com/job/state\" AND metric.labels.state=\"ERROR\"",
      "comparison": "COMPARISON_GT",
      "thresholdValue": 0
    }
  }]
}





18. What are Dataflow Flex Templates?

Flex Templates Benefits:
+-- Package pipeline as Docker image
+-- Runtime parameters
+-- Custom dependencies
+-- Faster startup than classic templates
+-- Better version control
+-- Reusable across projects

# Project Structure
my-flex-template/
+-- Dockerfile
+-- requirements.txt
+-- metadata.json
+-- main.py
+-- transforms/
    +-- custom.py

# Dockerfile
FROM gcr.io/dataflow-templates-base/python311-template-launcher-base

ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/main.py"
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt"

COPY requirements.txt /template/
COPY main.py /template/
COPY transforms/ /template/transforms/

RUN pip install -r /template/requirements.txt

# metadata.json
{
  "name": "My ETL Pipeline",
  "description": "Processes data from GCS to BigQuery",
  "parameters": [
    {
      "name": "input_path",
      "label": "Input GCS path",
      "helpText": "Path to input files",
      "regexes": ["^gs://.*"]
    },
    {
      "name": "output_table",
      "label": "Output BigQuery table",
      "helpText": "project:dataset.table format"
    },
    {
      "name": "window_size",
      "label": "Window size in seconds",
      "isOptional": true,
      "paramType": "NUMBER"
    }
  ]
}

# Build template
gcloud builds submit --tag gcr.io/my-project/my-template:v1

gcloud dataflow flex-template build \
    gs://my-bucket/templates/my-template.json \
    --image=gcr.io/my-project/my-template:v1 \
    --sdk-language=PYTHON \
    --metadata-file=metadata.json

# Run template
gcloud dataflow flex-template run my-job \
    --template-file-gcs-location=gs://my-bucket/templates/my-template.json \
    --region=us-central1 \
    --parameters=input_path=gs://bucket/input,output_table=project:dataset.table

19. How do you optimize Dataflow pipelines?

Optimization Techniques:

1. Fusion optimization
# Beam automatically fuses compatible transforms
# Control with Reshuffle when needed
(pcoll
 | 'Transform1' >> beam.Map(func1)
 | 'Reshuffle' >> beam.Reshuffle()  # Break fusion
 | 'Transform2' >> beam.Map(func2))

2. Combiner lifting
# Use combiners for associative/commutative operations
(pcoll
 | beam.CombinePerKey(sum)  # Partial combining at workers
)

# Custom combiner
class MeanCombiner(beam.CombineFn):
    def create_accumulator(self):
        return (0, 0)  # (sum, count)
    
    def add_input(self, accumulator, input):
        sum_, count = accumulator
        return sum_ + input, count + 1
    
    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)
    
    def extract_output(self, accumulator):
        sum_, count = accumulator
        return sum_ / count if count else 0

3. Batch elements
class BatchDoFn(beam.DoFn):
    def __init__(self, batch_size=100):
        self.batch_size = batch_size
        
    def start_bundle(self):
        self.batch = []
    
    def process(self, element):
        self.batch.append(element)
        if len(self.batch) >= self.batch_size:
            yield self.batch
            self.batch = []
    
    def finish_bundle(self):
        if self.batch:
            yield beam.utils.windowed_value.WindowedValue(
                self.batch, timestamp=0, windows=[beam.window.GlobalWindow()])

4. Worker configuration
options = PipelineOptions([
    '--max_num_workers=20',
    '--autoscaling_algorithm=THROUGHPUT_BASED',
    '--machine_type=n1-highmem-4',  # More memory
    '--disk_size_gb=100',
    '--number_of_worker_harness_threads=4'
])

5. Avoid hot keys
# Use combiners with hot key fanout
pcoll | beam.CombinePerKey(sum).with_hot_key_fanout(lambda key: 16)

# Or use Reshuffle
(pcoll
 | beam.Map(lambda x: (x['hot_key'], x))
 | beam.Reshuffle()
 | beam.GroupByKey())

20. What are best practices for data processing?

Best Practices:

1. Choose the right service
+--------------------+----------------------------------------+
| Use Case           | Recommended Service                    |
+--------------------+----------------------------------------+
| New pipelines      | Dataflow                               |
| Existing Spark     | Dataproc                               |
| Interactive SQL    | Dataproc (Presto/Trino)               |
| Simple ETL         | BigQuery + Dataform                    |
| ML training        | Dataproc (Spark MLlib) or Vertex AI   |
| One-time Spark job | Dataproc Serverless                   |
| Streaming          | Dataflow                               |
+--------------------+----------------------------------------+

2. File format selection
+-- Parquet - Best for analytics (columnar)
+-- Avro - Good for streaming, schema evolution
+-- ORC - Alternative columnar format
+-- JSON - Human readable, larger size
+-- CSV - Simple but inefficient

3. Partitioning strategy
# Time-based partitioning
output_path = f"gs://bucket/data/year={year}/month={month}/day={day}/"

# Hive-style for query engines
df.write.partitionBy("year", "month", "day").parquet(output_path)

4. Error handling
class SafeProcess(beam.DoFn):
    ERROR_TAG = 'errors'
    
    def process(self, element):
        try:
            result = process_element(element)
            yield result
        except Exception as e:
            yield beam.pvalue.TaggedOutput(
                self.ERROR_TAG,
                {'element': element, 'error': str(e)}
            )

5. Testing
# Unit test transforms
def test_transform():
    with TestPipeline() as p:
        input_data = p | beam.Create(['a', 'b', 'c'])
        output = input_data | MyTransform()
        assert_that(output, equal_to(['A', 'B', 'C']))

6. Cost management
+-- Use preemptible/spot VMs for batch
+-- Right-size clusters/workers
+-- Set max workers limit
+-- Use autoscaling
+-- Delete clusters when done
+-- Monitor costs with billing alerts

Google Cloud Interview Questions


Popular Posts