Production-Grade MLOps Pipeline: From Data to Deployment on GCP

Fabrizio Albertoni
Co-Founder / Principal ML Engineer

Production-Grade MLOps Pipeline: From Data to Deployment on GCP

MLOps Pipeline Architecture
End-to-end ML lifecycle automation

At Sofos, we've implemented a robust MLOps pipeline for our lead scoring model using Google Cloud Platform's powerful ML tools. This post walks through how we built an end-to-end solution that handles everything from data ingestion to model deployment with proper versioning and monitoring.

  • MLOps
  • Kubeflow
  • Google Cloud
  • Vertex AI
  • Terraform
  • XGBoost

Architecture Overview

MLOps Pipeline Architecture

Kubeflow
BigQuery
Vertex AI
Terraform

Our end-to-end MLOps solution integrates these key technologies to create a streamlined workflow

Orchestration
Kubeflow
Data Storage
BigQuery
Training Pipeline
Vertex
Infrastructure
Terraform

Our end-to-end MLOps solution integrates these key technologies to create a streamlined workflow that automates the entire machine learning lifecycle, from data to deployment.

  • Data Ingestion

    Extract lead scoring data from BigQuery with automatic preprocessing
  • Feature Engineering

    Transform raw data into ML-ready features with reproducible pipelines
  • Model Training

    Train XGBoost regression models with hyperparameter optimization
  • Evaluation

    Calculate performance metrics and track experiments in Vertex AI
  • Registry & Versioning

    Implement automated model promotion with semantic versioning
  • Deployment

    Provision infrastructure and deploy models using Terraform and Vertex AI

Let's break down each component of our MLOps pipeline to understand how they work together to create a production-grade system.

1. Data Ingestion Component

Our MLOps journey begins with extracting high-quality data. We designed a Kubeflow component that connects to BigQuery and pulls our lead scoring dataset with full typing support.

BigQuery

Data Extraction

Google Cloud Storage
@component(
    packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow"],
    base_image="python:3.9"
)
def ingest_data(
    project_id: str,
    dataset_id: str,
    table_id: str,
    output_dataset: Output[Dataset]
) -> NamedTuple("Outputs", [("features", List[str]), ("target", str)]):
    from google.cloud import bigquery
    import pandas as pd
    import os

    query = f"""
    SELECT * FROM `{project_id}.{dataset_id}.{table_id}`
    WHERE RAND() < 1.0  # Use sampling if needed for large datasets
    """

    client = bigquery.Client(project=project_id)
    df = client.query(query).to_dataframe()

    # Save to GCS
    output_path = output_dataset.path + "/data.csv"
    df.to_csv(output_path, index=False)

    # Identify target column - assuming it's 'conversion_probability'
    target_column = "conversion_probability"
    feature_columns = [col for col in df.columns if col != target_column]

    from collections import namedtuple
    output = namedtuple("Outputs", ["features", "target"])
    return output(feature_columns, target_column)
  • BigQuery Integration

    Connects directly to our data warehouse using the Google Cloud client library
  • Data Extraction

    Uses SQL queries with optional sampling for handling large datasets
  • Artifact Handling

    Automatically saves datasets to Google Cloud Storage through Kubeflow's artifact management
  • Type Safety

    Returns strongly-typed outputs for feature columns and target variable

Top tip

For large datasets, consider adding sampling to your query or implementing incremental processing patterns to improve pipeline efficiency.

2. Data Preprocessing Component

Data preparation is critical for model quality. Our preprocessing component automatically handles feature engineering, missing values, and data splitting to ensure reproducible results.

Preprocessing Steps:

  • • Feature type detection
  • • Missing value imputation
  • • Numeric feature scaling
  • • Categorical encoding
  • • Train/test splitting

Advantages:

  • • Consistent transformations
  • • Reproducible feature engineering
  • • Automatic feature handling
  • • Pipeline serialization for deployment
