Search Tutorials


Top GCP Cloud Functions Interview Questions (2026) | JavaInUse

Top 20 GCP Cloud Functions Interview Questions


  1. What is Cloud Functions?
  2. What is Cloud Run?
  3. What are the differences between Cloud Functions and Cloud Run?
  4. What are Cloud Functions triggers?
  5. How do you deploy Cloud Functions?
  6. What are Cloud Functions generations?
  7. How do you handle cold starts?
  8. How do you manage secrets in Cloud Functions?
  9. What is Cloud Functions concurrency?
  10. How do you test Cloud Functions locally?
  11. How do you implement error handling?
  12. What are Cloud Run services?
  13. How do you configure autoscaling?
  14. How do you connect to databases?
  15. What are Cloud Functions best practices?
  16. How do you implement authentication?
  17. How do you monitor Cloud Functions?
  18. What is Cloud Run Jobs?
  19. How do you optimize performance?
  20. How do you implement event-driven architectures?

Google Cloud Interview Questions

1. What is Cloud Functions?

Cloud Functions is a serverless execution environment for building and connecting cloud services with event-driven functions.

Cloud Functions Features:
+-- Serverless - No infrastructure management
+-- Event-driven - Responds to cloud events
+-- Auto-scaling - Scales based on load
+-- Pay-per-use - Billed by invocations
+-- Multiple runtimes - Python, Node.js, Go, Java, etc.
+-- Integrated - Native GCP service integration

Function Types:
+-- HTTP Functions - Triggered by HTTP requests
+-- Event-driven Functions - Triggered by cloud events

# Python HTTP Function
import functions_framework

@functions_framework.http
def hello_http(request):
    """HTTP Cloud Function."""
    request_json = request.get_json(silent=True)
    
    if request_json and 'name' in request_json:
        name = request_json['name']
    else:
        name = 'World'
    
    return f'Hello {name}!'

# Python Event Function (Cloud Storage trigger)
@functions_framework.cloud_event
def process_file(cloud_event):
    """Triggered by Cloud Storage event."""
    data = cloud_event.data
    
    bucket = data['bucket']
    name = data['name']
    
    print(f'Processing file: gs://{bucket}/{name}')
    # Process the file...

2. What is Cloud Run?

Cloud Run is a managed compute platform for running stateless containers that are invocable via HTTP requests.

Cloud Run Features:
+-- Container-based - Deploy any container
+-- Knative-based - Built on Kubernetes
+-- Request-based scaling - Including to zero
+-- HTTPS endpoints - Automatic SSL
+-- Custom domains - Bring your own domain
+-- VPC connectivity - Connect to private resources

# Dockerfile for Cloud Run
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

# Cloud Run expects PORT environment variable
ENV PORT=8080
EXPOSE 8080

CMD ["python", "app.py"]

# Flask app for Cloud Run
from flask import Flask, request
import os

app = Flask(__name__)

@app.route('/')
def hello():
    return 'Hello from Cloud Run!'

@app.route('/process', methods=['POST'])
def process():
    data = request.get_json()
    # Process data...
    return {'status': 'processed', 'data': data}

if __name__ == '__main__':
    port = int(os.environ.get('PORT', 8080))
    app.run(host='0.0.0.0', port=port)

# Deploy to Cloud Run
gcloud run deploy my-service \
    --source . \
    --region us-central1 \
    --allow-unauthenticated

3. What are the differences between Cloud Functions and Cloud Run?

AspectCloud FunctionsCloud Run
UnitSingle functionContainer (multiple endpoints)
RuntimeLimited runtimesAny language/runtime
Timeout9 min (1st gen), 60 min (2nd gen)60 minutes
MemoryUp to 32 GBUp to 32 GB
Concurrency1 per instance (1st gen), configurable (2nd gen)Configurable (up to 1000)
Event triggersNative supportVia Eventarc
DeploymentSource codeContainer image
Use caseSimple event handlersComplex applications

When to use Cloud Functions:
+-- Simple event-driven processing
+-- Glue code between services
+-- Lightweight APIs
+-- Quick prototyping
+-- Event handlers (Pub/Sub, GCS, etc.)

When to use Cloud Run:
+-- Containerized applications
+-- Multiple endpoints/routes
+-- Custom runtimes/dependencies
+-- Longer running processes
+-- gRPC services
+-- Migrating existing applications

