Dataflow Guide
Build complex, parallel pipelines using data-driven DAG orchestration
Version: {VERSION} Related: Pipelines Guide, Execution Guide, Core Concepts
Overview
Taskiq-Flow’s dataflow system is the most powerful way to orchestrate complex workflows. Unlike sequential pipelines where you manually chain steps, dataflow pipelines automatically construct a Directed Acyclic Graph (DAG) from your task declarations, enabling:
- Automatic dependency resolution — tasks declare what they produce and consume
- Automatic parallelism — independent tasks run concurrently without manual configuration
- Data-driven execution — the flow of data determines the order of execution
- Dynamic pipeline construction — add tasks flexibly at runtime
This guide covers the complete dataflow system: the DAG, the registry, decorators, the execution engine, and advanced patterns.
1. Core Concepts
1.1. The Dataflow Paradigm
In a dataflow pipeline, tasks are connected by data dependencies rather than explicit ordering:
Sequential: Dataflow:
task1 → task2 → task3 task1 ──→ task2
└──→ task3 (parallel!)
Each task declares:
output: What data it produces (e.g.,"features")inputs: What data it consumes (e.g.,["features", "config"])
The library automatically resolves dependencies and builds the execution graph.
1.2. Key Components
| Component | Purpose | Module |
|---|---|---|
@pipeline_task |
Decorator to declare task I/O | taskiq_flow.decorators |
DataNode |
Represents a data artifact in the graph | taskiq_flow.dataflow.node |
DAG / DAGNode |
Graph structure for dependency tracking | taskiq_flow.dataflow.dag |
DataflowRegistry |
Central registry for task metadata and DAG building | taskiq_flow.dataflow.registry |
DataCache |
Stores intermediate results during execution | taskiq_flow.dataflow.cache |
DataflowPipeline |
High-level pipeline using dataflow orchestration | taskiq_flow.pipeline |
ExecutionEngine |
Low-level DAG executor with parallelism | taskiq_flow.execution_engine |
2. Declaring Dataflow Tasks
2.1. The @pipeline_task Decorator
Mark a function as a pipeline task with explicit output:
from taskiq import InMemoryBroker
from taskiq_flow import pipeline_task, DataflowPipeline
broker = InMemoryBroker()
@broker.task
@pipeline_task(output="features")
async def extract_features(paths: list[str]) -> dict:
"""Extract audio features from file paths."""
return {"tempo": 120.0, "energy": 0.8}
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
output |
str |
required | Name of the data this task produces |
inputs |
list[str] |
None (auto-inferred) |
Names of data consumed |
retries |
int |
0 |
Number of retry attempts on failure |
retry_delay |
float |
1.0 |
Initial delay between retries (seconds) |
retry_backoff |
float |
2.0 |
Multiplier for delay between retries |
retry_jitter |
bool |
True |
Add randomness to retry delays |
max_retry_time |
int |
300 |
Maximum total time for retries (seconds) |
resources |
dict |
{} |
Estimated resource usage (memory, cpu) |
2.2. Automatic Input Inference
If you don’t specify inputs, they’re inferred from the function signature:
@broker.task
@pipeline_task(output="stats")
def compute_stats(features: dict, config: dict) -> dict:
# Automatically inferred: inputs=["features", "config"]
return {"count": len(features)}
Parameters named self, cls, or those with default values are excluded from inference.
2.3. Multiple Outputs
Use @pipeline_task_multi_output when a task produces multiple data artifacts:
from taskiq_flow.decorators import pipeline_task_multi_output
@broker.task
@pipeline_task_multi_output(
outputs={"features": dict, "metadata": dict},
retries=2
)
async def process_audio(path: str) -> dict:
features = extract(path)
meta = get_metadata(path)
return {
"features": features, # → output "features"
"metadata": meta, # → output "metadata"
}
The first key is the primary output; all keys are registered as named outputs.
3. Building Dataflow Pipelines
3.1. Using DataflowPipeline.from_tasks()
The recommended approach for most use cases:
pipeline = DataflowPipeline.from_tasks(
broker,
[extract_features, compute_stats, generate_report]
)
The DAG is built automatically:
extract_featuresproduces"features"— no dependencies → Level 0compute_statsconsumes"features"→ depends onextract_features→ Level 1generate_reportconsumes"stats"→ depends oncompute_stats→ Level 2
3.2. Adding Tasks Dynamically
pipeline = DataflowPipeline(broker)
pipeline.add_dataflow_task(extract_features)
pipeline.add_dataflow_task(compute_stats)
pipeline.add_dataflow_task(generate_report)
# DAG is rebuilt lazily on first execution
results = await pipeline.kiq_dataflow(paths=["song.mp3"])
3.3. Fan-Out / Fan-In (Multiple Dependencies)
Tasks can consume multiple outputs, and multiple tasks can share a dependency:
@broker.task
@pipeline_task(output="audio")
def load_audio(path: str) -> dict: ...
@broker.task
@pipeline_task(output="transcription")
def transcribe(audio: dict) -> str: ...
@broker.task
@pipeline_task(output="tags")
def generate_tags(audio: dict) -> list[str]: ... # parallel with transcribe
@broker.task
@pipeline_task(output="report")
def create_report(
transcription: str,
tags: list[str]
) -> dict: ... # fan-in: waits for both
pipeline = DataflowPipeline.from_tasks(
broker,
[load_audio, transcribe, generate_tags, create_report]
)
# DAG:
# load_audio → (transcribe ∥ generate_tags) → create_report
3.4. External Inputs
Pass data at execution time that isn’t produced by any task:
results = await pipeline.kiq_dataflow(
user_id="user_123", # external input
config={"mode": "fast"} # external input
)
External inputs are automatically identified — they’re parameters with no corresponding producer task.
4. DAG Construction & Inspection
4.1. The DataflowRegistry
For advanced use cases, build DAGs manually:
from taskiq_flow import DataflowRegistry
registry = DataflowRegistry()
# Register tasks with explicit I/O
registry.register_task(
task=load_data,
output="raw_data",
inputs=["source_url"] # external input
)
registry.register_task(
task=clean_data,
output="clean_data",
inputs=["raw_data"]
)
registry.register_task(
task=save_data,
output="saved",
inputs=["clean_data"]
)
# Inspect the graph
print("Tasks:", [t.task_name for t in registry.get_tasks()])
print("Outputs:", registry.get_outputs())
print("External inputs:", registry.get_external_inputs())
# Build DAG
dag = registry.build_dag()
dag.print()
4.2. DAG Inspection Methods
from taskiq_flow.dataflow import DAG, DAGNode
# Get topological order
ordered = dag.topological_sort()
for node in ordered:
print(f"Level {node.level}: {node.task_name}")
# Get parallel execution groups
dag.compute_levels()
for i, level in enumerate(dag.levels):
names = [n.task_name for n in level]
print(f"Level {i} (parallel): {names}")
# Find ready tasks given completed set
ready = dag.get_ready_tasks(completed={node_a})
# Visualize DAG (requires networkx)
from taskiq_flow.visualization import DAGVisualizer
viz = DAGVisualizer(dag)
print(viz.to_json())
print(viz.to_graphviz())
print(viz.visualize_ascii())
4.3. Detecting Critical Path & Parallel Groups
from taskiq_flow.visualization import DAGVisualizer
viz = DAGVisualizer(dag)
# Critical path = longest execution chain
critical = viz.detect_critical_path()
print(f"Critical path: {' → '.join(critical)}")
# Groups of tasks that can run in parallel
groups = viz.find_parallelizable_groups()
for i, group in enumerate(groups):
print(f"Parallel group {i}: {group}")
5. Execution
5.1. Running Dataflow Pipelines
# Execute and get all outputs
results = await pipeline.kiq_dataflow(track_paths=["song1.mp3", "song2.mp3"])
print(results)
# {"audio_features": {...}, "mir_features": {...}, "tags": [...], "embedding": [...]}
5.2. The Execution Engine
DataflowPipeline uses ExecutionEngine internally for DAG-based execution:
from taskiq_flow import ExecutionEngine
# Custom execution with fine-grained control
engine = ExecutionEngine(
broker=broker,
dag=dag,
max_parallel=10,
error_mode=ErrorHandlingMode.CONTINUE_ON_ERROR,
resource_aware=True,
)
outputs = await engine.execute(
inputs={"source_file": "data.csv"},
pipeline_id="my_pipeline"
)
Execution features:
- Topological ordering — tasks execute after their dependencies
- Parallel execution — independent tasks run concurrently
- Retry per task — configured via
@pipeline_task(retries=N) - Error modes —
FAIL_FAST,CONTINUE_ON_ERROR,SKIP_FAILED - Resource-aware parallelism — adjusts concurrency based on CPU/memory
5.3. Map-Reduce Operations
For batch processing within pipelines:
from taskiq_flow import MapReduce
# Parallel map
mapped = await MapReduce.map(
broker,
process_item,
items=list(range(100)),
output="processed",
max_parallel=10,
)
# Reduce results
result = await MapReduce.reduce(
broker,
aggregate_results,
mapped.results,
output="final",
initial=0,
)
# Combined map-reduce
final = await MapReduce.map_reduce(
broker,
map_task=process_item,
reduce_task=aggregate_results,
items=list(range(1000)),
max_parallel=20,
reduce_chunk_size=100,
)
Map features:
- Automatic parallelism with
asyncio - Intelligent chunking for large datasets
- Progress callbacks
- Error collection with success rate reporting
5.4. Pipeline-Level Map/Reduce
DataflowPipeline integrates map-reduce as pipeline operations:
pipeline = DataflowPipeline.from_tasks(
broker, [extract_features]
)
# Add map operation (process many items in parallel)
pipeline.map(
process_track,
track_list,
output="track_features",
max_parallel=10,
)
# Add reduce operation (aggregate results)
pipeline.reduce(
aggregate_features,
input_name="track_features",
output="playlist_stats",
)
# Execute
results = await pipeline.kiq_map_reduce(track_list=tracks)
6. Combining Sequential and Dataflow Pipelines
6.1. Hybrid Pattern
Use sequential pipelines for linear flows and dataflow for complex sub-workflows:
# Sequential outer shell
main_pipeline = Pipeline(broker)
@broker.task
async def run_dataflow_subset(data: list) -> dict:
# Inner dataflow pipeline
sub_pipeline = DataflowPipeline.from_tasks(
broker,
[task_a, task_b, task_c]
)
return await sub_pipeline.kiq_dataflow(data=data)
main_pipeline.call_next(run_dataflow_subset).call_next(finalize)
6.2. Pipeline Scheduling
Schedule dataflow pipelines with cron or intervals:
from taskiq_flow import PipelineScheduler
scheduler = PipelineScheduler(broker)
# Cron-based scheduling
await scheduler.schedule(
pipeline,
cron="0 2 * * *", # Daily at 2 AM
kwargs={"paths": ["daily_files/*.mp3"]}
)
# Interval-based
await scheduler.schedule(
pipeline,
interval_seconds=3600, # Every hour
label="hourly_analysis"
)
7. Caching & Intermediate Results
7.1. DataCache
The DataCache stores intermediate results during pipeline execution:
from taskiq_flow.dataflow.cache import DataCache
cache = DataCache()
# Store results
cache.set("features", {"tempo": 120.0})
cache.set("tags", ["electronic", "dance"])
# Retrieve
features = cache.get("features")
# Check existence
if cache.has("embedding"):
embedding = cache.get("embedding")
# Inject dependencies for tasks
inputs = cache.inject(["features", "tags"])
# → {"features": {...}, "tags": [...]}
# Clear between runs
cache.clear()
8. Error Handling in Dataflow
8.1. Error Modes
from taskiq_flow.errors import ErrorHandlingMode
# Fail on first error (default)
engine = ExecutionEngine(broker, dag, error_mode=ErrorHandlingMode.FAIL_FAST)
# Continue despite errors
engine = ExecutionEngine(broker, dag, error_mode=ErrorHandlingMode.CONTINUE_ON_ERROR)
# Skip failed tasks
engine = ExecutionEngine(broker, dag, error_mode=ErrorHandlingMode.SKIP_FAILED)
8.2. Retry Configuration
Configure retries at the task level:
@broker.task
@pipeline_task(
output="reliable_feature",
retries=3,
retry_delay=2.0,
retry_backoff=2.0,
)
def fetch_with_retry(url: str) -> dict:
# Will be retried up to 3 times with exponential backoff
...
9. Resource Management
9.1. Resource-Aware Parallelism
Control parallelism based on estimated task resource usage:
from taskiq_flow.optimization.parallel import ResourceAwareExecutor
executor = ResourceAwareExecutor(
max_cpu_percent=80.0,
max_memory_percent=80.0,
min_parallel=1,
max_parallel=10,
)
engine = ExecutionEngine(
broker,
dag,
resource_aware=True,
resource_profiles={
"heavy_task": {"estimated_memory_mb": 500, "estimated_cpu_cores": 2.0},
"light_task": {"estimated_memory_mb": 50, "estimated_cpu_cores": 0.5},
},
)
10. Visualization
10.1. Built-in Visualization
# ASCII in console
pipeline.print_dag()
# JSON for web UIs
viz = pipeline.visualize() # → {"nodes": [...], "edges": [...], "levels": [...]}
# DOT for Graphviz
dot = pipeline.visualize_dot()
# Render: dot -Tpng pipeline.dot -o pipeline.png
10.2. Advanced Visualization (requires networkx)
from taskiq_flow.visualization import DAGVisualizer
viz = DAGVisualizer(dag)
# Export formats
viz.to_json() # JSON for frontend
viz.to_cytoscape_json() # Cytoscape.js format
viz.to_graphviz() # DOT format
viz.visualize_ascii() # ASCII art
# Analysis
viz.detect_critical_path() # Longest execution path
viz.find_parallelizable_groups() # Tasks that can run in parallel
10.3. Mermaid Diagrams
from taskiq_flow.visualization import MermaidGenerator
mermaid = MermaidGenerator(dag)
print(mermaid.to_mermaid()) # Basic diagram
print(mermaid.to_mermaid_with_styling()) # Styled with colors
11. Full Example: Audio Processing Pipeline
import asyncio
from taskiq import InMemoryBroker
from taskiq_flow import DataflowPipeline, pipeline_task
broker = InMemoryBroker()
@broker.task
@pipeline_task(output="audio_features")
async def extract_audio(paths: list[str]) -> dict:
return {"duration": 180.0, "tempo": 120.0}
@broker.task
@pipeline_task(output="mir_features")
async def compute_mir(audio_features: dict) -> dict:
return {"key": "C major", "loudness": -12.5}
@broker.task
@pipeline_task(output="tags")
async def generate_tags(mir_features: dict) -> list[str]:
return ["electronic", "dance"]
@broker.task
@pipeline_task(output="embedding")
async def create_embedding(
mir_features: dict,
tags: list[str]
) -> list[float]:
return [0.1, 0.2, 0.3, 0.4, 0.5]
async def main():
pipeline = DataflowPipeline.from_tasks(
broker,
[extract_audio, compute_mir, generate_tags, create_embedding]
)
# Inspect before execution
pipeline.print_dag()
# Execute
results = await pipeline.kiq_dataflow(
paths=["track1.mp3", "track2.mp3"]
)
print(results)
# {
# "audio_features": {"duration": 180.0, "tempo": 120.0},
# "mir_features": {"key": "C major", "loudness": -12.5},
# "tags": ["electronic", "dance"],
# "embedding": [0.1, 0.2, 0.3, 0.4, 0.5]
# }
asyncio.run(main())
12. Common Pitfalls
| Symptom | Cause | Fix |
|---|---|---|
| All tasks run sequentially | Using Pipeline instead of DataflowPipeline |
Switch to DataflowPipeline |
| Missing output errors | @pipeline_task(output=...) doesn’t match downstream param name |
Align output name with parameter name |
| “No DAG built” error | Called kiq_dataflow() without adding tasks |
Add tasks via from_tasks() or add_dataflow_task() |
| Tasks run twice | Mixed .call_next() and @pipeline_task dependencies |
Use one approach consistently |
| Deadlock detected | Circular dependency in data flow | Redesign with forward data flow only |
| Memory explosion | Too many parallel tasks | Set max_parallel or use resource-aware mode |
13. Performance Tips
- Limit parallelism — Use
max_parallelto control concurrent task count - Use map-reduce for batches —
MapReduce.map()with chunking for large datasets - Profile resource usage — Set
resource_profilesfor adaptive parallelism - Avoid large intermediate results — Stream data when possible
- Reuse DAG — Build DAG once, execute multiple times with different inputs
14. API Reference Summary
DataflowPipeline
| Method | Description |
|---|---|
from_tasks(broker, tasks) |
Create pipeline from task list |
add_dataflow_task(task) |
Add task dynamically |
kiq_dataflow(**inputs) |
Execute pipeline with dataflow orchestration |
kiq_map_reduce(**inputs) |
Execute map-reduce pipeline |
kiq_map_reduce_advanced(...) |
Execute with full map-reduce options |
kiq_map_sweep(task, param_values, ...) |
Multi-dimensional parameter sweep |
visualize() |
Get DAG as JSON dict |
visualize_dot() |
Get DAG as DOT string |
print_dag() |
Print ASCII representation |
schedule_with_cron(scheduler, label, cron, **inputs) |
Schedule with cron expression |
schedule_with_labels(scheduler, label, ...) |
Schedule with label-based scheduler |
map(task, items, output, ...) |
Add map operation |
reduce(task, input_name, output, ...) |
Add reduce operation |
DataflowRegistry
| Method | Description |
|---|---|
register_task(task, output, inputs, **meta) |
Register task with I/O metadata |
build_dag() |
Construct DAG from registered tasks |
get_producer(data_name) |
Find producer task for data |
get_consumers(data_name) |
Find consumer tasks for data |
get_external_inputs() |
List external input names |
get_outputs() |
List all output names |
get_tasks() |
List all registered tasks |
ExecutionEngine
| Method | Description |
|---|---|
execute(inputs, pipeline_id) |
Execute DAG with inputs |
get_execution_report() |
Get execution statistics |
MapReduce
| Method | Description |
|---|---|
map(broker, task, items, output, ...) |
Parallel map operation |
reduce(broker, task, inputs, output, ...) |
Reduction operation |
map_reduce(broker, map_task, reduce_task, items, ...) |
Combined map+reduce |
map_sweep(broker, task, param_values, output, ...) |
Multi-dimensional sweep |
Master dataflow to build complex, parallel workflows. For sequential patterns, see Pipelines Guide.