DatabricksSpark Dev Associate81 concepts
Spark Dev Associate Cheat Sheet
Quick reference for the Databricks Certified Associate Developer for Apache Spark exam.
Quick Navigation
Spark Architecture EssentialsCreating DataFramesDataFrame Transformations — Core OperationsJoins and UnionsAggregations and GroupingColumn Operations and FunctionsWindow FunctionsSpark SQLReading and Writing DataStructured StreamingDelta Lake OperationsPerformance and OptimizationUDFs and Higher-Order Functions
Spark Architecture Essentials
- SparkSession: spark = SparkSession.builder.appName('myApp').getOrCreate()
- Single entry point for all Spark functionality. Replaces SparkContext, SQLContext, and HiveContext
- Execution hierarchy: Application > Job > Stage > Task
- Actions trigger jobs, shuffle boundaries create stages, each partition gets one task
- Driver: creates SparkSession, builds DAG, schedules tasks Executors: run tasks, store cached data, report results to driver
- Driver is a single point of failure. Executor failures are recoverable via lineage recomputation
- Client mode: driver runs on submission machine Cluster mode: driver runs on a cluster node
- Client mode for interactive dev (notebooks). Cluster mode for production jobs
- Parallelism = number of slots (cores) across all executors 4 executors x 2 cores = 8 tasks in parallel
- Each task runs on one core and processes one partition. More slots = more parallelism
- Transformations: lazy, build a plan (select, filter, join, groupBy) Actions: trigger execution (show, collect, count, write, head)
- Nothing executes until an action is called. Transformations return new DataFrames
Creating DataFrames
- df = spark.read.csv('/path/to/file.csv', header=True, inferSchema=True)
- Read CSV with header row and automatic type inference. inferSchema reads file twice — slow for large files
- df = spark.read.parquet('/path/to/data.parquet')
- Read Parquet — schema is embedded in the file, no inference needed
- df = spark.read.json('/path/to/data.json')
- Read JSON — schema inferred automatically. Use multiLine=True for multi-line JSON records
- df = spark.read.format('delta').load('/path/to/delta_table')
- Read a Delta Lake table. Supports time travel, ACID transactions, and schema evolution
- from pyspark.sql.types import * schema = StructType([ StructField('id', IntegerType(), False), StructField('name', StringType(), True), StructField('amount', DoubleType(), True) ]) df = spark.read.csv('/path', schema=schema)
- Explicit schema definition — faster than inferSchema and guarantees correct types
- df = spark.read.csv('/path', schema='id INT, name STRING, amount DOUBLE')
- DDL string shorthand for schema definition — simpler than StructType for basic schemas
- df = spark.createDataFrame([ (1, 'Alice', 100.0), (2, 'Bob', 200.0) ], ['id', 'name', 'amount'])
- Create DataFrame from Python list of tuples with column names
DataFrame Transformations — Core Operations
- df.select('id', 'name', 'amount')
- Select specific columns. Returns a new DataFrame with only these columns
- df.select(col('name'), col('amount') * 1.1)
- Select with column expressions. col() from pyspark.sql.functions enables arithmetic and logic
- df.filter(col('age') > 18) df.filter((col('status') == 'active') & (col('amount') > 100))
- Filter rows by condition. filter() and where() are identical. Use & (and), | (or), ~ (not) with parentheses
- df.withColumn('total', col('price') * col('quantity'))
- Add or replace a column. If 'total' already exists, it is overwritten
- df.withColumnRenamed('old_name', 'new_name')
- Rename a column. Returns a new DataFrame (original is unchanged)
- df.drop('unwanted_col')
- Remove a column. Returns a new DataFrame without the specified column
- df.distinct() df.dropDuplicates(['id', 'name'])
- distinct() removes fully duplicate rows. dropDuplicates() removes duplicates based on specified columns only
- df.orderBy(col('amount').desc()) df.sort('name', ascending=False)
- Sort DataFrame. orderBy() and sort() are identical. Triggers a shuffle (wide transformation)
Joins and Unions
- df1.join(df2, on='id', how='inner')
- Inner join — only matching rows from both sides. 'id' column appears once in result
- df1.join(df2, df1['id'] == df2['id'], 'left')
- Left join with expression. Both 'id' columns appear — use df1['id'] or df2['id'] to disambiguate
- Join types: inner — matching rows only left / left_outer — all left rows + matching right right / right_outer — all right rows + matching left full / full_outer — all rows from both sides cross — cartesian product (every row x every row) left_semi — left rows that have a match (no right columns) left_anti — left rows that have NO match
- Know all 7 join types. semi and anti are filtering joins — they don't add columns from the right side
- from pyspark.sql.functions import broadcast df1.join(broadcast(df2), on='id', how='inner')
- Broadcast join hint — replicate small df2 to all executors to avoid shuffling large df1
- df1.union(df2)
- Combine rows by POSITION (not column name). Both DataFrames must have the same number of columns
- df1.unionByName(df2, allowMissingColumns=True)
- Combine rows by COLUMN NAME. allowMissingColumns fills missing columns with null
Aggregations and Grouping
- from pyspark.sql.functions import count, sum, avg, max, min df.groupBy('category').agg( count('*').alias('total'), sum('amount').alias('total_amount'), avg('amount').alias('avg_amount') )
- Group by column and apply multiple aggregations. Only grouped and aggregated columns remain
- df.groupBy('category').count()
- Shorthand for counting rows per group. Equivalent to .agg(count('*'))
- from pyspark.sql.functions import collect_list, collect_set df.groupBy('user_id').agg( collect_list('item').alias('all_items'), collect_set('item').alias('unique_items') )
- Collect values into arrays. collect_list keeps duplicates, collect_set removes them
- df.groupBy('year', 'month').pivot('category').sum('amount')
- Pivot — rotate distinct values of 'category' column into separate columns with aggregated values
- from pyspark.sql.functions import approx_count_distinct df.groupBy('region').agg( approx_count_distinct('user_id').alias('unique_users') )
- Approximate distinct count — faster than countDistinct for large datasets
Column Operations and Functions
- from pyspark.sql.functions import col, lit, when, coalesce df.withColumn('status', when(col('amount') > 100, 'high') .when(col('amount') > 50, 'medium') .otherwise('low'))
- Conditional column creation with when/otherwise (equivalent to SQL CASE WHEN)
- df.withColumn('amount', col('amount').cast('double')) df.withColumn('date', col('date_str').cast('date'))
- Cast column to a different type. Common casts: string, int, double, date, timestamp
- df.withColumn('first_non_null', coalesce(col('a'), col('b'), col('c')))
- Returns the first non-null value from the given columns
- df.withColumn('constant', lit(42)) df.withColumn('null_col', lit(None).cast('string'))
- lit() creates a column with a constant value. Use for adding fixed values or explicit nulls
- df.na.fill(0) df.na.fill({'amount': 0, 'name': 'Unknown'}) df.na.drop() df.na.drop(subset=['id', 'name'])
- Null handling: fill replaces nulls, drop removes rows with nulls. Both accept column subsets
- df.filter(col('name').isNotNull()) df.filter(col('email').isNull())
- Filter by null/not-null. Use isNull() and isNotNull() — do NOT use == None
- from pyspark.sql.functions import explode, split, array_contains df.select('id', explode(col('tags')).alias('tag')) df.withColumn('words', split(col('text'), ' ')) df.filter(array_contains(col('tags'), 'python'))
- Array operations: explode flattens arrays into rows, split creates arrays, array_contains filters
Window Functions
- from pyspark.sql.window import Window from pyspark.sql.functions import row_number, rank, dense_rank window = Window.partitionBy('dept').orderBy(col('salary').desc()) df.withColumn('rank', row_number().over(window))
- ROW_NUMBER assigns unique sequential numbers. RANK leaves gaps after ties. DENSE_RANK has no gaps
- from pyspark.sql.functions import lag, lead window = Window.partitionBy('user_id').orderBy('date') df.withColumn('prev_amount', lag('amount', 1).over(window)) df.withColumn('next_amount', lead('amount', 1).over(window))
- LAG accesses previous row's value, LEAD accesses next row's value within the window
- from pyspark.sql.functions import sum as _sum window = Window.partitionBy('account').orderBy('date').rowsBetween(Window.unboundedPreceding, Window.currentRow) df.withColumn('running_total', _sum('amount').over(window))
- Running sum — accumulates values from the start of the partition to the current row
- -- SQL window functions SELECT *, ROW_NUMBER() OVER (PARTITION BY dept ORDER BY salary DESC) AS rn, RANK() OVER (PARTITION BY dept ORDER BY salary DESC) AS rnk, LAG(salary, 1) OVER (PARTITION BY dept ORDER BY hire_date) AS prev_salary FROM employees
- SQL syntax for window functions. Window functions do NOT reduce row count unlike GROUP BY
- -- Deduplication with window function SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY id ORDER BY updated_at DESC ) AS rn FROM my_table ) WHERE rn = 1
- Keep only the latest record per id. Must use subquery — window functions cannot appear in WHERE
Spark SQL
- df.createOrReplaceTempView('my_table') result = spark.sql('SELECT * FROM my_table WHERE amount > 100')
- Register DataFrame as temp view (session-scoped), then query with SQL. Result is a DataFrame
- df.createOrReplaceGlobalTempView('shared_view') spark.sql('SELECT * FROM global_temp.shared_view')
- Global temp view persists across sessions. Must use global_temp prefix to access
- SELECT id, name, CASE WHEN score >= 90 THEN 'A' WHEN score >= 80 THEN 'B' WHEN score >= 70 THEN 'C' ELSE 'F' END AS grade FROM students
- SQL CASE WHEN — equivalent to PySpark when().otherwise()
- -- Common Table Expression (CTE) WITH ranked AS ( SELECT *, ROW_NUMBER() OVER (PARTITION BY dept ORDER BY salary DESC) AS rn FROM employees ) SELECT * FROM ranked WHERE rn <= 3
- CTE for readable multi-step SQL queries. Essential for window function filtering
- SELECT /*+ BROADCAST(small_table) */ b.*, s.category_name FROM big_table b INNER JOIN small_table s ON b.category_id = s.id
- SQL broadcast join hint — same as PySpark broadcast() function
- spark.sql('SELECT * FROM t').filter(col('x') > 10).groupBy('cat').count()
- spark.sql() returns a DataFrame — chain DataFrame methods freely after SQL queries
Reading and Writing Data
- # Save modes df.write.mode('overwrite').parquet('/path') # Replace existing data df.write.mode('append').parquet('/path') # Add to existing data df.write.mode('errorIfExists').parquet('/path')# Fail if data exists (default) df.write.mode('ignore').parquet('/path') # Do nothing if data exists
- Four save modes. Default is errorIfExists. Use overwrite for full refresh, append for incremental
- df.write.partitionBy('year', 'month').parquet('/output')
- Partition output by column values — creates directory structure year=2026/month=01/. Enables partition pruning on reads
- df.write.format('delta').mode('overwrite').save('/delta/table')
- Write as Delta Lake format — adds transaction log, ACID, time travel, schema enforcement
- spark.read.option('header', 'true') .option('delimiter', '|') .option('quote', '"') .option('multiLine', 'true') .csv('/path/to/file.csv')
- CSV read options. header, delimiter, quote, escape, multiLine, nullValue, dateFormat
- # Number of output files = number of partitions df.coalesce(1).write.csv('/path') # Single file output df.repartition(10).write.csv('/path') # Exactly 10 files
- Control output file count via partitions. coalesce for reducing, repartition for exact count
Structured Streaming
- stream_df = spark.readStream.format('kafka') .option('kafka.bootstrap.servers', 'host:9092') .option('subscribe', 'topic') .load()
- Start a streaming source from Kafka. readStream returns a streaming DataFrame
- query = stream_df.writeStream .format('delta') .outputMode('append') .option('checkpointLocation', '/checkpoint/path') .trigger(processingTime='10 seconds') .start('/output/path')
- Write streaming results to Delta with 10-second micro-batches. Checkpoint required for fault tolerance
- Output modes: append — only new rows (default, no aggregation without watermark) complete — full result table rewritten each trigger (requires aggregation) update — only changed rows (requires aggregation)
- Choose based on query type: append for raw events, complete for dashboard totals, update for efficient aggregation output
- Triggers: processingTime='10 seconds' — micro-batch every 10s availableNow=True — process all available, then stop once=True — process one batch, then stop (deprecated) continuous='1 second' — experimental low-latency
- availableNow is the bridge between batch and streaming — processes all pending data with streaming benefits (checkpointing)
- from pyspark.sql.functions import window stream_df.withWatermark('event_time', '10 minutes') .groupBy(window('event_time', '5 minutes')) .count()
- Watermark handles late data — events arriving more than 10 minutes late are dropped. Required for windowed aggregations
Delta Lake Operations
- CREATE TABLE my_table (id INT, name STRING, amount DOUBLE) USING DELTA
- Create a managed Delta table. USING DELTA specifies the Delta Lake format
- DESCRIBE HISTORY my_table
- View table version history — operations, timestamps, users. Essential for auditing and time travel
- SELECT * FROM my_table VERSION AS OF 3 SELECT * FROM my_table TIMESTAMP AS OF '2026-01-01'
- Time travel — query historical table state by version number or timestamp
- RESTORE TABLE my_table TO VERSION AS OF 3
- Restore table to a previous version. Creates a new version with the old data
- MERGE INTO target AS t USING source AS s ON t.id = s.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *
- Upsert pattern — update existing rows, insert new ones. SET * and INSERT * use all columns
- OPTIMIZE my_table
- Compact small files into larger ones for faster reads. Run after many small appends
- VACUUM my_table RETAIN 168 HOURS
- Delete old data files beyond retention period (default 7 days). Breaks time travel for versions older than retention
- # Schema evolution on write df.write.option('mergeSchema', 'true').mode('append').format('delta').save('/path')
- Enable schema evolution to automatically add new columns. Without this, mismatched schemas fail
Performance and Optimization
- Narrow transformations (no shuffle): select, filter, withColumn, map, union Wide transformations (shuffle): join, groupBy, repartition, distinct, orderBy
- Wide transformations create stage boundaries and are the primary performance bottleneck
- spark.conf.set('spark.sql.shuffle.partitions', '50')
- Default is 200. Reduce for small datasets (avoid tiny partitions), increase for large (avoid oversized partitions)
- spark.conf.set('spark.sql.autoBroadcastJoinThreshold', '10485760')
- Default 10MB. Tables below this size are auto-broadcast in joins. Set to -1 to disable auto-broadcast
- df.cache() # Lazy! Caches on next action. Uses MEMORY_AND_DISK df.persist(StorageLevel.MEMORY_ONLY) # Choose storage level df.unpersist() # Eager — immediately releases cached data
- Cache reused DataFrames. cache() is lazy (requires action to materialize). unpersist() is eager
- df.repartition(100) # Full shuffle, even distribution, can increase or decrease df.repartition('key_col') # Repartition by column values (co-locate same keys) df.coalesce(10) # No full shuffle, merge adjacent partitions, can only decrease
- repartition for even distribution before joins. coalesce for reducing file count before writes
- Data skew diagnosis: one task takes 10-100x longer than others in same stage Fixes: salt the skewed key, repartition, use broadcast join, enable AQE
- More executors do NOT fix skew — the bottleneck is one oversized partition on a single executor
- df.explain(True) # Show parsed, analyzed, optimized, and physical plans
- Inspect query plan to understand optimizations (predicate pushdown, column pruning, join strategy)
- Adaptive Query Execution (AQE): spark.sql.adaptive.enabled = true (default) - Coalesces small shuffle partitions - Converts sort-merge join to broadcast if runtime stats show small table - Handles skewed joins automatically
- AQE adjusts plans at runtime based on actual data statistics. Enabled by default in Spark 3+
UDFs and Higher-Order Functions
- from pyspark.sql.functions import udf from pyspark.sql.types import StringType @udf(returnType=StringType()) def clean_text(s): return s.strip().lower() if s else None df.withColumn('cleaned', clean_text(col('text')))
- Python UDF with decorator syntax. Must specify return type. Slow due to Python serialization
- spark.udf.register('clean_text_sql', clean_text) spark.sql("SELECT clean_text_sql(text) FROM my_table")
- Register UDF for SQL use. Required for calling UDFs in spark.sql() queries
- # Pandas UDF (vectorized) — much faster than row-at-a-time UDFs from pyspark.sql.functions import pandas_udf import pandas as pd @pandas_udf('double') def multiply(a: pd.Series, b: pd.Series) -> pd.Series: return a * b df.withColumn('result', multiply(col('price'), col('qty')))
- Pandas UDFs process batches of rows as Pandas Series — 2-100x faster than standard UDFs
- # Higher-order functions (no UDF needed) from pyspark.sql.functions import transform, filter as array_filter, aggregate df.withColumn('doubled', transform(col('values'), lambda x: x * 2)) df.withColumn('positives', array_filter(col('values'), lambda x: x > 0))
- Built-in higher-order functions for arrays — faster than UDFs and optimized by Catalyst
- Why avoid UDFs: - Python UDFs bypass Tungsten and Catalyst optimizer - Data serialized JVM → Python → JVM for every row - Built-in functions are 2-100x faster - Use when() instead of UDF for conditional logic - Use expr() for complex SQL expressions
- Always prefer built-in functions over UDFs. Only use UDFs when no built-in alternative exists