4. What are Cloud Functions triggers?

Trigger Types:

1. HTTP Trigger
@functions_framework.http
def http_function(request):
    return 'Hello!'

# Deploy
gcloud functions deploy http-func \
    --gen2 \
    --runtime=python311 \
    --trigger-http \
    --allow-unauthenticated

2. Pub/Sub Trigger
@functions_framework.cloud_event
def pubsub_function(cloud_event):
    import base64
    data = base64.b64decode(cloud_event.data['message']['data']).decode()
    print(f'Received: {data}')

# Deploy
gcloud functions deploy pubsub-func \
    --gen2 \
    --runtime=python311 \
    --trigger-topic=my-topic

3. Cloud Storage Trigger
@functions_framework.cloud_event
def gcs_function(cloud_event):
    data = cloud_event.data
    print(f'File: {data["name"]} in bucket: {data["bucket"]}')

# Deploy
gcloud functions deploy gcs-func \
    --gen2 \
    --runtime=python311 \
    --trigger-event-filters="type=google.cloud.storage.object.v1.finalized" \
    --trigger-event-filters="bucket=my-bucket"

4. Firestore Trigger
@functions_framework.cloud_event
def firestore_function(cloud_event):
    data = cloud_event.data
    old_value = data.get('oldValue', {})
    new_value = data.get('value', {})
    print(f'Document changed: {new_value}')

5. Eventarc Triggers (2nd gen)
# Any CloudEvents source
gcloud functions deploy eventarc-func \
    --gen2 \
    --trigger-event-filters="type=google.cloud.audit.log.v1.written" \
    --trigger-event-filters="serviceName=bigquery.googleapis.com"

5. How do you deploy Cloud Functions?

Deployment Methods:

1. gcloud CLI
# Deploy from source
gcloud functions deploy my-function \
    --gen2 \
    --runtime=python311 \
    --region=us-central1 \
    --source=. \
    --entry-point=main \
    --trigger-http \
    --memory=256MB \
    --timeout=60s \
    --max-instances=10 \
    --min-instances=1 \
    --set-env-vars=KEY=value

2. Cloud Console UI
# Upload source code or connect to repository

3. Terraform
resource "google_cloudfunctions2_function" "function" {
  name     = "my-function"
  location = "us-central1"

  build_config {
    runtime     = "python311"
    entry_point = "main"
    source {
      storage_source {
        bucket = google_storage_bucket.source.name
        object = google_storage_bucket_object.source.name
      }
    }
  }

  service_config {
    max_instance_count = 10
    min_instance_count = 1
    available_memory   = "256M"
    timeout_seconds    = 60
    environment_variables = {
      KEY = "value"
    }
  }
}

4. CI/CD with Cloud Build
# cloudbuild.yaml
steps:
  - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
    args:
      - gcloud
      - functions
      - deploy
      - my-function
      - --gen2
      - --runtime=python311
      - --trigger-http
      - --region=us-central1





6. What are Cloud Functions generations?

Generation Comparison:

1st Generation (Legacy):
+-- Single concurrency per instance
+-- 9-minute timeout
+-- Limited triggers
+-- Simpler deployment
+-- Lower cold start times

2nd Generation (Recommended):
+-- Concurrent requests per instance
+-- 60-minute timeout
+-- Eventarc triggers (any GCP event)
+-- Cloud Run backend
+-- Longer cold starts
+-- Traffic splitting
+-- Better VPC connectivity

# 1st Gen deployment
gcloud functions deploy my-func \
    --runtime=python311 \
    --trigger-http

# 2nd Gen deployment (recommended)
gcloud functions deploy my-func \
    --gen2 \
    --runtime=python311 \
    --trigger-http

2nd Gen Concurrency:
# Allow multiple requests per instance
gcloud functions deploy my-func \
    --gen2 \
    --runtime=python311 \
    --trigger-http \
    --concurrency=80

# Your code must be thread-safe!
import threading
from flask import Flask

app = Flask(__name__)
lock = threading.Lock()
counter = 0

@app.route('/')
def handler():
    global counter
    with lock:
        counter += 1
        return f'Request {counter}'

7. How do you handle cold starts?

Cold Start Mitigation:

1. Minimum Instances (keep warm)
gcloud functions deploy my-func \
    --gen2 \
    --min-instances=1 \
    --trigger-http

2. Optimize initialization
# Bad - heavy imports in global scope
import pandas as pd
import tensorflow as tf