@component(
    packages_to_install=["pandas", "scikit-learn", "pyarrow"],
    base_image="python:3.9"
)
def preprocess_data(
    input_dataset: Input[Dataset],
    features: List[str],
    target: str,
    train_dataset: Output[Dataset],
    test_dataset: Output[Dataset],
    preprocessor: Output[Model]
) -> Dict[str, List[str]]:
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler, OneHotEncoder
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    from sklearn.impute import SimpleImputer
    import pickle

    # Load data
    df = pd.read_csv(input_dataset.path + "/data.csv")

    # Split features and target
    X = df[features]
    y = df[target]

    # Identify numeric and categorical columns
    numeric_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
    categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()

    # Define preprocessing for numeric columns
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])

    # Define preprocessing for categorical columns
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])

    # Combine preprocessing steps
    preprocessor_obj = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ])

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Fit the preprocessor
    preprocessor_obj.fit(X_train)

    # Transform the data
    X_train_transformed = preprocessor_obj.transform(X_train)
    X_test_transformed = preprocessor_obj.transform(X_test)

    # Save preprocessed data
    if hasattr(X_train_transformed, "toarray"):
        train_data = np.hstack((X_train_transformed.toarray(), y_train.values.reshape(-1, 1)))
        test_data = np.hstack((X_test_transformed.toarray(), y_test.values.reshape(-1, 1)))
    else:
        train_data = np.hstack((X_train_transformed, y_train.values.reshape(-1, 1)))
        test_data = np.hstack((X_test_transformed, y_test.values.reshape(-1, 1)))

    # Save datasets and preprocessor
    np.save(train_dataset.path + "/train_data.npy", train_data)
    np.save(test_dataset.path + "/test_data.npy", test_data)
    with open(preprocessor.path + "/preprocessor.pkl", "wb") as f:
        pickle.dump(preprocessor_obj, f)

    # Get feature names after preprocessing
    all_feature_names = []
    all_feature_names.extend(numeric_features)

    # Get one-hot encoded feature names
    if categorical_features:
        ohe = preprocessor_obj.named_transformers_['cat'].named_steps['onehot']
        categories = ohe.categories_
        for i, category in enumerate(categories):
            cat_name = categorical_features[i]
            for cat_value in category:
                all_feature_names.append(f"{cat_name}_{cat_value}")

    return {"preprocessed_features": all_feature_names}

Top tip

Always save your preprocessor alongside your model. The feature engineering pipeline is just as important as the model itself for reproducibility. In production, you'll need both to transform raw data into the exact same format used during training.

3. Model Training Component

With our data prepared, we train an XGBoost regression model optimized for lead scoring. Our component not only trains the model but also extracts key feature importance metrics to help understand what drives predictions.

XGBoost Advantages

  • • Handles complex relationships
  • • Feature importance insights
  • • Robust to missing values
  • • Regularization to prevent overfitting
  • • Efficient training on large datasets

Hyperparameters

  • objective: 'reg:squarederror'
  • n_estimators: 100
  • max_depth: 6
  • learning_rate: 0.1
  • subsample: 0.8
  • colsample_bytree: 0.8
@component(
    packages_to_install=["xgboost", "numpy", "scikit-learn"],
    base_image="python:3.9"
)
def train_model(
    train_dataset: Input[Dataset],
    preprocessed_features: List[str],
    model: Output[Model]
) -> Dict[str, str]:
    import numpy as np
    import xgboost as xgb
    import pickle

    # Load the training data
    train_data = np.load(train_dataset.path + "/train_data.npy")

    # Split back into features and target
    X_train = train_data[:, :-1]  # All columns except the last
    y_train = train_data[:, -1]   # Last column is the target

    # Train XGBoost model
    xgb_model = xgb.XGBRegressor(
        objective='reg:squarederror',
        n_estimators=100,
        max_depth=6,
        learning_rate=0.1,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=42
    )

    xgb_model.fit(X_train, y_train)

    # Save the model
    with open(model.path + "/model.pkl", "wb") as f:
        pickle.dump(xgb_model, f)

    # Get feature importance
    feature_importance = xgb_model.feature_importances_
    feature_importance_dict = {feature: importance for feature, importance in zip(preprocessed_features, feature_importance)}

    # Get top 5 features by importance
    top_features = sorted(feature_importance_dict.items(), key=lambda x: x[1], reverse=True)[:5]
    top_features_str = ", ".join([f"{feature}: {importance:.4f}" for feature, importance in top_features])

    return {"top_features": top_features_str}

Why XGBoost for Lead Scoring?

XGBoost excels at predicting lead conversion probability because it can capture non-linear relationships between features like website behavior, demographic data, and historical interactions.

Its built-in feature importance metrics also provide valuable insights into which factors most strongly influence conversion likelihood, helping sales teams understand what drives customer decisions.

4. Model Evaluation Component

