Production-Grade MLOps Pipeline: From Data to Deployment on GCP

Fabrizio Albertoni
Co-Founder / Principal ML Engineer

At Sofos, we've implemented a robust MLOps pipeline for our lead scoring model using Google Cloud Platform's powerful ML tools. This post walks through how we built an end-to-end solution that handles everything from data ingestion to model deployment with proper versioning and monitoring.

Architecture Overview

Our pipeline uses:

  • Kubeflow Pipelines for orchestration
  • BigQuery for data storage
  • Vertex AI for experiment tracking, model registry, and deployment
  • Terraform for infrastructure as code

The workflow consists of these key components:

  1. Data ingestion from BigQuery
  2. Data preprocessing and feature engineering
  3. Model training using XGBoost
  4. Model evaluation and metric logging
  5. Automated model registration and versioning
  6. Deployment to Vertex AI Endpoints with explainability and monitoring

Let's break down each component.

1. Data Ingestion Component

We start by pulling our lead scoring data from BigQuery:

@component(
    packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow"],
    base_image="python:3.9"
)
def ingest_data(
    project_id: str,
    dataset_id: str,
    table_id: str,
    output_dataset: Output[Dataset]
) -> NamedTuple("Outputs", [("features", List[str]), ("target", str)]):
    from google.cloud import bigquery
    import pandas as pd
    import os

    query = f"""
    SELECT * FROM `{project_id}.{dataset_id}.{table_id}`
    WHERE RAND() < 1.0  # Use sampling if needed for large datasets
    """

    client = bigquery.Client(project=project_id)
    df = client.query(query).to_dataframe()

    # Save to GCS
    output_path = output_dataset.path + "/data.csv"
    df.to_csv(output_path, index=False)

    # Identify target column - assuming it's 'conversion_probability'
    target_column = "conversion_probability"
    feature_columns = [col for col in df.columns if col != target_column]

    from collections import namedtuple
    output = namedtuple("Outputs", ["features", "target"])
    return output(feature_columns, target_column)

This component:

  • Connects to BigQuery and extracts data using a SQL query
  • Saves the dataset to GCS (automatically handled by Kubeflow's Output artifact)
  • Returns the feature columns and target column for downstream components
  • Uses KFP's typing system to ensure proper data flow through the pipeline

Pro tip: For large datasets, consider adding sampling to your query or implementing incremental processing patterns.

2. Data Preprocessing Component

Next, we need to prepare our data for model training:

@component(
    packages_to_install=["pandas", "scikit-learn", "pyarrow"],
    base_image="python:3.9"
)
def preprocess_data(
    input_dataset: Input[Dataset],
    features: List[str],
    target: str,
    train_dataset: Output[Dataset],
    test_dataset: Output[Dataset],
    preprocessor: Output[Model]
) -> Dict[str, List[str]]:
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler, OneHotEncoder
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    from sklearn.impute import SimpleImputer
    import pickle

    # Load data
    df = pd.read_csv(input_dataset.path + "/data.csv")

    # Split features and target
    X = df[features]
    y = df[target]

    # Identify numeric and categorical columns
    numeric_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
    categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()

    # Define preprocessing for numeric columns
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])

    # Define preprocessing for categorical columns
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])

    # Combine preprocessing steps
    preprocessor_obj = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ])

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Fit the preprocessor
    preprocessor_obj.fit(X_train)

    # Transform the data
    X_train_transformed = preprocessor_obj.transform(X_train)
    X_test_transformed = preprocessor_obj.transform(X_test)

    # Save preprocessed data
    if hasattr(X_train_transformed, "toarray"):
        train_data = np.hstack((X_train_transformed.toarray(), y_train.values.reshape(-1, 1)))
        test_data = np.hstack((X_test_transformed.toarray(), y_test.values.reshape(-1, 1)))
    else:
        train_data = np.hstack((X_train_transformed, y_train.values.reshape(-1, 1)))
        test_data = np.hstack((X_test_transformed, y_test.values.reshape(-1, 1)))

    # Save datasets and preprocessor
    np.save(train_dataset.path + "/train_data.npy", train_data)
    np.save(test_dataset.path + "/test_data.npy", test_data)
    with open(preprocessor.path + "/preprocessor.pkl", "wb") as f:
        pickle.dump(preprocessor_obj, f)

    # Get feature names after preprocessing
    all_feature_names = []
    all_feature_names.extend(numeric_features)

    # Get one-hot encoded feature names
    if categorical_features:
        ohe = preprocessor_obj.named_transformers_['cat'].named_steps['onehot']
        categories = ohe.categories_
        for i, category in enumerate(categories):
            cat_name = categorical_features[i]
            for cat_value in category:
                all_feature_names.append(f"{cat_name}_{cat_value}")

    return {"preprocessed_features": all_feature_names}

