From Notebooks to Pipelines: Supercharge Your MLOps with Argo on Kubernetes

Yashod Perera
7 min readAug 25, 2024

--

Notebooks, workflows, and programs have been used to accomplish machine learning tasks for a long time, each with its own advantages. and disadvantages. In this post, we will deep dive into how efficiently we can utilize Kubernetes to power your ML work.

Combination of ML Ops and Kubernetes

Machine Learning or we called it AI; has started 60 years back with a niche market but recently with GenAI almost everyone is using AI. In early days people, train and test their models using notebooks then MLOps has been introduced to streamline the machine learning pipelines.

What is MLOps?

Simple scripts or notebooks are not sufficient to handle complexity of machine learning workflows where people bring continuous integration and deployment to ML world. As a result, people create automated pipelines to manage the entire ML lifecycle starting from data processing to model deployment which is the birth of MLOps.

What is Argo Workflows?

Argo workflows provides a Kubernetes native workflow engine support for step-based and Directed Acyclic Graph (DAG) workflows.

Please refer the following article to refer more granular level details.

Argo workflows can be used to run ML pipelines (or we call it workflows) by utilizing the GPU resources.

Why Move from Notebooks to Kubernetes Pipelines?

Currently people use scripts, notebooks to run their ML pipelines on GPU powered machines. Following has illustrated a simple Machine Leaning Pipeline

Simple Machine Leaning Pipeline

Problems with traditional Jupyter Notebooks and Scripts

People has been using Jupyter notebooks and Scripts for a while to execute this workflows with following issues

  • Lack of Scalability — ML pipelines may need higher resources which cannot be automatically handled by notebooks without manual interventions
  • Reproducibility — Reproducibility is a cornerstone of robust ML practices. But notebook has their stateful nature which introduced inconsistencies.
  • Resource utilization — Notebooks or Scripts cannot be scheduled each steps on different machines depend on the requirment. As an example for the above workflow we do not need a GPU powered machine for Data collection but need one for training data. Those capabilites is not included without additional programitically setup.
  • Version Control — Version controlling of pipelines is challenging without a version controlling system (Which has been solved with GitOps practices)

How Argo Workflows Addresses Those Issues

  • Scalability — In Argo each step is run as a pod where we can define the number of resources needed for each step which is scalable and also cost effective where we can utilize resources.
  • Reproducibility — Argo workflows are stateless which provides the reproducibility
  • Resource utilisation — Argo provide the facility out of the box to allocate GPU resources only for the relevant steps (which will be covered in the tutorial)
  • Additional Capabilities — Argo provides parallel processing out of the box using DAGs, Approval capabilities, conditional logics, managing dependencies, highly customisable workflows with retries etc.

Designing Your ML Pipelines on Argo

Let’s create your first ML pipeline with Argo which name is ml workflow which has 4 steps where it uses a volume to store data in between steps as follows.

Sample Code

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: ml-pipeline-
spec:
entrypoint: ml-workflow
volumeClaimTemplates:
- metadata:
name: data-volume
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 5Gi
templates:
- name: ml-workflow
dag:
tasks:
- name: data-preprocessing
template: data-preprocess
- name: model-training
template: train-model
dependencies: [data-preprocessing]
- name: model-evaluation
template: evaluate-model
dependencies: [model-training]
- name: model-storage
template: store-model
dependencies: [model-evaluation]

- name: data-preprocess
container:
image: python:3.9
command: ["python", "-c"]
args:
- |
import pandas as pd
# Load it from database and write it to a file

df = pd.read_csv('/mnt/data/input.csv')
df_cleaned = df.dropna()
df_cleaned.to_csv('/mnt/data/cleaned_data.csv', index=False)
volumeMounts:
- name: data-volume
mountPath: /mnt/data

- name: train-model
container:
image: python:3.9
command: ["python", "-c"]
args:
- |
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import joblib
import pandas as pd

# Load preprocessed data
df = pd.read_csv('/mnt/data/cleaned_data.csv')
X = df.drop('target', axis=1)
y = df['target']

# Split data and train model
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier()
model.fit(X_train, y_train)
joblib.dump(model, '/mnt/data/model.joblib')
volumeMounts:
- name: data-volume
mountPath: /mnt/data

- name: evaluate-model
container:
image: python:3.9
command: ["python", "-c"]
args:
- |
from sklearn.metrics import accuracy_score
import joblib
import pandas as pd

# Load model and test data
model = joblib.load('/mnt/data/model.joblib')
df = pd.read_csv('/mnt/data/cleaned_data.csv')
X = df.drop('target', axis=1)
y = df['target']

# Evaluate model
predictions = model.predict(X)
accuracy = accuracy_score(y, predictions)
print(f'Model accuracy: {accuracy}')
volumeMounts:
- name: data-volume
mountPath: /mnt/data

- name: store-model
container:
image: amazon/aws-cli
command: ["sh", "-c"]
args:
- aws s3 cp /mnt/data/model.joblib s3://yashod-ml/model.joblib
volumeMounts:
- name: data-volume
mountPath: /mnt/data

Hey Wait! We need to allocate dynamic resources and GPU for some steps and conditions and many more. How can we do those?

Allocate Dynamic Resources

