Référence API: Décorateurs
Décorateurs de tâches, @pipeline_task, et utilitaires
Version : {VERSION} Module : taskiq_flow.decorators
Aperçu
Le décorateur @pipeline_task annote les tâches taskiq avec des déclarations de sortie, permettant la résolution automatique de dépendances dans DataflowPipeline.
@pipeline_task
Marque une tâche avec ce qu’elle produit pour les consommateurs en aval.
from taskiq_flow import pipeline_task
@broker.task
@pipeline_task(output="features")
def extract(données: list[str]) -> dict:
return compute_features(données)
Paramètres:
| Paramètre | Type | Description |
|---|---|---|
output |
str |
Nom clé sortie unique |
outputs |
list[str] |
Clés sortie multiples (pour retours tuple) |
inputs |
list[str] |
Dépendances entrée explicites (optionnel, auto-détecté) |
description |
str |
Description lisible humain (pour documentation) |
Sortie unique (plus courant)
@broker.task
@pipeline_task(output="données_traitées")
def process(données_brutes: str) -> dict:
return {"result": données_brutes.upper()}
Sorties multiples
@broker.task
@pipeline_task(outputs=["features", "metadata"])
def split_output(audio: np.ndarray) -> tuple[dict, dict]:
features = extract_features(audio)
metadata = extract_meta(audio)
return features, metadata # déballé vers les deux sorties
Les tâches en aval peuvent consommer soit sortie:
@broker.task
@pipeline_task(output="tags")
def tag(features: dict): ... # consomme sortie 'features'
@broker.task
@pipeline_task(output="info")
def describe(metadata: dict): ... # consomme sortie 'metadata'
@pipeline_task_multi_output
Alias pour @pipeline_task(outputs=[...]). Apporte clarté pour tâches multi-sorties:
from taskiq_flow import pipeline_task_multi_output
@broker.task
@pipeline_task_multi_output(outputs=["x", "y"])
def split(valeur: int) -> tuple[int, int]:
return valeur // 2, valeur % 2
Fonctions Utilitaires
get_task_outputs(task: Callable) -> list[str]
Obtenir clés sortie déclarées pour une tâche:
from taskiq_flow import get_task_outputs
outputs = get_task_outputs(extract_task)
print(outputs) # ['features']
get_task_inputs(task: Callable) -> list[str]
Get declared input dependencies:
from taskiq_flow import get_task_inputs
inputs = get_task_inputs(tag_task)
print(inputs) # ['features']
is_pipeline_task(task: Callable) -> bool
Check if function is decorated with @pipeline_task:
from taskiq_flow import is_pipeline_task
if is_pipeline_task(my_function):
print("This is a pipeline task with output declarations")
resolve_task_dependencies(tasks: list[Callable]) -> dict
Build dependency map:
from taskiq_flow import resolve_task_dependencies
deps = resolve_task_dependencies([task_a, task_b, task_c])
# Returns: {task_a: [], task_b: ['features'], task_c: ['tags']}
Decorator Order
The order of decorators matters: @broker.task must be the outermost (applied last), @pipeline_task inner (applied first):
# CORRECT
@broker.task
@pipeline_task(output="result")
def my_task(): ...
# INCORRECT (will fail)
@pipeline_task(output="result")
@broker.task
def my_task(): ...
Why: @broker.task wraps the function; @pipeline_task attaches metadata to the original function. Python applies decorators bottom-to-top.
Pourquoi: `@broker.task` enveloppe la fonction; `@pipeline_task` attache métadonnées à la fonction originale. Python applique décorateurs bas-vers-haut.
---
## Type Hints & Analyse Statique
Les type hints aident IDEs et linters à comprendre le dataflow:
```python
from typing import TypedDict
class AudioFeatures(TypedDict):
duration: float
tempo: float
@broker.task
@pipeline_task(output="features")
def extract(chemin: str) -> AudioFeatures:
return {"duration": 180.0, "tempo": 120.0}
@broker.task
@pipeline_task(output="tags")
def tag(features: AudioFeatures) -> list[str]: # type-safe
return ["rapide", "électronique"]
Utiliser TypedDict ou modèles Pydantic pour meilleure autocomplétion IDE et vérification mypy.
Versionnage & Métadonnées
Attacher version et autres métadonnées:
@broker.task(
nom="extract_features_v2",
labels={"version": "2.0.0", "expérimental": False}
)
@pipeline_task(
output="features",
description="Extraire caractéristiques audio (v2 avec estimation tempo améliorée)"
)
def extract(chemin: str) -> dict:
...
Pièges Courants
| Piège | Conséquence | Correction |
|---|---|---|
@broker.task manquant |
Tâche non enregistrée avec broker | Ajouter décorateur |
output non défini |
Aucun consommateur en aval ne peut en dépendre | Toujours déclarer output pour tâches dataflow |
| Mismatch nom sortie | Tâche en aval ne reçoit pas entrée | S’assurer nom paramètre en aval correspond output amont |
Utiliser @pipeline_task sur tâches SequentialPipeline |
Aucun effet mais inutile | Seulement nécessaire pour DataflowPipeline |
Exemple: Pipeline Dataflow Complet
from taskiq import InMemoryBroker
from taskiq_flow import DataflowPipeline, pipeline_task
broker = InMemoryBroker()
@broker.task
@pipeline_task(output="brut")
def charger(source: str) -> dict:
return {"data": lire_fichier(source)}
@broker.task
@pipeline_task(output="propre")
def nettoyer(brut: dict) -> dict:
return {"data": prétraiter(brut["data"])}
@broker.task
@pipeline_task(output="stats")
def analyser(propre: dict) -> dict:
return calculer_stats(propre["data"])
# Construire
pipeline = DataflowPipeline.from_tasks(broker, [charger, nettoyer, analyser])
# Exécuter
résultats = await pipeline.kiq_dataflow(source="data.csv")
# résultats = {"brut": {...}, "propre": {...}, "stats": {...}}
Pour API tâches complète, voir Guide des Tâches. Pour écrire décorateurs personnalisés, étendre BaseTaskDecorator depuis taskiq_flow.decorators.