Search Tutorials


Top Azure Data Factory Real-Time Scenario Interview Questions (2026) | JavaInuse

Top 20 Azure Data Factory Real-Time Scenario Interview Questions


  1. How do you handle incremental data loading from a SQL database?
  2. How do you handle slowly changing dimensions (SCD) Type 2?
  3. How do you process files that arrive in batches?
  4. How do you handle data quality checks in ADF?
  5. How do you implement a data lake architecture using ADF?
  6. How do you handle API pagination while extracting data?
  7. How do you implement dynamic pipelines based on metadata?
  8. How do you handle large file processing?
  9. How do you implement error handling and retry logic?
  10. How do you synchronize data between multiple databases?
  11. How do you handle schema drift in source data?
  12. How do you implement data archival strategy?
  13. How do you handle time zone conversions in data pipelines?
  14. How do you implement CDC (Change Data Capture)?
  15. How do you handle hierarchical/nested JSON data?
  16. How do you implement parallel processing for multiple sources?
  17. How do you handle PII data masking?
  18. How do you implement data reconciliation?
  19. How do you migrate on-premises SSIS packages to ADF?
  20. How do you optimize ADF pipeline performance?

Microsoft Azure Interview Questions

Comprehensive interview questions for Azure cloud services and data engineering roles.

1. How do you handle incremental data loading from a SQL database?

Scenario: You need to load only new or modified records from a SQL Server database daily.

Solution - Watermark Pattern:
-- Watermark Table
CREATE TABLE WatermarkTable (
    TableName VARCHAR(100),
    WatermarkColumn VARCHAR(100),
    WatermarkValue DATETIME
);

-- Source Query (dynamic)
SELECT * FROM Orders 
WHERE ModifiedDate > '@{activity('Lookup1').output.firstRow.WatermarkValue}'
  AND ModifiedDate <= '@{pipeline().parameters.NewWatermarkValue}'

Pipeline Structure:
Pipeline: IncrementalLoad
├── Lookup (Get Old Watermark)
│   └── Query: SELECT WatermarkValue FROM WatermarkTable WHERE TableName='Orders'
├── Lookup (Get New Watermark)
│   └── Query: SELECT MAX(ModifiedDate) as NewWatermark FROM Orders
├── Copy Activity (Copy Delta Data)
│   └── Source Query: SELECT * FROM Orders WHERE ModifiedDate > oldWatermark AND ModifiedDate <= newWatermark
│   └── Sink: ADLS/Destination
└── Stored Procedure (Update Watermark)
    └── UPDATE WatermarkTable SET WatermarkValue = newWatermark WHERE TableName='Orders'

For SCD Type 1 (Update existing):
Use Data Flow with AlterRow transformation to handle Upsert logic.

2. How do you handle slowly changing dimensions (SCD) Type 2?

Scenario: Track historical changes to customer data with effective dates.

Solution - Data Flow for SCD Type 2:
Data Flow: SCDType2_Customer
├── Source (New Customer Data)
├── Source (Existing Dimension - Active Records)
│   └── Filter: IsActive = 1
├── Lookup (Match on Business Key)
│   └── Left: New Data, Right: Existing
├── Conditional Split
│   ├── NewRecords: isNull(ExistingKey)
│   ├── ChangedRecords: hash(NewData) != hash(ExistingData)
│   └── UnchangedRecords: Default
├── Derived Column (For New Records)
│   ├── SurrogateKey: autoIncrement()
│   ├── EffectiveStartDate: currentDate()
│   ├── EffectiveEndDate: toDate('9999-12-31')
│   └── IsActive: 1
├── Union (New + Changed Records)
├── Derived Column (Expire Old Records)
│   ├── EffectiveEndDate: currentDate()
│   └── IsActive: 0
└── Sink (Dimension Table - Insert/Update)

-- Dimension Table Structure
CREATE TABLE DimCustomer (
    SurrogateKey INT IDENTITY PRIMARY KEY,
    CustomerID INT,  -- Business Key
    CustomerName VARCHAR(100),
    Address VARCHAR(200),
    EffectiveStartDate DATE,
    EffectiveEndDate DATE,
    IsActive BIT
);

AlterRow Expression:
-- In AlterRow transformation
Insert if: isNull(ExistingSurrogateKey)  -- New records
Update if: IsActive == 0  -- Expire old active record

3. How do you process files that arrive in batches?

Scenario: Multiple CSV files arrive in a folder; process all files and move to archive.