def handler(request):
    # Uses pre-loaded libraries
    ...

# Good - lazy loading
_model = None

def get_model():
    global _model
    if _model is None:
        import tensorflow as tf
        _model = tf.keras.models.load_model('model.h5')
    return _model

def handler(request):
    model = get_model()
    ...

3. Connection pooling
from google.cloud import bigquery

# Global client (reused across invocations)
client = bigquery.Client()

def handler(request):
    # Reuses existing client
    results = client.query('SELECT * FROM table')
    return list(results)

4. Use smaller runtimes
# Prefer:
# - Python (fast cold start)
# - Go (fast cold start)
# - Node.js (moderate)
# 
# Avoid for latency-sensitive:
# - Java (slower cold start)
# - .NET (slower cold start)

5. Reduce package size
# requirements.txt - only needed packages
functions-framework==3.*
google-cloud-bigquery==3.*
# Avoid: pandas, numpy unless needed

8. How do you manage secrets in Cloud Functions?

Secret Management Options:

1. Secret Manager (Recommended)
# Create secret
echo -n "my-api-key" | gcloud secrets create api-key --data-file=-

# Grant access to function service account
gcloud secrets add-iam-policy-binding api-key \
    --member="serviceAccount:my-project@appspot.gserviceaccount.com" \
    --role="roles/secretmanager.secretAccessor"

# Mount as environment variable
gcloud functions deploy my-func \
    --gen2 \
    --set-secrets="API_KEY=api-key:latest"

# Or mount as file
gcloud functions deploy my-func \
    --gen2 \
    --set-secrets="/secrets/api-key=api-key:latest"

# Access in code
import os
api_key = os.environ['API_KEY']

# Or with Secret Manager client
from google.cloud import secretmanager

def get_secret(secret_id):
    client = secretmanager.SecretManagerServiceClient()
    name = f"projects/my-project/secrets/{secret_id}/versions/latest"
    response = client.access_secret_version(name=name)
    return response.payload.data.decode('UTF-8')

2. Environment Variables (Less secure)
gcloud functions deploy my-func \
    --set-env-vars="API_KEY=my-key"

3. Encrypted at deployment
# Encrypt with KMS before deployment
# Decrypt in function code

9. What is Cloud Functions concurrency?

Concurrency Models:

1st Generation:
+-- 1 request per instance
+-- New instance for each concurrent request
+-- Simple but more instances needed

2nd Generation:
+-- Multiple requests per instance (up to 1000)
+-- Configurable concurrency
+-- Better resource utilization
+-- Code must be thread-safe

Concurrency Configuration:
gcloud functions deploy my-func \
    --gen2 \
    --concurrency=80 \     # Requests per instance
    --max-instances=100 \  # Max instances
    --min-instances=1      # Keep warm

Concurrency Calculation:
Total concurrent requests = concurrency × instances
Example: 80 × 100 = 8,000 concurrent requests

Thread-Safe Code Example:
import threading
from flask import Flask, g

app = Flask(__name__)

# Thread-local storage
local = threading.local()

def get_db():
    if not hasattr(local, 'db'):
        local.db = create_connection()
    return local.db

@app.route('/query')
def query():
    db = get_db()  # Thread-safe
    result = db.execute('SELECT ...')
    return result

# Avoid global mutable state
# Bad:
results = []  # Shared across requests!

# Good:
def handler(request):
    results = []  # Local to request
    # ...

10. How do you test Cloud Functions locally?

Local Testing Methods:

1. Functions Framework
# Install
pip install functions-framework

# Run locally
functions-framework --target=hello_http --debug

# Test with curl
curl http://localhost:8080

2. Unit Testing
import unittest
from main import hello_http
from unittest.mock import Mock

class TestFunction(unittest.TestCase):
    def test_hello_http(self):
        # Create mock request
        request = Mock()
        request.get_json.return_value = {'name': 'Test'}
        
        # Call function
        response = hello_http(request)
        
        # Assert
        self.assertEqual(response, 'Hello Test!')
    
    def test_event_function(self):
        from cloudevents.http import CloudEvent
        
        event = CloudEvent({
            'type': 'google.cloud.storage.object.v1.finalized',
            'source': '//storage.googleapis.com/my-bucket'
        }, {'bucket': 'my-bucket', 'name': 'test.txt'})
        
        # Test event function
        result = process_file(event)
        self.assertIsNone(result)