This component handles critical preprocessing tasks:

  • Automatic detection of numeric and categorical features
  • Missing value imputation (median for numeric, constant for categorical)
  • Feature scaling using StandardScaler
  • One-hot encoding for categorical features
  • Train/test split
  • Saving both data and preprocessing pipeline for reproducibility

The preprocessing pipeline is a crucial part of our MLOps setup, as it ensures consistent transformation between training and inference.

Top tip

Always save your preprocessor alongside your model. The feature engineering pipeline is just as important as the model itself for reproducibility. In production, you'll need both to transform raw data into the exact same format used during training.

3. Model Training Component

Now for the fun part—training our XGBoost regression model:

@component(
    packages_to_install=["xgboost", "numpy", "scikit-learn"],
    base_image="python:3.9"
)
def train_model(
    train_dataset: Input[Dataset],
    preprocessed_features: List[str],
    model: Output[Model]
) -> Dict[str, str]:
    import numpy as np
    import xgboost as xgb
    import pickle

    # Load the training data
    train_data = np.load(train_dataset.path + "/train_data.npy")

    # Split back into features and target
    X_train = train_data[:, :-1]  # All columns except the last
    y_train = train_data[:, -1]   # Last column is the target

    # Train XGBoost model
    xgb_model = xgb.XGBRegressor(
        objective='reg:squarederror',
        n_estimators=100,
        max_depth=6,
        learning_rate=0.1,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=42
    )

    xgb_model.fit(X_train, y_train)

    # Save the model
    with open(model.path + "/model.pkl", "wb") as f:
        pickle.dump(xgb_model, f)

    # Get feature importance
    feature_importance = xgb_model.feature_importances_
    feature_importance_dict = {feature: importance for feature, importance in zip(preprocessed_features, feature_importance)}

    # Get top 5 features by importance
    top_features = sorted(feature_importance_dict.items(), key=lambda x: x[1], reverse=True)[:5]
    top_features_str = ", ".join([f"{feature}: {importance:.4f}" for feature, importance in top_features])

    return {"top_features": top_features_str}

Our training component:

  • Loads preprocessed training data
  • Trains an XGBoost regression model optimized for lead scoring
  • Saves the trained model as a pickle file
  • Extracts and returns feature importance information

XGBoost works well for our lead scoring use case because it handles complex relationships in the data and provides useful feature importance metrics to help us understand what's driving our predictions.

4. Model Evaluation Component

Evaluation is a key step in any ML pipeline—we need to know if our model meets performance criteria:

