DatabricksML Associate51 concepts
ML Associate Cheat Sheet
Quick reference for the Databricks Certified Machine Learning Associate exam.
Quick Navigation
MLflow Experiment Tracking
- import mlflow with mlflow.start_run(run_name='my_experiment'): mlflow.log_param('learning_rate', 0.01) mlflow.log_metric('rmse', 2.45) mlflow.log_artifact('plot.png') mlflow.sklearn.log_model(model, 'model')
- Complete MLflow tracking workflow: start a run, log parameters, metrics, artifacts, and the trained model
- mlflow.autolog() # Enable for all supported frameworks mlflow.sklearn.autolog() # Enable for scikit-learn only mlflow.xgboost.autolog() # Enable for XGBoost only
- Autologging — automatically logs parameters, metrics, and models during training. No manual log calls needed
- with mlflow.start_run(run_name='grid_search'): for params in param_grid: with mlflow.start_run(nested=True): mlflow.log_params(params) # train model... mlflow.log_metric('rmse', rmse)
- Nested runs — create parent-child hierarchy for hyperparameter search. nested=True is required for child runs
- runs_df = mlflow.search_runs( experiment_ids=['123'], filter_string='metrics.rmse < 3.0', order_by=['metrics.rmse ASC'] )
- Programmatically search and filter runs by metrics. Returns a pandas DataFrame for analysis
- Autologging does NOT create nested runs. nested=True is required for parent-child hierarchy. Autologging logs flat runs with params and metrics only.
- Key distinction: autologging vs nested runs serve different purposes
MLflow Model Registry
- mlflow.register_model( f'runs:/{best_run_id}/model', 'production_classifier' )
- Register a model from an experiment run. URI format: 'runs:/{run_id}/artifact_path' (forward slash after 'runs:' is required)
- # Also register during logging: mlflow.sklearn.log_model( model, 'model', registered_model_name='production_classifier' )
- Register a model directly during logging — creates a new version if the model name already exists
- from mlflow.tracking import MlflowClient client = MlflowClient() client.transition_model_version_stage( name='production_classifier', version=3, stage='Production' )
- Transition a model version between stages: None -> Staging -> Production -> Archived
- # Load model for inference model = mlflow.pyfunc.load_model( 'models:/production_classifier/Production' )
- Load a model from the registry by name and stage. Use 'models:/' prefix (not 'runs:/')
- URI Formats: - runs:/{run_id}/artifact → reference a run artifact - models:/{name}/{stage} → reference a registered model by stage - models:/{name}/{version} → reference a registered model by version
- Know these URI formats — the exam tests the exact syntax including forward slashes
Spark ML Pipeline Construction
- from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler( inputCols=['age', 'income', 'score'], outputCol='features' ) df_assembled = assembler.transform(df)
- VectorAssembler — REQUIRED for all Spark ML models. Combines multiple columns into a single Vector column
- from pyspark.ml.feature import StringIndexer, OneHotEncoder # Step 1: String → Index (REQUIRED before OneHotEncoder) indexer = StringIndexer(inputCol='category', outputCol='category_idx') # Step 2: Index → One-Hot Vector encoder = OneHotEncoder(inputCol='category_idx', outputCol='category_vec')
- StringIndexer → OneHotEncoder chain. OneHotEncoder requires numeric indices — direct string input fails
- from pyspark.ml import Pipeline from pyspark.ml.regression import LinearRegression pipeline = Pipeline(stages=[ indexer, # StringIndexer encoder, # OneHotEncoder assembler, # VectorAssembler LinearRegression(featuresCol='features', labelCol='price') ]) model = pipeline.fit(train_df) predictions = model.transform(test_df)
- Complete Spark ML Pipeline: chain transformers and estimator. fit() trains all stages, transform() applies them
- from pyspark.ml.feature import StandardScaler scaler = StandardScaler( inputCol='features', outputCol='scaled_features', withMean=True, withStd=True ) scaler_model = scaler.fit(train_df) scaled_df = scaler_model.transform(train_df)
- StandardScaler — fit() learns mean/std from data, transform() applies scaling. Fit on training data ONLY
- from pyspark.ml.feature import Imputer imputer = Imputer( inputCols=['col1', 'col2'], outputCols=['col1_imp', 'col2_imp'], strategy='median' # or 'mean', 'mode' ) imputer_model = imputer.fit(df) # Learn statistics imputed_df = imputer_model.transform(df) # Apply
- Imputer — fit() computes median/mean, transform() replaces NaN. Must call fit() first or transform() fails
Feature Engineering Patterns
- from pyspark.ml.feature import Bucketizer bucketizer = Bucketizer( splits=[0, 18, 35, 55, 100], inputCol='age', outputCol='age_bucket' )
- Bucketizer — bin continuous features into discrete buckets. Splits define bucket boundaries
- # Median imputation (robust to outliers) # Use median when features have significant outliers # Use mean when features are normally distributed imputer = Imputer(strategy='median') # Binary missingness indicator df = df.withColumn('col1_missing', col('col1').isNull().cast('int'))
- Missing value handling: median for skewed/outlier data, mean for normal. Add missingness indicators to preserve information
- # One-hot encoding: Use for NOMINAL categories (no order) # Ordinal encoding: Use for ORDINAL categories (has order) # Do NOT pre-compute one-hot in Feature Store # Different models need different encoding strategies
- Encoding strategy: defer one-hot encoding to model pipelines, not Feature Store. Tree-based models don't need it
- # Data leakage prevention: # WRONG: scaler.fit(all_data) → split # RIGHT: split → scaler.fit(train_only) → transform(both) # Best: use Pipeline inside CrossValidator # Pipeline ensures fit() only on training folds pipeline = Pipeline(stages=[scaler, model]) cv = CrossValidator(estimator=pipeline, ...)
- Data leakage — always fit preprocessing on training data only. Pipeline inside CV handles this automatically
- # Pandas UDFs — 10-100x faster than standard UDFs from pyspark.sql.functions import pandas_udf @pandas_udf('double') def normalize(s: pd.Series) -> pd.Series: return (s - s.mean()) / s.std() df = df.withColumn('normalized', normalize(col('value')))
- Pandas UDFs process data in Arrow batches — drastically reduces JVM-Python serialization overhead
- # Rank-based encoding requires global shuffle — slow # Log transformation is element-wise — fast, no shuffle # MinMax scaling needs min/max aggregation — moderate # Standard scaling needs mean/std aggregation — moderate
- Distributed efficiency: element-wise ops (log) are fastest. Global ordering (rank) requires expensive shuffles
Model Evaluation Metrics
- from pyspark.ml.evaluation import RegressionEvaluator evaluator = RegressionEvaluator( labelCol='actual', predictionCol='prediction', metricName='rmse' # or 'mae', 'r2', 'mse' ) rmse = evaluator.evaluate(predictions_df)
- RegressionEvaluator — compute RMSE, MAE, R², MSE for regression models on Spark DataFrames
- from pyspark.ml.evaluation import ( BinaryClassificationEvaluator, MulticlassClassificationEvaluator ) # Binary classification auc_eval = BinaryClassificationEvaluator(metricName='areaUnderROC') # Multiclass classification f1_eval = MulticlassClassificationEvaluator(metricName='f1') acc_eval = MulticlassClassificationEvaluator(metricName='accuracy')
- Classification evaluators — BinaryClassificationEvaluator for AUC, MulticlassClassificationEvaluator for F1/accuracy
- Regression Metrics: - RMSE: penalizes large errors (sensitive to outliers) - MAE: average absolute error (robust to outliers) - R²: proportion of variance explained (0-1, higher is better) - MSE: mean squared error (RMSE squared) Classification Metrics: - Accuracy: overall correct predictions (misleading if imbalanced) - Precision: of predicted positives, how many are truly positive - Recall: of actual positives, how many are correctly found - F1: harmonic mean of precision and recall
- Metric selection guide — know when to use each metric. F1 is ONLY for classification, never regression
- Imbalanced Data (e.g., 2% fraud): - Accuracy is MISLEADING (predicting majority = 98% accuracy) - Use F1 score or Recall instead - Recall when catching ALL positives matters most - F1 when balancing precision and recall
- For imbalanced datasets, accuracy is deceptive. Prioritize F1 or recall depending on business requirements
- # Log-transformed target: exponentiate BEFORE computing metrics import numpy as np predictions_original = np.exp(log_predictions) actuals_original = np.exp(log_actuals) rmse = np.sqrt(np.mean((predictions_original - actuals_original)**2))
- When the target was log-transformed, inverse-transform predictions before computing metrics on the original scale
Hyperparameter Tuning
- from pyspark.ml.tuning import CrossValidator, ParamGridBuilder paramGrid = ParamGridBuilder() \ .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \ .addGrid(lr.maxIter, [50, 100]) \ .build() cv = CrossValidator( estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3 ) cv_model = cv.fit(train_df)
- CrossValidator with ParamGridBuilder — exhaustive grid search with k-fold CV. Total models = combinations x folds
- from pyspark.ml.tuning import TrainValidationSplit tvs = TrainValidationSplit( estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, trainRatio=0.8 # 80% train, 20% validation ) tvs_model = tvs.fit(train_df)
- TrainValidationSplit — single train/validation split. Faster than CV (trains 1 model per combo vs k models)
- from hyperopt import fmin, tpe, hp, SparkTrials def objective(params): model = train_model(params) rmse = evaluate(model) return rmse # Hyperopt MINIMIZES this best = fmin( fn=objective, space={'lr': hp.uniform('lr', 0.001, 0.1), 'depth': hp.choice('depth', [3, 5, 7])}, algo=tpe.suggest, # Bayesian (TPE) max_evals=50, trials=SparkTrials(parallelism=4) )
- Hyperopt with SparkTrials — distributed Bayesian optimization. fmin MINIMIZES the objective by default
- # Hyperopt MINIMIZES — negate metrics you want to MAXIMIZE def objective(params): accuracy = train_and_evaluate(params) return -accuracy # Negate for maximization # Or use 'loss' key: return {'loss': -accuracy, 'status': STATUS_OK}
- Critical trap: Hyperopt minimizes. Return -accuracy (negative) when maximizing. Without negation, it finds the WORST model
- Grid Search vs Random vs Bayesian: - Grid: exhaustive, all combos. Best for small spaces (2-3 params) - Random: samples randomly. Better for large spaces - Bayesian (TPE): learns from past trials. Best for expensive models Total models trained: - CrossValidator: combinations × folds (e.g., 9 × 3 = 27) - TrainValidationSplit: combinations × 1 (e.g., 9)
- Search strategy comparison and computational cost calculation — common exam question
- # Too much parallelism hurts Bayesian optimization # TPE needs past results to propose better combinations # Rule: parallelism = sqrt(max_evals) is a good start # If optimization stalls, REDUCE parallelism
- Bayesian optimization with excessive parallelism degrades to random search — reduce parallel workers to allow learning
Model Deployment & Batch Inference
- # Batch inference with spark_udf import mlflow.pyfunc predict_udf = mlflow.pyfunc.spark_udf( spark, model_uri='models:/my_model/Production' ) predictions = df.withColumn( 'prediction', predict_udf(*feature_columns) )
- Batch inference — wrap MLflow model as Spark UDF for distributed scoring across partitions
- # Distributed inference with mapInPandas def predict_batch(iterator): model = load_model() # Loaded ONCE per executor for batch_df in iterator: batch_df['prediction'] = model.predict(batch_df) yield batch_df result = spark_df.mapInPandas( predict_batch, schema=output_schema )
- mapInPandas — apply a function to each partition. Model loaded once per executor and reused across batches
- # Feature Store inference — only pass primary key from databricks.feature_store import FeatureStoreClient fs = FeatureStoreClient() predictions = fs.score_batch( model_uri='models:/my_model/Production', df=df_with_keys_only # Only needs customer_id )
- Feature Store auto-retrieves features by primary key during inference — no need to manually join feature tables
- # Train/test split in Spark train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
- DataFrame.randomSplit() — the standard way to split data in Spark. Set seed for reproducibility
- Batch vs Real-time: - Batch: large datasets, periodic scoring. Use spark_udf or mapInPandas - Real-time: individual requests, low latency. Use Model Serving endpoints - Never use serving endpoints for batch workloads
- Choose the right inference pattern based on the use case — exam tests this distinction
AutoML & Data Exploration
- from databricks import automl summary = automl.regress( dataset=train_df, target_col='price', timeout_minutes=30 )
- Databricks AutoML — automatically trains multiple models, tunes hyperparameters, and generates editable notebooks
- # AutoML handles automatically: # - Feature scaling and normalization # - Hyperparameter optimization # - Cross-validation and model evaluation # - Model logging to MLflow # AutoML does NOT handle: # - Exploratory Data Analysis (EDA) # - Data understanding and quality assessment # - Feature selection decisions
- Know what AutoML does and does NOT do. EDA must be performed separately before running AutoML
- # Quick data profiling dbutils.data(df).summarize() # Histograms + stats df.describe() # Count, mean, stddev, min, max df.summary() # Extended statistics
- Data exploration methods — dbutils.data().summarize() generates visual histograms, describe() returns summary statistics
- # Feature Store basics from databricks.feature_store import FeatureStoreClient fs = FeatureStoreClient() table_info = fs.get_table('sales_features') print(table_info.description) # .description for metadata
- Feature Store — get_table() returns metadata. Use .description to access the documentation text
pandas API on Spark & DataFrame Operations
- import pyspark.pandas as ps # Convert Spark DF to pandas API on Spark DF ps_df = ps.DataFrame(spark_df) # Or read directly ps_df = ps.read_csv('/path/to/data.csv') # Use familiar pandas syntax ps_df.groupby('category').mean() ps_df['new_col'] = ps_df['col1'] * 2
- pandas API on Spark — pandas-compatible syntax on distributed Spark. Minimal code changes from pandas
- # Spark DataFrame filtering (exam-tested syntax) df.filter(col('amount') > 0) # Correct df.filter(col('status') == 'active') # Correct # NOT valid Spark syntax: df[df['amount'] > 0] # pandas syntax, not Spark df.loc[df['amount'] > 0] # pandas syntax, not Spark
- Spark DataFrame uses .filter() with col() — NOT bracket indexing or .loc[] (those are pandas-only)
- # groupBy + aggregation from pyspark.sql.functions import avg, count, sum as spark_sum df.groupBy('category').agg( count('*').alias('total'), avg('amount').alias('avg_amount') )
- Spark DataFrame aggregation — use groupBy().agg() with SQL functions, not pandas-style methods
- # Grouped apply with applyInPandas def train_per_group(pdf): model = fit_model(pdf) pdf['prediction'] = model.predict(pdf) return pdf df.groupBy('segment_id').applyInPandas( train_per_group, schema=output_schema )
- applyInPandas — train separate models per group. Each group processed as a pandas DataFrame
- pandas API on Spark DataFrames: - ARE distributed (backed by Spark) - Wrap Spark DataFrames with pandas-like methods - Do NOT require collect() — they run on the cluster - Use ps.DataFrame(spark_df) to convert
- Key fact: pandas API on Spark is distributed — it is NOT a local pandas DataFrame. No collect() needed
ML Fundamentals Quick Reference
- Supervised Learning: - Classification: predict categories (spam/not spam) - Regression: predict continuous values (house price) Unsupervised Learning: - Clustering: group similar items (customer segments) - Dimensionality Reduction: reduce features (PCA)
- Learning type classification — the most basic ML concept tested on the exam
- Ensemble Methods: - Bagging (Random Forest): parallel trees, reduces VARIANCE → Bootstrap samples + random feature subsets + average - Boosting (XGBoost, GBT): sequential trees, reduces BIAS → Each tree corrects previous errors → cannot parallelize iterations
- Bagging vs Boosting — Random Forest = bagging. XGBoost/GBT = boosting. Know which reduces what
- Bias-Variance Tradeoff: - High Bias → Underfitting (model too simple) → Increase complexity, add features, reduce regularization - High Variance → Overfitting (model too complex) → Simplify model, add regularization, get more data - Goal: minimize total error = bias² + variance + noise
- Bias = systematic error (underfitting), Variance = sensitivity to training data (overfitting)
- Cross-Validation: - Overall CV score = mean of fold scores - Example: [2.5, 3.1, 2.8, 3.4, 2.7] → mean = 2.9 - NOT the min, max, or sum Grid Search Total: - 3 values × 3 values × 3 folds = 27 total models
- CV score calculation and grid search model count — frequently tested arithmetic
- Mixture of Experts: - Route predictions through different models based on input features - Model A for condition X, Model B for condition Y - Also called conditional ensemble learning
- Routing different inputs to specialized models based on feature values