Production-Grade MLOps Pipeline: From Data to Deployment on GCP
Fabrizio Albertoni
Co-Founder / Principal ML Engineer
Production-Grade MLOps Pipeline: From Data to Deployment on GCP
At Sofos, we've implemented a robust MLOps pipeline for our lead scoring model using Google Cloud Platform's powerful ML tools. This post walks through how we built an end-to-end solution that handles everything from data ingestion to model deployment with proper versioning and monitoring.
- MLOps
- Kubeflow
- Google Cloud
- Vertex AI
- Terraform
- XGBoost
Architecture Overview
MLOps Pipeline Architecture
Our end-to-end MLOps solution integrates these key technologies to create a streamlined workflow
- Orchestration
- Kubeflow
- Data Storage
- BigQuery
- Training Pipeline
- Vertex
- Infrastructure
- Terraform
Our end-to-end MLOps solution integrates these key technologies to create a streamlined workflow that automates the entire machine learning lifecycle, from data to deployment.
Data Ingestion
Extract lead scoring data from BigQuery with automatic preprocessingFeature Engineering
Transform raw data into ML-ready features with reproducible pipelinesModel Training
Train XGBoost regression models with hyperparameter optimizationEvaluation
Calculate performance metrics and track experiments in Vertex AIRegistry & Versioning
Implement automated model promotion with semantic versioningDeployment
Provision infrastructure and deploy models using Terraform and Vertex AI
Let's break down each component of our MLOps pipeline to understand how they work together to create a production-grade system.
1. Data Ingestion Component
Our MLOps journey begins with extracting high-quality data. We designed a Kubeflow component that connects to BigQuery and pulls our lead scoring dataset with full typing support.
Data Extraction
@component(
packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow"],
base_image="python:3.9"
)
def ingest_data(
project_id: str,
dataset_id: str,
table_id: str,
output_dataset: Output[Dataset]
) -> NamedTuple("Outputs", [("features", List[str]), ("target", str)]):
from google.cloud import bigquery
import pandas as pd
import os
query = f"""
SELECT * FROM `{project_id}.{dataset_id}.{table_id}`
WHERE RAND() < 1.0 # Use sampling if needed for large datasets
"""
client = bigquery.Client(project=project_id)
df = client.query(query).to_dataframe()
# Save to GCS
output_path = output_dataset.path + "/data.csv"
df.to_csv(output_path, index=False)
# Identify target column - assuming it's 'conversion_probability'
target_column = "conversion_probability"
feature_columns = [col for col in df.columns if col != target_column]
from collections import namedtuple
output = namedtuple("Outputs", ["features", "target"])
return output(feature_columns, target_column)
BigQuery Integration
Connects directly to our data warehouse using the Google Cloud client libraryData Extraction
Uses SQL queries with optional sampling for handling large datasetsArtifact Handling
Automatically saves datasets to Google Cloud Storage through Kubeflow's artifact managementType Safety
Returns strongly-typed outputs for feature columns and target variable
Top tip
For large datasets, consider adding sampling to your query or implementing incremental processing patterns to improve pipeline efficiency.
2. Data Preprocessing Component
Data preparation is critical for model quality. Our preprocessing component automatically handles feature engineering, missing values, and data splitting to ensure reproducible results.
Preprocessing Steps:
- • Feature type detection
- • Missing value imputation
- • Numeric feature scaling
- • Categorical encoding
- • Train/test splitting
Advantages:
- • Consistent transformations
- • Reproducible feature engineering
- • Automatic feature handling
- • Pipeline serialization for deployment
@component(
packages_to_install=["pandas", "scikit-learn", "pyarrow"],
base_image="python:3.9"
)
def preprocess_data(
input_dataset: Input[Dataset],
features: List[str],
target: str,
train_dataset: Output[Dataset],
test_dataset: Output[Dataset],
preprocessor: Output[Model]
) -> Dict[str, List[str]]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
import pickle
# Load data
df = pd.read_csv(input_dataset.path + "/data.csv")
# Split features and target
X = df[features]
y = df[target]
# Identify numeric and categorical columns
numeric_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()
# Define preprocessing for numeric columns
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# Define preprocessing for categorical columns
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
# Combine preprocessing steps
preprocessor_obj = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Fit the preprocessor
preprocessor_obj.fit(X_train)
# Transform the data
X_train_transformed = preprocessor_obj.transform(X_train)
X_test_transformed = preprocessor_obj.transform(X_test)
# Save preprocessed data
if hasattr(X_train_transformed, "toarray"):
train_data = np.hstack((X_train_transformed.toarray(), y_train.values.reshape(-1, 1)))
test_data = np.hstack((X_test_transformed.toarray(), y_test.values.reshape(-1, 1)))
else:
train_data = np.hstack((X_train_transformed, y_train.values.reshape(-1, 1)))
test_data = np.hstack((X_test_transformed, y_test.values.reshape(-1, 1)))
# Save datasets and preprocessor
np.save(train_dataset.path + "/train_data.npy", train_data)
np.save(test_dataset.path + "/test_data.npy", test_data)
with open(preprocessor.path + "/preprocessor.pkl", "wb") as f:
pickle.dump(preprocessor_obj, f)
# Get feature names after preprocessing
all_feature_names = []
all_feature_names.extend(numeric_features)
# Get one-hot encoded feature names
if categorical_features:
ohe = preprocessor_obj.named_transformers_['cat'].named_steps['onehot']
categories = ohe.categories_
for i, category in enumerate(categories):
cat_name = categorical_features[i]
for cat_value in category:
all_feature_names.append(f"{cat_name}_{cat_value}")
return {"preprocessed_features": all_feature_names}
Top tip
Always save your preprocessor alongside your model. The feature engineering pipeline is just as important as the model itself for reproducibility. In production, you'll need both to transform raw data into the exact same format used during training.
3. Model Training Component
With our data prepared, we train an XGBoost regression model optimized for lead scoring. Our component not only trains the model but also extracts key feature importance metrics to help understand what drives predictions.
XGBoost Advantages
- • Handles complex relationships
- • Feature importance insights
- • Robust to missing values
- • Regularization to prevent overfitting
- • Efficient training on large datasets
Hyperparameters
- •
objective: 'reg:squarederror'
- •
n_estimators: 100
- •
max_depth: 6
- •
learning_rate: 0.1
- •
subsample: 0.8
- •
colsample_bytree: 0.8
@component(
packages_to_install=["xgboost", "numpy", "scikit-learn"],
base_image="python:3.9"
)
def train_model(
train_dataset: Input[Dataset],
preprocessed_features: List[str],
model: Output[Model]
) -> Dict[str, str]:
import numpy as np
import xgboost as xgb
import pickle
# Load the training data
train_data = np.load(train_dataset.path + "/train_data.npy")
# Split back into features and target
X_train = train_data[:, :-1] # All columns except the last
y_train = train_data[:, -1] # Last column is the target
# Train XGBoost model
xgb_model = xgb.XGBRegressor(
objective='reg:squarederror',
n_estimators=100,
max_depth=6,
learning_rate=0.1,
subsample=0.8,
colsample_bytree=0.8,
random_state=42
)
xgb_model.fit(X_train, y_train)
# Save the model
with open(model.path + "/model.pkl", "wb") as f:
pickle.dump(xgb_model, f)
# Get feature importance
feature_importance = xgb_model.feature_importances_
feature_importance_dict = {feature: importance for feature, importance in zip(preprocessed_features, feature_importance)}
# Get top 5 features by importance
top_features = sorted(feature_importance_dict.items(), key=lambda x: x[1], reverse=True)[:5]
top_features_str = ", ".join([f"{feature}: {importance:.4f}" for feature, importance in top_features])
return {"top_features": top_features_str}
Why XGBoost for Lead Scoring?
XGBoost excels at predicting lead conversion probability because it can capture non-linear relationships between features like website behavior, demographic data, and historical interactions.
Its built-in feature importance metrics also provide valuable insights into which factors most strongly influence conversion likelihood, helping sales teams understand what drives customer decisions.
4. Model Evaluation Component
Rigorous evaluation is essential for any production ML system. Our evaluation component calculates key performance metrics and integrates with Vertex AI Experiments for comprehensive tracking.
Key Evaluation Metrics
@component(
packages_to_install=["numpy", "scikit-learn", "google-cloud-aiplatform"],
base_image="python:3.9"
)
def evaluate_model(
test_dataset: Input[Dataset],
model: Input[Model],
metrics: Output[ClassificationMetrics],
project_id: str,
region: str,
model_display_name: str
) -> Dict[str, float]:
import numpy as np
import pickle
import json
import time
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from google.cloud import aiplatform
# Load test data and model
test_data = np.load(test_dataset.path + "/test_data.npy")
X_test = test_data[:, :-1]
y_test = test_data[:, -1]
with open(model.path + "/model.pkl", "rb") as f:
model_obj = pickle.load(f)
# Make predictions
y_pred = model_obj.predict(X_test)
# Calculate metrics
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
# Log metrics to Vertex AI Experiments
aiplatform.init(project=project_id, location=region)
# Create experiment or get existing one
experiment_name = f"{model_display_name}_experiment"
try:
experiment = aiplatform.Experiment.get(experiment_name=experiment_name)
except:
experiment = aiplatform.Experiment.create(experiment_name=experiment_name)
# Start a new run
run = experiment.start_run(run=f"run_{int(time.time())}")
# Log metrics
run.log_metrics({
"rmse": rmse,
"mae": mae,
"r2": r2
})
run.end_run()
# Save metrics for the next component
metrics_dict = {
"rmse": float(rmse),
"mae": float(mae),
"r2": float(r2)
}
with open(metrics.path + "/metrics.json", "w") as f:
json.dump(metrics_dict, f)
# Create confusion matrix data for visualization
bins = 5
y_binned = np.digitize(y_test, np.linspace(y_test.min(), y_test.max(), bins))
y_pred_binned = np.digitize(y_pred, np.linspace(y_pred.min(), y_pred.max(), bins))
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(y_binned, y_pred_binned, labels=range(1, bins+1))
# Record metrics for classification visualizations
metrics.log_confusion_matrix(
["bin_" + str(i) for i in range(1, bins+1)],
cm.tolist()
)
return metrics_dict
Top tip
In Vertex AI, using the Experiments API lets you track all training runs centrally. This provides a historical record of model performance, hyperparameters, and training datasets, making it easier to reproduce results and understand model improvements over time.
Benefits of Integration with Vertex AI Experiments
5. Model Registry Component
This is where our MLOps setup really shines. The registry component implements continuous delivery for ML models with automated promotion and semantic versioning.
If better
Promote
Automated Model Registry and Deployment Flow
GitOps for Machine Learning
@component(
packages_to_install=["google-cloud-aiplatform", "semantic-version"],
base_image="python:3.9",
files={"predictor.py": "predictor.py"} # Reference the external file
)
def register_model(
model: Input[Model],
preprocessor: Input[Model],
metrics: Input[ClassificationMetrics],
project_id: str,
region: str,
model_display_name: str
) -> Dict[str, str]:
import os
import json
import shutil
import time
from google.cloud import aiplatform
from semantic_version import Version
# Initialize Vertex AI
aiplatform.init(project=project_id, location=region)
# Load metrics from the evaluation step
with open(metrics.path + "/metrics.json", "r") as f:
current_metrics = json.load(f)
# Get current RMSE
current_rmse = current_metrics["rmse"]
# Check if there's an existing model with "production" tag
filter_string = f'display_name="{model_display_name}" AND labels.deployed="production"'
models = aiplatform.Model.list(filter=filter_string)
deployment_decision = "not_deployed"
current_version = "0.0.0"
if models:
# Get the existing model's metrics
existing_model = models[0]
existing_metrics = existing_model.gca_resource.metadata.get("metrics", {})
if "rmse" in existing_metrics:
existing_rmse = float(existing_metrics["rmse"])
# Check if new model is better (lower RMSE is better)
if current_rmse < existing_rmse:
# New model is better, get the current version and increment patch
current_version = existing_model.gca_resource.metadata.get("version", "1.0.0")
v = Version(current_version)
new_version = str(Version(major=v.major, minor=v.minor, patch=v.patch+1))
deployment_decision = "deployed"
else:
# Existing model is better, keep it
deployment_decision = "not_deployed_existing_better"
return {"deployment_decision": deployment_decision, "version": current_version}
else:
# No existing production model, use 1.0.0
new_version = "1.0.0"
deployment_decision = "deployed_first_version"
if deployment_decision.startswith("deployed"):
# Create a directory to package model artifacts
model_dir = "/tmp/model_dir"
os.makedirs(model_dir, exist_ok=True)
# Copy model and preprocessor files
shutil.copy2(model.path + "/model.pkl", model_dir + "/model.pkl")
shutil.copy2(preprocessor.path + "/preprocessor.pkl", model_dir + "/preprocessor.pkl")
# Copy the predictor.py file
shutil.copy2("predictor.py", model_dir + "/predict.py")
# Upload model to Vertex AI Model Registry
model_uri = f"{model_dir}@gs://{project_id}-model-artifacts"
# Register the model
registered_model = aiplatform.Model.upload(
display_name=model_display_name,
artifact_uri=model_uri,
serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
serving_container_predict_route="/predict",
serving_container_health_route="/health",
serving_container_environment_variables={
"PREDICT_ROUTE": "/predict",
"HEALTH_ROUTE": "/health"
},
sync=True
)
# Add metadata and labels
registered_model.update(
labels={"deployed": "production"},
metadata={
"rmse": str(current_rmse),
"mae": str(current_metrics["mae"]),
"r2": str(current_metrics["r2"]),
"version": new_version,
"framework": "xgboost",
"created_at": str(int(time.time()))
}
)
return {
"deployment_decision": deployment_decision,
"version": new_version,
"model_id": registered_model.resource_name
}
else:
return {"deployment_decision": deployment_decision, "version": current_version}
Model Serving: predictor.py
This file handles the model serving logic, ensuring both model and preprocessor are loaded correctly.
import pickle
import numpy as np
import pandas as pd
from google.cloud import storage
import os
class LeadScoringPredictor:
def __init__(self):
self.model = None
self.preprocessor = None
def load(self):
with open('/tmp/model.pkl', 'rb') as f:
self.model = pickle.load(f)
with open('/tmp/preprocessor.pkl', 'rb') as f:
self.preprocessor = pickle.load(f)
return self
def predict(self, instances):
# Convert instances to the format expected by the preprocessor
input_df = pd.DataFrame(instances)
# Preprocess the input data
X_processed = self.preprocessor.transform(input_df)
# Make predictions
predictions = self.model.predict(X_processed)
# Return predictions
return predictions.tolist()
Top tip
Use Vertex AI Model Registry's metadata and labeling features to store model performance metrics, training dataset information, and deployment status. The decision to deploy is data-driven and automated, eliminating human error and reducing deployment friction, while providing a comprehensive audit trail for compliance purposes.
6. Main Pipeline Definition
Now let's stitch all these components together into a cohesive pipeline. The pipeline definition creates a directed acyclic graph (DAG) that shows exactly how data flows through our system.
Pipeline Orchestration Benefits
Pipeline components and their relationships are defined in a clear, declarative way that makes the ML workflow explicit and reproducible
Components execute only when their dependencies have completed successfully, ensuring data integrity throughout the process
All intermediate artifacts are stored in GCS, allowing for easier debugging, lineage tracking, and reproducibility
Pipeline can be triggered manually, scheduled, or executed as part of a larger CI/CD process
@pipeline(
name="lead-scoring-pipeline",
pipeline_root=PIPELINE_ROOT
)
def lead_scoring_pipeline(
project_id: str = PROJECT_ID,
region: str = REGION,
bq_dataset: str = BQ_DATASET,
bq_table: str = BQ_TABLE,
model_display_name: str = MODEL_DISPLAY_NAME
):
# Run the ingestion component
ingest_task = ingest_data(
project_id=project_id,
dataset_id=bq_dataset,
table_id=bq_table
)
# Run the preprocessing component
preprocess_task = preprocess_data(
input_dataset=ingest_task.outputs["output_dataset"],
features=ingest_task.outputs["features"],
target=ingest_task.outputs["target"]
)
# Run the training component
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
preprocessed_features=preprocess_task.outputs["preprocessed_features"]
)
# Run the evaluation component
eval_task = evaluate_model(
test_dataset=preprocess_task.outputs["test_dataset"],
model=train_task.outputs["model"],
project_id=project_id,
region=region,
model_display_name=model_display_name
)
# Run the registration component
register_task = register_model(
model=train_task.outputs["model"],
preprocessor=preprocess_task.outputs["preprocessor"],
metrics=eval_task.outputs["metrics"],
project_id=project_id,
region=region,
model_display_name=model_display_name
)
Key Pipeline Features
✓ Automatic dependency resolution based on inputs and outputs
✓ Parameterized execution for flexibility
✓ Strongly-typed interfaces between components
✓ Clear data and control flow
✓ Default parameters for quick execution
7. Compiling and Running the Pipeline
With our pipeline defined, we can compile it to a JSON representation that Vertex AI Pipelines can execute. This separation of definition and execution is a key part of MLOps best practices.
Pipeline Execution Flow
def compile_and_run_pipeline():
# Compile the pipeline
compiler.Compiler().compile(
pipeline_func=lead_scoring_pipeline,
package_path="lead_scoring_pipeline.json"
)
# Initialize Vertex AI
aiplatform.init(project=PROJECT_ID, location=REGION)
# Create a pipeline job
pipeline_job = aiplatform.PipelineJob(
display_name="lead-scoring-pipeline",
template_path="lead_scoring_pipeline.json",
pipeline_root=PIPELINE_ROOT,
enable_caching=True
)
# Run the pipeline
pipeline_job.run()
return pipeline_job
# Execute the pipeline
if __name__ == "__main__":
job = compile_and_run_pipeline()
print(f"Pipeline job: {job.display_name} started")
Execution Options
⟳ Manual execution - Run on demand via script or UI
⟳ Scheduled execution - Regular retraining with Cloud Scheduler
⟳ Event-driven - Trigger on new data in BigQuery
⟳ CI/CD integration - Pipeline as part of deployment
Performance Optimizations
⚡ Component caching - Skip recomputation of unchanged steps
⚡ Custom resources - Allocate appropriate CPU/RAM/GPU per component
⚡ Parallelization - Independent steps run in parallel
⚡ Artifact optimization - Efficient data passing between components
8. Model Deployment with Terraform
For the infrastructure piece, we use Terraform to deploy our models to production in a consistent, versioned, and auditable way. This infrastructure-as-code approach gives us confidence that our deployment environment is reproducible and properly configured.
Infrastructure as Code Benefits
All infrastructure changes are tracked in Git alongside code
Environments can be recreated exactly as specified
Configuration drift is eliminated through declarative definitions
Deployments can be fully automated in CI/CD pipelines
Infrastructure code serves as living documentation
Team members can review and understand deployment
# main.tf
terraform {
required_providers {
google = {
source = "hashicorp/google"
version = "4.75.0"
}
}
}
provider "google" {
project = var.project_id
region = var.region
}
# Get the production model
data "google_vertex_ai_models" "production_model" {
filter = "labels.deployed=\"production\" AND display_name=\"${var.model_display_name}\""
}
locals {
model_id = length(data.google_vertex_ai_models.production_model.models) > 0 ? data.google_vertex_ai_models.production_model.models[0].id : ""
}
# Create endpoint
resource "google_vertex_ai_endpoint" "lead_scoring_endpoint" {
display_name = "lead-scoring-endpoint"
location = var.region
# Only create if we have a production model
count = local.model_id != "" ? 1 : 0
}
# Deploy model to endpoint
resource "google_vertex_ai_model_deployment" "lead_scoring_deployment" {
# Only create if we have a production model
count = local.model_id != "" ? 1 : 0
model = local.model_id
endpoint = google_vertex_ai_endpoint.lead_scoring_endpoint[0].id
display_name = "lead-scoring-deployment"
dedicated_resources {
machine_spec {
machine_type = "n1-standard-2"
}
min_replica_count = 1
max_replica_count = 2
}
traffic_split {
percent = 100
}
# Enable explainability
explanation_spec {
parameters {
sampled_shapley_attribution {
path_count = 10
}
}
metadata {
inputs {
visualization {
type = "FEATURE_IMPORTANCE"
}
}
}
}
}
# Setup model monitoring
resource "google_vertex_ai_model_monitoring_job" "monitoring" {
count = local.model_id != "" ? 1 : 0
display_name = "lead-scoring-monitoring"
endpoint = google_vertex_ai_endpoint.lead_scoring_endpoint[0].id
prediction_sampling_config {
sampling_strategy = "RANDOM_SAMPLING"
random_sample_config {
sample_rate_percent = 80
}
}
logging_sampling_strategy {
random_sample_config {
sample_rate = 0.8
}
}
drift_detection_config {
drift_threshold = 0.05
}
monitoring_schedule {
monitor_interval {
seconds = 86400 # Daily monitoring
}
}
alert_config {
email_alert_config {
user_emails = ["mlops-team@example.com"]
}
}
}
Terraform Configuration Highlights
Dynamic Model Selection
The configuration queries the Vertex AI Model Registry to find models tagged with "production", ensuring we always deploy the latest approved model.
Autoscaling Resources
The endpoint is configured to automatically scale between 1-2 replicas based on load, optimizing cost while maintaining performance.
Built-in Explainability
Shapley values are configured to provide feature importance data for each prediction, enhancing model transparency.
Continuous Monitoring
Automated drift detection alerts the team when model performance degrades, triggering retraining when necessary.
9. Using the Deployed Model
Once deployed, our lead scoring model is available through a REST API powered by Vertex AI Endpoints. This makes it easy to integrate with existing applications and services.
Model Prediction
def get_prediction(instances, endpoint_name="lead-scoring-endpoint"):
"""Get predictions from the deployed model."""
from google.cloud import aiplatform
# Initialize Vertex AI
aiplatform.init(project=PROJECT_ID, location=REGION)
# Get endpoint
endpoint = aiplatform.Endpoint.list(
filter=f'display_name="{endpoint_name}"'
)[0]
# Get predictions
predictions = endpoint.predict(instances=instances)
return predictions
Model Explainability
def get_explanation(instances, endpoint_name="lead-scoring-endpoint"):
"""Get explanations from the deployed model."""
from google.cloud import aiplatform
# Initialize Vertex AI
aiplatform.init(project=PROJECT_ID, location=REGION)
# Get endpoint
endpoint = aiplatform.Endpoint.list(
filter=f'display_name="{endpoint_name}"'
)[0]
# Get explanations
explanation = endpoint.explain(instances=instances)
return explanation
Example: Processing a New Lead
# Example: Get prediction for a new lead
new_lead = [
{
"company_size": 152,
"industry": "technology",
"product_interest": "enterprise",
"pages_visited": 15,
"time_on_site": 320,
"campaign_source": "google",
"prior_contacts": 2
}
]
# Get prediction
prediction = get_prediction(new_lead)
print(f"Lead score: {prediction.predictions[0][0]:.2f}")
# Get explanation
explanation = get_explanation(new_lead)
print("Feature importance:")
for attribution in explanation.explanations[0].attributions[0].feature_attributions.items():
print(f" {attribution[0]}: {attribution[1]:.4f}")
Integration Options
Integrate with frontends to provide real-time lead scoring during user interactions
Connect to CRM workflows to prioritize leads automatically when they enter the system
Optimize campaign targeting by integrating with marketing automation tools
10. Monitoring and Observability
Continuous monitoring is essential for any production ML system. Data drift, concept drift, and performance degradation can all impact model quality over time. Our MLOps pipeline includes robust monitoring to detect these issues early.
Monitoring Capabilities
Data Drift Detection
Automatically detects when input feature distributions change significantly
Performance Monitoring
Tracks model accuracy, precision, and overall quality metrics
Latency Tracking
Measures prediction latency and service performance metrics
Infrastructure Metrics
Tracks compute resource utilization and scaling efficiency
def get_monitoring_metrics(endpoint_name="lead-scoring-endpoint"):
"""Get monitoring metrics from the endpoint."""
from google.cloud import aiplatform
# Initialize Vertex AI
aiplatform.init(project=PROJECT_ID, location=REGION)
# Get endpoint
endpoint = aiplatform.Endpoint.list(
filter=f'display_name="{endpoint_name}"'
)[0]
# Get monitoring data
monitoring_stats = endpoint.monitoring_stats()
return monitoring_stats
# Check for data drift
monitoring_data = get_monitoring_metrics()
print("Feature drift metrics:")
for feature, metrics in monitoring_data["feature_metrics"].items():
print(f" {feature}: drift score = {metrics['drift_score']:.4f}")
Automated Alerting and Response
When monitoring detects issues, we can take automated actions to maintain model health:
Sends alerts to the ML team via email, Slack, or PagerDuty
Triggers pipeline execution to retrain with fresh data
Automatically reverts to previous stable version if issues are detected
Records all incidents for later analysis and process improvement
Conclusion
Our end-to-end MLOps pipeline on Google Cloud Platform has transformed how we develop, deploy, and maintain our lead scoring models. By implementing proper automation, versioning, and monitoring, we've achieved several key benefits:
Model Quality
By automating evaluation and only promoting models that outperform existing ones, we've seen an 23% improvement in lead conversion prediction accuracy and a 19% reduction in false positives.
Development Velocity
The time from model development to production deployment has been reduced from weeks to hours, allowing our data scientists to iterate quickly and test new hypotheses.
Operational Reliability
Our infrastructure-as-code approach and comprehensive monitoring have increased our model serving reliability, with automatic alerts for any potential issues.
Business Impact
By providing sales teams with accurate lead scoring, we've seen a 40% increase in conversion rates and a 28% reduction in sales cycle length, driving significant ROI.
Next Steps in Our MLOps Journey
Expanding our pipeline to support A/B testing between multiple model architectures
Implementing a central feature store to improve feature reuse and consistency
Implementing AI agents that can automatically prioritize, engage, and nurture leads based on real-time scoring and business rules
This MLOps pipeline serves as the foundation for all our machine learning projects at Sofos. By establishing these best practices, we've created a scalable platform that supports rapid innovation while maintaining the quality and reliability required for production systems.
What MLOps challenges are you facing in your organization? We're always interested in learning how other teams are solving similar problems. Drop us a line and let us know!