@component(
    packages_to_install=["numpy", "scikit-learn", "google-cloud-aiplatform"],
    base_image="python:3.9"
)
def evaluate_model(
    test_dataset: Input[Dataset],
    model: Input[Model],
    metrics: Output[ClassificationMetrics],
    project_id: str,
    region: str,
    model_display_name: str
) -> Dict[str, float]:
    import numpy as np
    import pickle
    import json
    import time
    from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
    from google.cloud import aiplatform

    # Load test data and model
    test_data = np.load(test_dataset.path + "/test_data.npy")
    X_test = test_data[:, :-1]
    y_test = test_data[:, -1]

    with open(model.path + "/model.pkl", "rb") as f:
        model_obj = pickle.load(f)

    # Make predictions
    y_pred = model_obj.predict(X_test)

    # Calculate metrics
    mse = mean_squared_error(y_test, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)

    # Log metrics to Vertex AI Experiments
    aiplatform.init(project=project_id, location=region)

    # Create experiment or get existing one
    experiment_name = f"{model_display_name}_experiment"
    try:
        experiment = aiplatform.Experiment.get(experiment_name=experiment_name)
    except:
        experiment = aiplatform.Experiment.create(experiment_name=experiment_name)

    # Start a new run
    run = experiment.start_run(run=f"run_{int(time.time())}")

    # Log metrics
    run.log_metrics({
        "rmse": rmse,
        "mae": mae,
        "r2": r2
    })

    run.end_run()

    # Save metrics for the next component
    metrics_dict = {
        "rmse": float(rmse),
        "mae": float(mae),
        "r2": float(r2)
    }

    with open(metrics.path + "/metrics.json", "w") as f:
        json.dump(metrics_dict, f)

    # Create confusion matrix data for visualization
    bins = 5
    y_binned = np.digitize(y_test, np.linspace(y_test.min(), y_test.max(), bins))
    y_pred_binned = np.digitize(y_pred, np.linspace(y_pred.min(), y_pred.max(), bins))

    from sklearn.metrics import confusion_matrix
    cm = confusion_matrix(y_binned, y_pred_binned, labels=range(1, bins+1))

    # Record metrics for classification visualizations
    metrics.log_confusion_matrix(
        ["bin_" + str(i) for i in range(1, bins+1)],
        cm.tolist()
    )

    return metrics_dict

This component:

  • Evaluates the model on our test set
  • Calculates RMSE, MAE, and R² metrics
  • Logs metrics to Vertex AI Experiments for tracking
  • Creates a pseudo-confusion matrix by binning our regression outputs
  • Saves metrics for use in later components

Top tip

In Vertex AI, using the Experiments API let's you track all training runs centrally. This provides a historical record of model performance, hyperparameters, and training datasets, making it easier to reproduce results and understand model improvements over time.

5. Model Registry Component

This is where our MLOps setup really shines. The registry component implements automated model promotion with semantic versioning:

@component(
    packages_to_install=["google-cloud-aiplatform", "semantic-version"],
    base_image="python:3.9",
    files={"predictor.py": "predictor.py"}  # Reference the external file
)
def register_model(
    model: Input[Model],
    preprocessor: Input[Model],
    metrics: Input[ClassificationMetrics],
    project_id: str,
    region: str,
    model_display_name: str
) -> Dict[str, str]:
    import os
    import json
    import shutil
    import time
    from google.cloud import aiplatform
    from semantic_version import Version

    # Initialize Vertex AI
    aiplatform.init(project=project_id, location=region)

    # Load metrics from the evaluation step
    with open(metrics.path + "/metrics.json", "r") as f:
        current_metrics = json.load(f)

    # Get current RMSE
    current_rmse = current_metrics["rmse"]

    # Check if there's an existing model with "production" tag
    filter_string = f'display_name="{model_display_name}" AND labels.deployed="production"'
    models = aiplatform.Model.list(filter=filter_string)

    deployment_decision = "not_deployed"
    current_version = "0.0.0"

    if models:
        # Get the existing model's metrics
        existing_model = models[0]
        existing_metrics = existing_model.gca_resource.metadata.get("metrics", {})
        if "rmse" in existing_metrics:
            existing_rmse = float(existing_metrics["rmse"])

            # Check if new model is better (lower RMSE is better)
            if current_rmse < existing_rmse:
                # New model is better, get the current version and increment patch
                current_version = existing_model.gca_resource.metadata.get("version", "1.0.0")
                v = Version(current_version)
                new_version = str(Version(major=v.major, minor=v.minor, patch=v.patch+1))
                deployment_decision = "deployed"
            else:
                # Existing model is better, keep it
                deployment_decision = "not_deployed_existing_better"
                return {"deployment_decision": deployment_decision, "version": current_version}
    else:
        # No existing production model, use 1.0.0
        new_version = "1.0.0"
        deployment_decision = "deployed_first_version"

    if deployment_decision.startswith("deployed"):
        # Create a directory to package model artifacts
        model_dir = "/tmp/model_dir"
        os.makedirs(model_dir, exist_ok=True)

        # Copy model and preprocessor files
        shutil.copy2(model.path + "/model.pkl", model_dir + "/model.pkl")
        shutil.copy2(preprocessor.path + "/preprocessor.pkl", model_dir + "/preprocessor.pkl")

        # Copy the predictor.py file
        shutil.copy2("predictor.py", model_dir + "/predict.py")

        # Upload model to Vertex AI Model Registry
        model_uri = f"{model_dir}@gs://{project_id}-model-artifacts"

        # Register the model
        registered_model = aiplatform.Model.upload(
            display_name=model_display_name,
            artifact_uri=model_uri,
            serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
            serving_container_predict_route="/predict",
            serving_container_health_route="/health",
            serving_container_environment_variables={
                "PREDICT_ROUTE": "/predict",
                "HEALTH_ROUTE": "/health"
            },
            sync=True
        )

        # Add metadata and labels
        registered_model.update(
            labels={"deployed": "production"},
            metadata={
                "rmse": str(current_rmse),
                "mae": str(current_metrics["mae"]),
                "r2": str(current_metrics["r2"]),
                "version": new_version,
                "framework": "xgboost",
                "created_at": str(int(time.time()))
            }
        )

        return {
            "deployment_decision": deployment_decision,
            "version": new_version,
            "model_id": registered_model.resource_name
        }
    else:
        return {"deployment_decision": deployment_decision, "version": current_version}

