6. What are Cloud Functions generations?
Generation Comparison:
1st Generation (Legacy):
+-- Single concurrency per instance
+-- 9-minute timeout
+-- Limited triggers
+-- Simpler deployment
+-- Lower cold start times
2nd Generation (Recommended):
+-- Concurrent requests per instance
+-- 60-minute timeout
+-- Eventarc triggers (any GCP event)
+-- Cloud Run backend
+-- Longer cold starts
+-- Traffic splitting
+-- Better VPC connectivity
# 1st Gen deployment
gcloud functions deploy my-func \
--runtime=python311 \
--trigger-http
# 2nd Gen deployment (recommended)
gcloud functions deploy my-func \
--gen2 \
--runtime=python311 \
--trigger-http
2nd Gen Concurrency:
# Allow multiple requests per instance
gcloud functions deploy my-func \
--gen2 \
--runtime=python311 \
--trigger-http \
--concurrency=80
# Your code must be thread-safe!
import threading
from flask import Flask
app = Flask(__name__)
lock = threading.Lock()
counter = 0
@app.route('/')
def handler():
global counter
with lock:
counter += 1
return f'Request {counter}'
7. How do you handle cold starts?
Cold Start Mitigation:
1. Minimum Instances (keep warm)
gcloud functions deploy my-func \
--gen2 \
--min-instances=1 \
--trigger-http
2. Optimize initialization
# Bad - heavy imports in global scope
import pandas as pd
import tensorflow as tf
def handler(request):
# Uses pre-loaded libraries
...
# Good - lazy loading
_model = None
def get_model():
global _model
if _model is None:
import tensorflow as tf
_model = tf.keras.models.load_model('model.h5')
return _model
def handler(request):
model = get_model()
...
3. Connection pooling
from google.cloud import bigquery
# Global client (reused across invocations)
client = bigquery.Client()
def handler(request):
# Reuses existing client
results = client.query('SELECT * FROM table')
return list(results)
4. Use smaller runtimes
# Prefer:
# - Python (fast cold start)
# - Go (fast cold start)
# - Node.js (moderate)
#
# Avoid for latency-sensitive:
# - Java (slower cold start)
# - .NET (slower cold start)
5. Reduce package size
# requirements.txt - only needed packages
functions-framework==3.*
google-cloud-bigquery==3.*
# Avoid: pandas, numpy unless needed
8. How do you manage secrets in Cloud Functions?
Secret Management Options:
1. Secret Manager (Recommended)
# Create secret
echo -n "my-api-key" | gcloud secrets create api-key --data-file=-
# Grant access to function service account
gcloud secrets add-iam-policy-binding api-key \
--member="serviceAccount:my-project@appspot.gserviceaccount.com" \
--role="roles/secretmanager.secretAccessor"
# Mount as environment variable
gcloud functions deploy my-func \
--gen2 \
--set-secrets="API_KEY=api-key:latest"
# Or mount as file
gcloud functions deploy my-func \
--gen2 \
--set-secrets="/secrets/api-key=api-key:latest"
# Access in code
import os
api_key = os.environ['API_KEY']
# Or with Secret Manager client
from google.cloud import secretmanager
def get_secret(secret_id):
client = secretmanager.SecretManagerServiceClient()
name = f"projects/my-project/secrets/{secret_id}/versions/latest"
response = client.access_secret_version(name=name)
return response.payload.data.decode('UTF-8')
2. Environment Variables (Less secure)
gcloud functions deploy my-func \
--set-env-vars="API_KEY=my-key"
3. Encrypted at deployment
# Encrypt with KMS before deployment
# Decrypt in function code
9. What is Cloud Functions concurrency?
Concurrency Models:
1st Generation:
+-- 1 request per instance
+-- New instance for each concurrent request
+-- Simple but more instances needed
2nd Generation:
+-- Multiple requests per instance (up to 1000)
+-- Configurable concurrency
+-- Better resource utilization
+-- Code must be thread-safe
Concurrency Configuration:
gcloud functions deploy my-func \
--gen2 \
--concurrency=80 \ # Requests per instance
--max-instances=100 \ # Max instances
--min-instances=1 # Keep warm
Concurrency Calculation:
Total concurrent requests = concurrency × instances
Example: 80 × 100 = 8,000 concurrent requests
Thread-Safe Code Example:
import threading
from flask import Flask, g
app = Flask(__name__)
# Thread-local storage
local = threading.local()
def get_db():
if not hasattr(local, 'db'):
local.db = create_connection()
return local.db
@app.route('/query')
def query():
db = get_db() # Thread-safe
result = db.execute('SELECT ...')
return result
# Avoid global mutable state
# Bad:
results = [] # Shared across requests!
# Good:
def handler(request):
results = [] # Local to request
# ...
10. How do you test Cloud Functions locally?
Local Testing Methods:
1. Functions Framework
# Install
pip install functions-framework
# Run locally
functions-framework --target=hello_http --debug
# Test with curl
curl http://localhost:8080
2. Unit Testing
import unittest
from main import hello_http
from unittest.mock import Mock
class TestFunction(unittest.TestCase):
def test_hello_http(self):
# Create mock request
request = Mock()
request.get_json.return_value = {'name': 'Test'}
# Call function
response = hello_http(request)
# Assert
self.assertEqual(response, 'Hello Test!')
def test_event_function(self):
from cloudevents.http import CloudEvent
event = CloudEvent({
'type': 'google.cloud.storage.object.v1.finalized',
'source': '//storage.googleapis.com/my-bucket'
}, {'bucket': 'my-bucket', 'name': 'test.txt'})
# Test event function
result = process_file(event)
self.assertIsNone(result)
3. Integration Testing
# Use emulators
gcloud beta emulators pubsub start
gcloud beta emulators firestore start
# Set emulator env vars
export PUBSUB_EMULATOR_HOST=localhost:8085
export FIRESTORE_EMULATOR_HOST=localhost:8086
# Run tests against emulators
pytest tests/integration/
4. Docker Testing
# Build and run locally
docker build -t my-function .
docker run -p 8080:8080 -e PORT=8080 my-function
curl http://localhost:8080
11. How do you implement error handling?
Error Handling Patterns:
1. HTTP Functions
import functions_framework
from flask import jsonify
import traceback
@functions_framework.http
def handler(request):
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
result = process_data(data)
return jsonify(result), 200
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
print(f'Error: {traceback.format_exc()}')
return jsonify({'error': 'Internal server error'}), 500
2. Event Functions
@functions_framework.cloud_event
def event_handler(cloud_event):
try:
data = cloud_event.data
process(data)
except TransientError as e:
# Raise to retry
print(f'Transient error, will retry: {e}')
raise
except PermanentError as e:
# Log and don't retry
print(f'Permanent error, not retrying: {e}')
# Optionally send to dead letter topic
publish_to_dlq(cloud_event, str(e))
3. Retry Configuration
gcloud functions deploy my-func \
--gen2 \
--trigger-topic=my-topic \
--retry # Enable retries for event functions
4. Dead Letter Queue
# Create DLQ topic
gcloud pubsub topics create my-topic-dlq
# Configure subscription with DLQ
gcloud pubsub subscriptions create my-sub \
--topic=my-topic \
--dead-letter-topic=my-topic-dlq \
--max-delivery-attempts=5
5. Structured Logging
import json
from google.cloud import logging
def log_error(message, error, context=None):
entry = {
'severity': 'ERROR',
'message': message,
'error': str(error),
'context': context
}
print(json.dumps(entry))
12. What are Cloud Run services?
Cloud Run Service Components:
Service --> Revisions --> Instances
Service:
+-- Unique HTTPS endpoint
+-- Traffic management
+-- IAM policies
+-- Configuration
Revision:
+-- Immutable snapshot
+-- Container image
+-- Environment config
+-- Resource limits
# Deploy service
gcloud run deploy my-service \
--image=gcr.io/my-project/my-image \
--region=us-central1 \
--platform=managed \
--memory=512Mi \
--cpu=1 \
--concurrency=80 \
--max-instances=10 \
--min-instances=1 \
--port=8080 \
--set-env-vars=KEY=value
# Traffic splitting
gcloud run services update-traffic my-service \
--to-revisions=my-service-00001=90,my-service-00002=10
# Gradual rollout
gcloud run services update-traffic my-service \
--to-latest
# Service YAML
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: my-service
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/maxScale: "10"
spec:
containers:
- image: gcr.io/my-project/my-image
ports:
- containerPort: 8080
resources:
limits:
memory: 512Mi
cpu: "1"
env:
- name: KEY
value: value
13. How do you configure autoscaling?
Autoscaling Configuration:
Cloud Functions:
gcloud functions deploy my-func \
--gen2 \
--min-instances=1 \ # Keep warm
--max-instances=100 \ # Max scale
--concurrency=80 # Requests per instance
Cloud Run:
gcloud run deploy my-service \
--min-instances=1 \
--max-instances=100 \
--concurrency=80 \
--cpu-throttling # Reduce CPU when idle
Scaling Behavior:
+-----------------------------------------------------+
| Autoscaling |
+-----------------------------------------------------+
| Requests Instances |
| | | |
| 0 ----------+-- min-instances (keep warm) |
| | | |
| 100 ----------+-- scaled instances |
| | | |
| 1000 ----------+-- max-instances (cap) |
| |
| Scale-to-zero (if min-instances=0): |
| - No requests --> 0 instances |
| - Cold start on next request |
+-----------------------------------------------------+
CPU Allocation:
# Always allocated (lower latency)
gcloud run deploy my-service \
--no-cpu-throttling \
--min-instances=1
# Throttled when idle (lower cost)
gcloud run deploy my-service \
--cpu-throttling \
--min-instances=0
Startup Probe:
# Ensure instance ready before receiving traffic
gcloud run deploy my-service \
--startup-cpu-boost \ # Extra CPU during startup
--image=gcr.io/my-project/my-image
14. How do you connect to databases?
Database Connection Patterns:
1. Cloud SQL (via Unix socket)
import os
import sqlalchemy
def get_connection():
connection_name = os.environ['CLOUD_SQL_CONNECTION_NAME']
db_user = os.environ['DB_USER']
db_pass = os.environ['DB_PASS']
db_name = os.environ['DB_NAME']
# For Cloud Functions/Run, use Unix socket
unix_socket = f'/cloudsql/{connection_name}'
engine = sqlalchemy.create_engine(
f'postgresql+pg8000://{db_user}:{db_pass}@/{db_name}',
connect_args={'unix_sock': f'{unix_socket}/.s.PGSQL.5432'}
)
return engine
# Deploy with Cloud SQL connection
gcloud functions deploy my-func \
--gen2 \
--set-env-vars="CLOUD_SQL_CONNECTION_NAME=project:region:instance" \
--vpc-connector=my-connector
2. Firestore
from google.cloud import firestore
db = firestore.Client()
def handler(request):
doc_ref = db.collection('users').document('user1')
doc_ref.set({'name': 'Alice', 'age': 30})
doc = doc_ref.get()
return doc.to_dict()
3. BigQuery
from google.cloud import bigquery
client = bigquery.Client()
def handler(request):
query = 'SELECT * FROM `project.dataset.table` LIMIT 10'
results = client.query(query).result()
return [dict(row) for row in results]
4. Redis (Memorystore)
import redis
import os
redis_host = os.environ.get('REDIS_HOST', 'localhost')
redis_client = redis.Redis(host=redis_host, port=6379)
def handler(request):
redis_client.set('key', 'value')
return redis_client.get('key')
15. What are Cloud Functions best practices?
Best Practices:
1. Idempotency
# Make functions idempotent for retries
from google.cloud import firestore
def process_order(cloud_event):
order_id = cloud_event.data['order_id']
db = firestore.Client()
doc_ref = db.collection('processed').document(order_id)
# Check if already processed
if doc_ref.get().exists:
print(f'Order {order_id} already processed')
return
# Process and mark as done atomically
process(cloud_event.data)
doc_ref.set({'processed_at': firestore.SERVER_TIMESTAMP})
2. Minimize Dependencies
# requirements.txt - only what's needed
functions-framework==3.*
google-cloud-storage==2.*
# Avoid unnecessary packages
3. Reuse Connections
# Global initialization (reused across invocations)
from google.cloud import storage
client = storage.Client() # Initialized once
def handler(request):
# Reuses client
bucket = client.bucket('my-bucket')
...
4. Proper Timeout Handling
import signal
def timeout_handler(signum, frame):
raise TimeoutError('Function timeout approaching')
def handler(request):
# Set alarm for 5 seconds before timeout
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(55) # For 60s timeout
try:
# Long running work
result = process()
return result
finally:
signal.alarm(0)
5. Structured Logging
import json
def log(severity, message, **kwargs):
entry = {
'severity': severity,
'message': message,
**kwargs
}
print(json.dumps(entry))
def handler(request):
log('INFO', 'Processing request', request_id=request.headers.get('X-Request-ID'))
16. How do you implement authentication?
Authentication Options:
1. IAM-based (service-to-service)
# Require authentication
gcloud functions deploy my-func \
--gen2 \
--no-allow-unauthenticated
# Grant invoker role
gcloud functions add-iam-policy-binding my-func \
--member="serviceAccount:caller@project.iam.gserviceaccount.com" \
--role="roles/cloudfunctions.invoker"
# Call with identity token
import google.auth.transport.requests
import google.oauth2.id_token
def call_function():
url = 'https://region-project.cloudfunctions.net/my-func'
auth_req = google.auth.transport.requests.Request()
token = google.oauth2.id_token.fetch_id_token(auth_req, url)
response = requests.get(
url,
headers={'Authorization': f'Bearer {token}'}
)
return response.json()
2. Firebase Auth / Identity Platform
from firebase_admin import auth, initialize_app
initialize_app()
def handler(request):
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return {'error': 'Unauthorized'}, 401
token = auth_header.split('Bearer ')[1]
try:
decoded = auth.verify_id_token(token)
user_id = decoded['uid']
return {'message': f'Hello user {user_id}'}
except Exception as e:
return {'error': 'Invalid token'}, 401
3. API Gateway
# Deploy with API Gateway for API key auth
gcloud api-gateway gateways create my-gateway \
--api=my-api \
--api-config=my-config \
--location=us-central1
4. Custom JWT Validation
import jwt
def validate_token(request):
token = request.headers.get('Authorization', '').replace('Bearer ', '')
try:
payload = jwt.decode(
token,
options={'verify_signature': True},
algorithms=['RS256'],
audience='my-api'
)
return payload
except jwt.InvalidTokenError:
return None
17. How do you monitor Cloud Functions?
Monitoring Options:
1. Cloud Logging
# Structured logs automatically collected
import json
def handler(request):
# Logs appear in Cloud Logging
print(json.dumps({
'severity': 'INFO',
'message': 'Processing request',
'httpRequest': {
'requestMethod': request.method,
'requestUrl': request.url
}
}))
# Query logs
gcloud logging read 'resource.type="cloud_function"' --limit=50
2. Cloud Monitoring Metrics
# Built-in metrics:
+-- function/execution_count
+-- function/execution_times
+-- function/user_memory_bytes
+-- function/active_instances
+-- function/network_egress
# Create alert
gcloud monitoring policies create \
--display-name="Function Error Rate" \
--condition-display-name="Error rate > 5%" \
--condition-filter='resource.type="cloud_function" AND metric.type="cloudfunctions.googleapis.com/function/execution_count" AND metric.labels.status!="ok"'
3. Custom Metrics
from google.cloud import monitoring_v3
import time
def write_metric(value):
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/my-project"
series = monitoring_v3.TimeSeries()
series.metric.type = "custom.googleapis.com/function/items_processed"
series.resource.type = "cloud_function"
series.resource.labels["function_name"] = "my-function"
point = monitoring_v3.Point()
point.value.int64_value = value
point.interval.end_time.seconds = int(time.time())
series.points = [point]
client.create_time_series(name=project_name, time_series=[series])
4. Error Reporting
from google.cloud import error_reporting
client = error_reporting.Client()
def handler(request):
try:
process()
except Exception as e:
client.report_exception()
raise
18. What is Cloud Run Jobs?
Cloud Run Jobs are for running containers to completion, unlike services which handle requests.
Cloud Run Jobs vs Services:
+-- Services: Long-running, request-driven
+-- Jobs: Run to completion, scheduled/triggered
Job Use Cases:
+-- Batch processing
+-- Database migrations
+-- ML training
+-- Data pipelines
+-- Scheduled tasks
# Create job
gcloud run jobs create my-job \
--image=gcr.io/my-project/my-batch \
--region=us-central1 \
--task-timeout=3600 \
--max-retries=3 \
--parallelism=10 \
--tasks=100
# Execute job
gcloud run jobs execute my-job
# Schedule job
gcloud scheduler jobs create http my-scheduled-job \
--location=us-central1 \
--schedule="0 */6 * * *" \
--uri="https://us-central1-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/my-project/jobs/my-job:run" \
--http-method=POST \
--oauth-service-account-email=my-sa@my-project.iam.gserviceaccount.com
# Job with array tasks
# main.py
import os
def main():
task_index = int(os.environ.get('CLOUD_RUN_TASK_INDEX', 0))
task_count = int(os.environ.get('CLOUD_RUN_TASK_COUNT', 1))
print(f'Processing task {task_index} of {task_count}')
# Process partition of work
process_partition(task_index, task_count)
if __name__ == '__main__':
main()