Rigorous evaluation is essential for any production ML system. Our evaluation component calculates key performance metrics and integrates with Vertex AI Experiments for comprehensive tracking.

Key Evaluation Metrics

RMSE
Root Mean Squared Error - measures prediction accuracy
MAE
Mean Absolute Error - average magnitude of errors
Coefficient of Determination - explained variance
@component(
    packages_to_install=["numpy", "scikit-learn", "google-cloud-aiplatform"],
    base_image="python:3.9"
)
def evaluate_model(
    test_dataset: Input[Dataset],
    model: Input[Model],
    metrics: Output[ClassificationMetrics],
    project_id: str,
    region: str,
    model_display_name: str
) -> Dict[str, float]:
    import numpy as np
    import pickle
    import json
    import time
    from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
    from google.cloud import aiplatform

    # Load test data and model
    test_data = np.load(test_dataset.path + "/test_data.npy")
    X_test = test_data[:, :-1]
    y_test = test_data[:, -1]

    with open(model.path + "/model.pkl", "rb") as f:
        model_obj = pickle.load(f)

    # Make predictions
    y_pred = model_obj.predict(X_test)

    # Calculate metrics
    mse = mean_squared_error(y_test, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)

    # Log metrics to Vertex AI Experiments
    aiplatform.init(project=project_id, location=region)

    # Create experiment or get existing one
    experiment_name = f"{model_display_name}_experiment"
    try:
        experiment = aiplatform.Experiment.get(experiment_name=experiment_name)
    except:
        experiment = aiplatform.Experiment.create(experiment_name=experiment_name)

    # Start a new run
    run = experiment.start_run(run=f"run_{int(time.time())}")

    # Log metrics
    run.log_metrics({
        "rmse": rmse,
        "mae": mae,
        "r2": r2
    })

    run.end_run()

    # Save metrics for the next component
    metrics_dict = {
        "rmse": float(rmse),
        "mae": float(mae),
        "r2": float(r2)
    }

    with open(metrics.path + "/metrics.json", "w") as f:
        json.dump(metrics_dict, f)

    # Create confusion matrix data for visualization
    bins = 5
    y_binned = np.digitize(y_test, np.linspace(y_test.min(), y_test.max(), bins))
    y_pred_binned = np.digitize(y_pred, np.linspace(y_pred.min(), y_pred.max(), bins))

    from sklearn.metrics import confusion_matrix
    cm = confusion_matrix(y_binned, y_pred_binned, labels=range(1, bins+1))

    # Record metrics for classification visualizations
    metrics.log_confusion_matrix(
        ["bin_" + str(i) for i in range(1, bins+1)],
        cm.tolist()
    )

    return metrics_dict

Top tip

In Vertex AI, using the Experiments API lets you track all training runs centrally. This provides a historical record of model performance, hyperparameters, and training datasets, making it easier to reproduce results and understand model improvements over time.

Benefits of Integration with Vertex AI Experiments

Centralized Tracking
All model metrics, parameters, and datasets are stored in one central location
Historical Comparisons
Easily compare performance across different model versions
Reproducibility
All parameters needed to reproduce a model are captured automatically
Collaborative Development
Team members can view and build upon previous experiments

5. Model Registry Component

This is where our MLOps setup really shines. The registry component implements continuous delivery for ML models with automated promotion and semantic versioning.

Evaluation
Compare metrics with production model

If better

Registration
Register with semantic versioning

Promote

Deployment
Deploy to production endpoint

Automated Model Registry and Deployment Flow

GitOps for Machine Learning