Let's take a closer look at our predictor.py file, which handles the model serving logic

import pickle
import numpy as np
import pandas as pd
from google.cloud import storage
import os

class LeadScoringPredictor:
    def __init__(self):
        self.model = None
        self.preprocessor = None

    def load(self):
        with open('/tmp/model.pkl', 'rb') as f:
            self.model = pickle.load(f)
        with open('/tmp/preprocessor.pkl', 'rb') as f:
            self.preprocessor = pickle.load(f)
        return self

    def predict(self, instances):
        # Convert instances to the format expected by the preprocessor
        input_df = pd.DataFrame(instances)

        # Preprocess the input data
        X_processed = self.preprocessor.transform(input_df)

        # Make predictions
        predictions = self.model.predict(X_processed)

        # Return predictions
        return predictions.tolist()

The registry component is the cornerstone of our automated MLOps pipeline, implementing GitOps-style continuous delivery for ML models:

  • It checks if the new model outperforms the production model (using RMSE)
  • If better, it uploads the model to Vertex AI's Model Registry
  • It implements proper semantic versioning (1.0.0, 1.0.1, etc.)
  • Attaches metadata including metrics and version information
  • Tags models with "production" label for easy filtering
  • Creates a custom prediction routine that bundles both model and preprocessor

Top tip

Use Vertex AI Model Registry's metadata and labeling features to store model performance metrics, training dataset information, and deployment status. The decision to deploy is data-driven and automated, eliminating human error and reducing deployment friction, while providing a comprehensive audit trail for compliance purposes.

6. Main Pipeline Definition

Now let's stitch all these components together into a cohesive pipeline:

@pipeline(
    name="lead-scoring-pipeline",
    pipeline_root=PIPELINE_ROOT
)
def lead_scoring_pipeline(
    project_id: str = PROJECT_ID,
    region: str = REGION,
    bq_dataset: str = BQ_DATASET,
    bq_table: str = BQ_TABLE,
    model_display_name: str = MODEL_DISPLAY_NAME
):
    # Run the ingestion component
    ingest_task = ingest_data(
        project_id=project_id,
        dataset_id=bq_dataset,
        table_id=bq_table
    )

    # Run the preprocessing component
    preprocess_task = preprocess_data(
        input_dataset=ingest_task.outputs["output_dataset"],
        features=ingest_task.outputs["features"],
        target=ingest_task.outputs["target"]
    )

    # Run the training component
    train_task = train_model(
        train_dataset=preprocess_task.outputs["train_dataset"],
        preprocessed_features=preprocess_task.outputs["preprocessed_features"]
    )

    # Run the evaluation component
    eval_task = evaluate_model(
        test_dataset=preprocess_task.outputs["test_dataset"],
        model=train_task.outputs["model"],
        project_id=project_id,
        region=region,
        model_display_name=model_display_name
    )

    # Run the registration component
    register_task = register_model(
        model=train_task.outputs["model"],
        preprocessor=preprocess_task.outputs["preprocessor"],
        metrics=eval_task.outputs["metrics"],
        project_id=project_id,
        region=region,
        model_display_name=model_display_name
    )