3. Integration Testing
# Use emulators
gcloud beta emulators pubsub start
gcloud beta emulators firestore start

# Set emulator env vars
export PUBSUB_EMULATOR_HOST=localhost:8085
export FIRESTORE_EMULATOR_HOST=localhost:8086

# Run tests against emulators
pytest tests/integration/

4. Docker Testing
# Build and run locally
docker build -t my-function .
docker run -p 8080:8080 -e PORT=8080 my-function

curl http://localhost:8080

11. How do you implement error handling?

Error Handling Patterns:

1. HTTP Functions
import functions_framework
from flask import jsonify
import traceback

@functions_framework.http
def handler(request):
    try:
        data = request.get_json()
        if not data:
            return jsonify({'error': 'No data provided'}), 400
        
        result = process_data(data)
        return jsonify(result), 200
        
    except ValueError as e:
        return jsonify({'error': str(e)}), 400
    except Exception as e:
        print(f'Error: {traceback.format_exc()}')
        return jsonify({'error': 'Internal server error'}), 500

2. Event Functions
@functions_framework.cloud_event
def event_handler(cloud_event):
    try:
        data = cloud_event.data
        process(data)
    except TransientError as e:
        # Raise to retry
        print(f'Transient error, will retry: {e}')
        raise
    except PermanentError as e:
        # Log and don't retry
        print(f'Permanent error, not retrying: {e}')
        # Optionally send to dead letter topic
        publish_to_dlq(cloud_event, str(e))

3. Retry Configuration
gcloud functions deploy my-func \
    --gen2 \
    --trigger-topic=my-topic \
    --retry  # Enable retries for event functions

4. Dead Letter Queue
# Create DLQ topic
gcloud pubsub topics create my-topic-dlq

# Configure subscription with DLQ
gcloud pubsub subscriptions create my-sub \
    --topic=my-topic \
    --dead-letter-topic=my-topic-dlq \
    --max-delivery-attempts=5

5. Structured Logging
import json
from google.cloud import logging

def log_error(message, error, context=None):
    entry = {
        'severity': 'ERROR',
        'message': message,
        'error': str(error),
        'context': context
    }
    print(json.dumps(entry))

12. What are Cloud Run services?

Cloud Run Service Components:

Service --> Revisions --> Instances

Service:
+-- Unique HTTPS endpoint
+-- Traffic management
+-- IAM policies
+-- Configuration

Revision:
+-- Immutable snapshot
+-- Container image
+-- Environment config
+-- Resource limits

# Deploy service
gcloud run deploy my-service \
    --image=gcr.io/my-project/my-image \
    --region=us-central1 \
    --platform=managed \
    --memory=512Mi \
    --cpu=1 \
    --concurrency=80 \
    --max-instances=10 \
    --min-instances=1 \
    --port=8080 \
    --set-env-vars=KEY=value

# Traffic splitting
gcloud run services update-traffic my-service \
    --to-revisions=my-service-00001=90,my-service-00002=10

# Gradual rollout
gcloud run services update-traffic my-service \
    --to-latest

# Service YAML
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: my-service
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/maxScale: "10"
    spec:
      containers:
        - image: gcr.io/my-project/my-image
          ports:
            - containerPort: 8080
          resources:
            limits:
              memory: 512Mi
              cpu: "1"
          env:
            - name: KEY
              value: value

13. How do you configure autoscaling?

Autoscaling Configuration:

Cloud Functions:
gcloud functions deploy my-func \
    --gen2 \
    --min-instances=1 \     # Keep warm
    --max-instances=100 \   # Max scale
    --concurrency=80        # Requests per instance

Cloud Run:
gcloud run deploy my-service \
    --min-instances=1 \
    --max-instances=100 \
    --concurrency=80 \
    --cpu-throttling         # Reduce CPU when idle

Scaling Behavior:
+-----------------------------------------------------+
|                  Autoscaling                         |
+-----------------------------------------------------+
|  Requests    Instances                              |
|      |           |                                  |
|      0 ----------+-- min-instances (keep warm)     |
|      |           |                                  |
|    100 ----------+-- scaled instances              |
|      |           |                                  |
|   1000 ----------+-- max-instances (cap)           |
|                                                     |
|  Scale-to-zero (if min-instances=0):               |
|  - No requests --> 0 instances                       |
|  - Cold start on next request                      |
+-----------------------------------------------------+

