API Reference: Decorators
Task decorators, pipeline_task, and utility decorators
Version: {VERSION} Module: taskiq_flow.decorators
Overview
The @pipeline_task decorator annotates taskiq tasks with output declarations, enabling automatic dependency resolution in DataflowPipeline.
@pipeline_task
Marks a task with what it produces for downstream consumers.
from taskiq_flow import pipeline_task
@broker.task
@pipeline_task(output="features")
def extract(data: list[str]) -> dict:
return compute_features(data)
Parameters:
| Parameter | Type | Description |
|---|---|---|
output |
str |
Single output key name |
outputs |
list[str] |
Multiple output keys (for tuple returns) |
inputs |
list[str] |
Explicit input dependencies (optional, auto-detected) |
description |
str |
Human-readable description (for documentation) |
Usage patterns:
Single output (most common)
@broker.task
@pipeline_task(output="processed_data")
def process(raw_data: str) -> dict:
return {"result": raw_data.upper()}
Multiple outputs
@broker.task
@pipeline_task(outputs=["features", "metadata"])
def split_output(audio: np.ndarray) -> tuple[dict, dict]:
features = extract_features(audio)
metadata = extract_meta(audio)
return features, metadata # unpacked to both outputs
Downstream tasks can consume either output:
@broker.task
@pipeline_task(output="tags")
def tag(features: dict): ... # consumes 'features' output
@broker.task
@pipeline_task(output="info")
def describe(metadata: dict): ... # consumes 'metadata' output
@pipeline_task_multi_output
Alias for @pipeline_task(outputs=[...]). Provides clarity for multi-output tasks:
from taskiq_flow import pipeline_task_multi_output
@broker.task
@pipeline_task_multi_output(outputs=["x", "y"])
def split(value: int) -> tuple[int, int]:
return value // 2, value % 2
Utility Functions
get_task_outputs(task: Callable) -> list[str]
Get declared output keys for a task:
from taskiq_flow import get_task_outputs
outputs = get_task_outputs(extract_task)
print(outputs) # ['features']
get_task_inputs(task: Callable) -> list[str]
Get declared input dependencies:
from taskiq_flow import get_task_inputs
inputs = get_task_inputs(tag_task)
print(inputs) # ['features']
is_pipeline_task(task: Callable) -> bool
Check if a function has been decorated with @pipeline_task:
from taskiq_flow import is_pipeline_task
if is_pipeline_task(my_func):
print("This is a pipeline task with output declarations")
resolve_task_dependencies(tasks: list[Callable]) -> dict
Build a dependency map:
from taskiq_flow import resolve_task_dependencies
deps = resolve_task_dependencies([task_a, task_b, task_c])
# Returns: {task_a: [], task_b: ['features'], task_c: ['tags']}
Decorator Order
The decorator order matters: @broker.task must be outermost (applied last), @pipeline_task inner (applied first):
# CORRECT
@broker.task
@pipeline_task(output="result")
def my_task(): ...
# INCORRECT (will fail)
@pipeline_task(output="result")
@broker.task
def my_task(): ...
Why: @broker.task wraps the function; @pipeline_task attaches metadata to the original function. Python applies decorators bottom-to-top.
Type Hints & Static Analysis
Type hints help IDEs and static checkers understand dataflow:
from typing import TypedDict
class AudioFeatures(TypedDict):
duration: float
tempo: float
@broker.task
@pipeline_task(output="features")
def extract(path: str) -> AudioFeatures:
return {"duration": 180.0, "tempo": 120.0}
@broker.task
@pipeline_task(output="tags")
def tag(features: AudioFeatures) -> list[str]: # type-safe
return ["fast", "electronic"]
Using TypedDict or Pydantic models provides better IDE autocomplete and mypy checking.
Versioning & Metadata
Attach version and other metadata:
@broker.task(
name="extract_features_v2",
labels={"version": "2.0.0", "experimental": False}
)
@pipeline_task(
output="features",
description="Extract audio features (v2 with improvedtempo estimation)"
)
def extract(path: str) -> dict:
...
Common Pitfalls
| Pitfall | Consequence | Fix |
|---|---|---|
Missing @broker.task |
Task not registered with broker | Add decorator |
output not set |
No downstream consumers can depend on it | Always declare output for dataflow tasks |
| Output name mismatch | Downstream task doesn’t receive input | Ensure downstream parameter name matches upstream output |
Using @pipeline_task on SequentialPipeline tasks |
No effect but unnecessary | Only needed for DataflowPipeline |
Example: Complete Dataflow Pipeline
from taskiq import InMemoryBroker
from taskiq_flow import DataflowPipeline, pipeline_task
broker = InMemoryBroker()
@broker.task
@pipeline_task(output="raw")
def load(source: str) -> dict:
return {"data": read_file(source)}
@broker.task
@pipeline_task(output="clean")
def clean(raw: dict) -> dict:
return {"data": preprocess(raw["data"])}
@broker.task
@pipeline_task(output="stats")
def analyze(clean: dict) -> dict:
return compute_stats(clean["data"])
# Build
pipeline = DataflowPipeline.from_tasks(broker, [load, clean, analyze])
# Execute
results = await pipeline.kiq_dataflow(source="data.csv")
# results = {"raw": {...}, "clean": {...}, "stats": {...}}
For the full task API, see Tasks Guide. For writing custom decorators, extend BaseTaskDecorator from taskiq_flow.decorators.