Our pipeline definition:

  • Connects all components in a logical flow
  • Handles parameter passing between components
  • Makes dependencies explicit
  • Sets default parameter values

This declarative style makes our pipeline easy to understand, modify, and maintain.

7. Compiling and Running the Pipeline

With our pipeline defined, we can compile and run it:

def compile_and_run_pipeline():
    # Compile the pipeline
    compiler.Compiler().compile(
        pipeline_func=lead_scoring_pipeline,
        package_path="lead_scoring_pipeline.json"
    )

    # Initialize Vertex AI
    aiplatform.init(project=PROJECT_ID, location=REGION)

    # Create a pipeline job
    pipeline_job = aiplatform.PipelineJob(
        display_name="lead-scoring-pipeline",
        template_path="lead_scoring_pipeline.json",
        pipeline_root=PIPELINE_ROOT,
        enable_caching=True
    )

    # Run the pipeline
    pipeline_job.run()

    return pipeline_job

# Execute the pipeline
if __name__ == "__main__":
    job = compile_and_run_pipeline()
    print(f"Pipeline job: {job.display_name} started")

The compiled pipeline can be:

  • Run manually as shown above
  • Scheduled to run periodically using Cloud Scheduler
  • Triggered by new data using Pub/Sub
  • Integrated into CI/CD workflows

8. Model Deployment with Terraform

Now for the infrastructure piece. We'll use Terraform to deploy and version our production model to a Vertex AI Endpoint:

# main.tf

terraform {
  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "4.75.0"
    }
  }
}

provider "google" {
  project = var.project_id
  region  = var.region
}

# Get the production model
data "google_vertex_ai_models" "production_model" {
  filter = "labels.deployed=\"production\" AND display_name=\"${var.model_display_name}\""
}

locals {
  model_id = length(data.google_vertex_ai_models.production_model.models) > 0 ? data.google_vertex_ai_models.production_model.models[0].id : ""
}

# Create endpoint
resource "google_vertex_ai_endpoint" "lead_scoring_endpoint" {
  display_name = "lead-scoring-endpoint"
  location     = var.region

  # Only create if we have a production model
  count        = local.model_id != "" ? 1 : 0
}

# Deploy model to endpoint
resource "google_vertex_ai_model_deployment" "lead_scoring_deployment" {
  # Only create if we have a production model
  count                = local.model_id != "" ? 1 : 0

  model                = local.model_id
  endpoint             = google_vertex_ai_endpoint.lead_scoring_endpoint[0].id
  display_name         = "lead-scoring-deployment"
  dedicated_resources {
    machine_spec {
      machine_type = "n1-standard-2"
    }
    min_replica_count = 1
    max_replica_count = 2
  }

  traffic_split {
    percent = 100
  }

  # Enable monitoring
  automatic_resources {
    min_replica_count = 1
    max_replica_count = 2
  }

  # Enable explainability
  explanation_spec {
    parameters {
      sampled_shapley_attribution {
        path_count = 10
      }
    }
    metadata {
      inputs {
        visualization {
          type = "FEATURE_IMPORTANCE"
        }
      }
    }
  }
}

# Setup model monitoring
resource "google_vertex_ai_model_monitoring_job" "monitoring" {
  count = local.model_id != "" ? 1 : 0

  display_name = "lead-scoring-monitoring"
  endpoint     = google_vertex_ai_endpoint.lead_scoring_endpoint[0].id

  prediction_sampling_config {
    sampling_strategy = "RANDOM_SAMPLING"
    random_sample_config {
      sample_rate_percent = 80
    }
  }

  logging_sampling_strategy {
    random_sample_config {
      sample_rate = 0.8
    }
  }

  drift_detection_config {
    drift_threshold = 0.05
  }

  monitoring_schedule {
    monitor_interval {
      seconds = 86400  # Daily monitoring
    }
  }

  alert_config {
    email_alert_config {
      user_emails = ["mlops-team@example.com"]
    }
  }
}