CPU Allocation:
# Always allocated (lower latency)
gcloud run deploy my-service \
    --no-cpu-throttling \
    --min-instances=1

# Throttled when idle (lower cost)
gcloud run deploy my-service \
    --cpu-throttling \
    --min-instances=0

Startup Probe:
# Ensure instance ready before receiving traffic
gcloud run deploy my-service \
    --startup-cpu-boost \  # Extra CPU during startup
    --image=gcr.io/my-project/my-image

14. How do you connect to databases?

Database Connection Patterns:

1. Cloud SQL (via Unix socket)
import os
import sqlalchemy

def get_connection():
    connection_name = os.environ['CLOUD_SQL_CONNECTION_NAME']
    db_user = os.environ['DB_USER']
    db_pass = os.environ['DB_PASS']
    db_name = os.environ['DB_NAME']
    
    # For Cloud Functions/Run, use Unix socket
    unix_socket = f'/cloudsql/{connection_name}'
    
    engine = sqlalchemy.create_engine(
        f'postgresql+pg8000://{db_user}:{db_pass}@/{db_name}',
        connect_args={'unix_sock': f'{unix_socket}/.s.PGSQL.5432'}
    )
    return engine

# Deploy with Cloud SQL connection
gcloud functions deploy my-func \
    --gen2 \
    --set-env-vars="CLOUD_SQL_CONNECTION_NAME=project:region:instance" \
    --vpc-connector=my-connector

2. Firestore
from google.cloud import firestore

db = firestore.Client()

def handler(request):
    doc_ref = db.collection('users').document('user1')
    doc_ref.set({'name': 'Alice', 'age': 30})
    
    doc = doc_ref.get()
    return doc.to_dict()

3. BigQuery
from google.cloud import bigquery

client = bigquery.Client()

def handler(request):
    query = 'SELECT * FROM `project.dataset.table` LIMIT 10'
    results = client.query(query).result()
    return [dict(row) for row in results]

4. Redis (Memorystore)
import redis
import os

redis_host = os.environ.get('REDIS_HOST', 'localhost')
redis_client = redis.Redis(host=redis_host, port=6379)

def handler(request):
    redis_client.set('key', 'value')
    return redis_client.get('key')

15. What are Cloud Functions best practices?

Best Practices:

1. Idempotency
# Make functions idempotent for retries
from google.cloud import firestore

def process_order(cloud_event):
    order_id = cloud_event.data['order_id']
    
    db = firestore.Client()
    doc_ref = db.collection('processed').document(order_id)
    
    # Check if already processed
    if doc_ref.get().exists:
        print(f'Order {order_id} already processed')
        return
    
    # Process and mark as done atomically
    process(cloud_event.data)
    doc_ref.set({'processed_at': firestore.SERVER_TIMESTAMP})

2. Minimize Dependencies
# requirements.txt - only what's needed
functions-framework==3.*
google-cloud-storage==2.*
# Avoid unnecessary packages

3. Reuse Connections
# Global initialization (reused across invocations)
from google.cloud import storage

client = storage.Client()  # Initialized once

def handler(request):
    # Reuses client
    bucket = client.bucket('my-bucket')
    ...

4. Proper Timeout Handling
import signal

def timeout_handler(signum, frame):
    raise TimeoutError('Function timeout approaching')

def handler(request):
    # Set alarm for 5 seconds before timeout
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(55)  # For 60s timeout
    
    try:
        # Long running work
        result = process()
        return result
    finally:
        signal.alarm(0)

5. Structured Logging
import json

def log(severity, message, **kwargs):
    entry = {
        'severity': severity,
        'message': message,
        **kwargs
    }
    print(json.dumps(entry))

def handler(request):
    log('INFO', 'Processing request', request_id=request.headers.get('X-Request-ID'))

16. How do you implement authentication?

Authentication Options:

1. IAM-based (service-to-service)
# Require authentication
gcloud functions deploy my-func \
    --gen2 \
    --no-allow-unauthenticated

# Grant invoker role
gcloud functions add-iam-policy-binding my-func \
    --member="serviceAccount:caller@project.iam.gserviceaccount.com" \
    --role="roles/cloudfunctions.invoker"

# Call with identity token
import google.auth.transport.requests
import google.oauth2.id_token

