Category: Builder's Corner
Keyword: Prefect AI workflow tutorial
AI projects require robust workflow orchestration to ensure data pipelines, model training, and deployment steps run reliably and efficiently. Prefect has emerged as a popular tool for orchestrating complex AI workflows, offering flexibility, Pythonic syntax, and strong observability features. In this deep dive, you'll learn how to build a custom AI workflow using Prefect, with hands-on, reproducible steps. For a broader comparison of orchestration tools, see our guide to AI workflow orchestration tools.
Prerequisites
- Python 3.8+ (tested with Python 3.10)
- pip (Python package manager)
- Git (for code versioning, optional but recommended)
- Basic knowledge of:
- Python scripting and functions
- Machine learning workflow concepts (e.g., data loading, preprocessing, model training)
- Terminal/command-line usage
- Optional: Docker (for advanced deployment scenarios)
- Set Up Your Development Environment
- Design Your AI Workflow
- Download a dataset
- Preprocess data
- Train a model
- Evaluate the model
- Store results
- Write Your Prefect Tasks and Flow
- Each step is a separate
@task, making it observable and retryable in Prefect. - The
@flowfunction orchestrates the task execution. - Results are saved to a file for reproducibility.
- Run and Monitor Your Workflow Locally
- Parameterize Your Workflow for Flexibility
- Add Error Handling and Retries
- Integrate with External Services
- Schedule and Deploy Your Workflow
- Monitor and Debug with Prefect UI
- Version Control and Collaboration
First, ensure you have Python and pip installed. You can check your versions:
python --version pip --version
If you need to install Python, download it from python.org.
It's best practice to use a virtual environment to isolate your dependencies:
python -m venv .venv source .venv/bin/activate # On Windows: .venv\Scripts\activate
Next, install Prefect (latest version as of June 2024 is 2.14.x):
pip install "prefect>=2.14"
For this tutorial, we'll also use scikit-learn for a simple AI task:
pip install scikit-learn pandas
Confirm installation:
python -c "import prefect; print(prefect.__version__)"
Tip: If you plan to use Prefect Cloud for orchestration and monitoring, sign up at Prefect Cloud.
Before coding, sketch out your workflow. For this tutorial, we'll orchestrate a simple ML pipeline:
Prefect models workflows as flows (the pipeline) and tasks (the steps). Each task is a Python function decorated with @task, and the flow is decorated with @flow.
Create a new file called ai_workflow.py:
from prefect import flow, task
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
@task
def load_data():
iris = load_iris(as_frame=True)
df = iris.frame
return df
@task
def preprocess_data(df):
X = df.drop("target", axis=1)
y = df["target"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
return X_train, X_test, y_train, y_test
@task
def train_model(X_train, y_train):
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train, y_train)
return clf
@task
def evaluate_model(clf, X_test, y_test):
y_pred = clf.predict(X_test)
acc = accuracy_score(y_test, y_pred)
return acc
@task
def store_results(acc):
with open("metrics.txt", "w") as f:
f.write(f"Accuracy: {acc:.4f}\n")
return "metrics.txt"
@flow
def ai_pipeline():
df = load_data()
X_train, X_test, y_train, y_test = preprocess_data(df)
clf = train_model(X_train, y_train)
acc = evaluate_model(clf, X_test, y_test)
result_file = store_results(acc)
print(f"Workflow complete. Accuracy: {acc:.4f}. Results saved to {result_file}")
if __name__ == "__main__":
ai_pipeline()
Explanation:
You can run the workflow directly:
python ai_workflow.py
You should see output like:
Workflow complete. Accuracy: 1.0000. Results saved to metrics.txt
The metrics.txt file will contain your model's accuracy.
To visualize and monitor the workflow with Prefect's UI, start the Prefect server (for local orchestration):
prefect server start
Open http://127.0.0.1:4200/ in your browser to see the Prefect dashboard.
To register and run your flow with the Prefect orchestration engine:
prefect deployment build ai_workflow.py:ai_pipeline -n "Local AI Pipeline" prefect deployment apply ai_pipeline-deployment.yaml prefect agent start prefect deployment run "ai-pipeline/local-ai-pipeline"
This approach lets you schedule, monitor, and retry your AI workflow from the UI.
Prefect allows you to pass parameters to flows and tasks for reusable pipelines. Let's add a parameter for the test size split.
from prefect import flow, task
@task
def preprocess_data(df, test_size):
X = df.drop("target", axis=1)
y = df["target"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=42)
return X_train, X_test, y_train, y_test
@flow
def ai_pipeline(test_size: float = 0.2):
df = load_data()
X_train, X_test, y_train, y_test = preprocess_data(df, test_size)
clf = train_model(X_train, y_train)
acc = evaluate_model(clf, X_test, y_test)
result_file = store_results(acc)
print(f"Workflow complete. Accuracy: {acc:.4f}. Results saved to {result_file}")
if __name__ == "__main__":
ai_pipeline(test_size=0.3)
Now you can change the test split ratio by passing a parameter, either in code or via Prefect's UI/API.
Prefect makes it easy to add retries and timeouts to tasks. For example, if load_data could fail, add:
from prefect import task
@task(retries=3, retry_delay_seconds=10)
def load_data():
# ... as before
This will retry the task up to 3 times with a 10-second delay between attempts.
Prefect tasks can interact with databases, S3 buckets, APIs, or ML model registries. For example, to store results in S3:
import boto3
from prefect import task
@task
def store_results_s3(acc, bucket, key):
s3 = boto3.client("s3")
s3.put_object(Bucket=bucket, Key=key, Body=f"Accuracy: {acc:.4f}\n")
return f"s3://{bucket}/{key}"
You can then call this task in your flow, passing the bucket and key as parameters.
Prefect supports flexible scheduling. For example, to run your workflow daily:
prefect deployment build ai_workflow.py:ai_pipeline -n "Daily AI Pipeline" --cron "0 8 * * *" prefect deployment apply ai_pipeline-deployment.yaml
This schedules your workflow to run every day at 8:00 AM UTC. You can manage schedules and monitor runs via the Prefect UI.
The Prefect dashboard shows flow runs, task statuses, logs, and error traces. If a task fails, you can see the stack trace and retry from the UI. This observability is a key advantage of Prefect for AI workflows.
For more on how Prefect compares to other orchestration tools, see Comparing AI Workflow Orchestration Tools: Airflow, Prefect, and Beyond.
Store your workflow code in Git for reproducibility and team collaboration:
git init git add ai_workflow.py git commit -m "Initial Prefect AI workflow"
You can trigger flows on code changes or integrate with CI/CD pipelines for advanced automation.
Common Issues & Troubleshooting
- Prefect not found / ImportError: Ensure your virtual environment is activated and Prefect is installed.
- Port 4200 already in use: If
prefect server startfails, stop other services using the port or change the port with--portflag. - Task retries not working: Check that your task is decorated with
@task(retries=...)and that exceptions are not being swallowed. - Deployment errors: Verify your flow function signature matches the deployment command.
- Agent not picking up flows: Ensure the agent is started in the same environment as your deployment and is registered with the correct work queue.
- External service errors (e.g., S3): Check credentials and network access. Use Prefect's
Secretblocks for managing sensitive information.
Next Steps
- Explore Prefect's blocks for managing secrets and connections to databases, cloud storage, and APIs.
- Integrate with Prefect Cloud for team-based orchestration, advanced scheduling, and cloud-native monitoring.
- Expand your workflow to include data validation, model versioning, and notifications (e.g., Slack, email).
- For a broader perspective on orchestration tools, read Comparing AI Workflow Orchestration Tools: Airflow, Prefect, and Beyond.
By following this tutorial, you've built a robust, parameterized AI workflow with Prefect—ready for real-world ML and data science projects. Prefect's Pythonic syntax, observability, and extensibility make it an excellent choice for custom AI pipelines.
