DatabricksDB DE Professional57 concepts
DB DE Professional Cheat Sheet
Quick reference for the Databricks Certified Data Engineer Professional exam.
Quick Navigation
Advanced Delta Lake Operations
- ALTER TABLE my_table SET TBLPROPERTIES ( 'delta.enableChangeDataFeed' = 'true' )
- Enable Change Data Feed — must be set BEFORE making changes. Does not retroactively capture past changes.
- SELECT * FROM table_changes('my_table', 2) WHERE _change_type = 'update_postimage'
- Read changes from CDF starting at version 2. _change_type values: insert, update_preimage, update_postimage, delete
- spark.read.option('readChangeFeed', 'true') .option('startingVersion', 2) .option('endingVersion', 5) .table('my_table')
- Read CDF changes between versions using PySpark — returns DataFrame with _change_type, _commit_version, _commit_timestamp
- CREATE TABLE test_clone SHALLOW CLONE prod_table
- Zero-copy clone — references source files. Fast to create, minimal storage. Breaks if source runs VACUUM.
- CREATE TABLE archive_copy DEEP CLONE prod_table
- Full independent copy — copies all data files. Changes to clone don't affect source. Uses full storage.
- ALTER TABLE my_table ADD CONSTRAINT positive_amount CHECK (amount > 0)
- Add CHECK constraint — enforced on write only. Existing violating data is NOT retroactively flagged.
- CREATE TABLE events ( event_timestamp TIMESTAMP, event_date DATE GENERATED ALWAYS AS (CAST(event_timestamp AS DATE)), payload STRING ) USING DELTA
- Generated column — computed on write, not read. Changing the expression does NOT update existing rows.
- DESCRIBE DETAIL my_table
- Show table metadata: location, format, size, partition columns, number of files, table properties
Structured Streaming — Advanced Patterns
- df.withWatermark('event_time', '10 minutes') .groupBy(window('event_time', '5 minutes')) .agg(avg('value'))
- Watermark with windowed aggregation — drops events arriving more than 10 minutes late
- # Stream-static join (static loaded ONCE at start) stream_df.join(static_df, 'user_id', 'left')
- Stream-static join — static table is NOT refreshed during execution. Restart stream to pick up changes.
- # Stream-stream join (requires watermarks on both) stream_a.withWatermark('ts_a', '10 min') .join( stream_b.withWatermark('ts_b', '10 min'), expr('id_a = id_b AND ts_a >= ts_b - interval 5 minutes') )
- Stream-stream join — watermarks bound state. Both streams must have watermarks to prevent unbounded memory.
- .trigger(availableNow=True)
- Process all available data in multiple micro-batches, then stop. PREFERRED over trigger(once=True) for large backlogs — avoids OOM.
- .trigger(once=True)
- Process all available data in ONE micro-batch, then stop. DEPRECATED — can cause OOM with large datasets. Use availableNow instead.
- # Output modes: # append — new rows only (default, non-aggregation) # update — changed rows only # complete — full result table (aggregations only)
- Three output modes for streaming writes. Complete mode rewrites the entire result — only valid for aggregation queries.
- def upsert_to_delta(batch_df, batch_id): batch_df.createOrReplaceTempView('updates') batch_df.sparkSession.sql(""" MERGE INTO target t USING updates s ON t.id = s.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """) stream_df.writeStream .foreachBatch(upsert_to_delta) .start()
- foreachBatch pattern — standard approach for streaming MERGE INTO operations. Receives a batch DataFrame per micro-batch.
CDC and MERGE INTO — Advanced Patterns
- -- SCD Type 1 (overwrite current values) MERGE INTO customers t USING updates s ON t.id = s.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *
- SCD Type 1 — no history preserved. Latest values overwrite previous ones.
- -- SCD Type 2 (preserve history) MERGE INTO dim_customer t USING ( SELECT *, true AS is_current, current_date() AS start_date FROM staged_updates ) s ON t.id = s.id AND t.is_current = true WHEN MATCHED THEN UPDATE SET t.is_current = false, t.end_date = current_date() WHEN NOT MATCHED THEN INSERT *
- SCD Type 2 — closes current record and inserts new version. Preserves full change history with effective dates.
- -- CDC with deduplication before MERGE WITH ranked AS ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY patient_id ORDER BY change_timestamp DESC ) AS rn FROM staging_cdc ) MERGE INTO target t USING (SELECT * FROM ranked WHERE rn = 1) s ON t.patient_id = s.patient_id WHEN MATCHED AND s.op = 'DELETE' THEN DELETE WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED AND s.op != 'DELETE' THEN INSERT *
- CDC processing — deduplicate source by key, take latest change, handle inserts/updates/deletes in one MERGE.
- -- MERGE with schema evolution SET spark.databricks.delta.schema.autoMerge.enabled = true; MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *
- Auto-merge schema — new columns in source are automatically added to the target table during MERGE.
- -- Deduplicate BEFORE merging (critical!) source_deduped = source_df.dropDuplicates(['id']) # OR with window function for latest record: from pyspark.sql.window import Window w = Window.partitionBy('id').orderBy(col('ts').desc()) source_deduped = source_df.withColumn('rn', row_number().over(w)).filter('rn = 1').drop('rn')
- Always deduplicate source before MERGE. Duplicate source keys matching same target row cause unexpected duplicate inserts.
Data Modeling Patterns
- Star Schema: - Fact tables: transactions, events, metrics (large, append-heavy) - Dimension tables: customers, products, dates (smaller, slowly changing) - Join via foreign keys: fact.customer_id → dim_customer.id
- Star schema — central fact tables surrounded by dimension tables. Optimized for analytical queries with simple joins.
- Medallion Architecture (Professional): - Bronze: raw, append-only, source-faithful - Silver: cleaned, deduplicated, conformed, validated - Gold: business aggregates, pre-joined, consumption-ready
- Three-layer architecture. Professional exam tests advanced Silver→Gold patterns, not just the concept.
- -- View (always fresh, recomputes on query) CREATE VIEW daily_summary AS SELECT date, SUM(revenue) FROM orders GROUP BY date
- Views recompute on every query — always fresh but slow for complex aggregations on large datasets.
- -- Materialized View (DLT-managed, precomputed) CREATE LIVE TABLE daily_summary AS SELECT date, SUM(revenue) FROM orders GROUP BY date
- Materialized views in Databricks are managed by DLT. Precomputed for fast reads but may be stale until refreshed.
- -- Fact table with generated columns CREATE TABLE fact_sales ( sale_id BIGINT, sale_timestamp TIMESTAMP, sale_date DATE GENERATED ALWAYS AS (CAST(sale_timestamp AS DATE)), amount DECIMAL(10,2), customer_id BIGINT, product_id BIGINT ) USING DELTA CLUSTER BY (sale_date, customer_id)
- Fact table design with generated column for partitioning and Liquid Clustering for optimized query performance.
Testing Strategies
- # Unit test with pytest + local SparkSession import pytest from pyspark.sql import SparkSession @pytest.fixture(scope='session') def spark(): return SparkSession.builder.master('local[*]').getOrCreate() def test_dedup(spark): data = [(1, 'a'), (1, 'b'), (2, 'c')] df = spark.createDataFrame(data, ['id', 'val']) result = df.dropDuplicates(['id']) assert result.count() == 2
- Unit test pattern — isolate transformation logic into testable functions. Use local SparkSession for fast tests.
- # Integration test with temporary tables def test_pipeline_integration(spark): # Create test data in temp catalog spark.sql('CREATE SCHEMA IF NOT EXISTS test_catalog.test_schema') spark.sql('CREATE TABLE test_catalog.test_schema.input AS SELECT 1 AS id') # Run pipeline run_pipeline('test_catalog.test_schema') # Assert results result = spark.table('test_catalog.test_schema.output') assert result.count() > 0 # Cleanup spark.sql('DROP SCHEMA test_catalog.test_schema CASCADE')
- Integration test — use dedicated test catalogs/schemas in Unity Catalog to isolate test data from production.
- # Test with shallow clone (zero-copy production data) CREATE TABLE test_catalog.test.customers SHALLOW CLONE prod_catalog.main.customers
- Use shallow clones for testing against production-like data without duplicating storage. Fast to create.
- # Test streaming with memory sink query = stream_df.writeStream .format('memory') .queryName('test_output') .outputMode('append') .start() query.processAllAvailable() result = spark.sql('SELECT * FROM test_output') assert result.count() > 0 query.stop()
- Test streaming pipelines using memory sink — writes to an in-memory table for assertions. Useful for unit testing.
CI/CD and Deployment
- # databricks.yml — multi-environment bundle bundle: name: my_pipeline targets: dev: workspace: host: https://dev.cloud.databricks.com default: true staging: workspace: host: https://staging.cloud.databricks.com production: workspace: host: https://prod.cloud.databricks.com run_as: service_principal_name: prod-sp
- Databricks Asset Bundles — define dev/staging/prod targets. Targets inherit defaults and override only what differs.
- databricks bundle validate databricks bundle deploy -t staging databricks bundle deploy -t production
- Bundle CLI workflow: validate first, deploy to staging, then promote to production.
- # Notebook parameterization via Jobs API env = dbutils.widgets.get('environment') catalog = f'{env}_catalog' spark.sql(f'USE CATALOG {catalog}')
- Retrieve job parameters with dbutils.widgets.get(). Use to switch catalogs/schemas per environment.
- # REST API — create vs reset # POST /api/2.0/jobs/create → new job each call (NOT idempotent) # POST /api/2.0/jobs/reset → update existing job by job_id # POST /api/2.0/jobs/run-now → trigger immediate run
- Key API endpoints. /create is NOT idempotent — multiple calls create duplicate jobs. Use /reset to update.
- # Repair Run (partial retry) # Re-execute ONLY failed tasks + downstream dependents # Preserves results of successful tasks # Available via Jobs UI or POST /api/2.0/jobs/runs/repair
- Repair Run for partial pipeline failures — avoids re-running the entire job. Fix root cause first or repair will fail again.
- # Multi-environment catalog strategy # dev_catalog.schema.table → development # staging_catalog.schema.table → staging # prod_catalog.schema.table → production # Code uses parameterized catalog name
- Environment isolation pattern — separate Unity Catalog catalogs per environment. Same schemas and tables, different catalogs.
Monitoring and Optimization
- -- Spark UI diagnosis checklist: -- 1. One task much slower → Data Skew (repartition/salt) -- 2. Spill to disk → Insufficient memory (increase executor memory) -- 3. High shuffle read/write → Minimize wide transformations -- 4. Many small tasks → Too many partitions (coalesce)
- Spark UI interpretation guide — match symptoms to root causes for quick diagnosis.
- -- AQE (Adaptive Query Execution) — 3 key optimizations: -- 1. Coalesce shuffle partitions (reduce small partitions) -- 2. Convert sort-merge join → broadcast join (if one side is small) -- 3. Handle skewed joins (split skewed partitions)
- AQE is enabled by default. It optimizes at runtime using actual statistics — EXPLAIN may not match actual plan.
- EXPLAIN EXTENDED SELECT * FROM orders JOIN customers ON orders.customer_id = customers.id WHERE orders.order_date > '2026-01-01'
- Analyze query plan — look for full table scans, missing predicate pushdown, suboptimal join strategies.
- -- Auto Compaction file sizing behavior: -- Default target: ~1 GB per file -- Streaming MERGE: auto-tunes to 32-48 MB per file -- This is EXPECTED — not a problem to fix
- Auto Compaction adapts file size to write patterns. Small micro-batch writes produce smaller files by design.
- ALTER TABLE my_table CLUSTER BY (region, date)
- Liquid Clustering — modern replacement for partitioning + Z-ORDER. Keys can be changed without full rewrite.
- -- Databricks SQL Alert for data quality -- 1. Create a query: SELECT COUNT(*) AS bad_rows FROM t WHERE amount < 0 -- 2. Set schedule: every 2 minutes -- 3. Set alert: trigger when bad_rows > 0 -- 4. Configure notification (email, Slack, PagerDuty)
- SQL alerts for monitoring — schedule queries and trigger notifications when thresholds are exceeded.
Data Governance and Security
- -- Column masking CREATE FUNCTION mask_ssn(ssn STRING) RETURNS STRING RETURN CASE WHEN is_member('compliance_team') THEN ssn ELSE 'XXX-XX-XXXX' END; ALTER TABLE customers ALTER COLUMN ssn SET MASK mask_ssn
- Column mask with group check — compliance team sees real SSN, everyone else sees masked value. Applied at query time.
- -- Row-level security CREATE FUNCTION region_filter(region STRING) RETURNS BOOLEAN RETURN ( is_member('global_access') OR region = current_user_attribute('region') ); ALTER TABLE sales SET ROW FILTER region_filter ON (region)
- Row filter — users only see rows matching their region attribute. Global access group bypasses the filter.
- -- Dynamic view for access control CREATE VIEW secure_customers AS SELECT customer_id, name, CASE WHEN is_member('pii_access') THEN email ELSE '***@***' END AS email, CASE WHEN is_member('pii_access') THEN phone ELSE '***-***-****' END AS phone FROM customers
- Dynamic view — alternative to column masking. Useful when you want to control access via a view rather than table-level masks.
- -- Secrets management password = dbutils.secrets.get(scope='my_scope', key='db_password') # print(password) → outputs '[REDACTED]' # But password variable contains the actual value for use in connections conn = connect(host='db.example.com', password=password)
- Secrets are redacted when printed but usable in code. The actual value is retrieved — only display is masked.
- -- Query metadata via information_schema SELECT table_catalog, table_schema, table_name, column_name FROM system.information_schema.columns WHERE table_catalog = 'prod_catalog' AND data_type = 'STRING'
- Query information_schema for metadata — find all string columns, audit permissions, list masking policies.
- -- GDPR deletion pattern DELETE FROM customers WHERE customer_id = 12345; -- Verify deletion SELECT * FROM customers WHERE customer_id = 12345; -- should return 0 rows -- Permanently remove with VACUUM VACUUM customers RETAIN 0 HOURS; -- Must set: delta.retentionDurationCheck.enabled = false
- GDPR right-to-erasure: DELETE removes from current version, VACUUM permanently removes old files. Verify with time travel.
- -- Audit logging SELECT * FROM system.access.audit WHERE action_name = 'getTable' AND request_params.full_name_arg = 'prod.main.customers' AND event_date > current_date() - 7
- Unity Catalog audit logs — track who accessed what data and when. Essential for compliance reporting.
Streaming Trigger Modes Comparison
- # Continuous processing (always running) .trigger(processingTime='10 seconds') # → Process micro-batches every 10 seconds # → Stream runs continuously # → Use for real-time pipelines
- Continuous trigger with processing interval. Stream never stops — processes micro-batches at the specified interval.
- # Process all available, then stop (PREFERRED) .trigger(availableNow=True) # → Splits backlog into multiple micro-batches # → Stops after all data processed # → Safe for large backlogs (no OOM)
- availableNow — processes incrementally across micro-batches. Use for scheduled batch-style streaming. Replaced once=True.
- # Process all available in ONE batch (LEGACY) .trigger(once=True) # → Single micro-batch for ALL data # → Can OOM on large backlogs # → DEPRECATED in favor of availableNow
- trigger(once=True) — processes everything at once. Deprecated. Use availableNow=True instead.
- # Default trigger (no trigger specified) # → Process next micro-batch as soon as previous completes # → Similar to processingTime='0 seconds' # → Continuous processing with no delay
- Default behavior — continuous processing with no pause between micro-batches. Maximizes throughput.
Production Job Configuration
- # Job cluster vs All-Purpose cluster # Job cluster: created per run, auto-terminates → COST EFFICIENT # All-Purpose: persists between runs → for interactive dev only # ALWAYS use job clusters for production jobs
- Job clusters are ephemeral and cost-effective. All-purpose clusters are for development only. Never use all-purpose for scheduled production jobs.
- # Task value passing between job tasks # Task 1 (Extract): dbutils.jobs.taskValues.set(key='row_count', value=1500) # Task 2 (Validate): count = dbutils.jobs.taskValues.get( taskKey='Extract', key='row_count' ) assert count > 0, 'No rows extracted'
- Pass lightweight values between tasks with taskValues. Recommended over Delta tables for simple values (counts, flags, status).
- # Maximum concurrent runs = 1 # → Prevents duplicate streaming jobs # → Critical for MERGE-based pipelines # → Overlapping MERGEs cause conflicts
- Set max concurrent runs to 1 for streaming and MERGE jobs. Overlapping runs on the same target table cause conflicts.
- # Retry configuration # max_retries: 3 (reasonable for transient failures) # min_retry_interval_millis: 30000 (30 sec backoff) # retry_on_timeout: true # timeout_seconds: 3600 (1 hour per task)
- Production retry settings — balance reliability with cost. Unlimited retries waste resources on persistent failures.
- # Job notification channels # on_start: optional (noisy for frequent jobs) # on_success: optional (useful for daily SLA tracking) # on_failure: REQUIRED (always alert on failures) # on_duration_threshold: useful for detecting slowdowns
- Configure notifications per job. At minimum, always set on_failure. Duration threshold catches performance degradation early.