def call_function():
    url = 'https://region-project.cloudfunctions.net/my-func'
    
    auth_req = google.auth.transport.requests.Request()
    token = google.oauth2.id_token.fetch_id_token(auth_req, url)
    
    response = requests.get(
        url,
        headers={'Authorization': f'Bearer {token}'}
    )
    return response.json()

2. Firebase Auth / Identity Platform
from firebase_admin import auth, initialize_app

initialize_app()

def handler(request):
    auth_header = request.headers.get('Authorization', '')
    if not auth_header.startswith('Bearer '):
        return {'error': 'Unauthorized'}, 401
    
    token = auth_header.split('Bearer ')[1]
    
    try:
        decoded = auth.verify_id_token(token)
        user_id = decoded['uid']
        return {'message': f'Hello user {user_id}'}
    except Exception as e:
        return {'error': 'Invalid token'}, 401

3. API Gateway
# Deploy with API Gateway for API key auth
gcloud api-gateway gateways create my-gateway \
    --api=my-api \
    --api-config=my-config \
    --location=us-central1

4. Custom JWT Validation
import jwt

def validate_token(request):
    token = request.headers.get('Authorization', '').replace('Bearer ', '')
    
    try:
        payload = jwt.decode(
            token,
            options={'verify_signature': True},
            algorithms=['RS256'],
            audience='my-api'
        )
        return payload
    except jwt.InvalidTokenError:
        return None

17. How do you monitor Cloud Functions?

Monitoring Options:

1. Cloud Logging
# Structured logs automatically collected
import json

def handler(request):
    # Logs appear in Cloud Logging
    print(json.dumps({
        'severity': 'INFO',
        'message': 'Processing request',
        'httpRequest': {
            'requestMethod': request.method,
            'requestUrl': request.url
        }
    }))

# Query logs
gcloud logging read 'resource.type="cloud_function"' --limit=50

2. Cloud Monitoring Metrics
# Built-in metrics:
+-- function/execution_count
+-- function/execution_times
+-- function/user_memory_bytes
+-- function/active_instances
+-- function/network_egress

# Create alert
gcloud monitoring policies create \
    --display-name="Function Error Rate" \
    --condition-display-name="Error rate > 5%" \
    --condition-filter='resource.type="cloud_function" AND metric.type="cloudfunctions.googleapis.com/function/execution_count" AND metric.labels.status!="ok"'

3. Custom Metrics
from google.cloud import monitoring_v3
import time

def write_metric(value):
    client = monitoring_v3.MetricServiceClient()
    project_name = f"projects/my-project"
    
    series = monitoring_v3.TimeSeries()
    series.metric.type = "custom.googleapis.com/function/items_processed"
    series.resource.type = "cloud_function"
    series.resource.labels["function_name"] = "my-function"
    
    point = monitoring_v3.Point()
    point.value.int64_value = value
    point.interval.end_time.seconds = int(time.time())
    series.points = [point]
    
    client.create_time_series(name=project_name, time_series=[series])

4. Error Reporting
from google.cloud import error_reporting

client = error_reporting.Client()

def handler(request):
    try:
        process()
    except Exception as e:
        client.report_exception()
        raise

18. What is Cloud Run Jobs?

Cloud Run Jobs are for running containers to completion, unlike services which handle requests.

Cloud Run Jobs vs Services:
+-- Services: Long-running, request-driven
+-- Jobs: Run to completion, scheduled/triggered

Job Use Cases:
+-- Batch processing
+-- Database migrations
+-- ML training
+-- Data pipelines
+-- Scheduled tasks

# Create job
gcloud run jobs create my-job \
    --image=gcr.io/my-project/my-batch \
    --region=us-central1 \
    --task-timeout=3600 \
    --max-retries=3 \
    --parallelism=10 \
    --tasks=100

# Execute job
gcloud run jobs execute my-job

# Schedule job
gcloud scheduler jobs create http my-scheduled-job \
    --location=us-central1 \
    --schedule="0 */6 * * *" \
    --uri="https://us-central1-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/my-project/jobs/my-job:run" \
    --http-method=POST \
    --oauth-service-account-email=my-sa@my-project.iam.gserviceaccount.com

# Job with array tasks
# main.py
import os

def main():
    task_index = int(os.environ.get('CLOUD_RUN_TASK_INDEX', 0))
    task_count = int(os.environ.get('CLOUD_RUN_TASK_COUNT', 1))
    
    print(f'Processing task {task_index} of {task_count}')
    
    # Process partition of work
    process_partition(task_index, task_count)

