Pipelines Guide
Sequential and Dataflow pipeline patterns, configurations, and best practices
Version: {VERSION} Related: Execution Guide, Tasks Guide, Dataflow Guide
Overview
Taskiq-Flow provides two main pipeline types for orchestrating task workflows:
- SequentialPipeline — Manual step chaining for linear workflows
- DataflowPipeline — Automatic DAG construction from task dependencies
For a comprehensive deep-dive into dataflow patterns, see the Dataflow Guide.
This guide explores both types, their use cases, and how to choose between them.
1. Sequential Pipeline
The classic pipeline model where you explicitly chain steps in order.
1.1. Basic Structure
from taskiq_flow import Pipeline
pipeline = (
Pipeline(broker)
.call_next(task1)
.call_next(task2)
.call_next(task3)
)
Execution: task1 → task2 → task3 (synchronously)
1.2. Available Operations
.call_next(task, *args, **kwargs)
Execute a task, passing the previous result as the first argument:
pipeline.call_next(process_data).call_next(save_result)
# process_data receives output of previous step
# save_result receives output of process_data
Parameter binding:
- By position: result becomes first argument
- By name:
pipeline.call_next(task, param_name=previous_result)
Example:
@broker.task
def multiply(value: int, factor: int) -> int:
return value * factor
pipeline.call_next(add_one).call_next(multiply, factor=3)
# add_one output → multiply(value=...) , factor=3
.call_after(task, *args, **kwargs)
Execute a task without consuming the previous result (fire-and-forget within pipeline):
pipeline.call_next(process).call_after(log_completion)
# log_completion runs after process but doesn't receive process's output
Useful for side effects (logging, notifications) that shouldn’t transform the data flow.
.map(task, max_parallel=None)
Apply a task to each element of an iterable result in parallel:
# Previous step returned: [1, 2, 3, 4]
pipeline.map(process_item)
# Runs process_item(1), process_item(2), ... concurrently
# Collects results: [processed1, processed2, ...]
Options:
max_parallel=10— limit concurrent executionsoutput_name="results"— custom output key (default: task output name)
.filter(task)
Keep elements where the task returns truthy:
# Previous step returned: [1, 2, 3, 4]
pipeline.filter(is_even)
# Keeps elements where is_even(element) returns True
# Result: [2, 4]
.group(tasks, param_names=None)
Execute multiple independent tasks in parallel, starting from the same input:
pipeline.group(
[task_a, task_b, task_c],
param_names=["x", "y", "z"] # bind input to these parameters
)
# All three tasks receive the same previous result
# Returns: [result_a, result_b, result_c]
2. Dataflow Pipeline
For a comprehensive guide on dataflow patterns, see the Dataflow Guide.
Automatic DAG construction using @pipeline_task(output=...) annotations.
2.1. Declaring Task Outputs
from taskiq_flow import pipeline_task, DataflowPipeline
@broker.task
@pipeline_task(output="features")
def extract_features(data: list[str]) -> dict:
return {"count": len(data)}
@broker.task
@pipeline_task(output="stats")
def compute_stats(features: dict) -> dict:
return {"entries": features["count"] * 2}
@broker.task
@pipeline_task(output="report")
def generate_report(stats: dict) -> str:
return f"Stats: {stats}"
Key: The output parameter declares what this task produces. Downstream tasks declare matching parameter names to consume those outputs.
2.2. Building the Pipeline
pipeline = DataflowPipeline.from_tasks(
broker,
[extract_features, compute_stats, generate_report]
)
Automatic dependency resolution:
extract_featuresproducesfeatures— no dependenciescompute_statsneedsfeatures— depends onextract_featuresgenerate_reportneedsstats— depends oncompute_stats
Resulting DAG:
extract_features → compute_stats → generate_report
2.3. Multiple Consumers
Multiple tasks can consume the same output; they’ll all wait for the producer:
@broker.task
@pipeline_task(output="features")
def extract(data): ...
@broker.task
@pipeline_task(output="tags")
def tag(features: dict): ... # consumer 1 of features
@broker.task
@pipeline_task(output="embedding")
def embed(features: dict): ... # consumer 2 of features
# Both tag and embed run in parallel after extract completes
2.4. Input Parameters
Dataflow pipelines accept external inputs via kiq_dataflow(**kwargs):
results = await pipeline.kiq_dataflow(data=["file1.mp3", "file2.mp3"])
# The `data` parameter is matched to any task needing it
# Must match a parameter name of a task with no producer (external input)
3. Pipeline Configuration
3.1. Adding Tracking
from taskiq_flow import PipelineTrackingManager
tracking = PipelineTrackingManager().with_auto_storage(broker)
pipeline = Pipeline(broker).with_tracking(tracking)
See Tracking Guide for details.
3.2. Setting a Custom Pipeline ID
pipeline.pipeline_id = "my_custom_workflow_001"
# If not set, a UUID is generated automatically
Important for tracking and WebSocket subscriptions.
3.3. Attaching Hooks (WebSocket)
from taskiq_flow.hooks import HookManager
hooks = HookManager()
pipeline = Pipeline(broker).with_hooks(hooks)
See WebSocket Guide.
3.4. Retry & Error Policies
pipeline.with_retry(
max_attempts=3,
delay=1.0,
backoff=2.0
)
pipeline.on_error("continue") # or "stop"
See Retry Guide.
3.5. Timeouts
pipeline.with_timeout(seconds=60)
4. Pipeline Lifecycle
4.1. Creation → Execution → Completion
1. pipeline = Pipeline(broker) # Create pipeline object
2. pipeline.call_next(...) # Chain steps
3. task = await pipeline.kiq(input) # Launch
4. result = await task.wait_result() # Wait & retrieve
4.2. Reuseability
Pipeline objects are single-use. For repeated execution, create a new pipeline or use the PipelineScheduler:
# Correct: Create fresh pipeline each time
async def run_workflow(data):
pipeline = Pipeline(broker).call_next(step1).call_next(step2)
return await pipeline.kiq(data)
# For recurring schedules, use PipelineScheduler
from taskiq_flow import PipelineScheduler
scheduler = PipelineScheduler(broker)
await scheduler.schedule(pipeline, cron="* * * * *")
5. Visualizing Pipelines
5.1. ASCII DAG (Console)
pipeline.print_dag()
Example output:
DAG Execution Order:
Level 0: task_a
Level 1: task_b, task_c
Level 2: task_d
5.2. JSON for Web UIs
viz = pipeline.visualize() # returns dict
print(viz)
Structure:
{
"nodes": [
{"id": "task_a", "outputs": ["x", "y"]},
{"id": "task_b", "inputs": ["x"]}
],
"edges": [{"from": "task_a", "to": "task_b"}]
}
5.3. DOT Format (Graphviz)
dot = pipeline.visualize_dot()
with open("pipeline.dot", "w") as f:
f.write(dot)
# Render: dot -Tpng pipeline.dot -o pipeline.png
Resulting diagram shows nodes, edges, and execution order.
6. Pipeline Inspection (DataflowRegistry)
For advanced use cases, manually construct and inspect the dataflow graph:
from taskiq_flow import DataflowRegistry
registry = DataflowRegistry()
# Register tasks with explicit I/O
registry.register_task(
task=load_data,
output="raw",
inputs=["source"] # external input
)
registry.register_task(
task=clean,
output="clean",
inputs=["raw"]
)
registry.register_task(
task=save,
output="saved",
inputs=["clean"]
)
# Inspect structure
print("Tasks:", [t.task_name for t in registry.get_tasks()])
print("Outputs:", registry.get_outputs()) # ["raw", "clean", "saved"]
print("External inputs:", registry.get_external_inputs()) # ["source"]
# Find dependencies
producer = registry.get_producer("clean") # returns TaskNode for 'clean'
consumers = registry.get_consumers("raw") # list of tasks needing 'raw'
# Build DAG
dag = registry.build_dag()
dag.print()
order = dag.topological_sort() # list of tasks in execution order
levels = dag.levels # list of lists (parallel groups)
See examples/registry_discovery_example.py for complete usage.
7. Choosing Between Pipeline Types
| Criteria | SequentialPipeline | DataflowPipeline |
|---|---|---|
| Workflow shape | Linear, with occasional branching | Complex DAG with many branches |
| Task dependencies | Implicit (chaining order) | Explicit (@pipeline_task) |
| Parallel needs | Manual (.group()) |
Automatic (independent tasks) |
| Flexibility | Full control over order | Declarative; library optimizes |
| Dynamic workflows | Hard (fixed at build time) | Easy (can add tasks flexibly) |
| Best for | ETL linear steps, simple batch | Audio/video processing, ML pipelines |
Rule of thumb:
- SequentialPipeline for simple, fixed-order workflows
- DataflowPipeline for complex, branched, or reusable workflows
8. Best Practices
8.1. Task Naming & Outputs
Use clear, unique output names:
@pipeline_task(output="user_features") # clear
@pipeline_task(output="features_2") # ambiguous (if multiple features exist)
8.2. Avoid Circular Dependencies
DataflowPipeline detects cycles and raises CycleError during build_dag(). Design with forward data flow only.
8.3. Minimize Shared State
Each task should be pure (output depends only on inputs) for parallel safety.
8.4. Version Pipeline IDs
Include version in pipeline IDs for tracking:
pipeline.pipeline_id = f"audio_analysis_v1_{int(time.time())}"
8.5. Use .call_after() for Side Effects
Don’t corrupt the data flow with logging/metrics:
pipeline.call_next(process).call_after(log_result) # correct
pipeline.call_next(process_and_log) # anti-pattern
8.6. Limit Parallelism for Resource-Heavy Tasks
# CPU-intensive transcoding
pipeline.map(transcode, files, max_parallel=2)
8.7. Validate DAG Before Execution
pipeline.print_dag() # Always inspect complex pipelines
input("Press Enter to execute...")
9. Common Pitfalls
| Symptom | Likely cause | Fix |
|---|---|---|
| Task runs twice | .call_next() and dependent task both declared |
Remove redundant call; Dataflow manages dependencies |
| Missing output key | @pipeline_task(output=...) doesn’t match downstream param |
Align output name with parameter name |
| All tasks sequential | Using Pipeline instead of DataflowPipeline | Switch to DataflowPipeline for automatic parallelism |
| Results None | Forgetting broker.add_middlewares(PipelineMiddleware()) |
Add middleware before creating pipelines |
| Stale pipeline reused | Attempting to call kiq() twice on same pipeline object |
Create fresh pipeline per execution |
10. Advanced Patterns
10.1. Hybrid Sequential + Dataflow
Combine both types for maximum control:
# Sequential outer shell
sequential = Pipeline(broker)
# Inside a step, spawn a dataflow sub-pipeline
@broker.task
async def process_batch(data: list) -> dict:
sub_pipeline = DataflowPipeline.from_tasks(
broker,
[subtask1, subtask2, subtask3]
)
return await sub_pipeline.kiq_dataflow(data=data)
sequential.call_next(process_batch).call_next(finalize)
10.2. Dynamic Pipeline Construction
Build pipelines at runtime based on configuration:
def build_pipeline(config: dict) -> Pipeline:
steps = []
if config.get("preprocess"):
steps.append(preprocess_task)
if config.get("analyze"):
steps.append(analyze_task)
# ...
pipeline = Pipeline(broker)
for step in steps:
pipeline.call_next(step)
return pipeline
10.3. Conditional Branching
Use .filter() and condition steps:
high_value = pipeline.filter(is_high_value)
high_value.call_next(premium_processing)
low_value = pipeline.filter(is_low_value)
low_value.call_next(standard_processing)
# Merge back
merged = high_value.group([premium_processing, standard_processing])
See steps/condition.py for IfStep.
11. Summary Checklist
Before running a pipeline, verify:
- Pipeline type chosen appropriately (Sequential vs Dataflow)
- All functions decorated with
@broker.task - Dataflow: all relevant tasks decorated with
@pipeline_task(output=…) - Output names match downstream parameter names exactly
PipelineMiddlewareadded to brokerpipeline_idset if tracking/WebSocket needed- DAG inspected with
print_dag()for complex workflows - Parallelism limits (
max_parallel) set appropriately - Timeouts configured for long-running tasks
- Example run completed successfully before production use
Further Reading
- Execution Guide — How pipelines run, error handling, timeouts
- Tasks Guide — Writing task functions and decorators
- Examples — End-to-end pipeline demonstrations
Master pipelines to orchestrate any workflow. Next, learn about Task Definition.