Performance Comparison
Automated evaluation of new models against production baselines
Semantic Versioning
Models follow standard versioning (1.0.0, 1.0.1) for clear tracking
Metadata Attachment
Performance metrics, training dataset info, and parameters are stored with the model
@component(
    packages_to_install=["google-cloud-aiplatform", "semantic-version"],
    base_image="python:3.9",
    files={"predictor.py": "predictor.py"}  # Reference the external file
)
def register_model(
    model: Input[Model],
    preprocessor: Input[Model],
    metrics: Input[ClassificationMetrics],
    project_id: str,
    region: str,
    model_display_name: str
) -> Dict[str, str]:
    import os
    import json
    import shutil
    import time
    from google.cloud import aiplatform
    from semantic_version import Version

    # Initialize Vertex AI
    aiplatform.init(project=project_id, location=region)

    # Load metrics from the evaluation step
    with open(metrics.path + "/metrics.json", "r") as f:
        current_metrics = json.load(f)

    # Get current RMSE
    current_rmse = current_metrics["rmse"]

    # Check if there's an existing model with "production" tag
    filter_string = f'display_name="{model_display_name}" AND labels.deployed="production"'
    models = aiplatform.Model.list(filter=filter_string)

    deployment_decision = "not_deployed"
    current_version = "0.0.0"

    if models:
        # Get the existing model's metrics
        existing_model = models[0]
        existing_metrics = existing_model.gca_resource.metadata.get("metrics", {})
        if "rmse" in existing_metrics:
            existing_rmse = float(existing_metrics["rmse"])

            # Check if new model is better (lower RMSE is better)
            if current_rmse < existing_rmse:
                # New model is better, get the current version and increment patch
                current_version = existing_model.gca_resource.metadata.get("version", "1.0.0")
                v = Version(current_version)
                new_version = str(Version(major=v.major, minor=v.minor, patch=v.patch+1))
                deployment_decision = "deployed"
            else:
                # Existing model is better, keep it
                deployment_decision = "not_deployed_existing_better"
                return {"deployment_decision": deployment_decision, "version": current_version}
    else:
        # No existing production model, use 1.0.0
        new_version = "1.0.0"
        deployment_decision = "deployed_first_version"

    if deployment_decision.startswith("deployed"):
        # Create a directory to package model artifacts
        model_dir = "/tmp/model_dir"
        os.makedirs(model_dir, exist_ok=True)

        # Copy model and preprocessor files
        shutil.copy2(model.path + "/model.pkl", model_dir + "/model.pkl")
        shutil.copy2(preprocessor.path + "/preprocessor.pkl", model_dir + "/preprocessor.pkl")

        # Copy the predictor.py file
        shutil.copy2("predictor.py", model_dir + "/predict.py")

        # Upload model to Vertex AI Model Registry
        model_uri = f"{model_dir}@gs://{project_id}-model-artifacts"

        # Register the model
        registered_model = aiplatform.Model.upload(
            display_name=model_display_name,
            artifact_uri=model_uri,
            serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
            serving_container_predict_route="/predict",
            serving_container_health_route="/health",
            serving_container_environment_variables={
                "PREDICT_ROUTE": "/predict",
                "HEALTH_ROUTE": "/health"
            },
            sync=True
        )

        # Add metadata and labels
        registered_model.update(
            labels={"deployed": "production"},
            metadata={
                "rmse": str(current_rmse),
                "mae": str(current_metrics["mae"]),
                "r2": str(current_metrics["r2"]),
                "version": new_version,
                "framework": "xgboost",
                "created_at": str(int(time.time()))
            }
        )

        return {
            "deployment_decision": deployment_decision,
            "version": new_version,
            "model_id": registered_model.resource_name
        }
    else:
        return {"deployment_decision": deployment_decision, "version": current_version}

Model Serving: predictor.py

This file handles the model serving logic, ensuring both model and preprocessor are loaded correctly.

import pickle
import numpy as np
import pandas as pd
from google.cloud import storage
import os

class LeadScoringPredictor:
    def __init__(self):
        self.model = None
        self.preprocessor = None

    def load(self):
        with open('/tmp/model.pkl', 'rb') as f:
            self.model = pickle.load(f)
        with open('/tmp/preprocessor.pkl', 'rb') as f:
            self.preprocessor = pickle.load(f)
        return self

    def predict(self, instances):
        # Convert instances to the format expected by the preprocessor
        input_df = pd.DataFrame(instances)

        # Preprocess the input data
        X_processed = self.preprocessor.transform(input_df)

        # Make predictions
        predictions = self.model.predict(X_processed)

        # Return predictions
        return predictions.tolist()

Top tip

Use Vertex AI Model Registry's metadata and labeling features to store model performance metrics, training dataset information, and deployment status. The decision to deploy is data-driven and automated, eliminating human error and reducing deployment friction, while providing a comprehensive audit trail for compliance purposes.

6. Main Pipeline Definition

Now let's stitch all these components together into a cohesive pipeline. The pipeline definition creates a directed acyclic graph (DAG) that shows exactly how data flows through our system.

Pipeline Orchestration Benefits

Declarative Workflow

Pipeline components and their relationships are defined in a clear, declarative way that makes the ML workflow explicit and reproducible

Proper Dependency Management

Components execute only when their dependencies have completed successfully, ensuring data integrity throughout the process

