API Reference: Execution Engine
ExecutionEngine, DAG, map-reduce utilities, and error handling
Version: {VERSION} Module: taskiq_flow.execution_engine,taskiq_flow.dataflow.dag,taskiq_flow.map_reduce
ExecutionEngine
Low-level engine for executing DAGs directly, bypassing Pipeline abstraction.
from taskiq_flow import ExecutionEngine, DataflowRegistry
# Build registry manually
registry = DataflowRegistry()
registry.register_task(load, output="raw", inputs=[])
registry.register_task(process, output="clean", inputs=["raw"])
registry.register_task(save, output="saved", inputs=["clean"])
# Build DAG
dag = registry.build_dag()
# Create engine
engine = ExecutionEngine(broker, dag)
# Execute
results = await engine.execute(inputs={"source": "data.csv"})
# results = {"raw": ..., "clean": ..., "saved": ...}
Constructor:
ExecutionEngine(
broker: BaseBroker,
dag: DAG,
max_parallel: int = None,
on_step_complete: callable = None
)
Methods:
| Method | Signature | Description |
|---|---|---|
execute |
execute(inputs: dict) -> dict |
Run the DAG with given inputs |
execute_async |
execute_async(inputs: dict) -> AsyncIterator |
Stream results as they complete |
cancel |
cancel() |
Stop running execution |
Events:
async def on_step(task_name: str, result: Any):
print(f"Step {task_name} completed")
engine = ExecutionEngine(broker, dag, on_step_complete=on_step)
DAG (Directed Acyclic Graph)
Represents the execution graph of tasks.
from taskiq_flow.dataflow import DAG, DAGNode
dag = DAG()
node = DAGNode(task=my_task, output="result", inputs=["input_a"])
dag.add_node(node)
DAG Methods:
| Method | Description |
|---|---|
add_node(node: DAGNode) |
Add a task node |
add_edge(from_task, to_task) |
Add dependency |
topological_sort() -> list[DAGNode] |
Return execution order |
get_parallel_levels() -> list[list[DAGNode]] |
Group nodes by parallel execution level |
validate() |
Check for cycles, missing nodes |
print() |
ASCII visualization to console |
DAG Properties:
| Property | Type | Description |
|---|---|---|
nodes |
list[DAGNode] |
All nodes in graph |
edges |
set[tuple[DAGNode, DAGNode]] |
Dependency edges |
roots |
list[DAGNode] |
Nodes with no dependencies |
leaves |
list[DAGNode] |
Nodes with no dependents |
DAGNode
Represents a single task in the DAG with its I/O specification.
from taskiq_flow.dataflow import DAGNode
node = DAGNode(
task=my_task_function,
output="result_key",
inputs=["input_a", "input_b"],
metadata={"description": "My task"}
)
Properties:
| Property | Type | Description |
|---|---|---|
task |
Callable |
The task function |
task_name |
str |
Auto-generated or custom name |
output |
str |
Output key (single) |
outputs |
list[str] |
Output keys (multiple) |
inputs |
list[str] |
Required input keys |
metadata |
dict |
Arbitrary metadata |
DAGBuilder
Helper to construct DAGs programmatically (less common; usually use DataflowRegistry).
from taskiq_flow import DAGBuilder
builder = DAGBuilder()
builder.add_task(task1, output="a", inputs=[])
builder.add_task(task2, output="b", inputs=["a"])
builder.add_task(task3, output="c", inputs=["a", "b"])
dag = builder.build()
Builder Pattern:
dag = (DAGBuilder()
.node(load, output="raw", inputs=[])
.node(process, output="clean", inputs=["raw"])
.node(save, output="saved", inputs=["clean"])
.build()
)
MapReduce
Utility for parallel map followed by reduce.
MapReduce.map
from taskiq_flow import MapReduce
mapped = await MapReduce.map(
broker,
map_func, # Task function to apply
items: Iterable, # Items to process
output: str = "mapped",
max_parallel: int = None
)
# Returns: MapReduceResult (behaves like Task)
MapReduce.reduce
reduced = await MapReduce.reduce(
broker,
reduce_func, # Aggregation function
mapped_result, # Output from MapReduce.map
input_name: str, # Name of mapped output to consume
output: str = "reduced"
)
# Returns: Task (with final result)
MapReduce.map_reduce (combined)
final = await MapReduce.map_reduce(
broker,
map_func,
items,
reduce_func,
map_output="mapped",
reduce_output="final",
max_parallel=10
)
All three return Task objects; call .wait_result() to retrieve value.
DataflowRegistry (Advanced)
Manual task registration for dynamic pipeline construction.
from taskiq_flow import DataflowRegistry
registry = DataflowRegistry()
# Register tasks with explicit I/O
registry.register_task(
task=extract,
output="features",
inputs=["audio_files"] # external input
)
registry.register_task(
task=tag,
output="tags",
inputs=["features"] # depends on extract's output
)
# Inspect
print("Tasks:", [t.task_name for t in registry.get_tasks()])
print("Outputs:", registry.get_outputs())
print("External inputs:", registry.get_external_inputs())
# Build DAG
dag = registry.build_dag()
dag.print()
# Execute via ExecutionEngine
engine = ExecutionEngine(broker, dag)
results = await engine.execute(inputs={"audio_files": files})
Registry Queries:
| Method | Description |
|---|---|
get_tasks() |
List all registered TaskNode objects |
get_outputs() |
List all output keys |
get_external_inputs() |
List inputs not produced by any task |
get_producer(output_key) |
Get task producing given output |
get_consumers(input_key) |
List tasks consuming given input |
build_dag() |
Construct DAG, validate, return ready-to-execute |
Version Notes
- ExecutionEngine introduced in v0.3.0
DAGandDAGNodeare used internally by DataflowPipeline- MapReduce utility available since v0.2.0
Next Steps
- Tracking API — Monitor execution with PipelineTrackingManager
- WebSocket API — HookManager and event system
- Core API — Pipeline and middleware reference
- Dataflow Audio Pipeline Example — See ExecutionEngine used in a real DAG pipeline
- Registry Discovery Example — Manual DAG construction and ExecutionEngine usage
For advanced use cases only. 95% of users should stick with Pipeline and DataflowPipeline abstractions.