CertPrepNowFREE
DatabricksSpark Dev Associate81 concepts

Spark Dev Associate Cheat Sheet

Quick reference for the Databricks Certified Associate Developer for Apache Spark exam.

Spark Architecture Essentials

SparkSession: spark = SparkSession.builder.appName('myApp').getOrCreate()
Single entry point for all Spark functionality. Replaces SparkContext, SQLContext, and HiveContext
Execution hierarchy: Application > Job > Stage > Task
Actions trigger jobs, shuffle boundaries create stages, each partition gets one task
Driver: creates SparkSession, builds DAG, schedules tasks Executors: run tasks, store cached data, report results to driver
Driver is a single point of failure. Executor failures are recoverable via lineage recomputation
Client mode: driver runs on submission machine Cluster mode: driver runs on a cluster node
Client mode for interactive dev (notebooks). Cluster mode for production jobs
Parallelism = number of slots (cores) across all executors 4 executors x 2 cores = 8 tasks in parallel
Each task runs on one core and processes one partition. More slots = more parallelism
Transformations: lazy, build a plan (select, filter, join, groupBy) Actions: trigger execution (show, collect, count, write, head)
Nothing executes until an action is called. Transformations return new DataFrames

Creating DataFrames

df = spark.read.csv('/path/to/file.csv', header=True, inferSchema=True)
Read CSV with header row and automatic type inference. inferSchema reads file twice — slow for large files
df = spark.read.parquet('/path/to/data.parquet')
Read Parquet — schema is embedded in the file, no inference needed
df = spark.read.json('/path/to/data.json')
Read JSON — schema inferred automatically. Use multiLine=True for multi-line JSON records
df = spark.read.format('delta').load('/path/to/delta_table')
Read a Delta Lake table. Supports time travel, ACID transactions, and schema evolution
from pyspark.sql.types import * schema = StructType([ StructField('id', IntegerType(), False), StructField('name', StringType(), True), StructField('amount', DoubleType(), True) ]) df = spark.read.csv('/path', schema=schema)
Explicit schema definition — faster than inferSchema and guarantees correct types
df = spark.read.csv('/path', schema='id INT, name STRING, amount DOUBLE')
DDL string shorthand for schema definition — simpler than StructType for basic schemas
df = spark.createDataFrame([ (1, 'Alice', 100.0), (2, 'Bob', 200.0) ], ['id', 'name', 'amount'])
Create DataFrame from Python list of tuples with column names

DataFrame Transformations — Core Operations

df.select('id', 'name', 'amount')
Select specific columns. Returns a new DataFrame with only these columns
df.select(col('name'), col('amount') * 1.1)
Select with column expressions. col() from pyspark.sql.functions enables arithmetic and logic
df.filter(col('age') > 18) df.filter((col('status') == 'active') & (col('amount') > 100))
Filter rows by condition. filter() and where() are identical. Use & (and), | (or), ~ (not) with parentheses
df.withColumn('total', col('price') * col('quantity'))
Add or replace a column. If 'total' already exists, it is overwritten
df.withColumnRenamed('old_name', 'new_name')
Rename a column. Returns a new DataFrame (original is unchanged)
df.drop('unwanted_col')
Remove a column. Returns a new DataFrame without the specified column
df.distinct() df.dropDuplicates(['id', 'name'])
distinct() removes fully duplicate rows. dropDuplicates() removes duplicates based on specified columns only
df.orderBy(col('amount').desc()) df.sort('name', ascending=False)
Sort DataFrame. orderBy() and sort() are identical. Triggers a shuffle (wide transformation)

Joins and Unions

df1.join(df2, on='id', how='inner')
Inner join — only matching rows from both sides. 'id' column appears once in result
df1.join(df2, df1['id'] == df2['id'], 'left')
Left join with expression. Both 'id' columns appear — use df1['id'] or df2['id'] to disambiguate
Join types: inner — matching rows only left / left_outer — all left rows + matching right right / right_outer — all right rows + matching left full / full_outer — all rows from both sides cross — cartesian product (every row x every row) left_semi — left rows that have a match (no right columns) left_anti — left rows that have NO match
Know all 7 join types. semi and anti are filtering joins — they don't add columns from the right side
from pyspark.sql.functions import broadcast df1.join(broadcast(df2), on='id', how='inner')
Broadcast join hint — replicate small df2 to all executors to avoid shuffling large df1
df1.union(df2)
Combine rows by POSITION (not column name). Both DataFrames must have the same number of columns
df1.unionByName(df2, allowMissingColumns=True)
Combine rows by COLUMN NAME. allowMissingColumns fills missing columns with null

