Quick Start Guide
Getting up and running with Taskiq-Flow in 5 minutes
Version: {VERSION} Prerequisites: Python 3.9+, asyncio basics
Overview
This guide will help you create your first pipelines with Taskiq-Flow. By the end, you’ll understand:
- How to set up a broker and add the PipelineMiddleware
- Defining tasks with
@broker.task - Building sequential pipelines with
.call_next(),.map(),.filter() - Running pipelines and retrieving results
- Basic dataflow pipelines with
@pipeline_task
Prerequisites
pip install taskiq taskiq-flow
For this guide, we’ll use the in-memory broker which requires no external services.
1. Basic Sequential Pipeline
1.1. Setup
Create a new Python file quickstart_basic.py:
import asyncio
from taskiq import InMemoryBroker
from taskiq_flow import Pipeline, PipelineMiddleware
# Initialize broker and add required middleware
broker = InMemoryBroker()
broker.add_middlewares(PipelineMiddleware())
1.2. Define Tasks
All functions in a pipeline must be taskiq tasks (decorated with @broker.task):
@broker.task
def add_one(value: int) -> int:
"""Add 1 to the input value."""
return value + 1
@broker.task
def repeat(value: int, times: int) -> list[int]:
"""Repeat a value multiple times."""
return [value] * times
@broker.task
def is_positive(value: int) -> bool:
"""Check if value is non-negative."""
return value >= 0
1.3. Build & Run the Pipeline
async def main():
# Build the pipeline by chaining operations
pipeline = (
Pipeline(broker)
.call_next(add_one) # Step 1: 1 → 2
.call_next(repeat, times=4) # Step 2: 2 → [2, 2, 2, 2]
.map(add_one) # Step 3: apply to each element → [3, 3, 3, 3]
.filter(is_positive) # Step 4: keep elements where result is True
)
# Kick off the pipeline with initial input
task = await pipeline.kiq(1)
# Wait for completion and retrieve the result
result = await task.wait_result()
print("Result:", result.return_value) # Output: [3, 3, 3, 3]
asyncio.run(main())
Expected output:
Result: [3, 3, 3, 3]
1.4. How It Works
| Step | Operation | Input | Output |
|---|---|---|---|
| 1 | .call_next(add_one) |
1 |
2 |
| 2 | .call_next(repeat, times=4) |
2 |
[2, 2, 2, 2] |
| 3 | .map(add_one) |
[2, 2, 2, 2] |
[3, 3, 3, 3] (parallel) |
| 4 | .filter(is_positive) |
[3, 3, 3, 3] |
[3, 3, 3, 3] (unchanged) |
Key points:
- The
PipelineMiddlewarehandles task routing; it must be added to your broker. - Each step receives the previous step’s output as input.
.map()and.filter()operate on iterable results and run elements in parallel.pipeline.kiq(initial_input)starts the pipeline and returns aTaskobject.task.wait_result()blocks until the pipeline finishes.
2. Dataflow Pipeline (Automatic DAG)
For more complex workflows, use DataflowPipeline which automatically builds a dependency graph.
2.1. Define Tasks with @pipeline_task
Mark task outputs using the @pipeline_task decorator:
from taskiq_flow import DataflowPipeline, pipeline_task
@broker.task
@pipeline_task(output="features")
def extract_audio(track_paths: list[str]) -> dict:
"""Extract audio features from tracks."""
print(f"Extracting features from {len(track_paths)} tracks...")
return {"duration": 180.0, "tempo": 120.0, "energy": 0.8}
@broker.task
@pipeline_task(output="tags")
def generate_tags(features: dict) -> list[str]:
"""Generate tags based on audio features."""
print(f"Generating tags from features: {features}")
return ["electronic", "dance", "upbeat"]
@broker.task
@pipeline_task(output="embedding")
def compute_embedding(features: dict) -> list[float]:
"""Compute vector embedding from features."""
print(f"Computing embedding from {features}")
return [0.1, 0.2, 0.3, 0.4, 0.5]
How dependency resolution works:
extract_audiodeclaresoutput="features"generate_tagshas parameterfeatures: dict→ automatically depends onextract_audiocompute_embeddingalso depends onextract_audio(samefeaturesparam)- Taskiq-Flow constructs a DAG and runs independent tasks in parallel
2.2. Build & Execute
async def main():
# Auto-build the DAG from task list
pipeline = DataflowPipeline.from_tasks(
broker,
[extract_audio, generate_tags, compute_embedding]
)
# Optional: visualize the DAG
pipeline.print_dag()
# Execute with input data (only external inputs needed)
results = await pipeline.kiq_dataflow(track_paths=["song1.mp3", "song2.mp3"])
print("Results:", results)
# Output: {
# "features": {"duration": 180.0, ...},
# "tags": ["electronic", "dance", "upbeat"],
# "embedding": [0.1, 0.2, 0.3, 0.4, 0.5]
# }
asyncio.run(main())
Sample DAG output (printed to console):
DAG Execution Order:
Level 0 (parallel): extract_audio
Level 1 (parallel): generate_tags, compute_embedding
Final outputs: features, tags, embedding
2.3. Visualizing the Pipeline
# ASCII DAG in console
pipeline.print_dag()
# JSON representation for web UIs
viz_json = pipeline.visualize()
print(viz_json)
# DOT format for Graphviz
dot = pipeline.visualize_dot()
with open("pipeline.dot", "w") as f:
f.write(dot)
# Render: dot -Tpng pipeline.dot -o pipeline.png
3. Common Patterns
3.1. Map-Reduce Pattern
Process items in parallel, then aggregate:
from taskiq_flow import MapReduce
# Map phase: process each track independently
mapped = await MapReduce.map(
broker,
process_track, # task function
track_list, # iterable of items
output="processed", # name of intermediate output
max_parallel=10 # limit concurrency
)
# Reduce phase: aggregate all results
reduced = await MapReduce.reduce(
broker,
aggregate_results, # aggregation function
mapped, # MapReduceResult object
input_name="processed", # consume the mapped output
output="final_stats"
)
print("Final:", reduced.return_value)
See examples/dataflow_audio_pipeline.py for a complete audio-processing pipeline.
3.2. Group Parallel Execution
Run multiple independent tasks simultaneously:
pipeline = Pipeline(broker)
pipeline.group(
[task_a, task_b, task_c],
param_names=["input_a", "input_b", "input_c"]
)
# Returns: [result_a, result_b, result_c]
3.3. Pipeline with Tracking
Monitor pipeline status in real-time:
from taskiq_flow import PipelineTrackingManager
tracking = PipelineTrackingManager().with_auto_storage(broker)
pipeline = Pipeline(broker).with_tracking(tracking)
task = await pipeline.kiq(data)
# Check status later
status = await tracking.get_status(pipeline.pipeline_id)
print(f"Status: {status.status}, Steps completed: {len(status.steps)}")
4. Running Example Scripts
The examples/ directory contains complete runnable demonstrations:
# Basic sequential pipeline
python examples/quickstart.py
# Tracking and monitoring
python examples/tracking_demo.py
# Scheduled pipelines (cron)
python examples/scheduled_pipeline.py
# Full dataflow DAG with map-reduce
python examples/dataflow_audio_pipeline.py
# Manual DAG construction with DataflowRegistry
python examples/registry_discovery_example.py
# WebSocket event streaming
python examples/websocket_demo.py
# REST API with FastAPI
python examples/api_example.py
5. Next Steps
With the basics under your belt, explore the deeper guides:
| Topic | Guide |
|---|---|
| Sequential & Dataflow Pipelines | Pipelines Guide |
| Dataflow Deep Dive | Dataflow Guide |
| Task definitions & decorators | Tasks Guide |
| Execution modes & error handling | Execution Guide |
| Real-time monitoring | Tracking Guide |
| Live dashboards | WebSocket Guide |
| Cron scheduling | Scheduling Guide |
| Error recovery | Retry Guide |
| Performance tuning | Performance Guide |
| REST API integration | API Guide |
| Full API reference | API Reference |
Troubleshooting
“PipelineMiddleware not found” Error
Symptom: Tasks fail with middleware errors.
Fix: Ensure PipelineMiddleware() is added to your broker before creating pipelines:
broker.add_middlewares(PipelineMiddleware()) # Must be called
“Task not found” or “Result is None”
Symptom: wait_result() returns None.
Cause: InMemoryBroker only works within the same process. For multi-worker distributed setups, use Redis or another persistent broker.
Fix: Switch to RedisStreamBroker with a shared result backend:
from taskiq_flow.broker import RedisStreamBroker
broker = RedisStreamBroker(redis_url="redis://localhost:6379")
WebSocket Connection Refused
Symptom: Client cannot connect to WebSocket server.
Fix: Ensure the WebSocket server is running and the port is accessible:
server = get_websocket_server(host="0.0.0.0", port=8765)
await server.start_server()
📚 Further Reading
- Full API Reference — Complete class and method documentation
- Example Gallery — Detailed walkthroughs of each example script
- Project README — Project overview, installation, and philosophy
Ready to dive deeper? Continue with the Pipelines Guide.