Pipeline Execution Guide
Understanding execution models, modes, and result handling
Version: {VERSION} Applies to: SequentialPipeline, DataflowPipeline, MapReduce See also: Dataflow Guide
Overview
This guide covers how Taskiq-Flow executes pipelines, manages concurrency, handles errors, and returns results.
1. Execution Models
1.1. Sequential Execution (Classic Pipeline)
The classic Pipeline executes steps one after another in a linear chain:
pipeline = Pipeline(broker).call_next(task1).call_next(task2).call_next(task3)
# Execution order: task1 → task2 → task3 (synchronously)
Characteristics:
- Each step waits for the previous to complete
- Results pass directly from one step to the next
- Predictable, deterministic execution order
- Suitable for linear workflows
1.2. Parallel Execution (Dataflow & Map)
DataflowPipeline automatically parallelizes independent tasks:
@broker.task
@pipeline_task(output="features")
def extract(tracks): ...
@broker_task
@pipeline_task(output="tags")
def tag(features): ... # Runs after extract
@broker.task
@pipeline_task(output="embedding")
def embed(features): ... # Also runs after extract, in parallel with tag
pipeline = DataflowPipeline.from_tasks(broker, [extract, tag, embed])
# DAG: extract → (tag & embed in parallel)
Characteristics:
- Tasks with no unmet dependencies run concurrently
- DAG determines execution order
- Maximum throughput for independent operations
- Controlled by
max_parallelparameter on.map()and.reduce()
1.3. Map-Reduce Parallelism
The MapReduce utility explicitly processes items in parallel:
from taskiq_flow import MapReduce
# Process 100 items with max 10 concurrent workers
result = await MapReduce.map(
broker,
process_item,
items=items_list,
output="processed",
max_parallel=10 # controls concurrency level
)
Parallelism control:
max_parallel=None→ unlimited concurrency (use with caution)max_parallel=1→ sequential execution- Recommended:
max_parallel = number_of_cpu_cores * 2for CPU-bound tasks
2. Starting a Pipeline
There are several ways to kick off pipeline execution:
2.1. pipeline.kiq(...) — Fire and Forget
Returns a Task immediately; you must manually wait for results:
task = await pipeline.kiq(initial_input)
# Do other things...
result = await task.wait_result() # blocks until complete
Use when:
- You need to track the task ID for later status checks
- You want to start multiple pipelines concurrently
- You’re building a task queue system
2.2. pipeline.kiq_dataflow(...) — Dataflow Convenience
Same as kiq() but specifically for DataflowPipeline, with clearer semantics:
results = await pipeline.kiq_dataflow(track_paths=["a.mp3", "b.mp3"])
# Returns: dict mapping output names → values
2.3. pipeline.kiq_map_reduce(...) — Map-Reduce Shortcut
Combined map and reduce in one call:
final = await pipeline.kiq_map_reduce(
items=items,
map_output="processed",
reduce_output="final"
)
3. Waiting for Results
3.1. Blocking Wait
task = await pipeline.kiq(data)
result = await task.wait_result() # blocks
print(result.return_value)
Options:
wait_result(timeout=30)— timeout in seconds (raisesasyncio.TimeoutError)wait_result(raise_on_error=True)— re-raise exceptions from tasks
3.2. Polling for Status
task = await pipeline.kiq(data)
# Check periodically without blocking
while not task.is_finished:
await asyncio.sleep(0.5)
status = await task.get_status()
print(f"Status: {status}")
Useful for progress bars or interactive applications.
3.3. Fetch by Task ID (Distributed)
If you have only the task ID (from another process):
from taskiq import Task
task = Task(task_id="abc123", broker=broker)
result = await task.wait_result()
4. Error Handling
4.1. Task-Level Errors
When a single task fails, the pipeline either:
- Stops immediately (default) — remaining tasks are cancelled
- Continues if configured with error handling policies
pipeline = Pipeline(broker)
# Configure to continue despite errors
pipeline.on_error("continue") # options: "stop", "continue", "retry"
# Or use a retry policy (see Retry Guide)
pipeline.with_retry(
max_attempts=3,
delay=5,
backoff=2
)
4.2. Pipeline-Level Errors
The entire pipeline may fail if:
- A critical task (no consumers) fails
- A task times out
- The broker becomes unavailable
Handle pipeline errors with try/except:
try:
result = await pipeline.kiq(data)
output = await result.wait_result()
except TaskiqError as exc:
print(f"Pipeline failed: {exc}")
# Access partial results if any
if result.is_failed:
print(f"Failed at step: {result.failed_step}")
4.3. Partial Results on Failure
Even if a pipeline fails, you may have partial results from completed steps:
result = await pipeline.kiq(data)
try:
output = await result.wait_result()
except PipelineError:
# Some steps succeeded before failure
partial = result.partial_results # dict of completed outputs
print(f"Partial: {partial}")
5. Timeouts
Set timeouts at the pipeline level:
pipeline = Pipeline(broker)
# Global timeout for entire pipeline (seconds)
pipeline.with_timeout(60)
# Or per-task timeout via taskiq task decorator
@broker.task(timeout=30)
def slow_task(): ...
Timeout behavior:
- Exceeding the timeout cancels the running task
asyncio.TimeoutErroris raised- Pipeline state is set to
ERROR
6. Execution Context
Each task receives an optional context parameter containing metadata:
from taskiq_flow import PipelineContext
@broker.task
async def my_task(data: str, context: PipelineContext):
print(f"Pipeline ID: {context.pipeline_id}")
print(f"Step number: {context.step_index}")
print(f"Task ID: {context.task_id}")
return data.upper()
Context fields:
| Field | Type | Description |
|---|---|---|
pipeline_id |
str |
Unique identifier of the pipeline instance |
step_index |
int |
Index of this step in the pipeline sequence |
task_id |
str |
ID of the underlying taskiq task |
execution_mode |
str |
"sequential", "parallel", or "map_reduce" |
started_at |
datetime |
Timestamp when pipeline started |
broker |
BaseBroker |
Reference to the task broker |
Enable context passing when building the pipeline:
pipeline = Pipeline(broker).with_context(enable=True)
7. Custom Execution Engines (Advanced)
For low-level control, use ExecutionEngine directly:
from taskiq_flow import ExecutionEngine, DAGBuilder
from taskiq_flow.dataflow import DataflowRegistry
# Build registry manually
registry = DataflowRegistry()
registry.register_task(load, output="raw", inputs=[])
registry.register_task(process, output="clean", inputs=["raw"])
registry.register_task(save, output="saved", inputs=["clean"])
# Build DAG
dag = registry.build_dag()
# Create execution engine
engine = ExecutionEngine(broker, dag)
# Execute with custom inputs
results = await engine.execute(inputs={"source_file": "data.csv"})
print(results) # {"raw": ..., "clean": ..., "saved": ...}
When to use ExecutionEngine:
- Building dynamic pipelines at runtime
- Custom scheduling/logic outside Pipeline abstraction
- Inspecting DAG structure before execution
- Integrating with external workflow managers
8. Result Shapes
Different pipeline types return different result structures:
8.1. Sequential Pipeline Results
task = await pipeline.kiq(input)
result = await task.wait_result()
# result.return_value is the final output after all steps
# Example: [3, 3, 3, 3] from our quickstart pipeline
8.2. Dataflow Pipeline Results
result = await pipeline.kiq_dataflow(input_data)
# Returns a dict mapping each output name to its value
{
"features": {...},
"tags": [...],
"embedding": [...]
}
8.3. MapReduce Results
mapped = await MapReduce.map(...)
print(mapped.return_value) # List of mapped results
reduced = await MapReduce.reduce(...)
print(reduced.return_value) # Final aggregated result
9. Inspecting Pipeline State
Query pipeline status during or after execution:
from taskiq_flow import PipelineTrackingManager
tracking = PipelineTrackingManager().with_auto_storage(broker)
pipeline = Pipeline(broker).with_tracking(tracking)
task = await pipeline.kiq(data)
# Get detailed status
status = await tracking.get_status(pipeline.pipeline_id)
print(f"Status: {status.status}") # PENDING, RUNNING, COMPLETED, FAILED
print(f"Steps: {len(status.steps)}") # Number of completed steps
print(f"Started: {status.started_at}")
print(f"Completed: {status.completed_at}")
# Get step-by-step history
for step in status.steps:
print(f" {step.name}: {step.status} ({step.duration_ms}ms)")
Status values:
| Status | Meaning |
|---|---|
PENDING |
Pipeline queued, not started |
RUNNING |
Currently executing |
COMPLETED |
Finished successfully |
FAILED |
Terminated with error |
CANCELLED |
Manually cancelled |
See Tracking Guide for advanced monitoring.
10. Debugging Execution
10.1. Enable Logging
import logging
logging.basicConfig(level=logging.DEBUG)
# Or configure specific loggers
logger = logging.getLogger("taskiq_flow")
logger.setLevel(logging.DEBUG)
10.2. Print DAG Before Execution
pipeline.print_dag()
# Shows execution levels and dependencies
10.3. Inspect Task Arguments
@broker.task
async def debug_task(data, context: PipelineContext):
print(f"Received: {data}")
print(f"Context: pipeline={context.pipeline_id}, step={context.step_index}")
return data
10.4. Middleware for Tracing
from taskiq_flow.middleware import PipelineMiddleware
class DebugMiddleware(PipelineMiddleware):
async def on_step_complete(self, ctx, result):
print(f"Step {ctx.task_id} completed with: {result}")
await super().on_step_complete(ctx, result)
broker.add_middlewares(DebugMiddleware())
11. Performance Considerations
11.1. Concurrency Limits
# Limit total parallel tasks globally
from taskiq_flow.optimization.parallel import set_max_parallel_tasks
set_max_parallel_tasks(20) # never more than 20 tasks simultaneously
11.2. Selective Parallelism
Not all tasks benefit from parallel execution:
# CPU-bound tasks: benefit from parallelism up to core count
# I/O-bound tasks: can handle higher parallelism
# Small/fast tasks: overhead may outweigh benefits
# Tip: Profile with varying max_parallel values
pipeline.map(process_item, items, max_parallel=8)
11.3. Memory Footprint
Parallel execution loads more data into memory:
# Process large datasets in chunks
chunks = split_into_chunks(large_list, chunk_size=100)
for chunk in chunks:
results = await pipeline.kiq_dataflow(chunk)
# process results before next chunk
See Performance Guide for detailed optimization strategies.
12. Common Pitfalls
| Issue | Cause | Solution |
|---|---|---|
| Tasks run sequentially | max_parallel=1 or sequential pipeline type |
Use DataflowPipeline or increase parallelism |
wait_result() hangs forever |
Broker not shared, results lost | Use persistent broker (Redis) with result backend |
| Tasks receive wrong inputs | Incorrect parameter naming | Ensure @pipeline_task(output=...) matches downstream param names |
| Out-of-order results | Dataflow tasks finishing at different times | Results dict preserves output names, not execution order |
| Memory explosion | Unlimited parallelism | Set max_parallel or process in batches |
| Deadlock during execution | Circular dependency or missing external input | Check data flow graph for cycles; provide all external inputs |
kiq_dataflow() raises “No DAG built” |
No tasks added to pipeline | Use DataflowPipeline.from_tasks() or add_dataflow_task() |
| Partial results only | continue_on_error=True with failed tasks |
Check PipelineErrorAggregator or execution report for details |
13. Summary
| Feature | Sequential Pipeline | DataflowPipeline | MapReduce |
|---|---|---|---|
| Execution | Linear chain | Automatic DAG | Parallel map + reduce |
| Parallelism | None (unless .group() used) |
Automatic (independent tasks) | Explicit per-map call |
| Control | Manual chaining | Declarative dependencies | Batch-oriented |
| Best for | Simple linear workflows | Complex branched workflows | Bulk data transformation |
Next Steps
- Pipelines Guide — Choosing between pipeline types and patterns
- Dataflow Guide — Complete guide to dataflow pipelines, DAGs, and decorators
- Tracking Guide — Monitoring pipeline status and history
- Performance Guide — Tuning for speed and resource usage
Understanding execution is key to building reliable pipelines. Learn more about Dataflow Pipelines for complex workflows.