Aggregations and Grouping

from pyspark.sql.functions import count, sum, avg, max, min df.groupBy('category').agg( count('*').alias('total'), sum('amount').alias('total_amount'), avg('amount').alias('avg_amount') )
Group by column and apply multiple aggregations. Only grouped and aggregated columns remain
df.groupBy('category').count()
Shorthand for counting rows per group. Equivalent to .agg(count('*'))
from pyspark.sql.functions import collect_list, collect_set df.groupBy('user_id').agg( collect_list('item').alias('all_items'), collect_set('item').alias('unique_items') )
Collect values into arrays. collect_list keeps duplicates, collect_set removes them
df.groupBy('year', 'month').pivot('category').sum('amount')
Pivot — rotate distinct values of 'category' column into separate columns with aggregated values
from pyspark.sql.functions import approx_count_distinct df.groupBy('region').agg( approx_count_distinct('user_id').alias('unique_users') )
Approximate distinct count — faster than countDistinct for large datasets

Column Operations and Functions

from pyspark.sql.functions import col, lit, when, coalesce df.withColumn('status', when(col('amount') > 100, 'high') .when(col('amount') > 50, 'medium') .otherwise('low'))
Conditional column creation with when/otherwise (equivalent to SQL CASE WHEN)
df.withColumn('amount', col('amount').cast('double')) df.withColumn('date', col('date_str').cast('date'))
Cast column to a different type. Common casts: string, int, double, date, timestamp
df.withColumn('first_non_null', coalesce(col('a'), col('b'), col('c')))
Returns the first non-null value from the given columns
df.withColumn('constant', lit(42)) df.withColumn('null_col', lit(None).cast('string'))
lit() creates a column with a constant value. Use for adding fixed values or explicit nulls
df.na.fill(0) df.na.fill({'amount': 0, 'name': 'Unknown'}) df.na.drop() df.na.drop(subset=['id', 'name'])
Null handling: fill replaces nulls, drop removes rows with nulls. Both accept column subsets
df.filter(col('name').isNotNull()) df.filter(col('email').isNull())
Filter by null/not-null. Use isNull() and isNotNull() — do NOT use == None
from pyspark.sql.functions import explode, split, array_contains df.select('id', explode(col('tags')).alias('tag')) df.withColumn('words', split(col('text'), ' ')) df.filter(array_contains(col('tags'), 'python'))
Array operations: explode flattens arrays into rows, split creates arrays, array_contains filters

Window Functions

from pyspark.sql.window import Window from pyspark.sql.functions import row_number, rank, dense_rank window = Window.partitionBy('dept').orderBy(col('salary').desc()) df.withColumn('rank', row_number().over(window))
ROW_NUMBER assigns unique sequential numbers. RANK leaves gaps after ties. DENSE_RANK has no gaps
from pyspark.sql.functions import lag, lead window = Window.partitionBy('user_id').orderBy('date') df.withColumn('prev_amount', lag('amount', 1).over(window)) df.withColumn('next_amount', lead('amount', 1).over(window))
LAG accesses previous row's value, LEAD accesses next row's value within the window
from pyspark.sql.functions import sum as _sum window = Window.partitionBy('account').orderBy('date').rowsBetween(Window.unboundedPreceding, Window.currentRow) df.withColumn('running_total', _sum('amount').over(window))
Running sum — accumulates values from the start of the partition to the current row
-- SQL window functions SELECT *, ROW_NUMBER() OVER (PARTITION BY dept ORDER BY salary DESC) AS rn, RANK() OVER (PARTITION BY dept ORDER BY salary DESC) AS rnk, LAG(salary, 1) OVER (PARTITION BY dept ORDER BY hire_date) AS prev_salary FROM employees
SQL syntax for window functions. Window functions do NOT reduce row count unlike GROUP BY
-- Deduplication with window function SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY id ORDER BY updated_at DESC ) AS rn FROM my_table ) WHERE rn = 1
Keep only the latest record per id. Must use subquery — window functions cannot appear in WHERE