if __name__ == '__main__':
    main()





19. How do you optimize performance?

Performance Optimization:

1. Memory and CPU
# More memory = more CPU
gcloud functions deploy my-func \
    --memory=1024MB  # Also gets more CPU

# Cloud Run explicit CPU
gcloud run deploy my-service \
    --memory=1Gi \
    --cpu=2

2. Connection Pooling
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    connection_string,
    poolclass=QueuePool,
    pool_size=5,
    max_overflow=2,
    pool_pre_ping=True
)

3. Async Processing
import asyncio
import aiohttp

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

def handler(request):
    urls = request.get_json()['urls']
    results = asyncio.run(fetch_all(urls))
    return {'results': results}

4. Caching
from cachetools import TTLCache

# In-memory cache (per instance)
cache = TTLCache(maxsize=1000, ttl=300)

def get_data(key):
    if key in cache:
        return cache[key]
    
    data = fetch_from_database(key)
    cache[key] = data
    return data

5. Lazy Loading
_heavy_module = None

def get_heavy_module():
    global _heavy_module
    if _heavy_module is None:
        import heavy_module
        _heavy_module = heavy_module
    return _heavy_module

6. Reduce Package Size
# Use slim base images
FROM python:3.11-slim

# Multi-stage builds
FROM python:3.11 AS builder
RUN pip wheel --no-cache-dir -w /wheels -r requirements.txt

FROM python:3.11-slim
COPY --from=builder /wheels /wheels
RUN pip install --no-cache /wheels/*

20. How do you implement event-driven architectures?

Event-Driven Architecture on GCP:

+-------------------------------------------------------------+
|                Event-Driven Architecture                     |
+-------------------------------------------------------------+
|                                                              |
|  +---------+    +----------+    +-------------------------+|
|  |  Event  |--->|  Pub/Sub |--->|    Cloud Functions     ||
|  | Source  |    |          |    |    (Event Handlers)     ||
|  +---------+    +----------+    +-------------------------+|
|                      |                      |               |
|                      |                      v               |
|                      |          +-------------------------+|
|                      |          |     Cloud Run          ||
|                      |          |  (Complex Processing)   ||
|                      |          +-------------------------+|
|                      |                      |               |
|                      v                      v               |
|            +-----------------+   +---------------------+  |
|            |   Eventarc      |   |    Workflows        |  |
|            | (Event Router)  |   |  (Orchestration)    |  |
|            +-----------------+   +---------------------+  |
+-------------------------------------------------------------+

Event Sources:
+-- Cloud Storage (object events)
+-- Pub/Sub (messages)
+-- Firestore (document changes)
+-- Cloud Scheduler (cron)
+-- Eventarc (any GCP audit log)
+-- Custom (via Pub/Sub)

# Event routing with Eventarc
gcloud eventarc triggers create storage-trigger \
    --destination-run-service=my-service \
    --destination-run-region=us-central1 \
    --event-filters="type=google.cloud.storage.object.v1.finalized" \
    --event-filters="bucket=my-bucket" \
    --service-account=my-sa@project.iam.gserviceaccount.com

# Saga pattern with Workflows
main:
  steps:
    - createOrder:
        call: http.post
        args:
          url: https://order-service.run.app/orders
          body: ${order}
        result: orderResult
    
    - reserveInventory:
        try:
          call: http.post
          args:
            url: https://inventory-service.run.app/reserve
            body: ${orderResult.body.items}
        except:
          steps:
            - cancelOrder:
                call: http.delete
                args:
                  url: https://order-service.run.app/orders/${orderResult.body.id}
            - raiseError:
                raise: "Inventory reservation failed"
    
    - processPayment:
        call: http.post
        args:
          url: https://payment-service.run.app/charge
          body:
            orderId: ${orderResult.body.id}
            amount: ${orderResult.body.total}

# Fan-out pattern
def publish_events(events):
    publisher = pubsub_v1.PublisherClient()
    topic = publisher.topic_path('my-project', 'events-topic')
    
    futures = []
    for event in events:
        future = publisher.publish(topic, json.dumps(event).encode())
        futures.append(future)
    
    # Wait for all publishes
    for future in futures:
        future.result()

Google Cloud Interview Questions


Popular Posts