Persistent Artifacts

All intermediate artifacts are stored in GCS, allowing for easier debugging, lineage tracking, and reproducibility

Flexible Execution

Pipeline can be triggered manually, scheduled, or executed as part of a larger CI/CD process

@pipeline(
    name="lead-scoring-pipeline",
    pipeline_root=PIPELINE_ROOT
)
def lead_scoring_pipeline(
    project_id: str = PROJECT_ID,
    region: str = REGION,
    bq_dataset: str = BQ_DATASET,
    bq_table: str = BQ_TABLE,
    model_display_name: str = MODEL_DISPLAY_NAME
):
    # Run the ingestion component
    ingest_task = ingest_data(
        project_id=project_id,
        dataset_id=bq_dataset,
        table_id=bq_table
    )

    # Run the preprocessing component
    preprocess_task = preprocess_data(
        input_dataset=ingest_task.outputs["output_dataset"],
        features=ingest_task.outputs["features"],
        target=ingest_task.outputs["target"]
    )

    # Run the training component
    train_task = train_model(
        train_dataset=preprocess_task.outputs["train_dataset"],
        preprocessed_features=preprocess_task.outputs["preprocessed_features"]
    )

    # Run the evaluation component
    eval_task = evaluate_model(
        test_dataset=preprocess_task.outputs["test_dataset"],
        model=train_task.outputs["model"],
        project_id=project_id,
        region=region,
        model_display_name=model_display_name
    )

    # Run the registration component
    register_task = register_model(
        model=train_task.outputs["model"],
        preprocessor=preprocess_task.outputs["preprocessor"],
        metrics=eval_task.outputs["metrics"],
        project_id=project_id,
        region=region,
        model_display_name=model_display_name
    )

Key Pipeline Features

  • Automatic dependency resolution based on inputs and outputs

  • Parameterized execution for flexibility

  • Strongly-typed interfaces between components

  • Clear data and control flow

  • Default parameters for quick execution

7. Compiling and Running the Pipeline

With our pipeline defined, we can compile it to a JSON representation that Vertex AI Pipelines can execute. This separation of definition and execution is a key part of MLOps best practices.

Pipeline Execution Flow

Pipeline Definition
Compilation
Vertex AI Execution
Monitoring
def compile_and_run_pipeline():
    # Compile the pipeline
    compiler.Compiler().compile(
        pipeline_func=lead_scoring_pipeline,
        package_path="lead_scoring_pipeline.json"
    )

    # Initialize Vertex AI
    aiplatform.init(project=PROJECT_ID, location=REGION)

    # Create a pipeline job
    pipeline_job = aiplatform.PipelineJob(
        display_name="lead-scoring-pipeline",
        template_path="lead_scoring_pipeline.json",
        pipeline_root=PIPELINE_ROOT,
        enable_caching=True
    )

    # Run the pipeline
    pipeline_job.run()

    return pipeline_job

# Execute the pipeline
if __name__ == "__main__":
    job = compile_and_run_pipeline()
    print(f"Pipeline job: {job.display_name} started")

Execution Options

  • Manual execution - Run on demand via script or UI

  • Scheduled execution - Regular retraining with Cloud Scheduler

  • Event-driven - Trigger on new data in BigQuery

  • CI/CD integration - Pipeline as part of deployment

Performance Optimizations

  • Component caching - Skip recomputation of unchanged steps

  • Custom resources - Allocate appropriate CPU/RAM/GPU per component

  • Parallelization - Independent steps run in parallel

  • Artifact optimization - Efficient data passing between components

8. Model Deployment with Terraform

For the infrastructure piece, we use Terraform to deploy our models to production in a consistent, versioned, and auditable way. This infrastructure-as-code approach gives us confidence that our deployment environment is reproducible and properly configured.

Infrastructure as Code Benefits

Version Control

All infrastructure changes are tracked in Git alongside code

Reproducibility

Environments can be recreated exactly as specified

Drift Prevention

Configuration drift is eliminated through declarative definitions

Automation

Deployments can be fully automated in CI/CD pipelines

Documentation

Infrastructure code serves as living documentation

Collaboration

Team members can review and understand deployment

# main.tf

terraform {
  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "4.75.0"
    }
  }
}

provider "google" {
  project = var.project_id
  region  = var.region
}

# Get the production model
data "google_vertex_ai_models" "production_model" {
  filter = "labels.deployed=\"production\" AND display_name=\"${var.model_display_name}\""
}

