Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion site/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
{
"metadata": {
"language_info": {
"name": "python"
},
"lastEditStatus": {
"notebookId": "6wwgc5yvkslbtwzqxiyl",
"authorId": "317811122459",
"authorName": "ADMIN",
"authorEmail": "[email protected]",
"sessionId": "ccfa6938-7d2b-4e2a-aee9-a515762cbb80",
"lastEditTime": 1762192477049
}
},
"nbformat_minor": 2,
"nbformat": 4,
"cells": [
{
"cell_type": "markdown",
"metadata": {
"name": "cell1"
},
"source": [
"# Distributed Hyperparameter Tuning with Experiment Tracking in Snowflake\n",
"\n",
"This notebook demonstrates how to use Snowflake's ML capabilities for:\n",
"1. **Experiment Tracking** - Log parameters, metrics, and models\n",
"2. **Distributed HPO** - Parallel hyperparameter optimization at scale\n",
"3. **Container Runtime** - Leverage Snowpark Container Services for ML workloads\n",
"\n",
"We'll build a classification model using the Wine Quality dataset and optimize it using distributed hyperparameter tuning while tracking all experiments in Snowflake.\n"
],
"id": "ce110000-1111-2222-3333-ffffff000000"
},
{
"cell_type": "markdown",
"metadata": {
"name": "cell2",
"collapsed": false
},
"source": "## Prerequisites\n\n- Snowflake account with a database and schema\n- CREATE EXPERIMENT privilege on your schema\n- snowflake-ml-python >= 1.9.1\n- Notebook configured for Container Runtime on SPCS (Compute Pool with instance type `CPU_X64_S`)\n",
"id": "ce110000-1111-2222-3333-ffffff000001"
},
{
"cell_type": "markdown",
"metadata": {
"name": "cell3"
},
"source": [
"## Step 1: Setup and Data Loading\n"
],
"id": "ce110000-1111-2222-3333-ffffff000002"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"name": "cell4",
"language": "python"
},
"outputs": [],
"source": "import pandas as pd\nimport numpy as np\nfrom datetime import datetime\nfrom sklearn.model_selection import train_test_split\nfrom sklearn.preprocessing import StandardScaler\nfrom sklearn import metrics\nfrom xgboost import XGBClassifier\n\nfrom snowflake.snowpark.context import get_active_session\nfrom snowflake.snowpark import Session\nfrom snowflake.ml.experiment.experiment_tracking import ExperimentTracking\nfrom snowflake.ml.modeling import tune\nfrom snowflake.ml.modeling.tune.search import RandomSearch, BayesOpt\nfrom snowflake.ml.data.data_connector import DataConnector\nfrom snowflake.ml.runtime_cluster import scale_cluster\n\n# Get active Snowflake session\nsession = get_active_session()\nprint(f\"Connected to Snowflake: {session.get_current_database()}.{session.get_current_schema()}\")\n\n# Create dated experiment name for tracking runs over time\nexperiment_date = datetime.now().strftime(\"%Y%m%d\")\nexperiment_name = f\"Wine_Quality_Classification_{experiment_date}\"\nprint(f\"\\nExperiment Name: {experiment_name}\")\n",
"id": "ce110000-1111-2222-3333-ffffff000003"
},
{
"cell_type": "markdown",
"metadata": {
"name": "cell5"
},
"source": [
"### Generate Wine Quality Classification Dataset\n",
"\n",
"We'll create a synthetic dataset inspired by wine quality prediction. The goal is to classify wines as high quality (1) or standard quality (0) based on chemical properties.\n"
],
"id": "ce110000-1111-2222-3333-ffffff000004"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"name": "cell6",
"language": "python"
},
"outputs": [],
"source": "# Generate synthetic wine quality dataset\nnp.random.seed(42)\nn_samples = 20000\n\n# Feature generation with realistic correlations\ndata = {\n \"FIXED_ACIDITY\": np.random.normal(7.0, 1.5, n_samples),\n \"VOLATILE_ACIDITY\": np.random.gamma(2, 0.2, n_samples),\n \"CITRIC_ACID\": np.random.beta(2, 5, n_samples),\n \"RESIDUAL_SUGAR\": np.random.lognormal(1, 0.8, n_samples),\n \"CHLORIDES\": np.random.gamma(3, 0.02, n_samples),\n \"FREE_SULFUR_DIOXIDE\": np.random.normal(30, 15, n_samples),\n \"TOTAL_SULFUR_DIOXIDE\": np.random.normal(120, 40, n_samples),\n \"DENSITY\": np.random.normal(0.997, 0.003, n_samples),\n \"PH\": np.random.normal(3.2, 0.3, n_samples),\n \"SULPHATES\": np.random.gamma(4, 0.15, n_samples),\n \"ALCOHOL\": np.random.normal(10.5, 1.5, n_samples)\n}\n\ndf = pd.DataFrame(data)\n\n# Create quality target based on feature combinations\nquality_score = (\n 0.3 * (df[\"ALCOHOL\"] - df[\"ALCOHOL\"].mean()) / df[\"ALCOHOL\"].std() +\n 0.2 * (df[\"CITRIC_ACID\"] - df[\"CITRIC_ACID\"].mean()) / df[\"CITRIC_ACID\"].std() -\n 0.25 * (df[\"VOLATILE_ACIDITY\"] - df[\"VOLATILE_ACIDITY\"].mean()) / df[\"VOLATILE_ACIDITY\"].std() +\n 0.15 * (df[\"SULPHATES\"] - df[\"SULPHATES\"].mean()) / df[\"SULPHATES\"].std() +\n np.random.normal(0, 0.3, n_samples) # Add noise\n)\n\n# Binary classification: 1 = high quality, 0 = standard quality\ndf[\"QUALITY\"] = (quality_score > quality_score.quantile(0.6)).astype(int)\n\nprint(f\"Dataset shape: {df.shape}\")\nprint(f\"\\nClass distribution:\\n{df['QUALITY'].value_counts()}\")\nprint(f\"\\nFeature statistics:\\n{df.describe()}\")\n",
"id": "ce110000-1111-2222-3333-ffffff000005"
},
{
"cell_type": "markdown",
"metadata": {
"name": "cell7"
},
"source": [
"### Prepare Train/Validation/Test Splits\n"
],
"id": "ce110000-1111-2222-3333-ffffff000006"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"name": "cell8",
"language": "python"
},
"outputs": [],
"source": "# Separate features and target\nX = df.drop('QUALITY', axis=1)\ny = df['QUALITY']\n\n# Create train/val/test splits\nX_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.15, random_state=42, stratify=y)\nX_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.18, random_state=42, stratify=y_temp)\n\n# Scale features\nscaler = StandardScaler()\nX_train_scaled = pd.DataFrame(scaler.fit_transform(X_train), columns=X_train.columns)\nX_val_scaled = pd.DataFrame(scaler.transform(X_val), columns=X_val.columns)\nX_test_scaled = pd.DataFrame(scaler.transform(X_test), columns=X_test.columns)\n\nprint(f\"Training set: {X_train_scaled.shape[0]} samples\")\nprint(f\"Validation set: {X_val_scaled.shape[0]} samples\")\nprint(f\"Test set: {X_test_scaled.shape[0]} samples\")\n",
"id": "ce110000-1111-2222-3333-ffffff000007"
},
{
"cell_type": "markdown",
"metadata": {
"name": "cell9"
},
"source": [
"## Step 2: Baseline Model with Experiment Tracking\n",
"\n",
"Before running distributed HPO, let's train a baseline model and log it to Snowflake Experiment Tracking.\n"
],
"id": "ce110000-1111-2222-3333-ffffff000008"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"name": "cell10",
"language": "python"
},
"outputs": [],
"source": "# Initialize Experiment Tracking\nexp = ExperimentTracking(session=session)\nexp.set_experiment(experiment_name)\n\n# Note: Snowflake supports autologging for certain ML frameworks, but this example uses \n# explicit logging (exp.log_params, exp.log_metrics) to demonstrate a framework-agnostic \n# approach. Explicit logging works with any ML library (scikit-learn, XGBoost, PyTorch, \n# TensorFlow, custom frameworks, etc.) and gives you precise control over what gets logged, \n# without requiring integration with Snowflake's modeling APIs.\n\n# Train baseline model\nwith exp.start_run(run_name=\"baseline_xgboost\") as run:\n # Define baseline parameters\n baseline_params = {\n 'n_estimators': 100,\n 'max_depth': 6,\n 'learning_rate': 0.1,\n 'subsample': 0.8,\n 'colsample_bytree': 0.8,\n 'gamma': 0.1,\n 'min_child_weight': 8,\n 'random_state': 42,\n }\n \n # Log parameters\n exp.log_params(baseline_params)\n \n # Train model\n baseline_model = XGBClassifier(**baseline_params)\n baseline_model.fit(X_train_scaled, y_train)\n \n # Evaluate on validation set\n y_val_pred = baseline_model.predict(X_val_scaled)\n y_val_proba = baseline_model.predict_proba(X_val_scaled)[:, 1]\n \n # Calculate metrics\n val_metrics = {\n 'val_accuracy': metrics.accuracy_score(y_val, y_val_pred),\n 'val_precision': metrics.precision_score(y_val, y_val_pred),\n 'val_recall': metrics.recall_score(y_val, y_val_pred),\n 'val_f1': metrics.f1_score(y_val, y_val_pred),\n 'val_roc_auc': metrics.roc_auc_score(y_val, y_val_proba)\n }\n \n # Log metrics\n exp.log_metrics(val_metrics)\n \n print(\"Baseline Model Performance:\")\n for metric, value in val_metrics.items():\n print(f\" {metric}: {value:.4f}\")\n",
"id": "ce110000-1111-2222-3333-ffffff000009"
},
{
"cell_type": "markdown",
"metadata": {
"name": "cell11",
"collapsed": false
},
"source": "## Step 3: Distributed Hyperparameter Optimization\n\nNow we'll use Snowflake's distributed HPO capabilities to find optimal hyperparameters. The HPO workload will:\n- Scale across multiple nodes in the SPCS compute pool\n- Run trials in parallel for faster optimization\n- Automatically log all trials to Experiment Tracking\n",
"id": "ce110000-1111-2222-3333-ffffff000010"
},
{
"cell_type": "markdown",
"metadata": {
"name": "cell12"
},
"source": [
"### Prepare Data Connectors\n",
"\n",
"Convert our pandas DataFrames to Snowflake DataConnectors for distributed processing.\n"
],
"id": "ce110000-1111-2222-3333-ffffff000011"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"name": "cell13",
"language": "python"
},
"outputs": [],
"source": "# Combine features and target for each split\ntrain_df = pd.concat([X_train_scaled, y_train.reset_index(drop=True)], axis=1)\nval_df = pd.concat([X_val_scaled, y_val.reset_index(drop=True)], axis=1)\n\n# Create DataConnectors\ndataset_map = {\n \"train\": DataConnector.from_dataframe(session.create_dataframe(train_df)),\n \"val\": DataConnector.from_dataframe(session.create_dataframe(val_df)),\n}\n\nprint(\"Data connectors created successfully\")\n",
"id": "ce110000-1111-2222-3333-ffffff000012"
},
{
"cell_type": "markdown",
"metadata": {
"name": "cell14"
},
"source": [
"### Define Training Function with Experiment Tracking\n",
"\n",
"The training function will be executed for each trial. It integrates both HPO and Experiment Tracking.\n"
],
"id": "ce110000-1111-2222-3333-ffffff000013"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"name": "cell15",
"language": "python"
},
"outputs": [],
"source": "def train_function():\n \"\"\"\n Training function executed for each HPO trial.\n Integrates with both TunerContext and ExperimentTracking.\n \"\"\" \n trial_session = Session.builder.getOrCreate()\n \n # Get tuner context\n tuner_context = tune.get_tuner_context()\n params = tuner_context.get_hyper_params()\n dm = tuner_context.get_dataset_map()\n \n # Initialize experiment tracking for this trial\n exp = ExperimentTracking(session=trial_session)\n exp.set_experiment(experiment_name)\n with exp.start_run():\n # Log hyperparameters\n exp.log_params(params)\n \n # Load data\n train_data = dm[\"train\"].to_pandas()\n val_data = dm[\"val\"].to_pandas()\n \n # Separate features and target\n X_train = train_data.drop('QUALITY', axis=1)\n y_train = train_data['QUALITY']\n X_val = val_data.drop('QUALITY', axis=1)\n y_val = val_data['QUALITY']\n \n # Train model with hyperparameters from HPO\n model = XGBClassifier(**params)\n model.fit(X_train, y_train)\n \n # Evaluate on validation set\n y_val_pred = model.predict(X_val)\n y_val_proba = model.predict_proba(X_val)[:, 1]\n \n # Calculate validation metrics\n val_metrics = {\n 'val_accuracy': metrics.accuracy_score(y_val, y_val_pred),\n 'val_precision': metrics.precision_score(y_val, y_val_pred),\n 'val_recall': metrics.recall_score(y_val, y_val_pred),\n 'val_f1': metrics.f1_score(y_val, y_val_pred),\n 'val_roc_auc': metrics.roc_auc_score(y_val, y_val_proba)\n }\n \n # Log metrics to experiment tracking\n exp.log_metrics(val_metrics)\n \n # Report to HPO framework (optimize on validation F1)\n tuner_context.report(metrics=val_metrics, model=model)\n",
"id": "ce110000-1111-2222-3333-ffffff000014"
},
{
"cell_type": "markdown",
"metadata": {
"name": "cell16"
},
"source": [
"### Define Search Space\n",
"\n",
"We'll define the hyperparameter search space using Snowflake's sampling functions.\n"
],
"id": "ce110000-1111-2222-3333-ffffff000015"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"name": "cell17",
"language": "python"
},
"outputs": [],
"source": "# Define search space for XGBoost\nsearch_space = {\n 'n_estimators': tune.randint(50, 300),\n 'max_depth': tune.randint(3, 15),\n 'learning_rate': tune.loguniform(0.01, 0.3),\n 'subsample': tune.uniform(0.5, 1.0),\n 'colsample_bytree': tune.uniform(0.5, 1.0),\n 'gamma': tune.uniform(0.0, 0.5),\n 'min_child_weight': tune.randint(1, 10),\n 'random_state': 42,\n}\n\nprint(\"Search space defined:\")\nfor param, space in search_space.items():\n print(f\" {param}: {space}\")\n",
"id": "ce110000-1111-2222-3333-ffffff000016"
},
{
"cell_type": "markdown",
"metadata": {
"name": "cell18",
"collapsed": false
},
"source": "### Configure and Run HPO\n\nConfigure the tuner to:\n- Maximize F1 score\n- Run 50 trials with random search\n- Execute trials in parallel across available nodes\n",
"id": "ce110000-1111-2222-3333-ffffff000017"
},
{
"cell_type": "markdown",
"id": "dd695b29-e15f-46ba-8388-b4f5932a84ad",
"metadata": {
"name": "cell23",
"collapsed": false
},
"source": "#### Monitor Node Activity with the Ray Dashboard\nUse the output url to access the dashboard"
},
{
"cell_type": "code",
"id": "4736f03a-e044-4133-8a6e-7d90066fb9ed",
"metadata": {
"language": "python",
"name": "cell22"
},
"outputs": [],
"source": "from snowflake.ml.runtime_cluster import get_ray_dashboard_url\nget_ray_dashboard_url()",
"execution_count": null
},
{
"cell_type": "markdown",
"id": "75ffd2fe-7fbe-4e9f-b595-7e5794c7d828",
"metadata": {
"name": "cell24",
"collapsed": false
},
"source": "#### Run HPO"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"name": "cell19",
"language": "python"
},
"outputs": [],
"source": "# Scale cluster for distributed processing\nprint(\"Scaling cluster for distributed HPO...\")\nscale_cluster(10) # Scale up nodes\n\n# Configure tuner\ntuner_config = tune.TunerConfig(\n metric='val_f1',\n mode='max',\n search_alg=RandomSearch(),\n num_trials=50\n)\n\n# Create tuner\ntuner = tune.Tuner(\n train_func=train_function,\n search_space=search_space,\n tuner_config=tuner_config\n)\n\nprint(\"Starting distributed hyperparameter optimization...\")\n\n# Run HPO\ntry:\n results = tuner.run(dataset_map=dataset_map)\n print(\"\\nHPO completed successfully\")\nexcept Exception as e:\n print(f\"\\nError during HPO: {e}\")\n raise\nfinally:\n # Scale cluster back down\n scale_cluster(1)\n print(\"Cluster scaled back to 1 node\")\n",
"id": "ce110000-1111-2222-3333-ffffff000018"
},
{
"cell_type": "markdown",
"metadata": {
"name": "cell20",
"collapsed": false
},
"source": [
"## Step 4: Analyze Results\n"
],
"id": "ce110000-1111-2222-3333-ffffff000019"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"name": "cell21",
"language": "python"
},
"outputs": [],
"source": "# Display all results\nprint(\"BEST MODEL FOUND\")\nprint(\"=\"*60)\n\n# Extract best hyperparameters\nprint(f\"\\nBest Parameters:\")\nbest_model = results.best_model\nparams = best_model.get_xgb_params()\nprint(params)\n\n# Compare with baseline\nbest_f1 = results.best_result['val_f1'][0]\nbaseline_f1 = val_metrics['val_f1'] # From baseline model\nimprovement = ((best_f1 - baseline_f1) / baseline_f1) * 100\n\nprint(f\"\\nPerformance Comparison:\")\nprint(f\" Baseline F1: {baseline_f1:.4f}\")\nprint(f\" Best HPO F1: {best_f1:.4f}\")\nprint(f\" Improvement: {improvement:+.2f}%\")\n\n# Get test set f1 score\ny_test_pred = best_model.predict(X_test_scaled)\ntest_f1 = metrics.f1_score(y_test, y_test_pred)\nprint(f\"\\n\\n Best HPO Test Set F1: {test_f1:.4f}\")\n\nresults.best_result",
"id": "ce110000-1111-2222-3333-ffffff000020"
},
{
"cell_type": "markdown",
"metadata": {
"name": "cell30",
"collapsed": false
},
"source": "## Step 5: View Results in Snowflake UI\n\nAll experiment runs are now available in the Snowflake UI:\n\n1. Navigate to **AI & ML > Experiments** in the left sidebar\n2. Find the `Wine_Quality_Classification_YYYYMMDD` experiment (with today's date)\n3. Compare runs, view metrics, and analyze results\n\n**Note**: Each time you run this notebook on a different day, it creates a new dated experiment, allowing you to track model performance over time and across different data versions.\n\nThe UI provides:\n- Side-by-side run comparisons\n- Metric visualizations\n- Parameter distributions\n- Model artifacts and metadata\n",
"id": "ce110000-1111-2222-3333-ffffff000029"
},
{
"cell_type": "markdown",
"metadata": {
"name": "cell31",
"collapsed": false
},
"source": "## Summary\n\nIn this notebook, we demonstrated:\n\n1. **Experiment Tracking**: Logged parameters and metrics to Snowflake\n2. **Distributed HPO**: Ran 50 trials in parallel across multiple nodes\n3. **Integration**: Combined both capabilities for comprehensive ML experimentation\n",
"id": "ce110000-1111-2222-3333-ffffff000030"
},
{
"cell_type": "markdown",
"metadata": {
"name": "cell32",
"collapsed": false
},
"source": "## Next Steps\n\n### Extend this Example\n\n1. **Adjust the search space** - Modify hyperparameter ranges based on your problem domain and data size\n2. **Increase trial count** - Scale to 100-200 trials for more thorough optimization\n3. **Scale compute clusters** - Adjust `scale_cluster()` to increase or decrease parallelism\n4. **Deploy the winning model** - Register to Snowflake Model Registry\n\n\n\n",
"id": "ce110000-1111-2222-3333-ffffff000031"
}
]
}
Loading