Spark SQL

df.createOrReplaceTempView('my_table') result = spark.sql('SELECT * FROM my_table WHERE amount > 100')
Register DataFrame as temp view (session-scoped), then query with SQL. Result is a DataFrame
df.createOrReplaceGlobalTempView('shared_view') spark.sql('SELECT * FROM global_temp.shared_view')
Global temp view persists across sessions. Must use global_temp prefix to access
SELECT id, name, CASE WHEN score >= 90 THEN 'A' WHEN score >= 80 THEN 'B' WHEN score >= 70 THEN 'C' ELSE 'F' END AS grade FROM students
SQL CASE WHEN — equivalent to PySpark when().otherwise()
-- Common Table Expression (CTE) WITH ranked AS ( SELECT *, ROW_NUMBER() OVER (PARTITION BY dept ORDER BY salary DESC) AS rn FROM employees ) SELECT * FROM ranked WHERE rn <= 3
CTE for readable multi-step SQL queries. Essential for window function filtering
SELECT /*+ BROADCAST(small_table) */ b.*, s.category_name FROM big_table b INNER JOIN small_table s ON b.category_id = s.id
SQL broadcast join hint — same as PySpark broadcast() function
spark.sql('SELECT * FROM t').filter(col('x') > 10).groupBy('cat').count()
spark.sql() returns a DataFrame — chain DataFrame methods freely after SQL queries

Reading and Writing Data

# Save modes df.write.mode('overwrite').parquet('/path') # Replace existing data df.write.mode('append').parquet('/path') # Add to existing data df.write.mode('errorIfExists').parquet('/path')# Fail if data exists (default) df.write.mode('ignore').parquet('/path') # Do nothing if data exists
Four save modes. Default is errorIfExists. Use overwrite for full refresh, append for incremental
df.write.partitionBy('year', 'month').parquet('/output')
Partition output by column values — creates directory structure year=2026/month=01/. Enables partition pruning on reads
df.write.format('delta').mode('overwrite').save('/delta/table')
Write as Delta Lake format — adds transaction log, ACID, time travel, schema enforcement
spark.read.option('header', 'true') .option('delimiter', '|') .option('quote', '"') .option('multiLine', 'true') .csv('/path/to/file.csv')
CSV read options. header, delimiter, quote, escape, multiLine, nullValue, dateFormat
# Number of output files = number of partitions df.coalesce(1).write.csv('/path') # Single file output df.repartition(10).write.csv('/path') # Exactly 10 files
Control output file count via partitions. coalesce for reducing, repartition for exact count

Structured Streaming

stream_df = spark.readStream.format('kafka') .option('kafka.bootstrap.servers', 'host:9092') .option('subscribe', 'topic') .load()
Start a streaming source from Kafka. readStream returns a streaming DataFrame
query = stream_df.writeStream .format('delta') .outputMode('append') .option('checkpointLocation', '/checkpoint/path') .trigger(processingTime='10 seconds') .start('/output/path')
Write streaming results to Delta with 10-second micro-batches. Checkpoint required for fault tolerance
Output modes: append — only new rows (default, no aggregation without watermark) complete — full result table rewritten each trigger (requires aggregation) update — only changed rows (requires aggregation)
Choose based on query type: append for raw events, complete for dashboard totals, update for efficient aggregation output
Triggers: processingTime='10 seconds' — micro-batch every 10s availableNow=True — process all available, then stop once=True — process one batch, then stop (deprecated) continuous='1 second' — experimental low-latency
availableNow is the bridge between batch and streaming — processes all pending data with streaming benefits (checkpointing)
from pyspark.sql.functions import window stream_df.withWatermark('event_time', '10 minutes') .groupBy(window('event_time', '5 minutes')) .count()
Watermark handles late data — events arriving more than 10 minutes late are dropped. Required for windowed aggregations

Delta Lake Operations

