MLOps: Navigating CI/CD Pipelines with Metaflow
Written on
Metaflow: A Brief Overview
If you're seeking more insights into MLOps, consider checking out my other piece titled "An Introduction to MLOps Architecture."
Understanding Metaflow's Purpose
Initially created by Netflix, Metaflow is a Python library aimed at boosting data scientists' efficiency by simplifying the processes of developing, deploying, and managing machine learning applications.
Metaflow covers multiple aspects of the machine learning workflow, from data management to model deployment, through an intuitive API that enables scalable and effective project management. Here's how Metaflow enhances various phases of MLOps:
- Data Management: Metaflow includes built-in features for easily retrieving, storing, and managing data, seamlessly integrating with data lakes and warehouses.
- Model Development: It enables data scientists to utilize well-known tools such as Jupyter for model prototyping, while offering comprehensive version control and experiment tracking.
- Model Deployment: It facilitates deployment across different platforms, ensuring a seamless transition from model training to production with the help of workflow orchestration tools.
Constructing Metaflow Flows
Basic Workflow Structure
A Metaflow workflow is organized as a Directed Acyclic Graph (DAG) of operations, known as "flows." Each operation within the flow is referred to as a "step." Below is an example of a basic Metaflow workflow:
from metaflow import FlowSpec, step
class MinimumFlow(FlowSpec):
@step
def start(self):
self.next(self.end)@step
def end(self):
print("Flow is complete!")if __name__ == "__main__":
MinimumFlow()
In this example, the workflow consists of two steps: start and end. The @step decorator marks the functions that will be executed as steps in the workflow. The self.next method is used to indicate the subsequent step to be executed.
One execution of the flow is termed a run. Additional steps can be integrated as needed within the workflow.
Running the Workflow
To execute this workflow, you would run:
python minimum_flow.py run
This will yield the following results:
- Each run receives a unique ID for experiment tracking.
- Step execution is denoted by the step name.
- Each task's process ID (pid) is also tracked.
Combining these three identifiers allows for unique identification of a task in a Metaflow environment, referred to as a pathspec.
Advanced ML Flow
Now, let's look at a more intricate workflow that includes steps for data loading, model training, saving, and preparing for deployment:
from metaflow import FlowSpec, step
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import joblib
import numpy as np
class DeploymentFlow(FlowSpec):
@step
def start(self):
self.next(self.load_data)@step
def load_data(self):
print("Loading and splitting data.")
X, y = np.random.rand(100, 10), np.random.randint(0, 2, 100)
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X, y, test_size=0.2, random_state=42)
self.next(self.train_model)
@step
def train_model(self):
print("Training model.")
self.model = RandomForestClassifier(random_state=42)
self.model.fit(self.X_train, self.y_train)
self.next(self.save_model)
@step
def save_model(self):
joblib.dump(self.model, 'model.joblib')
print("Model saved.")
self.next(self.prepare_deployment)
@step
def prepare_deployment(self):
print("Preparing deployment.")
self.next(self.end)
@step
def end(self):
print("Flow is complete! Model is ready for deployment.")if __name__ == "__main__":
DeploymentFlow()
To execute this more complex workflow, run:
python deployment_flow.py run
As depicted, each step is executed in accordance with the flow's structure.
Using Flow Cards
If you wish to visually inspect the data within your flow, you can utilize the @card decorator, which generates a simple report for data visualization. Here’s an illustration of how this works:
from metaflow import FlowSpec, step, card
class DataFlow(FlowSpec):
@step
def start(self):
self.next(self.data_func)@card # <-- This is where you define logging for a step
@step
def data_func(self):
self.data = [1, 2, 3]
self.next(self.end)
@step
def end(self):
print("Flow is complete!")if __name__ == "__main__":
DataFlow()
To run the flow, use:
python data_flow.py run
To view the card, execute:
python data_flow.py card view data_func
You will receive an output similar to:
Resolving card: DataFlow/1666720670441830/data_func/2
This will generate a file that you can open in your browser to see the results.
Flow Artifacts
In our machine learning workflow, we generate a substantial amount of data in the form of (meta)data, model states, hyperparameters, and metrics. All this information can be tracked, stored, and accessed during other steps in the flow or after a run. The data is stored using self and is referred to as an artifact.
from metaflow import FlowSpec, step
class ArtifactFlow(FlowSpec):
@step
def start(self):
self.next(self.create_artifact)@step
def create_artifact(self):
# Creating a dataset artifact
self.dataset = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
# Creating a metadata artifact
self.metadata_description = "created"
self.next(self.transform_artifact)
@step
def transform_artifact(self):
# Accessing and transforming these artifacts
self.dataset = [
[value * 10 for value in row]
for row in self.dataset
]
self.metadata_description = "transformed"
self.next(self.end)
@step
def end(self):
print("Artifact is in state {} with values {}".format(
self.metadata_description, self.dataset))if __name__ == "__main__":
ArtifactFlow()
Inspecting an Artifact
After a flow run, artifacts remain accessible by extracting data from the latest run. You can execute the following in a Jupyter notebook:
from metaflow import Flow
run_artifacts = Flow("ArtifactFlow").latest_run.data
run_artifacts.dataset
This will return:
[[10, 20, 30], [40, 50, 60], [70, 80, 90]]
Passing Parameters to a Flow
At times, it's necessary to pass parameters to a flow during runtime. This can occur at any stage of the MLOps workflow, such as providing hyperparameters during training. You can pass values to a metaflow.Parameter, which can be provided as arguments like python flow.py run --lr 0.001, and accessed during the flow run:
from metaflow import FlowSpec, step, Parameter
class ParameterizedFlow(FlowSpec):
learning_rate = Parameter('lr', default=.01) # <-- Get the value
@step
def start(self):
self.next(self.end)@step
def end(self):
print("Learning rate value is {}".format(self.learning_rate))if __name__ == "__main__":
ParameterizedFlow()
Conclusion
Metaflow offers a comprehensive toolkit for managing the orchestration of MLOps workflows, catering to everything from basic data handling tasks to intricate machine learning processes. Its compatibility with existing Python tools and its straightforward syntax lower the barriers to implementing effective data science solutions. For those keen on exploring Metaflow's capabilities further, the Metaflow Documentation provides extensive guides and tutorials.