Pipeline: ProcessBatchFiles
├── Get Metadata (List Files)
│   └── Dataset: Source Folder
│   └── Field List: childItems
├── Filter (Only CSV Files)
│   └── Condition: @endsWith(item().name, '.csv')
├── ForEach (Process Each File)
│   └── Items: @activity('Filter1').output.Value
│   └── Sequential: false (parallel)
│   └── Activities:
│       ├── Copy Activity
│       │   └── Source: @item().name
│       │   └── Sink: Destination
│       ├── Copy Activity (Move to Archive)
│       │   └── Source: @item().name
│       │   └── Sink: Archive/@item().name
│       └── Delete Activity
│           └── Delete source file after archive
└── Send Email (Notification)
    └── Summary of processed files

-- Dynamic File Path Expression
@concat('raw/', item().name)

-- Archive Path with Timestamp
@concat('archive/', formatDateTime(utcNow(), 'yyyy/MM/dd'), '/', item().name)

Error Handling for Individual Files:
ForEach Settings:
├── Batch Count: 20 (process 20 files in parallel)
├── Sequential: false
└── Activities:
    ├── Try (Execute Pipeline - Process Single File)
    │   └── On Success: Archive file
    │   └── On Failure: Move to Error folder, Log error

4. How do you handle data quality checks in ADF?

Scenario: Validate data before loading to destination; reject bad records.

Data Flow: DataQualityChecks
├── Source (Raw Data)
├── Derived Column (Quality Flags)
│   ├── IsEmailValid: regexMatch(Email, '^[A-Za-z0-9+_.-]+@(.+)$')
│   ├── IsDateValid: !isNull(toDate(DateString, 'yyyy-MM-dd'))
│   ├── IsAmountValid: Amount > 0 && Amount < 1000000
│   └── HasRequiredFields: !isNull(CustomerID) && !isNull(ProductID)
├── Derived Column (Quality Score)
│   └── QualityScore: iif(IsEmailValid, 1, 0) + iif(IsDateValid, 1, 0) + ...
├── Conditional Split
│   ├── ValidRecords: QualityScore == 4 (all checks passed)
│   ├── PartiallyValid: QualityScore >= 2
│   └── InvalidRecords: Default
├── Sink (Valid → Production Table)
├── Sink (PartiallyValid → Review Queue)
└── Sink (Invalid → Error Log Table)

-- Error Log Entry
ErrorTable: 
├── RecordID
├── SourceFile
├── ErrorType
├── ErrorDetails
├── OriginalData (JSON)
└── ProcessedDate

Data Validation Rules Example:
-- Conditional Split expressions
Valid: 
  !isNull(CustomerID) && 
  length(CustomerID) == 10 &&
  !isNull(Email) && 
  regexMatch(Email, '^[A-Za-z0-9+_.-]+@(.+)$') &&
  Amount > 0

DuplicateCheck (using Window function):
  rowNumber = row_number() over(partition by CustomerID order by ModifiedDate desc)
  Keep only rowNumber == 1

5. How do you implement a data lake architecture using ADF?

Scenario: Implement medallion architecture (Bronze/Silver/Gold) for analytics.

Data Lake Structure:
├── Bronze (Raw Layer)
│   └── /bronze/{source}/{table}/{year}/{month}/{day}/
│   └── Format: Raw JSON/CSV/Parquet as-is
├── Silver (Cleansed Layer)
│   └── /silver/{domain}/{entity}/
│   └── Format: Delta/Parquet, deduplicated, typed
└── Gold (Curated Layer)
    └── /gold/{subject_area}/{table}/
    └── Format: Delta/Parquet, aggregated, business logic

Pipeline: MedallionArchitecture
├── Pipeline: Bronze_Ingestion
│   ├── Copy Raw Data (as-is)
│   └── Add metadata columns (source, ingestionTime)
├── Pipeline: Silver_Processing
│   ├── Data Flow: Cleanse & Transform
│   │   ├── Remove duplicates
│   │   ├── Apply data types
│   │   ├── Handle nulls
│   │   └── Standardize formats
│   └── Write Delta format (for time travel)
└── Pipeline: Gold_Aggregation
    ├── Data Flow: Business Logic
    │   ├── Joins across entities
    │   ├── Aggregations
    │   └── KPI calculations
    └── Write to Gold layer