CREATE TABLE my_table (id INT, name STRING, amount DOUBLE) USING DELTA
Create a managed Delta table. USING DELTA specifies the Delta Lake format
DESCRIBE HISTORY my_table
View table version history — operations, timestamps, users. Essential for auditing and time travel
SELECT * FROM my_table VERSION AS OF 3 SELECT * FROM my_table TIMESTAMP AS OF '2026-01-01'
Time travel — query historical table state by version number or timestamp
RESTORE TABLE my_table TO VERSION AS OF 3
Restore table to a previous version. Creates a new version with the old data
MERGE INTO target AS t USING source AS s ON t.id = s.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *
Upsert pattern — update existing rows, insert new ones. SET * and INSERT * use all columns
OPTIMIZE my_table
Compact small files into larger ones for faster reads. Run after many small appends
VACUUM my_table RETAIN 168 HOURS
Delete old data files beyond retention period (default 7 days). Breaks time travel for versions older than retention
# Schema evolution on write df.write.option('mergeSchema', 'true').mode('append').format('delta').save('/path')
Enable schema evolution to automatically add new columns. Without this, mismatched schemas fail

Performance and Optimization

Narrow transformations (no shuffle): select, filter, withColumn, map, union Wide transformations (shuffle): join, groupBy, repartition, distinct, orderBy
Wide transformations create stage boundaries and are the primary performance bottleneck
spark.conf.set('spark.sql.shuffle.partitions', '50')
Default is 200. Reduce for small datasets (avoid tiny partitions), increase for large (avoid oversized partitions)
spark.conf.set('spark.sql.autoBroadcastJoinThreshold', '10485760')
Default 10MB. Tables below this size are auto-broadcast in joins. Set to -1 to disable auto-broadcast
df.cache() # Lazy! Caches on next action. Uses MEMORY_AND_DISK df.persist(StorageLevel.MEMORY_ONLY) # Choose storage level df.unpersist() # Eager — immediately releases cached data
Cache reused DataFrames. cache() is lazy (requires action to materialize). unpersist() is eager
df.repartition(100) # Full shuffle, even distribution, can increase or decrease df.repartition('key_col') # Repartition by column values (co-locate same keys) df.coalesce(10) # No full shuffle, merge adjacent partitions, can only decrease
repartition for even distribution before joins. coalesce for reducing file count before writes
Data skew diagnosis: one task takes 10-100x longer than others in same stage Fixes: salt the skewed key, repartition, use broadcast join, enable AQE
More executors do NOT fix skew — the bottleneck is one oversized partition on a single executor
df.explain(True) # Show parsed, analyzed, optimized, and physical plans
Inspect query plan to understand optimizations (predicate pushdown, column pruning, join strategy)
Adaptive Query Execution (AQE): spark.sql.adaptive.enabled = true (default) - Coalesces small shuffle partitions - Converts sort-merge join to broadcast if runtime stats show small table - Handles skewed joins automatically
AQE adjusts plans at runtime based on actual data statistics. Enabled by default in Spark 3+

UDFs and Higher-Order Functions

from pyspark.sql.functions import udf from pyspark.sql.types import StringType @udf(returnType=StringType()) def clean_text(s): return s.strip().lower() if s else None df.withColumn('cleaned', clean_text(col('text')))
Python UDF with decorator syntax. Must specify return type. Slow due to Python serialization
spark.udf.register('clean_text_sql', clean_text) spark.sql("SELECT clean_text_sql(text) FROM my_table")
Register UDF for SQL use. Required for calling UDFs in spark.sql() queries
# Pandas UDF (vectorized) — much faster than row-at-a-time UDFs from pyspark.sql.functions import pandas_udf import pandas as pd @pandas_udf('double') def multiply(a: pd.Series, b: pd.Series) -> pd.Series: return a * b df.withColumn('result', multiply(col('price'), col('qty')))
Pandas UDFs process batches of rows as Pandas Series — 2-100x faster than standard UDFs
# Higher-order functions (no UDF needed) from pyspark.sql.functions import transform, filter as array_filter, aggregate df.withColumn('doubled', transform(col('values'), lambda x: x * 2)) df.withColumn('positives', array_filter(col('values'), lambda x: x > 0))
Built-in higher-order functions for arrays — faster than UDFs and optimized by Catalyst
Why avoid UDFs: - Python UDFs bypass Tungsten and Catalyst optimizer - Data serialized JVM → Python → JVM for every row - Built-in functions are 2-100x faster - Use when() instead of UDF for conditional logic - Use expr() for complex SQL expressions
Always prefer built-in functions over UDFs. Only use UDFs when no built-in alternative exists

Ready to test yourself?

Start a timed Spark Dev Associate mock exam or review practice questions by domain.