locals {
  model_id = length(data.google_vertex_ai_models.production_model.models) > 0 ? data.google_vertex_ai_models.production_model.models[0].id : ""
}

# Create endpoint
resource "google_vertex_ai_endpoint" "lead_scoring_endpoint" {
  display_name = "lead-scoring-endpoint"
  location     = var.region

  # Only create if we have a production model
  count        = local.model_id != "" ? 1 : 0
}

# Deploy model to endpoint
resource "google_vertex_ai_model_deployment" "lead_scoring_deployment" {
  # Only create if we have a production model
  count                = local.model_id != "" ? 1 : 0

  model                = local.model_id
  endpoint             = google_vertex_ai_endpoint.lead_scoring_endpoint[0].id
  display_name         = "lead-scoring-deployment"
  dedicated_resources {
    machine_spec {
      machine_type = "n1-standard-2"
    }
    min_replica_count = 1
    max_replica_count = 2
  }

  traffic_split {
    percent = 100
  }

  # Enable explainability
  explanation_spec {
    parameters {
      sampled_shapley_attribution {
        path_count = 10
      }
    }
    metadata {
      inputs {
        visualization {
          type = "FEATURE_IMPORTANCE"
        }
      }
    }
  }
}

# Setup model monitoring
resource "google_vertex_ai_model_monitoring_job" "monitoring" {
  count = local.model_id != "" ? 1 : 0

  display_name = "lead-scoring-monitoring"
  endpoint     = google_vertex_ai_endpoint.lead_scoring_endpoint[0].id

  prediction_sampling_config {
    sampling_strategy = "RANDOM_SAMPLING"
    random_sample_config {
      sample_rate_percent = 80
    }
  }

  logging_sampling_strategy {
    random_sample_config {
      sample_rate = 0.8
    }
  }

  drift_detection_config {
    drift_threshold = 0.05
  }

  monitoring_schedule {
    monitor_interval {
      seconds = 86400  # Daily monitoring
    }
  }

  alert_config {
    email_alert_config {
      user_emails = ["mlops-team@example.com"]
    }
  }
}

Terraform Configuration Highlights

Dynamic Model Selection

The configuration queries the Vertex AI Model Registry to find models tagged with "production", ensuring we always deploy the latest approved model.

Autoscaling Resources

The endpoint is configured to automatically scale between 1-2 replicas based on load, optimizing cost while maintaining performance.

Built-in Explainability

Shapley values are configured to provide feature importance data for each prediction, enhancing model transparency.

Continuous Monitoring

Automated drift detection alerts the team when model performance degrades, triggering retraining when necessary.

9. Using the Deployed Model

Once deployed, our lead scoring model is available through a REST API powered by Vertex AI Endpoints. This makes it easy to integrate with existing applications and services.

Model Prediction

def get_prediction(instances, endpoint_name="lead-scoring-endpoint"):
    """Get predictions from the deployed model."""
    from google.cloud import aiplatform

    # Initialize Vertex AI
    aiplatform.init(project=PROJECT_ID, location=REGION)

    # Get endpoint
    endpoint = aiplatform.Endpoint.list(
        filter=f'display_name="{endpoint_name}"'
    )[0]

    # Get predictions
    predictions = endpoint.predict(instances=instances)

    return predictions

Model Explainability

def get_explanation(instances, endpoint_name="lead-scoring-endpoint"):
    """Get explanations from the deployed model."""
    from google.cloud import aiplatform

    # Initialize Vertex AI
    aiplatform.init(project=PROJECT_ID, location=REGION)

    # Get endpoint
    endpoint = aiplatform.Endpoint.list(
        filter=f'display_name="{endpoint_name}"'
    )[0]

    # Get explanations
    explanation = endpoint.explain(instances=instances)

    return explanation

Example: Processing a New Lead

# Example: Get prediction for a new lead
new_lead = [
    {
        "company_size": 152,
        "industry": "technology",
        "product_interest": "enterprise",
        "pages_visited": 15,
        "time_on_site": 320,
        "campaign_source": "google",
        "prior_contacts": 2
    }
]

# Get prediction
prediction = get_prediction(new_lead)
print(f"Lead score: {prediction.predictions[0][0]:.2f}")

# Get explanation
explanation = get_explanation(new_lead)
print("Feature importance:")
for attribution in explanation.explanations[0].attributions[0].feature_attributions.items():
    print(f"  {attribution[0]}: {attribution[1]:.4f}")

