Production-Grade MLOps Pipeline: From Data to Deployment on GCP
Fabrizio Albertoni
Co-Founder / Principal ML Engineer
At Sofos, we've implemented a robust MLOps pipeline for our lead scoring model using Google Cloud Platform's powerful ML tools. This post walks through how we built an end-to-end solution that handles everything from data ingestion to model deployment with proper versioning and monitoring.
Architecture Overview
Our pipeline uses:
- Kubeflow Pipelines for orchestration
- BigQuery for data storage
- Vertex AI for experiment tracking, model registry, and deployment
- Terraform for infrastructure as code
The workflow consists of these key components:
- Data ingestion from BigQuery
- Data preprocessing and feature engineering
- Model training using XGBoost
- Model evaluation and metric logging
- Automated model registration and versioning
- Deployment to Vertex AI Endpoints with explainability and monitoring
Let's break down each component.
1. Data Ingestion Component
We start by pulling our lead scoring data from BigQuery:
@component(
packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow"],
base_image="python:3.9"
)
def ingest_data(
project_id: str,
dataset_id: str,
table_id: str,
output_dataset: Output[Dataset]
) -> NamedTuple("Outputs", [("features", List[str]), ("target", str)]):
from google.cloud import bigquery
import pandas as pd
import os
query = f"""
SELECT * FROM `{project_id}.{dataset_id}.{table_id}`
WHERE RAND() < 1.0 # Use sampling if needed for large datasets
"""
client = bigquery.Client(project=project_id)
df = client.query(query).to_dataframe()
# Save to GCS
output_path = output_dataset.path + "/data.csv"
df.to_csv(output_path, index=False)
# Identify target column - assuming it's 'conversion_probability'
target_column = "conversion_probability"
feature_columns = [col for col in df.columns if col != target_column]
from collections import namedtuple
output = namedtuple("Outputs", ["features", "target"])
return output(feature_columns, target_column)
This component:
- Connects to BigQuery and extracts data using a SQL query
- Saves the dataset to GCS (automatically handled by Kubeflow's Output artifact)
- Returns the feature columns and target column for downstream components
- Uses KFP's typing system to ensure proper data flow through the pipeline
Pro tip: For large datasets, consider adding sampling to your query or implementing incremental processing patterns.
2. Data Preprocessing Component
Next, we need to prepare our data for model training:
@component(
packages_to_install=["pandas", "scikit-learn", "pyarrow"],
base_image="python:3.9"
)
def preprocess_data(
input_dataset: Input[Dataset],
features: List[str],
target: str,
train_dataset: Output[Dataset],
test_dataset: Output[Dataset],
preprocessor: Output[Model]
) -> Dict[str, List[str]]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
import pickle
# Load data
df = pd.read_csv(input_dataset.path + "/data.csv")
# Split features and target
X = df[features]
y = df[target]
# Identify numeric and categorical columns
numeric_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()
# Define preprocessing for numeric columns
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# Define preprocessing for categorical columns
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
# Combine preprocessing steps
preprocessor_obj = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Fit the preprocessor
preprocessor_obj.fit(X_train)
# Transform the data
X_train_transformed = preprocessor_obj.transform(X_train)
X_test_transformed = preprocessor_obj.transform(X_test)
# Save preprocessed data
if hasattr(X_train_transformed, "toarray"):
train_data = np.hstack((X_train_transformed.toarray(), y_train.values.reshape(-1, 1)))
test_data = np.hstack((X_test_transformed.toarray(), y_test.values.reshape(-1, 1)))
else:
train_data = np.hstack((X_train_transformed, y_train.values.reshape(-1, 1)))
test_data = np.hstack((X_test_transformed, y_test.values.reshape(-1, 1)))
# Save datasets and preprocessor
np.save(train_dataset.path + "/train_data.npy", train_data)
np.save(test_dataset.path + "/test_data.npy", test_data)
with open(preprocessor.path + "/preprocessor.pkl", "wb") as f:
pickle.dump(preprocessor_obj, f)
# Get feature names after preprocessing
all_feature_names = []
all_feature_names.extend(numeric_features)
# Get one-hot encoded feature names
if categorical_features:
ohe = preprocessor_obj.named_transformers_['cat'].named_steps['onehot']
categories = ohe.categories_
for i, category in enumerate(categories):
cat_name = categorical_features[i]
for cat_value in category:
all_feature_names.append(f"{cat_name}_{cat_value}")
return {"preprocessed_features": all_feature_names}
This component handles critical preprocessing tasks:
- Automatic detection of numeric and categorical features
- Missing value imputation (median for numeric, constant for categorical)
- Feature scaling using StandardScaler
- One-hot encoding for categorical features
- Train/test split
- Saving both data and preprocessing pipeline for reproducibility
The preprocessing pipeline is a crucial part of our MLOps setup, as it ensures consistent transformation between training and inference.
Top tip
Always save your preprocessor alongside your model. The feature engineering pipeline is just as important as the model itself for reproducibility. In production, you'll need both to transform raw data into the exact same format used during training.
3. Model Training Component
Now for the fun part—training our XGBoost regression model:
@component(
packages_to_install=["xgboost", "numpy", "scikit-learn"],
base_image="python:3.9"
)
def train_model(
train_dataset: Input[Dataset],
preprocessed_features: List[str],
model: Output[Model]
) -> Dict[str, str]:
import numpy as np
import xgboost as xgb
import pickle
# Load the training data
train_data = np.load(train_dataset.path + "/train_data.npy")
# Split back into features and target
X_train = train_data[:, :-1] # All columns except the last
y_train = train_data[:, -1] # Last column is the target
# Train XGBoost model
xgb_model = xgb.XGBRegressor(
objective='reg:squarederror',
n_estimators=100,
max_depth=6,
learning_rate=0.1,
subsample=0.8,
colsample_bytree=0.8,
random_state=42
)
xgb_model.fit(X_train, y_train)
# Save the model
with open(model.path + "/model.pkl", "wb") as f:
pickle.dump(xgb_model, f)
# Get feature importance
feature_importance = xgb_model.feature_importances_
feature_importance_dict = {feature: importance for feature, importance in zip(preprocessed_features, feature_importance)}
# Get top 5 features by importance
top_features = sorted(feature_importance_dict.items(), key=lambda x: x[1], reverse=True)[:5]
top_features_str = ", ".join([f"{feature}: {importance:.4f}" for feature, importance in top_features])
return {"top_features": top_features_str}
Our training component:
- Loads preprocessed training data
- Trains an XGBoost regression model optimized for lead scoring
- Saves the trained model as a pickle file
- Extracts and returns feature importance information
XGBoost works well for our lead scoring use case because it handles complex relationships in the data and provides useful feature importance metrics to help us understand what's driving our predictions.
4. Model Evaluation Component
Evaluation is a key step in any ML pipeline—we need to know if our model meets performance criteria:
@component(
packages_to_install=["numpy", "scikit-learn", "google-cloud-aiplatform"],
base_image="python:3.9"
)
def evaluate_model(
test_dataset: Input[Dataset],
model: Input[Model],
metrics: Output[ClassificationMetrics],
project_id: str,
region: str,
model_display_name: str
) -> Dict[str, float]:
import numpy as np
import pickle
import json
import time
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from google.cloud import aiplatform
# Load test data and model
test_data = np.load(test_dataset.path + "/test_data.npy")
X_test = test_data[:, :-1]
y_test = test_data[:, -1]
with open(model.path + "/model.pkl", "rb") as f:
model_obj = pickle.load(f)
# Make predictions
y_pred = model_obj.predict(X_test)
# Calculate metrics
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
# Log metrics to Vertex AI Experiments
aiplatform.init(project=project_id, location=region)
# Create experiment or get existing one
experiment_name = f"{model_display_name}_experiment"
try:
experiment = aiplatform.Experiment.get(experiment_name=experiment_name)
except:
experiment = aiplatform.Experiment.create(experiment_name=experiment_name)
# Start a new run
run = experiment.start_run(run=f"run_{int(time.time())}")
# Log metrics
run.log_metrics({
"rmse": rmse,
"mae": mae,
"r2": r2
})
run.end_run()
# Save metrics for the next component
metrics_dict = {
"rmse": float(rmse),
"mae": float(mae),
"r2": float(r2)
}
with open(metrics.path + "/metrics.json", "w") as f:
json.dump(metrics_dict, f)
# Create confusion matrix data for visualization
bins = 5
y_binned = np.digitize(y_test, np.linspace(y_test.min(), y_test.max(), bins))
y_pred_binned = np.digitize(y_pred, np.linspace(y_pred.min(), y_pred.max(), bins))
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(y_binned, y_pred_binned, labels=range(1, bins+1))
# Record metrics for classification visualizations
metrics.log_confusion_matrix(
["bin_" + str(i) for i in range(1, bins+1)],
cm.tolist()
)
return metrics_dict
This component:
- Evaluates the model on our test set
- Calculates RMSE, MAE, and R² metrics
- Logs metrics to Vertex AI Experiments for tracking
- Creates a pseudo-confusion matrix by binning our regression outputs
- Saves metrics for use in later components
Top tip
In Vertex AI, using the Experiments API let's you track all training runs centrally. This provides a historical record of model performance, hyperparameters, and training datasets, making it easier to reproduce results and understand model improvements over time.
5. Model Registry Component
This is where our MLOps setup really shines. The registry component implements automated model promotion with semantic versioning:
@component(
packages_to_install=["google-cloud-aiplatform", "semantic-version"],
base_image="python:3.9",
files={"predictor.py": "predictor.py"} # Reference the external file
)
def register_model(
model: Input[Model],
preprocessor: Input[Model],
metrics: Input[ClassificationMetrics],
project_id: str,
region: str,
model_display_name: str
) -> Dict[str, str]:
import os
import json
import shutil
import time
from google.cloud import aiplatform
from semantic_version import Version
# Initialize Vertex AI
aiplatform.init(project=project_id, location=region)
# Load metrics from the evaluation step
with open(metrics.path + "/metrics.json", "r") as f:
current_metrics = json.load(f)
# Get current RMSE
current_rmse = current_metrics["rmse"]
# Check if there's an existing model with "production" tag
filter_string = f'display_name="{model_display_name}" AND labels.deployed="production"'
models = aiplatform.Model.list(filter=filter_string)
deployment_decision = "not_deployed"
current_version = "0.0.0"
if models:
# Get the existing model's metrics
existing_model = models[0]
existing_metrics = existing_model.gca_resource.metadata.get("metrics", {})
if "rmse" in existing_metrics:
existing_rmse = float(existing_metrics["rmse"])
# Check if new model is better (lower RMSE is better)
if current_rmse < existing_rmse:
# New model is better, get the current version and increment patch
current_version = existing_model.gca_resource.metadata.get("version", "1.0.0")
v = Version(current_version)
new_version = str(Version(major=v.major, minor=v.minor, patch=v.patch+1))
deployment_decision = "deployed"
else:
# Existing model is better, keep it
deployment_decision = "not_deployed_existing_better"
return {"deployment_decision": deployment_decision, "version": current_version}
else:
# No existing production model, use 1.0.0
new_version = "1.0.0"
deployment_decision = "deployed_first_version"
if deployment_decision.startswith("deployed"):
# Create a directory to package model artifacts
model_dir = "/tmp/model_dir"
os.makedirs(model_dir, exist_ok=True)
# Copy model and preprocessor files
shutil.copy2(model.path + "/model.pkl", model_dir + "/model.pkl")
shutil.copy2(preprocessor.path + "/preprocessor.pkl", model_dir + "/preprocessor.pkl")
# Copy the predictor.py file
shutil.copy2("predictor.py", model_dir + "/predict.py")
# Upload model to Vertex AI Model Registry
model_uri = f"{model_dir}@gs://{project_id}-model-artifacts"
# Register the model
registered_model = aiplatform.Model.upload(
display_name=model_display_name,
artifact_uri=model_uri,
serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
serving_container_predict_route="/predict",
serving_container_health_route="/health",
serving_container_environment_variables={
"PREDICT_ROUTE": "/predict",
"HEALTH_ROUTE": "/health"
},
sync=True
)
# Add metadata and labels
registered_model.update(
labels={"deployed": "production"},
metadata={
"rmse": str(current_rmse),
"mae": str(current_metrics["mae"]),
"r2": str(current_metrics["r2"]),
"version": new_version,
"framework": "xgboost",
"created_at": str(int(time.time()))
}
)
return {
"deployment_decision": deployment_decision,
"version": new_version,
"model_id": registered_model.resource_name
}
else:
return {"deployment_decision": deployment_decision, "version": current_version}
Let's take a closer look at our predictor.py file, which handles the model serving logic
import pickle
import numpy as np
import pandas as pd
from google.cloud import storage
import os
class LeadScoringPredictor:
def __init__(self):
self.model = None
self.preprocessor = None
def load(self):
with open('/tmp/model.pkl', 'rb') as f:
self.model = pickle.load(f)
with open('/tmp/preprocessor.pkl', 'rb') as f:
self.preprocessor = pickle.load(f)
return self
def predict(self, instances):
# Convert instances to the format expected by the preprocessor
input_df = pd.DataFrame(instances)
# Preprocess the input data
X_processed = self.preprocessor.transform(input_df)
# Make predictions
predictions = self.model.predict(X_processed)
# Return predictions
return predictions.tolist()
The registry component is the cornerstone of our automated MLOps pipeline, implementing GitOps-style continuous delivery for ML models:
- It checks if the new model outperforms the production model (using RMSE)
- If better, it uploads the model to Vertex AI's Model Registry
- It implements proper semantic versioning (1.0.0, 1.0.1, etc.)
- Attaches metadata including metrics and version information
- Tags models with "production" label for easy filtering
- Creates a custom prediction routine that bundles both model and preprocessor
Top tip
Use Vertex AI Model Registry's metadata and labeling features to store model performance metrics, training dataset information, and deployment status. The decision to deploy is data-driven and automated, eliminating human error and reducing deployment friction, while providing a comprehensive audit trail for compliance purposes.
6. Main Pipeline Definition
Now let's stitch all these components together into a cohesive pipeline:
@pipeline(
name="lead-scoring-pipeline",
pipeline_root=PIPELINE_ROOT
)
def lead_scoring_pipeline(
project_id: str = PROJECT_ID,
region: str = REGION,
bq_dataset: str = BQ_DATASET,
bq_table: str = BQ_TABLE,
model_display_name: str = MODEL_DISPLAY_NAME
):
# Run the ingestion component
ingest_task = ingest_data(
project_id=project_id,
dataset_id=bq_dataset,
table_id=bq_table
)
# Run the preprocessing component
preprocess_task = preprocess_data(
input_dataset=ingest_task.outputs["output_dataset"],
features=ingest_task.outputs["features"],
target=ingest_task.outputs["target"]
)
# Run the training component
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
preprocessed_features=preprocess_task.outputs["preprocessed_features"]
)
# Run the evaluation component
eval_task = evaluate_model(
test_dataset=preprocess_task.outputs["test_dataset"],
model=train_task.outputs["model"],
project_id=project_id,
region=region,
model_display_name=model_display_name
)
# Run the registration component
register_task = register_model(
model=train_task.outputs["model"],
preprocessor=preprocess_task.outputs["preprocessor"],
metrics=eval_task.outputs["metrics"],
project_id=project_id,
region=region,
model_display_name=model_display_name
)
Our pipeline definition:
- Connects all components in a logical flow
- Handles parameter passing between components
- Makes dependencies explicit
- Sets default parameter values
This declarative style makes our pipeline easy to understand, modify, and maintain.
7. Compiling and Running the Pipeline
With our pipeline defined, we can compile and run it:
def compile_and_run_pipeline():
# Compile the pipeline
compiler.Compiler().compile(
pipeline_func=lead_scoring_pipeline,
package_path="lead_scoring_pipeline.json"
)
# Initialize Vertex AI
aiplatform.init(project=PROJECT_ID, location=REGION)
# Create a pipeline job
pipeline_job = aiplatform.PipelineJob(
display_name="lead-scoring-pipeline",
template_path="lead_scoring_pipeline.json",
pipeline_root=PIPELINE_ROOT,
enable_caching=True
)
# Run the pipeline
pipeline_job.run()
return pipeline_job
# Execute the pipeline
if __name__ == "__main__":
job = compile_and_run_pipeline()
print(f"Pipeline job: {job.display_name} started")
The compiled pipeline can be:
- Run manually as shown above
- Scheduled to run periodically using Cloud Scheduler
- Triggered by new data using Pub/Sub
- Integrated into CI/CD workflows
8. Model Deployment with Terraform
Now for the infrastructure piece. We'll use Terraform to deploy and version our production model to a Vertex AI Endpoint:
# main.tf
terraform {
required_providers {
google = {
source = "hashicorp/google"
version = "4.75.0"
}
}
}
provider "google" {
project = var.project_id
region = var.region
}
# Get the production model
data "google_vertex_ai_models" "production_model" {
filter = "labels.deployed=\"production\" AND display_name=\"${var.model_display_name}\""
}
locals {
model_id = length(data.google_vertex_ai_models.production_model.models) > 0 ? data.google_vertex_ai_models.production_model.models[0].id : ""
}
# Create endpoint
resource "google_vertex_ai_endpoint" "lead_scoring_endpoint" {
display_name = "lead-scoring-endpoint"
location = var.region
# Only create if we have a production model
count = local.model_id != "" ? 1 : 0
}
# Deploy model to endpoint
resource "google_vertex_ai_model_deployment" "lead_scoring_deployment" {
# Only create if we have a production model
count = local.model_id != "" ? 1 : 0
model = local.model_id
endpoint = google_vertex_ai_endpoint.lead_scoring_endpoint[0].id
display_name = "lead-scoring-deployment"
dedicated_resources {
machine_spec {
machine_type = "n1-standard-2"
}
min_replica_count = 1
max_replica_count = 2
}
traffic_split {
percent = 100
}
# Enable monitoring
automatic_resources {
min_replica_count = 1
max_replica_count = 2
}
# Enable explainability
explanation_spec {
parameters {
sampled_shapley_attribution {
path_count = 10
}
}
metadata {
inputs {
visualization {
type = "FEATURE_IMPORTANCE"
}
}
}
}
}
# Setup model monitoring
resource "google_vertex_ai_model_monitoring_job" "monitoring" {
count = local.model_id != "" ? 1 : 0
display_name = "lead-scoring-monitoring"
endpoint = google_vertex_ai_endpoint.lead_scoring_endpoint[0].id
prediction_sampling_config {
sampling_strategy = "RANDOM_SAMPLING"
random_sample_config {
sample_rate_percent = 80
}
}
logging_sampling_strategy {
random_sample_config {
sample_rate = 0.8
}
}
drift_detection_config {
drift_threshold = 0.05
}
monitoring_schedule {
monitor_interval {
seconds = 86400 # Daily monitoring
}
}
alert_config {
email_alert_config {
user_emails = ["mlops-team@example.com"]
}
}
}
The Terraform config:
- Queries the Vertex AI Model Registry for our production model
- Creates a Vertex AI Endpoint
- Deploys the latest production model to that endpoint
- Configures compute resources with autoscaling
- Enables explainability (Shapley values)
- Sets up model monitoring to detect data drift
- Configures alerts to notify the team of issues
9. Using the Deployed Model
Finally, let's look at how to interact with our deployed model:
def get_prediction(instances, endpoint_name="lead-scoring-endpoint"):
"""Get predictions from the deployed model."""
from google.cloud import aiplatform
# Initialize Vertex AI
aiplatform.init(project=PROJECT_ID, location=REGION)
# Get endpoint
endpoint = aiplatform.Endpoint.list(
filter=f'display_name="{endpoint_name}"'
)[0]
# Get predictions
predictions = endpoint.predict(instances=instances)
return predictions
def get_explanation(instances, endpoint_name="lead-scoring-endpoint"):
"""Get explanations from the deployed model."""
from google.cloud import aiplatform
# Initialize Vertex AI
aiplatform.init(project=PROJECT_ID, location=REGION)
# Get endpoint
endpoint = aiplatform.Endpoint.list(
filter=f'display_name="{endpoint_name}"'
)[0]
# Get explanations
explanation = endpoint.explain(instances=instances)
return explanation
# Example: Get prediction for a new lead
new_lead = [
{
"company_size": 152,
"industry": "technology",
"product_interest": "enterprise",
"pages_visited": 15,
"time_on_site": 320,
"campaign_source": "google",
"prior_contacts": 2
}
]
# Get prediction
prediction = get_prediction(new_lead)
print(f"Lead score: {prediction.predictions[0][0]:.2f}")
# Get explanation
explanation = get_explanation(new_lead)
print("Feature importance:")
for attribution in explanation.explanations[0].attributions[0].feature_attributions.items():
print(f" {attribution[0]}: {attribution[1]:.4f}")
10. Monitoring and Observability
You can access the monitoring data through the Vertex AI console or programmatically:
def get_monitoring_metrics(endpoint_name="lead-scoring-endpoint"):
"""Get monitoring metrics from the endpoint."""
from google.cloud import aiplatform
# Initialize Vertex AI
aiplatform.init(project=PROJECT_ID, location=REGION)
# Get endpoint
endpoint = aiplatform.Endpoint.list(
filter=f'display_name="{endpoint_name}"'
)[0]
# Get monitoring data
monitoring_stats = endpoint.monitoring_stats()
return monitoring_stats
# Check for data drift
monitoring_data = get_monitoring_metrics()
print("Feature drift metrics:")
for feature, metrics in monitoring_data["feature_metrics"].items():
print(f" {feature}: drift score = {metrics['drift_score']:.4f}")
Conclusion
Our MLOps pipeline provides:
- Automation: From data ingestion to deployment
- Quality control: Only better models get promoted to production
- Versioning: Proper semantic versioning for model tracking
- Observability: Comprehensive monitoring and explainability
- Infrastructure as code: Reproducible deployments with Terraform
This end-to-end solution has dramatically improved our lead scoring workflow, reducing the time from model development to deployment from weeks to hours, while ensuring that only high-quality models make it to production.
What MLOps challenges are you facing in your organization? Drop us a line and let us know!