Quick Navigation
Advanced MLflow Tracking
- import mlflow with mlflow.start_run(run_name='tuning_parent'): mlflow.log_param('search_strategy', 'bayesian') for lr in [0.01, 0.1, 0.5]: with mlflow.start_run(nested=True): mlflow.log_param('learning_rate', lr) mlflow.log_metric('rmse', train(lr))
- Nested runs for hyperparameter tuning — parent run groups child runs for each parameter combination
- mlflow.autolog() # Enable for sklearn, spark, pytorch, xgboost, lightgbm # Or framework-specific: mlflow.sklearn.autolog(log_models=True, log_input_examples=True) mlflow.spark.autolog() mlflow.pytorch.autolog()
- Autologging — automatically logs parameters, metrics, and models without explicit log calls
- from mlflow.tracking import MlflowClient client = MlflowClient() # Search runs by metric runs = client.search_runs( experiment_ids=['1'], filter_string='metrics.rmse < 0.5', order_by=['metrics.rmse ASC'], max_results=5 )
- Search and compare runs programmatically — filter by metrics, parameters, or tags
- mlflow.log_metric('loss', 0.5, step=1) mlflow.log_metric('loss', 0.3, step=2) mlflow.log_metric('loss', 0.1, step=3) # Creates a metric history for plotting training curves
- Step-based metric logging — track metrics over training iterations for loss curves
- mlflow.set_tag('model_type', 'xgboost') mlflow.set_tag('team', 'fraud_detection') mlflow.log_dict({'feature_importance': importance_dict}, 'feature_importance.json') mlflow.log_artifact('confusion_matrix.png')
- Tags, dictionaries, and artifacts — organize runs and store supplementary outputs
MLflow Model Registry
- import mlflow # Register during logging mlflow.sklearn.log_model(model, 'model', registered_model_name='fraud_detector') # Or register after the fact result = mlflow.register_model( model_uri='runs:/abc123/model', name='fraud_detector' )
- Register a model — creates a new version automatically if the model name already exists
- from mlflow.tracking import MlflowClient client = MlflowClient() # Transition model version stage client.transition_model_version_stage( name='fraud_detector', version=3, stage='Production', archive_existing_versions=True # Auto-archive old Production version )
- Stage transitions — move models through None → Staging → Production → Archived lifecycle
- # Unity Catalog model registry (3-level namespace) import mlflow mlflow.set_registry_uri('databricks-uc') mlflow.register_model( model_uri='runs:/abc123/model', name='catalog.schema.fraud_detector' # 3-level namespace )
- Unity Catalog Model Registry — register models with catalog.schema.model namespace for governance
- # Load model by stage model = mlflow.pyfunc.load_model('models:/fraud_detector/Production') # Load by version model = mlflow.pyfunc.load_model('models:/fraud_detector/3') # Load by alias (UC) model = mlflow.pyfunc.load_model('models:/catalog.schema.fraud_detector@champion')
- Load registered models — by stage name, version number, or alias (Unity Catalog)
- # Set up webhook for stage transition from databricks.sdk import WorkspaceClient w = WorkspaceClient() # Model Registry webhooks trigger CI/CD actions # when models transition between stages client.set_model_version_tag( name='fraud_detector', version='3', key='validation_status', value='passed' )
- Model version tags — annotate versions with validation results, approval status, or deployment info
Hyperparameter Tuning with Hyperopt
- from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK search_space = { 'max_depth': hp.choice('max_depth', [3, 5, 7, 10]), 'learning_rate': hp.loguniform('lr', -5, 0), 'n_estimators': hp.quniform('n_est', 50, 500, 50), 'subsample': hp.uniform('subsample', 0.5, 1.0) }
- Define search space — hp.choice (categorical), hp.uniform (continuous), hp.loguniform (log-scale), hp.quniform (discrete)
- def objective(params): params['n_estimators'] = int(params['n_estimators']) model = XGBClassifier(**params) score = cross_val_score(model, X, y, cv=3, scoring='f1').mean() return {'loss': -score, 'status': STATUS_OK} with mlflow.start_run(): best = fmin(fn=objective, space=search_space, algo=tpe.suggest, max_evals=100, trials=SparkTrials(parallelism=8))
- Distributed tuning with SparkTrials — parallelizes trials across 8 Spark workers
- # SparkTrials parallelism guidelines: # parallelism = number of concurrent trials # Set to number of executor cores / cores per trial # Higher parallelism = more exploration but less exploitation trials = SparkTrials(parallelism=4) # 4 trials run simultaneously # Each trial trains ONE complete model on the driver or a worker
- SparkTrials parallelism — controls how many hyperparameter combinations are evaluated simultaneously
- from hyperopt import space_eval # Get best parameters in original format best_params = space_eval(search_space, best) print(f'Best: {best_params}') # Access trial results for trial in trials.trials: print(trial['result']['loss'], trial['misc']['vals'])
- Extract results — space_eval converts Hyperopt indices back to actual parameter values
- # Conditional search space search_space = hp.choice('model_type', [ {'type': 'rf', 'n_estimators': hp.quniform('rf_n', 50, 500, 50), 'max_depth': hp.choice('rf_depth', [5, 10, 20])}, {'type': 'xgb', 'learning_rate': hp.loguniform('xgb_lr', -5, 0), 'max_depth': hp.choice('xgb_depth', [3, 5, 7])} ])
- Conditional/hierarchical search spaces — different parameters based on model type selection
Feature Store
- from databricks.feature_engineering import FeatureEngineeringClient fe = FeatureEngineeringClient() # Create feature table fe.create_table( name='catalog.schema.user_features', primary_keys=['user_id'], timestamp_keys=['event_timestamp'], # For point-in-time lookups df=features_df, description='User behavioral features for fraud detection' )
- Create a feature table with timestamp key for point-in-time correctness
- from databricks.feature_engineering import FeatureLookup # Point-in-time lookup for training data training_set = fe.create_training_set( df=events_df, # Must have user_id and event_timestamp feature_lookups=[ FeatureLookup( table_name='catalog.schema.user_features', lookup_key='user_id', timestamp_lookup_key='event_timestamp' # Point-in-time! ) ], label='is_fraud' ) training_df = training_set.load_df()
- Point-in-time lookup — joins features as they existed at event time, preventing data leakage
- # Train and log model with feature lookups with mlflow.start_run(): model = train_model(training_df) fe.log_model( model=model, artifact_path='model', flavor=mlflow.sklearn, training_set=training_set, # Stores feature lookup metadata registered_model_name='fraud_model' )
- Log model with Feature Store — model automatically looks up features at inference time
- # Batch inference — automatically looks up features predictions = fe.score_batch( model_uri='models:/fraud_model/Production', df=new_events_df # Only needs primary key columns ) # Online serving — endpoint auto-fetches features # Just send {"user_id": "123"} to the endpoint
- Score with automatic feature lookup — batch or real-time, model fetches features automatically
- # Update feature table fe.write_table( name='catalog.schema.user_features', df=updated_features_df, mode='merge' # 'overwrite' or 'merge' ) # Publish for online serving fe.publish_table( name='catalog.schema.user_features', online_store=online_store_spec )
- Update and publish features — merge new data and sync to online store for real-time serving
Model Monitoring & Drift Detection
- # Types of drift: # 1. Data drift: P(X) changes — input feature distributions shift # 2. Concept drift: P(Y|X) changes — relationship between features and target changes # 3. Prediction drift: P(Y_hat) changes — model output distribution shifts # Data drift can occur WITHOUT concept drift (and vice versa) # Always monitor BOTH feature distributions AND prediction quality
- Three types of drift — know the differences for the exam
- from scipy import stats # KS test for numeric features (continuous distributions) statistic, p_value = stats.ks_2samp(baseline_feature, current_feature) if p_value < 0.05: print('Significant drift detected') # Chi-squared test for categorical features contingency = pd.crosstab(baseline_cat, current_cat) chi2, p_value, dof, expected = stats.chi2_contingency(contingency)
- Statistical drift tests — KS for numeric, chi-squared for categorical features
- from scipy.spatial.distance import jensenshannon import numpy as np # Jensen-Shannon divergence (symmetric, bounded [0, 1]) # Works for both numeric (binned) and categorical distributions baseline_hist = np.histogram(baseline, bins=50, density=True)[0] current_hist = np.histogram(current, bins=50, density=True)[0] js_div = jensenshannon(baseline_hist, current_hist) print(f'JS Divergence: {js_div:.4f}') # 0 = identical, 1 = completely different
- Jensen-Shannon divergence — symmetric measure of distribution difference, works for any feature type
- # Summary statistics monitoring (simple, low-cost) import pandas as pd stats_baseline = baseline_df.describe() stats_current = current_df.describe() # Track: mean, std, min, max, nulls, distinct count # Alert if any metric deviates beyond threshold for col in numeric_cols: mean_shift = abs(stats_current[col]['mean'] - stats_baseline[col]['mean']) if mean_shift > threshold: alert(f'Mean shift detected in {col}: {mean_shift}')
- Summary statistics monitoring — cheapest drift detection method, track basic stats over time
- # Databricks Lakehouse Monitoring from databricks.sdk import WorkspaceClient w = WorkspaceClient() # Create monitor on inference table monitor = w.quality_monitors.create( table_name='catalog.schema.predictions', output_schema_name='catalog.schema', inference_log=InferenceLog( problem_type='classification', prediction_col='prediction', label_col='label', timestamp_col='timestamp', model_id_col='model_version' ) )
- Lakehouse Monitoring — automated drift detection and quality monitoring on inference tables
Model Serving & Deployment
- # Batch inference with Spark UDF import mlflow.pyfunc model_udf = mlflow.pyfunc.spark_udf( spark, model_uri='models:/fraud_model/Production' ) predictions = spark.table('new_transactions').withColumn( 'prediction', model_udf(struct('feature1', 'feature2', 'feature3')) )
- Batch inference — wrap MLflow model as Spark UDF to score millions of records
- # Create real-time serving endpoint import requests endpoint_config = { 'name': 'fraud-detector', 'config': { 'served_models': [{ 'model_name': 'fraud_model', 'model_version': '3', 'workload_size': 'Small', 'scale_to_zero_enabled': True }] } } # Use Databricks SDK or REST API to create endpoint
- Real-time serving endpoint — create auto-scaling endpoint with scale-to-zero for variable traffic
- # A/B testing with traffic splitting endpoint_config = { 'served_models': [ {'model_name': 'fraud_model', 'model_version': '2', 'workload_size': 'Small', 'traffic_percentage': 90}, # Champion {'model_name': 'fraud_model', 'model_version': '3', 'workload_size': 'Small', 'traffic_percentage': 10} # Challenger ] }
- A/B testing — split traffic between model versions at the serving endpoint level
- # Query serving endpoint import requests url = f'https://{workspace_url}/serving-endpoints/fraud-detector/invocations' headers = {'Authorization': f'Bearer {token}'} response = requests.post(url, json={ 'dataframe_records': [ {'feature1': 100, 'feature2': 'category_a', 'feature3': 0.5} ] }) prediction = response.json()['predictions'][0]
- Query real-time endpoint — send JSON payload, receive prediction via REST API
- # Deployment strategies: # Canary: Route 5% → 25% → 50% → 100% gradually, monitor at each step # Blue-Green: Switch 100% traffic instantly, keep old version for quick rollback # Shadow: Send traffic to both models, only serve old model's results, # compare new model's predictions offline (zero user impact) # Champion/Challenger: Same as shadow — new model runs alongside, metrics compared offline
- Deployment strategies — know when to use each approach for the exam
Distributed Deep Learning
- from pyspark.ml.deepspeed.deepspeed_distributor import DeepspeedTorchDistributor from pyspark.ml.torch.distributor import TorchDistributor def train_fn(): import torch model = MyModel() # Standard PyTorch training loop for epoch in range(10): loss = train_epoch(model, dataloader) return model # Distribute across 4 workers distributor = TorchDistributor(num_processes=4, local_mode=False) model = distributor.run(train_fn)
- TorchDistributor — distribute PyTorch training across Spark cluster workers
- # Petastorm for loading Spark data into PyTorch/TensorFlow from petastorm.spark import SparkDatasetConverter, make_spark_converter converter = make_spark_converter(spark_df) with converter.make_torch_dataloader(batch_size=64) as dataloader: for batch in dataloader: features = batch['features'] labels = batch['label'] # Standard PyTorch training step
- Petastorm — bridge between Spark DataFrames and PyTorch DataLoaders for efficient data loading
- # Single-node multi-GPU with TorchDistributor distributor = TorchDistributor( num_processes=4, # 4 GPUs local_mode=True, # All on driver node use_gpu=True ) model = distributor.run(train_fn) # Multi-node multi-GPU distributor = TorchDistributor( num_processes=8, # 8 GPUs total local_mode=False, # Across workers use_gpu=True ) model = distributor.run(train_fn)
- GPU distribution modes — local_mode=True for single-node multi-GPU, False for multi-node
- # Log deep learning model with MLflow with mlflow.start_run(): mlflow.pytorch.log_model( pytorch_model=model, artifact_path='model', registered_model_name='dl_classifier', signature=mlflow.models.infer_signature(sample_input, sample_output) ) mlflow.log_param('num_layers', 4) mlflow.log_param('hidden_size', 256)
- Log PyTorch model to MLflow — includes model signature for serving input validation
AutoML
- from databricks import automl summary = automl.classify( dataset=train_df, target_col='label', primary_metric='f1', timeout_minutes=30, max_trials=50 ) # Get best model best_model = summary.best_trial print(f'Best F1: {best_model.metrics["f1"]}') print(f'Best notebook: {best_model.notebook_path}')
- AutoML classify — automated model selection and tuning, returns glass-box notebook
- # AutoML for regression summary = automl.regress( dataset=df, target_col='price', primary_metric='rmse', timeout_minutes=20 ) # AutoML for forecasting summary = automl.forecast( dataset=df, target_col='sales', time_col='date', frequency='D', horizon=30 # Predict 30 days ahead )
- AutoML regression and forecasting — same glass-box approach for different problem types
- # Glass-box output: AutoML generates notebooks you can inspect # 1. Data exploration notebook — EDA, feature analysis # 2. Best trial notebook — full training code, reproducible # 3. All trial notebooks — each hyperparameter combination # Access generated notebooks for trial in summary.trials: print(f'Trial {trial.trial_id}: {trial.metrics}') print(f' Notebook: {trial.notebook_path}')
- Glass-box approach — AutoML generates readable, editable notebooks (not a black box)
- # Key AutoML features to know: # - Automatic feature type detection (numeric, categorical, text, date) # - Handles missing values and encoding automatically # - Tries multiple algorithms (LightGBM, XGBoost, sklearn, etc.) # - Generates SHAP explanations for feature importance # - Outputs are MLflow runs — all experiments tracked # - You CAN and SHOULD modify the generated notebooks
- AutoML capabilities summary — what it automates and what it doesn't
ML Pipeline Orchestration
- # Databricks Workflows for ML pipelines # Define multi-task job with dependencies: # # Task 1: Feature Engineering (notebook) # └→ Task 2: Model Training (notebook) # └→ Task 3: Model Evaluation (notebook) # └→ Task 4: Register Model (if metrics pass) # └→ Task 5: Deploy to Staging # Each task can be a notebook, Python script, or JAR # Tasks pass data via task values or Delta tables
- Databricks Workflows — orchestrate ML pipeline as directed acyclic graph (DAG) of tasks
- # Pass values between tasks # In Task 1 (training): dbutils.jobs.taskValues.set(key='model_uri', value=model_uri) dbutils.jobs.taskValues.set(key='rmse', value=0.15) # In Task 2 (evaluation): model_uri = dbutils.jobs.taskValues.get( taskKey='training_task', key='model_uri' ) rmse = dbutils.jobs.taskValues.get( taskKey='training_task', key='rmse' )
- Task values — pass data between workflow tasks without using external storage
- # CI/CD for ML with Databricks Repos # 1. Feature branch → develop in notebook # 2. Pull request → triggers test workflow # 3. Merge to main → triggers training workflow # 4. Model passes tests → register in Model Registry # 5. Stage transition → triggers deployment workflow # Git integration: # databricks repos update --path /Repos/prod/ml-project --branch main
- CI/CD pattern — Git-triggered workflows for automated ML model training and deployment
- # Retraining triggers: # 1. Scheduled: cron-based (e.g., weekly retrain) # 2. Performance: metric drops below threshold # 3. Drift: statistical drift detected in features or predictions # Example: conditional retraining in workflow if drift_detected or performance_degraded: dbutils.jobs.taskValues.set('should_retrain', True) else: dbutils.jobs.taskValues.set('should_retrain', False) # Next task checks this value to decide whether to retrain
- Retraining triggers — three strategies for when to retrain production models
- # Reproducibility checklist: # 1. Pin library versions: %pip install scikit-learn==1.3.0 # 2. Set random seeds: np.random.seed(42), torch.manual_seed(42) # 3. Use Delta time travel: spark.read.option('timestampAsOf', '2026-01-01').table('data') # 4. Log everything to MLflow: params, metrics, data version, code version # 5. Use Databricks Repos for code versioning
- Reproducibility — essential practices for reproducing ML experiments and results
Production ML Patterns
- # Champion/Challenger Pattern (Shadow Deployment) # 1. Production serves Champion model to ALL users # 2. Challenger model runs in parallel on same requests # 3. Challenger predictions are logged but NOT served # 4. Compare Champion vs Challenger metrics offline # 5. Promote Challenger to Champion only if significantly better # Key: zero user impact during evaluation
- Champion/Challenger — evaluate new model in shadow mode without affecting users
- # Model validation before promotion def validate_model(model_uri, test_data): model = mlflow.pyfunc.load_model(model_uri) predictions = model.predict(test_data) metrics = { 'accuracy': accuracy_score(y_test, predictions), 'f1': f1_score(y_test, predictions), 'latency_p99': measure_latency(model) } # Gate: must beat current production model prod_metrics = get_production_metrics() return all( metrics[k] >= prod_metrics[k] * 0.95 # Allow 5% tolerance for k in ['accuracy', 'f1'] )
- Model validation gate — automated checks before promoting to production
- # Handle class imbalance in production # Training strategies: from imblearn.over_sampling import SMOTE X_resampled, y_resampled = SMOTE().fit_resample(X_train, y_train) # Or use class weights: model = XGBClassifier(scale_pos_weight=len(y[y==0])/len(y[y==1])) # Evaluation: use precision-recall curve, not ROC (ROC is misleading for imbalanced data) # Threshold tuning: optimize F1 or business-specific metric
- Handling imbalanced data — SMOTE, class weights, and appropriate evaluation metrics
- # Time series cross-validation (NEVER random split) from sklearn.model_selection import TimeSeriesSplit tscv = TimeSeriesSplit(n_splits=5) for train_idx, test_idx in tscv.split(X): X_train, X_test = X[train_idx], X[test_idx] # Train is always BEFORE test in time # Each fold adds more training data # Walk-forward validation for production: # Train on months 1-6, test on month 7 # Train on months 1-7, test on month 8 # Train on months 1-8, test on month 9...
- Time series validation — temporal splits only, never random, to prevent data leakage
- # Feature importance for model interpretability import shap # SHAP values for individual predictions (local explanations) explainer = shap.TreeExplainer(model) shap_values = explainer.shap_values(X_test) # Global feature importance shap.summary_plot(shap_values, X_test) # Permutation importance (model-agnostic) from sklearn.inspection import permutation_importance result = permutation_importance(model, X_test, y_test, n_repeats=10)
- Model interpretability — SHAP for local/global explanations, permutation importance as alternative