-- File naming convention
@concat(
    'bronze/sales/orders/',
    formatDateTime(utcNow(),'yyyy'), '/',
    formatDateTime(utcNow(),'MM'), '/',
    formatDateTime(utcNow(),'dd'), '/',
    'orders_',
    formatDateTime(utcNow(),'yyyyMMddHHmmss'),
    '.parquet'
)




6. How do you handle API pagination while extracting data?

Scenario: Extract data from REST API that returns paginated results.

-- Method 1: Copy Activity with Pagination Rules
Copy Activity Settings:
├── Source: REST
│   └── Pagination Rules:
│       ├── AbsoluteUrl: $.nextLink (for next page URL in response)
│       OR
│       ├── QueryParameters.page: RANGE:1:100:1 (page 1 to 100)
│       OR
│       ├── QueryParameters.offset: RANGE:0:10000:100 (offset pagination)

-- Method 2: Until Loop for Complex Pagination
Pipeline: API_Pagination
├── Set Variable (hasMoreData = true, pageNumber = 1)
├── Until (hasMoreData == false)
│   └── Activities:
│       ├── Web Activity (Call API)
│       │   └── URL: @concat(baseUrl, '?page=', variables('pageNumber'))
│       ├── If Condition (Check for more data)
│       │   └── Condition: @greater(length(activity('Web1').output.data), 0)
│       │   └── True:
│       │       ├── Copy Data to Storage
│       │       └── Set Variable (pageNumber = pageNumber + 1)
│       │   └── False:
│       │       └── Set Variable (hasMoreData = false)

-- API Response handling
{
  "data": [...],
  "pagination": {
    "currentPage": 1,
    "totalPages": 50,
    "nextPageUrl": "https://api.example.com/data?page=2"
  }
}

-- Expression to check more pages
@less(
    int(activity('CallAPI').output.pagination.currentPage),
    int(activity('CallAPI').output.pagination.totalPages)
)

7. How do you implement dynamic pipelines based on metadata?

Scenario: Single pipeline to load multiple tables based on configuration metadata.

-- Metadata Table
CREATE TABLE PipelineMetadata (
    SourceSchema VARCHAR(50),
    SourceTable VARCHAR(100),
    TargetSchema VARCHAR(50),
    TargetTable VARCHAR(100),
    LoadType VARCHAR(20),  -- 'Full' or 'Incremental'
    WatermarkColumn VARCHAR(100),
    IsActive BIT,
    LastLoadDate DATETIME
);

Pipeline: MetadataDrivenLoad
├── Lookup (Get Active Tables)
│   └── Query: SELECT * FROM PipelineMetadata WHERE IsActive = 1
├── ForEach (Process Each Table)
│   └── Items: @activity('Lookup1').output.value
│   └── Activities:
│       └── Execute Pipeline (GenericCopyPipeline)
│           └── Parameters:
│               ├── SourceSchema: @item().SourceSchema
│               ├── SourceTable: @item().SourceTable
│               ├── TargetTable: @item().TargetTable
│               ├── LoadType: @item().LoadType
│               └── WatermarkColumn: @item().WatermarkColumn