The Terraform config:

  • Queries the Vertex AI Model Registry for our production model
  • Creates a Vertex AI Endpoint
  • Deploys the latest production model to that endpoint
  • Configures compute resources with autoscaling
  • Enables explainability (Shapley values)
  • Sets up model monitoring to detect data drift
  • Configures alerts to notify the team of issues

9. Using the Deployed Model

Finally, let's look at how to interact with our deployed model:

def get_prediction(instances, endpoint_name="lead-scoring-endpoint"):
    """Get predictions from the deployed model."""
    from google.cloud import aiplatform

    # Initialize Vertex AI
    aiplatform.init(project=PROJECT_ID, location=REGION)

    # Get endpoint
    endpoint = aiplatform.Endpoint.list(
        filter=f'display_name="{endpoint_name}"'
    )[0]

    # Get predictions
    predictions = endpoint.predict(instances=instances)

    return predictions

def get_explanation(instances, endpoint_name="lead-scoring-endpoint"):
    """Get explanations from the deployed model."""
    from google.cloud import aiplatform

    # Initialize Vertex AI
    aiplatform.init(project=PROJECT_ID, location=REGION)

    # Get endpoint
    endpoint = aiplatform.Endpoint.list(
        filter=f'display_name="{endpoint_name}"'
    )[0]

    # Get explanations
    explanation = endpoint.explain(instances=instances)

    return explanation

# Example: Get prediction for a new lead
new_lead = [
    {
        "company_size": 152,
        "industry": "technology",
        "product_interest": "enterprise",
        "pages_visited": 15,
        "time_on_site": 320,
        "campaign_source": "google",
        "prior_contacts": 2
    }
]

# Get prediction
prediction = get_prediction(new_lead)
print(f"Lead score: {prediction.predictions[0][0]:.2f}")

# Get explanation
explanation = get_explanation(new_lead)
print("Feature importance:")
for attribution in explanation.explanations[0].attributions[0].feature_attributions.items():
    print(f"  {attribution[0]}: {attribution[1]:.4f}")

10. Monitoring and Observability

You can access the monitoring data through the Vertex AI console or programmatically:

def get_monitoring_metrics(endpoint_name="lead-scoring-endpoint"):
    """Get monitoring metrics from the endpoint."""
    from google.cloud import aiplatform

    # Initialize Vertex AI
    aiplatform.init(project=PROJECT_ID, location=REGION)

    # Get endpoint
    endpoint = aiplatform.Endpoint.list(
        filter=f'display_name="{endpoint_name}"'
    )[0]

    # Get monitoring data
    monitoring_stats = endpoint.monitoring_stats()

    return monitoring_stats

# Check for data drift
monitoring_data = get_monitoring_metrics()
print("Feature drift metrics:")
for feature, metrics in monitoring_data["feature_metrics"].items():
    print(f"  {feature}: drift score = {metrics['drift_score']:.4f}")

Conclusion

Our MLOps pipeline provides:

  • Automation: From data ingestion to deployment
  • Quality control: Only better models get promoted to production
  • Versioning: Proper semantic versioning for model tracking
  • Observability: Comprehensive monitoring and explainability
  • Infrastructure as code: Reproducible deployments with Terraform

This end-to-end solution has dramatically improved our lead scoring workflow, reducing the time from model development to deployment from weeks to hours, while ensuring that only high-quality models make it to production.

What MLOps challenges are you facing in your organization? Drop us a line and let us know!

More articles

Building Python Microservices in a Monorepo: A Comprehensive Guide

Learn how to effectively organize Python microservices within a GitHub monorepo, from architecture design to deployment strategies.

Read more

Ready to Transform Your Business with AI?

Join hundreds of companies already leveraging AI solutions to drive innovation and growth.