In Argo you can simply control the number of resources you allocate for each step. In below example it shows how you can simply mention the request resources and limit.

- name: data-preprocess
container:
image: python:3.9
command: ["python", "-c"]
args:
- |
import pandas as pd
# Placeholder code for data preprocessing
df = pd.read_csv('/mnt/data/input.csv')
df_cleaned = df.dropna()
df_cleaned.to_csv('/mnt/data/cleaned_data.csv', index=False)
resources:
requests:
memory: "1Gi"
cpu: "2"
limits:
memory: "2Gi"
cpu: "4"
volumeMounts:
- name: data-volume
mountPath: /mnt/data

Allocate GPU for relevant steps

In ML some steps are GPU intensive but some can be accomplish using CPUs hence allocating GPU for the whole pipeline is costly. Hence in Argo it provides a capability to plug GPU when needed.

Prerequisites:

  • NVIDIA Device Plugin: Ensure that the NVIDIA device plugin is deployed in your Kubernetes cluster. This plugin is necessary for Kubernetes to manage GPUs.
  • GPU Nodes: Your Kubernetes cluster must have nodes with GPUs available.
- name: train-model
container:
image: tensorflow/tensorflow:2.6.0-gpu
command: ["python", "-c"]
args:
- |
from tensorflow import keras
from sklearn.model_selection import train_test_split
import pandas as pd

# Load preprocessed data
df = pd.read_csv('/mnt/data/cleaned_data.csv')
X = df.drop('target', axis=1)
y = df['target']

# Split data and train model
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = keras.Sequential([
keras.layers.Dense(128, activation='relu', input_shape=(X_train.shape[1],)),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dense(1, activation='sigmoid')
])
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.fit(X_train, y_train, epochs=10, batch_size=32)
model.save('/mnt/data/model.h5')
resources:
limits:
nvidia.com/gpu: 1 # Request 1 GPU
volumeMounts:
- name: data-volume
mountPath: /mnt/data

Conditional Training

Following example shows how we can reuse the train and test steps to train the model. Following will iterate 5 times and if accuracy is more than 75% it will upload the model to s3.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: ml-training-pipeline-
spec:
entrypoint: train-until-accuracy
templates:
- name: train-until-accuracy
steps:
- - name: train-model
template: train-model
- - name: evaluate-accuracy
template: evaluate-accuracy
arguments:
parameters:
- name: test-accuracy
value: "{{steps.train-model.outputs.parameters.test-accuracy}}"
- - name: check-accuracy
template: accuracy-loop
arguments:
parameters:
- name: test-accuracy
value: "{{steps.evaluate-accuracy.outputs.parameters.test-accuracy}}"
- name: iteration
value: "{{steps.evaluate-accuracy.outputs.parameters.iteration}}"

- name: train-model
outputs:
parameters:
- name: test-accuracy
valueFrom:
path: /mnt/data/test-accuracy
container:
image: your-ml-image:latest
command: [python]
args: ["train_model.py"]

- name: evaluate-accuracy
inputs:
parameters:
- name: test-accuracy
outputs:
parameters:
- name: iteration
valueFrom:
path: /mnt/data/iteration
- name: test-accuracy
value: "{{inputs.parameters.test-accuracy}}"
script:
image: python:3.8
command: [python]
source: |
import sys

test_accuracy = float(sys.argv[1])
iteration = int(sys.argv[2])

# Save iteration and accuracy
with open("/mnt/data/test-accuracy", "w") as f:
f.write(str(test_accuracy))

with open("/mnt/data/iteration", "w") as f:
f.write(str(iteration + 1))

- name: accuracy-loop
inputs:
parameters:
- name: test-accuracy
- name: iteration
steps:
- - name: check-and-upload
template: upload-if-accurate
arguments:
parameters:
- name: test-accuracy
value: "{{inputs.parameters.test-accuracy}}"
- - name: train-again
template: train-until-accuracy
when: "{{inputs.parameters.test-accuracy}} < 75 && {{inputs.parameters.iteration}} < 5"
arguments:
parameters:
- name: iteration
value: "{{inputs.parameters.iteration}}"

- name: upload-if-accurate
inputs:
parameters:
- name: test-accuracy
when: "{{inputs.parameters.test-accuracy}} >= 75"
container:
image: amazon/aws-cli:latest
command: [sh, -c]
args: ["aws s3 cp /path/to/your/model s3://yashod-ml/your-model-name"]

In the 2nd part of this blog will cover more advanced use cases.

Best Practices for ML Pipelines on Kubernetes

  • Containerisation and Dependency Management — make sure that models and dependencies are containerise using Docker to make sure code runs consistently across different environments
  • Resource Managements and Auto Scaling — It is important to identify the CPU, memory requests for each step and the need for GPUs. It is important to consider cluster auto scale as needed.
  • Security Best Practices—Use secrets for sensitive information.

This is the first part of From Notebooks to Pipelines which provides comprehensive tutorial series of how Argo workflows can utilize for MLOps. Hope you got the basic understanding of how Argo Workflows can be used to utilise MLOps.

Resources

If you have found this helpful please hit that 👏 and share it on social media :).

--

--

Yashod Perera

Technical Writer | Tech Enthusiast | Open source contributor