Pipeline: GenericCopyPipeline
├── Parameters: SourceSchema, SourceTable, LoadType, WatermarkColumn
├── If Condition (LoadType == 'Incremental')
│   └── True: Execute Incremental Load
│   └── False: Execute Full Load
├── Copy Activity
│   └── Source Query (Dynamic):
│       @if(equals(pipeline().parameters.LoadType, 'Full'),
│           concat('SELECT * FROM ', pipeline().parameters.SourceSchema, '.', pipeline().parameters.SourceTable),
│           concat('SELECT * FROM ', pipeline().parameters.SourceSchema, '.', pipeline().parameters.SourceTable, 
│                  ' WHERE ', pipeline().parameters.WatermarkColumn, ' > ''', variables('LastWatermark'), '''')
│       )
└── Update Metadata (LastLoadDate)

8. How do you handle large file processing?

Scenario: Process a 50GB file efficiently without memory issues.

-- Strategy 1: Chunked Processing
# Split large file into chunks using Azure Function or Databricks
# Then process each chunk in parallel

Pipeline: LargeFileProcessing
├── Azure Function (Split File)
│   └── Split 50GB into 500MB chunks
│   └── Return list of chunk file paths
├── ForEach (Process Chunks in Parallel)
│   └── Batch Count: 10
│   └── Copy Activity for each chunk
└── Azure Function (Merge Results if needed)

-- Strategy 2: Data Flow Partitioning
Data Flow Settings:
├── Source: Large File
│   └── Enable Staging (for large files)
├── Optimize Tab:
│   └── Partitioning: Hash or Round Robin
│   └── Number of Partitions: 200
└── Sink:
    └── Single file per partition: false
    └── File name option: Pattern
    └── Pattern: output_part{n}.parquet

-- Strategy 3: Copy Activity Settings
Copy Activity:
├── Enable Staging: true
├── Staging Storage: Azure Blob
├── Parallel Copies: 32
├── Data Integration Units (DIU): 256 (max)
└── Sink:
    └── Copy behavior: PreserveHierarchy
    └── Max concurrent connections: 50

-- For very large files, use PolyBase or COPY command
Copy Activity Sink (Synapse):
├── Copy method: PolyBase
├── PolyBase Settings:
│   └── Allow PolyBase: true
│   └── Reject Type: value
│   └── Reject Value: 10

9. How do you implement error handling and retry logic?

Scenario: Handle transient failures with retry and log all errors for investigation.

Pipeline: RobustETLPipeline
├── Activity Settings (on each activity):
│   └── Retry: 3
│   └── Retry Interval: 30 seconds
│   └── Timeout: 01:00:00
│   └── Secure Output: false (to see errors)
│
├── Copy Activity
│   └── On Success → Continue to next step
│   └── On Failure → Execute "LogError" activity
│
├── Stored Procedure (LogError)
│   └── Parameters:
│       ├── PipelineName: @pipeline().Pipeline
│       ├── RunId: @pipeline().RunId
│       ├── ActivityName: @activity('CopyData').ActivityName
│       ├── ErrorMessage: @activity('CopyData').output.errors[0].Message
│       ├── ErrorCode: @activity('CopyData').Error.errorCode
│       └── Timestamp: @utcNow()

-- Error Logging Table
CREATE TABLE PipelineErrorLog (
    LogId INT IDENTITY PRIMARY KEY,
    PipelineName VARCHAR(200),
    RunId VARCHAR(100),
    ActivityName VARCHAR(200),
    ErrorMessage NVARCHAR(MAX),
    ErrorCode VARCHAR(50),
    Timestamp DATETIME,
    Status VARCHAR(20)
);

-- Web Activity for Alert
Web Activity (Send Alert):
├── URL: Logic App HTTP trigger URL
├── Method: POST
├── Body: 
{
    "pipeline": "@{pipeline().Pipeline}",
    "error": "@{activity('CopyData').Error.message}",
    "runId": "@{pipeline().RunId}"
}

-- Conditional execution
Upon Failure path:
├── Log Error
├── Send Alert
└── Set Pipeline Variable (HasError = true)

At End of Pipeline:
├── If HasError == true
│   └── Fail Pipeline with custom message

10. How do you synchronize data between multiple databases?

Scenario: Keep data in sync between SQL Server and Azure SQL Database.

-- Bi-directional Sync Pattern
Pipeline: BidirectionalSync
├── Lookup (Changes from Source A - Last 24 hours)
│   └── Query: SELECT * FROM ChangeTracking WHERE ModifiedDate > @lastSync
├── Lookup (Changes from Source B - Last 24 hours)
├── Data Flow (Conflict Resolution)
│   └── Source: Changes from A
│   └── Source: Changes from B
│   └── Join on Business Key
│   └── Conditional Split:
│       ├── OnlyInA: Apply to B
│       ├── OnlyInB: Apply to A
│       └── InBoth: Use latest ModifiedDate (conflict resolution)
├── Copy Activity (A → B changes)
├── Copy Activity (B → A changes)
└── Update Sync Timestamp

-- Conflict Resolution Logic in Data Flow
Derived Column:
├── WinningSource: iif(ModifiedDateA > ModifiedDateB, 'A', 'B')
├── FinalValue: iif(WinningSource == 'A', ValueA, ValueB)

-- Using Change Tracking (SQL Server)
DECLARE @last_sync_version bigint = (SELECT LastSyncVersion FROM SyncMetadata);
DECLARE @current_version bigint = CHANGE_TRACKING_CURRENT_VERSION();

SELECT t.*, ct.SYS_CHANGE_OPERATION
FROM MyTable t
RIGHT JOIN CHANGETABLE(CHANGES MyTable, @last_sync_version) ct
ON t.PrimaryKey = ct.PrimaryKey;

-- Update sync version after successful sync
UPDATE SyncMetadata SET LastSyncVersion = @current_version;

11. How do you handle schema drift in source data?

Scenario: Source adds new columns without notice; pipeline should handle gracefully.

-- Enable Schema Drift in Data Flow
Data Flow Settings:
├── Source:
│   └── Allow schema drift: true
│   └── Infer drifted column types: true
│   └── Validate schema: false
├── Transformations:
│   └── Use byName() to reference columns
│   └── Select: Map drifted columns
└── Sink:
    └── Schema drift: Allow
    └── Auto-map drifted columns: true

-- Handle specific drifted columns
Select Transformation:
├── Fixed Mapping:
│   └── CustomerID → CustomerID
│   └── Name → Name
└── Rule-based Mapping:
    └── Match: name matches 'Custom.*'
    └── Name: 'drifted_' + $$
    └── Type: string

-- Expression for unknown columns
Derived Column:
├── AllDriftedAsJson: 
    toJSON(
        mapIf(
            columnNames(),
            !in(['CustomerID','Name','Email'], #item),
            byName(#item)
        )
    )

-- Sink to flexible schema (JSON/Parquet)
Data Flow Sink:
├── Dataset: Parquet with no schema
├── Settings:
│   └── Output to single file: false
│   └── Auto-schema: true

-- Alert on schema changes
Pipeline:
├── Get Metadata (Get current schema)
├── Compare with stored schema
├── If different → Log change and alert

12. How do you implement data archival strategy?

Scenario: Archive data older than 2 years to cold storage, maintain hot data for queries.

Pipeline: DataArchival
├── Lookup (Get Tables to Archive)
│   └── Query: SELECT TableName, RetentionDays FROM ArchivalConfig
├── ForEach Table
│   └── Activities:
│       ├── Copy Activity (Archive Old Data)
│       │   └── Source Query:
│       │       SELECT * FROM @{item().TableName}
│       │       WHERE CreatedDate < DATEADD(day, -@{item().RetentionDays}, GETDATE())
│       │   └── Sink: Archive Storage (Cool/Archive tier)
│       │       └── Path: archive/{table}/{year}/{month}/
│       ├── Stored Procedure (Verify Archive)
│       │   └── Compare row counts
│       └── Stored Procedure (Delete Archived Data)
│           └── DELETE FROM table WHERE CreatedDate < cutoff

-- Archive File Organization
archive/
├── sales/
│   ├── 2022/
│   │   ├── 01/orders_202201.parquet
│   │   └── 02/orders_202202.parquet
│   └── 2023/
└── customers/
    └── ...

-- Lifecycle Management Policy (Azure Storage)
{
  "rules": [
    {
      "name": "moveToArchive",
      "type": "Lifecycle",
      "definition": {
        "filters": {
          "blobTypes": ["blockBlob"],
          "prefixMatch": ["archive/"]
        },
        "actions": {
          "baseBlob": {
            "tierToCool": { "daysAfterModificationGreaterThan": 30 },
            "tierToArchive": { "daysAfterModificationGreaterThan": 90 }
          }
        }
      }
    }
  ]
}

-- Partitioned Table Strategy (Synapse)
-- Drop old partitions instead of delete
ALTER TABLE Orders SWITCH PARTITION 1 TO Orders_Archive PARTITION 1;

13. How do you handle time zone conversions in data pipelines?

Scenario: Data comes from multiple regions; standardize all timestamps to UTC.

-- Data Flow: Time Zone Standardization
Derived Column:
├── UTCTimestamp:
    convertTimeZone(
        LocalTimestamp,
        SourceTimeZone,  -- 'Eastern Standard Time'
        'UTC'
    )

-- Multiple source regions
Derived Column:
├── UTCTimestamp:
    case(
        SourceRegion == 'US-East', convertTimeZone(LocalTimestamp, 'Eastern Standard Time', 'UTC'),
        SourceRegion == 'US-West', convertTimeZone(LocalTimestamp, 'Pacific Standard Time', 'UTC'),
        SourceRegion == 'EU', convertTimeZone(LocalTimestamp, 'Central European Standard Time', 'UTC'),
        SourceRegion == 'Asia', convertTimeZone(LocalTimestamp, 'Singapore Standard Time', 'UTC'),
        LocalTimestamp  -- default, assume UTC
    )

-- ADF Expression (in Copy Activity)
@convertTimeZone(
    activity('Lookup1').output.firstRow.LocalTime,
    'Eastern Standard Time',
    'UTC'
)

-- Handle DST (Daylight Saving Time)
-- Use IANA time zone names for automatic DST handling
Derived Column:
├── UTCTimestamp:
    convertTimeZone(
        LocalTimestamp,
        'America/New_York',  -- IANA format handles DST
        'UTC'
    )

-- Store both local and UTC
Output Columns:
├── OriginalTimestamp (as received)
├── SourceTimeZone
├── UTCTimestamp (standardized)
└── ProcessedAt (pipeline execution time in UTC)

14. How do you implement CDC (Change Data Capture)?

Scenario: Capture real-time changes from SQL database and stream to data lake.

-- Method 1: Native ADF CDC Connector
Copy Activity with CDC:
├── Source: SQL Server CDC
│   └── Enable CDC: true
│   └── Net changes: true
│   └── Start from: Last checkpoint
├── Sink: Delta Lake
│   └── Update method: Upsert
│   └── Key columns: [PrimaryKey]

-- Method 2: SQL Server Change Tracking
-- Enable on database
ALTER DATABASE MyDB SET CHANGE_TRACKING = ON;
ALTER TABLE Orders ENABLE CHANGE_TRACKING;

-- Query changes
DECLARE @last_sync bigint = @{variables('LastSyncVersion')};
SELECT 
    ct.SYS_CHANGE_OPERATION,
    ct.SYS_CHANGE_VERSION,
    o.*
FROM CHANGETABLE(CHANGES Orders, @last_sync) ct
LEFT JOIN Orders o ON ct.OrderID = o.OrderID;

-- Data Flow CDC Pattern
Data Flow:
├── Source (CDC Query)
├── Conditional Split
│   ├── Inserts: SYS_CHANGE_OPERATION == 'I'
│   ├── Updates: SYS_CHANGE_OPERATION == 'U'
│   └── Deletes: SYS_CHANGE_OPERATION == 'D'
├── AlterRow
│   └── Insert if: SYS_CHANGE_OPERATION == 'I'
│   └── Update if: SYS_CHANGE_OPERATION == 'U'
│   └── Delete if: SYS_CHANGE_OPERATION == 'D'
└── Sink (Delta Lake)
    └── Enable merge (upsert)

-- Method 3: Event-driven with Event Grid
Trigger: Storage Event (file created)
├── When new CDC file arrives in landing zone
└── Pipeline processes and merges changes

15. How do you handle hierarchical/nested JSON data?

Scenario: Flatten complex nested JSON from API into relational tables.

-- Source JSON
{
  "orderId": "ORD001",
  "customer": {
    "id": "C001",
    "name": "John Doe",
    "addresses": [
      {"type": "billing", "city": "NYC"},
      {"type": "shipping", "city": "LA"}
    ]
  },
  "items": [
    {"productId": "P001", "qty": 2, "price": 100},
    {"productId": "P002", "qty": 1, "price": 200}
  ]
}

-- Data Flow: Flatten JSON
Source (JSON file)
├── Flatten (Customer Addresses)
│   └── Unroll: customer.addresses
│   └── Output: orderId, customer.id, addresses.type, addresses.city
├── Flatten (Order Items)  
│   └── Unroll: items
│   └── Output: orderId, items.productId, items.qty, items.price
├── Select (Order Header)
│   └── orderId, customer.id, customer.name

-- Create multiple outputs
Conditional Split:
├── OrderHeader → Sink (Orders table)
├── OrderItems → Sink (OrderItems table)
└── CustomerAddresses → Sink (Addresses table)

-- Flatten Transformation Settings
Flatten:
├── Unroll by: items[]
├── Unroll root: 
├── Input columns: orderId, customer.id
└── Output: orderId, customerId, productId, qty, price

-- Handle deeply nested
Parse Transformation:
├── Column: nestedJson
├── Expression: @json
└── Document form: Single document

-- Extract specific paths
Derived Column:
├── customerId: customer.id
├── customerName: customer.name
├── billingCity: customer.addresses[?(@.type=='billing')].city




16. How do you implement parallel processing for multiple sources?

Scenario: Load data from 50 different source tables simultaneously with optimal performance.

Pipeline: ParallelMultiSourceLoad
├── Lookup (Get Table List - 50 tables)
├── ForEach (Parallel Processing)
│   └── Settings:
│       ├── Sequential: false
│       ├── Batch Count: 20  -- Process 20 tables at a time
│   └── Activities:
│       └── Execute Pipeline (LoadSingleTable)

-- Optimize with Batching
Pipeline: BatchParallelLoad
├── Set Variable (Create Batches)
│   └── Expression: 
│       @chunk(activity('Lookup').output.value, 10)  -- Batches of 10
├── ForEach (Process Batches Sequentially)
│   └── Sequential: true  -- One batch at a time
│   └── Activities:
│       └── ForEach (Process Tables in Batch Parallel)
│           └── Sequential: false
│           └── Batch Count: 10

-- Data Flow with Multiple Sources
Data Flow: ParallelSources
├── Source1 (Table A) ─┐
├── Source2 (Table B) ─┼─→ Union → Transform → Sink
├── Source3 (Table C) ─┘

-- Integration Runtime Scaling
Self-hosted IR:
├── Create IR with 4 nodes
├── Each node handles different connections
└── Auto load balancing

Azure IR:
├── Time to live: 10 minutes
├── Core count: 16 (General purpose)
└── Reserved for heavy workloads

-- Concurrent Pipeline Runs
Pipeline Settings:
├── Max concurrent runs: 10
└── Activities Settings:
    ├── Copy Activity:
    │   └── Parallel copies: 32
    │   └── DIU: 256
    └── Data Flow:
        └── Core count: 16
        └── Compute type: Memory optimized

17. How do you handle PII data masking?

Scenario: Mask sensitive data (SSN, email, credit card) before loading to analytics.

-- Data Flow: PII Masking
Derived Column (Masking Rules):
├── SSN_Masked: 
    concat('XXX-XX-', right(SSN, 4))
    -- 123-45-6789 → XXX-XX-6789

├── Email_Masked:
    concat(
        left(Email, 2),
        '****@',
        split(Email, '@')[2]
    )
    -- john.doe@email.com → jo****@email.com

├── CreditCard_Masked:
    concat('****-****-****-', right(CreditCard, 4))
    -- 1234-5678-9012-3456 → ****-****-****-3456

├── Phone_Masked:
    concat('(***) ***-', right(replace(Phone, '-', ''), 4))

├── Name_Masked:
    concat(left(FirstName, 1), '****')

-- Hash for consistent anonymization (lookup possible)
├── CustomerID_Hashed:
    sha2(256, concat(CustomerID, 'salt_value'))

-- Conditional Masking based on environment
├── Email_Output:
    iif(
        '@{pipeline().parameters.Environment}' == 'Production',
        Email,  -- Keep original in prod
        Email_Masked  -- Mask in non-prod
    )

-- Dynamic Masking with Metadata
Lookup: MaskingRules
├── ColumnName: 'SSN', MaskType: 'SSN'
├── ColumnName: 'Email', MaskType: 'Email'

Join with column list:
├── Apply mask function based on MaskType

18. How do you implement data reconciliation?

Scenario: Verify data integrity between source and target after ETL load.

Pipeline: DataReconciliation
├── Lookup (Source Count)
│   └── Query: SELECT COUNT(*) as SourceCount FROM SourceTable WHERE Date = @loadDate
├── Lookup (Target Count)
│   └── Query: SELECT COUNT(*) as TargetCount FROM TargetTable WHERE LoadDate = @loadDate
├── Lookup (Source Checksum)
│   └── Query: SELECT SUM(CAST(CHECKSUM(Amount) AS BIGINT)) as SourceSum FROM SourceTable
├── Lookup (Target Checksum)
│   └── Query: SELECT SUM(Amount) as TargetSum FROM TargetTable
├── If Condition (Validate Counts)
│   └── Condition: 
│       @equals(
│           activity('SourceCount').output.firstRow.SourceCount,
│           activity('TargetCount').output.firstRow.TargetCount
│       )
│   └── False → Fail pipeline with details
├── If Condition (Validate Sums)
│   └── Condition: 
│       @equals(
│           activity('SourceSum').output.firstRow.SourceSum,
│           activity('TargetSum').output.firstRow.TargetSum
│       )
└── Stored Procedure (Log Reconciliation)
    └── INSERT INTO ReconciliationLog (
            LoadDate, SourceCount, TargetCount, 
            SourceSum, TargetSum, Status, Variance
        )

-- Detailed Reconciliation Report
Data Flow: DetailedReconciliation
├── Source (Source System)
├── Source (Target System)
├── Join (Full Outer on Business Key)
├── Conditional Split:
│   ├── MissingInTarget: isNull(TargetKey)
│   ├── MissingInSource: isNull(SourceKey)
│   ├── ValueMismatch: SourceAmount != TargetAmount
│   └── Matched: Default
├── Sink (Discrepancy Report)

-- Expression for variance percentage
Derived Column:
├── VariancePercent:
    abs(SourceCount - TargetCount) * 100.0 / SourceCount

19. How do you migrate on-premises SSIS packages to ADF?

Scenario: Convert existing SSIS packages to ADF pipelines.

-- Option 1: Lift and Shift (Azure-SSIS IR)
-- Run existing packages unchanged
Azure-SSIS Integration Runtime:
├── Deploy SSIS packages to SSISDB
├── Execute using Execute SSIS Package activity
├── Benefits: Minimal changes, familiar tooling
└── Limitations: Still SSIS, not cloud-native

Pipeline Activity:
├── Execute SSIS Package
│   └── SSIS package path: /SSISDB/Folder/Project/Package.dtsx
│   └── Connection managers: Override with Azure connections
│   └── Parameters: Pass pipeline parameters

-- Option 2: Convert to ADF Native
SSIS Component → ADF Equivalent:
├── Data Flow Task → Copy Activity / Data Flow
├── Execute SQL → Stored Procedure Activity
├── For Loop → ForEach Activity
├── Sequence Container → Pipeline with dependencies
├── Script Task → Azure Function / Databricks
├── Lookup → Lookup Activity
├── Derived Column → Data Flow Derived Column
├── Aggregate → Data Flow Aggregate
├── Sort → Data Flow Sort
├── Merge Join → Data Flow Join

-- Migration Assessment Tool
# Use SSIS Migration Assessment Tool
# Identifies compatibility issues
# Generates migration report

-- Conversion Example
SSIS Package:
├── Execute SQL (Get Max Date)
├── Data Flow (Load Incremental)
│   ├── OLE DB Source
│   ├── Derived Column
│   └── OLE DB Destination
└── Execute SQL (Update Watermark)

Converted ADF Pipeline:
├── Lookup (Get Max Date)
├── Data Flow
│   ├── Source
│   ├── Derived Column
│   └── Sink
└── Stored Procedure (Update Watermark)

20. How do you optimize ADF pipeline performance?

Best Practices for Optimization:

-- 1. Copy Activity Optimization
Copy Activity Settings:
├── DIU (Data Integration Units): Start with Auto, increase for large data
├── Parallel copies: 32 (default), increase for many small files
├── Enable staging for cloud-to-cloud copies (uses PolyBase)
├── Use binary copy when no transformation needed

-- 2. Data Flow Optimization
Data Flow Settings:
├── Core count: 16-256 based on data size
├── TTL (Time to Live): 10+ minutes for iterative development
├── Compute type: Memory Optimized for complex transformations
├── Enable staging for Synapse sink

Partitioning Strategy:
├── Hash: For large skewed data
├── Round Robin: For even distribution
├── Key: Preserve source partitioning

-- 3. Pipeline Design
Optimization:
├── Use ForEach with parallel execution (Sequential: false)
├── Batch activities to reduce overhead
├── Use Execute Pipeline for modularity and parallel runs
├── Avoid unnecessary lookups (cache results)

-- 4. Source Optimization
For SQL Sources:
├── Use indexed columns in WHERE clause
├── Avoid SELECT * (specify columns)
├── Use partitioning hint for large tables

Source Query:
├── Query: SELECT col1, col2 FROM Table 
│          WHERE ModifiedDate > ? 
│          OPTION (MAXDOP 8)
├── Partition option: Physical partitions of table

-- 5. Sink Optimization
For SQL Sink:
├── Pre-copy script: TRUNCATE TABLE target (if full load)
├── Batch size: 10000
├── Bulk insert table lock: true
├── Use staging with PolyBase for Synapse

For ADLS Sink:
├── Block size: 100MB
├── Max concurrent connections: 50

-- 6. Integration Runtime
Self-hosted IR:
├── Use multiple nodes (up to 4) for HA and load balancing
├── Separate IR for different workloads
├── Place IR close to data source

Azure IR:
├── Use region close to data
├── Consider managed VNET for security

-- 7. Monitoring and Alerting
Monitor:
├── Track DIU utilization
├── Monitor queue time vs execution time
├── Set up alerts for long-running pipelines
├── Use Log Analytics for detailed analysis

Microsoft Azure Interview Questions

Comprehensive interview questions for Azure cloud services and data engineering roles.


Popular Posts