Integration Options

Web Applications

Integrate with frontends to provide real-time lead scoring during user interactions

CRM Systems

Connect to CRM workflows to prioritize leads automatically when they enter the system

Marketing Platforms

Optimize campaign targeting by integrating with marketing automation tools

10. Monitoring and Observability

Continuous monitoring is essential for any production ML system. Data drift, concept drift, and performance degradation can all impact model quality over time. Our MLOps pipeline includes robust monitoring to detect these issues early.

Monitoring Capabilities

Data Drift Detection

Automatically detects when input feature distributions change significantly

Performance Monitoring

Tracks model accuracy, precision, and overall quality metrics

Latency Tracking

Measures prediction latency and service performance metrics

Infrastructure Metrics

Tracks compute resource utilization and scaling efficiency

def get_monitoring_metrics(endpoint_name="lead-scoring-endpoint"):
    """Get monitoring metrics from the endpoint."""
    from google.cloud import aiplatform

    # Initialize Vertex AI
    aiplatform.init(project=PROJECT_ID, location=REGION)

    # Get endpoint
    endpoint = aiplatform.Endpoint.list(
        filter=f'display_name="{endpoint_name}"'
    )[0]

    # Get monitoring data
    monitoring_stats = endpoint.monitoring_stats()

    return monitoring_stats

# Check for data drift
monitoring_data = get_monitoring_metrics()
print("Feature drift metrics:")
for feature, metrics in monitoring_data["feature_metrics"].items():
    print(f"  {feature}: drift score = {metrics['drift_score']:.4f}")

Automated Alerting and Response

When monitoring detects issues, we can take automated actions to maintain model health:

1
Alert Notification

Sends alerts to the ML team via email, Slack, or PagerDuty

2
Automatic Retraining

Triggers pipeline execution to retrain with fresh data

3
Rollback Capability

Automatically reverts to previous stable version if issues are detected

4
Incident Logging

Records all incidents for later analysis and process improvement

Conclusion

Our end-to-end MLOps pipeline on Google Cloud Platform has transformed how we develop, deploy, and maintain our lead scoring models. By implementing proper automation, versioning, and monitoring, we've achieved several key benefits:

Model Quality

By automating evaluation and only promoting models that outperform existing ones, we've seen an 23% improvement in lead conversion prediction accuracy and a 19% reduction in false positives.

Development Velocity

The time from model development to production deployment has been reduced from weeks to hours, allowing our data scientists to iterate quickly and test new hypotheses.

Operational Reliability

Our infrastructure-as-code approach and comprehensive monitoring have increased our model serving reliability, with automatic alerts for any potential issues.

Business Impact

By providing sales teams with accurate lead scoring, we've seen a 40% increase in conversion rates and a 28% reduction in sales cycle length, driving significant ROI.

Next Steps in Our MLOps Journey

Multi-Model Experiments

Expanding our pipeline to support A/B testing between multiple model architectures

Feature Store Integration

Implementing a central feature store to improve feature reuse and consistency

Autonomous Agents

Implementing AI agents that can automatically prioritize, engage, and nurture leads based on real-time scoring and business rules

This MLOps pipeline serves as the foundation for all our machine learning projects at Sofos. By establishing these best practices, we've created a scalable platform that supports rapid innovation while maintaining the quality and reliability required for production systems.

What MLOps challenges are you facing in your organization? We're always interested in learning how other teams are solving similar problems. Drop us a line and let us know!

Fabrizio Albertoni

Co-Founder / Principal ML Engineer at Sofos

Connect on LinkedIn

More articles

Building Python Microservices in a Monorepo: A Comprehensive Guide

Learn how to effectively organize Python microservices within a GitHub monorepo, from architecture design to deployment strategies.

Read more

𝐁𝐨𝐨𝐤 𝐲𝐨𝐮𝐫 𝐅𝐑𝐄𝐄 𝐀𝐈 𝐫𝐨𝐚𝐝𝐦𝐚𝐩 𝐜𝐚𝐥𝐥

and discover the first use-case we'd tackle for your business.

Tailored Strategy

Custom AI roadmap designed specifically for your industry and goals

Quick Wins

Identify immediate opportunities to implement AI in your workflow

ROI Focused

Clear path to measurable business value and cost savings

★★★★★20+ Happy Clients
No Commitment Required
30-Min Session

⏰ Limited spots available this month