Référence API: Composants Principaux
Pipeline, DataflowPipeline, PipelineMiddleware, PipelineContext et exceptions principales
Version : {VERSION} Module : taskiq_flow.core,taskiq_flow.pipeline,taskiq_flow.middleware
Classes Principales
Pipeline (SequentialPipeline)
Le pipeline séquentiel classique pour l’orchestration linéaire de tâches.
from taskiq_flow import Pipeline
pipeline = Pipeline(broker)
Constructeur:
Pipeline(
broker: BaseBroker,
max_parallel: int = None, # Limite globale de parallélisme
timeout: float = None, # Timeout global en secondes
pipeline_id: str = None # Auto-généré si non fourni
)
Méthodes:
| Méthode | Signature | Description |
|---|---|---|
call_next |
call_next(task, *args, **kwargs) -> Pipeline |
Enchaîne une tâche; passe résultat précédent comme premier arg |
call_after |
call_after(task, *args, **kwargs) -> Pipeline |
Exécute tâche sans consommer résultat précédent |
map |
map(task, max_parallel=None, output_name=None) -> Pipeline |
Applique tâche à chaque élément d’un résultat itérable |
filter |
filter(task) -> Pipeline |
Garde éléments où tâche retourne truthy |
group |
group(tasks, param_names=None) -> Pipeline |
Exécute multiples tâches en parallèle depuis même entrée |
kiq |
kiq(*args, **kwargs) -> Task |
Démarre exécution pipeline |
with_tracking |
with_tracking(tracking_manager) -> Pipeline |
Attache gestionnaire de suivi |
with_hooks |
with_hooks(hook_manager) -> Pipeline |
Attache gestionnaire hooks pour événements |
with_retry |
with_retry(...) -> Pipeline |
Configure politique de retry |
with_timeout |
with_timeout(seconds) -> Pipeline |
Définit timeout |
with_context |
with_context(enable=True) -> Pipeline |
Active passage PipelineContext aux tâches |
Example:
pipeline = (
Pipeline(broker)
.call_next(task1)
.call_next(task2, factor=2)
.map(task3, max_parallel=10)
.filter(validate)
.with_tracking(tracking)
)
result = await pipeline.kiq(initial_input)
DataflowPipeline
Construction automatique de DAG depuis dépendances entre tâches via décorateurs @pipeline_task.
from taskiq_flow import DataflowPipeline
pipeline = DataflowPipeline.from_tasks(
broker,
[task_a, task_b, task_c]
)
Constructeur:
DataflowPipeline(
broker: BaseBroker,
tasks: list[Callable] = None,
max_parallel: int = None,
timeout: float = None,
pipeline_id: str = None
)
Méthodes de Classe:
| Méthode | Description |
|---|---|
from_tasks(broker, tasks, **kwargs) |
Construit pipeline depuis liste de fonctions de tâche avec décorateurs @pipeline_task |
Méthodes d’Instance (la plupart partagées avec Pipeline):
| Méthode | Description |
|---|---|
print_dag() |
Affiche DAG ASCII en console |
visualize() |
Retourne représentation JSON du DAG |
visualize_dot() |
Retourne chaîne DOT Graphviz |
kiq_dataflow(**kwargs) |
Exécute pipeline avec entrées nommées |
Exemple:
@broker.task
@pipeline_task(output="features")
def extract(données): ...
@broker.task
@pipeline_task(output="tags")
def tag(features): ...
pipeline = DataflowPipeline.from_tasks(broker, [extract, tag])
pipeline.print_dag()
# Sortie:
# Niveau 0: extract
# Niveau 1: tag
résultats = await pipeline.kiq_dataflow(data=données_entrée)
# résultats = {"features": ..., "tags": ...}
PipelineMiddleware
Le middleware qui orchestre l’exécution des étapes de pipeline.
from taskiq_flow import PipelineMiddleware
broker.add_middlewares(PipelineMiddleware())
Responsabilités:
- Intercepte completion des tâches
- Détermine prochaine étape à exécuter
- Gère transitions d’état du pipeline
- Passe résultats entre étapes
- Émet événements hooks
Note : Ce middleware doit être ajouté au broker pour que tout pipeline fonctionne.
PipelineContext
Métadonnées passées aux tâches quand with_context(enable=True) est défini.
from taskiq_flow import PipelineContext
@broker.task
async def my_task(data: str, context: PipelineContext):
print(f"Pipeline: {context.pipeline_id}")
print(f"Step: {context.step_index}")
print(f"Task ID: {context.task_id}")
Champs:
| Champ | Type | Description |
|---|---|---|
pipeline_id |
str |
ID unique instance pipeline |
step_index |
int |
Numéro étape courante (0-indexé) |
task_id |
str |
ID tâche taskiq sous-jacente |
execution_mode |
str |
"sequential", "parallel", "map_reduce" |
started_at |
datetime |
Horodatage début pipeline |
broker |
BaseBroker |
Référence instance broker |
Exceptions Principales
Toutes exceptions héritent de classe de base TaskiqFlowError.
from taskiq_flow import TaskiqFlowError
| Exception | Signification | Cause Typique |
|---|---|---|
PipelineError |
Échec générique pipeline | Étape échouée |
CycleError |
Dépendance circulaire détectée | DAG a cycle |
TaskNotFoundError |
Tâche non dans registry | Tâche manquante dans DataflowPipeline |
InvalidOutputError |
Conflit clé de sortie | Deux tâches déclarent même sortie |
ConfigurationError |
Config pipeline invalide | Middleware manquant, paramètres incorrects |
TrackingError |
Échec opération suivi | Stockage indisponible |
Exemple gestion:
try:
résultat = await pipeline.kiq(données)
except CycleError as e:
print(f"Cycle DAG détecté: {e}")
except PipelineError as e:
print(f"Pipeline échoué: {e}")
Utilitaires
DataflowRegistry
Pour construction manuelle de DAG et inspection.
from taskiq_flow import DataflowRegistry
registry = DataflowRegistry()
registry.register_task(tache, output="sortie", inputs=["entrée"])
dag = registry.build_dag()
Voir documentation détaillée dans docs/fr/api/dataflow.md.
ExecutionEngine
Exécuteur de DAG bas niveau pour cas avancés.
from taskiq_flow import ExecutionEngine
engine = ExecutionEngine(broker, dag)
résultats = await engine.execute(inputs={"x": 1, "y": 2})
Voir API docs execution.
PipelineScheduler
Planification cron de pipelines.
from taskiq_flow import PipelineScheduler
scheduler = PipelineScheduler(broker)
await scheduler.schedule(pipeline, cron="* * * * *")
await scheduler.start()
Voir guide planification.
Compatibilité Version
Cette documentation couvre Taskiq-Flow v0.3.0+.
Stabilité API:
PipelineetDataflowPipeline: Stable (v0.3+)- Décorateur
pipeline_task: Stable (v0.3+) PipelineMiddleware: Stable (v0.3+)PipelineScheduler: Stable (v0.3+)PipelineTrackingManager: Stable (v0.3+)
Changements cassants notés dans CHANGELOG.md.
Pour exemples détaillés, voir section Exemples. Pour doc méthode par méthode, se référer aux docstrings Python inline (help(Pipeline)).