Guide Dataflow
Construire des pipelines complexes et parallèles grâce à l’orchestration basée sur les données
Version : {VERSION} Lié : Guide des Pipelines, Guide d’Exécution, Guide des Concepts Fondamentaux
Aperçu
Le système dataflow de Taskiq-Flow est la méthode la plus puissante pour orchestrer des workflows complexes. Contrairement aux pipelines séquentiels où vous enchaînez manuellement les étapes, les pipelines dataflow construisent automatiquement un Graphe Orienté Acyclique (DAG) à partir de vos déclarations de tâches, permettant :
- Résolution automatique des dépendances — les tâches déclarent ce qu’elles produisent et consomment
- Parallélisme automatique — les tâches indépendantes s’exécutent simultanément sans configuration manuelle
- Exécution pilotée par les données — le flux de données détermine l’ordre d’exécution
- Construction dynamique de pipelines — ajouter des tâches flexiblement à l’exécution
Ce guide couvre l’ensemble du système dataflow : le DAG, le registre, les décorateurs, le moteur d’exécution et les patterns avancés.
1. Concepts Fondamentaux
1.1. Le paradigme Dataflow
Dans un pipeline dataflow, les tâches sont connectées par dépendances de données plutôt que par un ordre explicite :
Séquentiel : Dataflow :
task1 → task2 → task3 task1 ──→ task2
└──→ task3 (parallèle !)
Chaque tâche déclare :
output: La donnée produite (ex :"features")inputs: Les données consommées (ex :["features", "config"])
La bibliothèque résout automatiquement les dépendances et construit le graphe d’exécution.
1.2. Composants Clés
| Composant | Rôle | Module |
|---|---|---|
@pipeline_task |
Décorateur pour déclarer les E/S d’une tâche | taskiq_flow.decorators |
DataNode |
Représente un artefact de données dans le graphe | taskiq_flow.dataflow.node |
DAG / DAGNode |
Structure de graphe pour le suivi des dépendances | taskiq_flow.dataflow.dag |
DataflowRegistry |
Registre central pour la construction du DAG | taskiq_flow.dataflow.registry |
DataCache |
Stocke les résultats intermédiaires pendant l’exécution | taskiq_flow.dataflow.cache |
DataflowPipeline |
Pipeline de haut niveau avec orchestration dataflow | taskiq_flow.pipeline |
ExecutionEngine |
Exécuteur DAG bas niveau avec parallélisme | taskiq_flow.execution_engine |
2. Déclaration des Tâches Dataflow
2.1. Le décorateur @pipeline_task
Marquez une fonction comme tâche de pipeline avec un output explicite :
from taskiq import InMemoryBroker
from taskiq_flow import pipeline_task, DataflowPipeline
broker = InMemoryBroker()
@broker.task
@pipeline_task(output="features")
async def extract_features(paths: list[str]) -> dict:
"""Extrait les caractéristiques audio depuis des chemins de fichiers."""
return {"tempo": 120.0, "energy": 0.8}
Paramètres :
| Paramètre | Type | Défaut | Description |
|---|---|---|---|
output |
str |
requis | Nom de la donnée produite par cette tâche |
inputs |
list[str] |
None (inféré) |
Noms des données consommées |
retries |
int |
0 |
Nombre de tentatives en cas d’échec |
retry_delay |
float |
1.0 |
Délai initial entre les essais (secondes) |
retry_backoff |
float |
2.0 |
Multiplicateur du délai entre les essais |
retry_jitter |
bool |
True |
Ajouter un jitter aléatoire aux délais |
max_retry_time |
int |
300 |
Temps maximum total pour les retries (secondes) |
resources |
dict |
{} |
Estimation des ressources (memory, cpu) |
2.2. Inférence automatique des entrées
Si inputs n’est pas spécifié, les noms sont inférés depuis la signature de la fonction :
@broker.task
@pipeline_task(output="stats")
def compute_stats(features: dict, config: dict) -> dict:
# inputs automatiquement inférés : ["features", "config"]
return {"count": len(features)}
Les paramètres nommés self, cls, ou ceux ayant une valeur par défaut sont exclus.
2.3. Sorties Multiples
Utilisez @pipeline_task_multi_output lorsqu’une tâche produit plusieurs artefacts :
from taskiq_flow.decorators import pipeline_task_multi_output
@broker.task
@pipeline_task_multi_output(
outputs={"features": dict, "metadata": dict},
retries=2
)
async def process_audio(path: str) -> dict:
features = extract(path)
meta = get_metadata(path)
return {
"features": features, # → sortie "features"
"metadata": meta, # → sortie "metadata"
}
La première clé est la sortie principale ; toutes les clés sont enregistrées comme sorties nommées.
3. Construction des Pipelines Dataflow
3.1. DataflowPipeline.from_tasks()
L’approche recommandée pour la plupart des cas :
pipeline = DataflowPipeline.from_tasks(
broker,
[extract_features, compute_stats, generate_report]
)
Le DAG est construit automatiquement :
extract_featuresproduit"features"— aucune dépendance → Niveau 0compute_statsconsomme"features"→ dépend deextract_features→ Niveau 1generate_reportconsomme"stats"→ dépend decompute_stats→ Niveau 2
3.2. Ajout dynamique de tâches
pipeline = DataflowPipeline(broker)
pipeline.add_dataflow_task(extract_features)
pipeline.add_dataflow_task(compute_stats)
pipeline.add_dataflow_task(generate_report)
# Le DAG est reconstruit paresseusement à la première exécution
results = await pipeline.kiq_dataflow(paths=["chanson.mp3"])
3.3. Fan-Out / Fan-In (Dépendances Multiples)
Des tâches peuvent consommer plusieurs sorties, et plusieurs tâches peuvent partager une dépendance :
@broker.task
@pipeline_task(output="audio")
def load_audio(path: str) -> dict: ...
@broker.task
@pipeline_task(output="transcription")
def transcribe(audio: dict) -> str: ...
@broker.task
@pipeline_task(output="tags")
def generate_tags(audio: dict) -> list[str]: ... # parallèle avec transcribe
@broker.task
@pipeline_task(output="report")
def create_report(
transcription: str,
tags: list[str]
) -> dict: ... # fan-in : attend les deux
pipeline = DataflowPipeline.from_tasks(
broker,
[load_audio, transcribe, generate_tags, create_report]
)
# DAG :
# load_audio → (transcribe ∥ generate_tags) → create_report
3.4. Entrées Externes
Passez des données à l’exécution qui ne sont produites par aucune tâche :
results = await pipeline.kiq_dataflow(
user_id="user_123", # entrée externe
config={"mode": "fast"} # entrée externe
)
Les entrées externes sont identifiées automatiquement — ce sont les paramètres sans producteur correspondant.
4. Construction et Inspection du DAG
4.1. Le DataflowRegistry
Pour des cas avancés, construisez des DAGs manuellement :
from taskiq_flow import DataflowRegistry
registry = DataflowRegistry()
# Enregistrez les tâches avec leurs E/S explicites
registry.register_task(
task=load_data,
output="raw_data",
inputs=["source_url"] # entrée externe
)
registry.register_task(
task=clean_data,
output="clean_data",
inputs=["raw_data"]
)
registry.register_task(
task=save_data,
output="saved",
inputs=["clean_data"]
)
# Inspectez le graphe
print("Tâches:", [t.task_name for t in registry.get_tasks()])
print("Sorties:", registry.get_outputs())
print("Entrées externes:", registry.get_external_inputs())
# Construisez le DAG
dag = registry.build_dag()
dag.print()
4.2. Méthodes d’Inspection du DAG
from taskiq_flow.dataflow import DAG, DAGNode
# Ordre topologique
ordered = dag.topological_sort()
for node in ordered:
print(f"Niveau {node.level}: {node.task_name}")
# Groupes d'exécution parallèle
dag.compute_levels()
for i, level in enumerate(dag.levels):
names = [n.task_name for n in level]
print(f"Niveau {i} (parallèle): {names}")
# Tâches prêtes étant donné un ensemble complété
ready = dag.get_ready_tasks(completed={node_a})
# Visualisation (nécessite networkx)
from taskiq_flow.visualization import DAGVisualizer
viz = DAGVisualizer(dag)
print(viz.to_json())
print(viz.to_graphviz())
print(viz.visualize_ascii())
4.3. Chemin Critique et Groupes Parallèles
from taskiq_flow.visualization import DAGVisualizer
viz = DAGVisualizer(dag)
# Chemin critique = chaîne d'exécution la plus longue
critical = viz.detect_critical_path()
print(f"Chemin critique: {' → '.join(critical)}")
# Groupes de tâches pouvant s'exécuter en parallèle
groups = viz.find_parallelizable_groups()
for i, group in enumerate(groups):
print(f"Groupe parallèle {i}: {group}")
5. Exécution
5.1. Lancer des Pipelines Dataflow
# Exécuter et récupérer toutes les sorties
results = await pipeline.kiq_dataflow(track_paths=["chanson1.mp3", "chanson2.mp3"])
print(results)
# {"audio_features": {...}, "mir_features": {...}, "tags": [...], "embedding": [...]}
5.2. Le Moteur d’Exécution
DataflowPipeline utilise ExecutionEngine en interne pour l’exécution basée sur le DAG :
from taskiq_flow import ExecutionEngine
# Exécution personnalisée avec contrôle fin
engine = ExecutionEngine(
broker=broker,
dag=dag,
max_parallel=10,
error_mode=ErrorHandlingMode.CONTINUE_ON_ERROR,
resource_aware=True,
)
outputs = await engine.execute(
inputs={"source_file": "data.csv"},
pipeline_id="mon_pipeline"
)
Fonctionnalités d’exécution :
- Ordre topologique — les tâches s’exécutent après leurs dépendances
- Exécution parallèle — les tâches indépendantes tournent simultanément
- Retry par tâche — configuré via
@pipeline_task(retries=N) - Modes d’erreur —
FAIL_FAST,CONTINUE_ON_ERROR,SKIP_FAILED - Parallélisme basé sur les ressources — ajuste la concurrence selon CPU/mémoire
5.3. Opérations Map-Reduce
Pour le traitement par lots dans les pipelines :
from taskiq_flow import MapReduce
# Map parallèle
mapped = await MapReduce.map(
broker,
process_item,
items=list(range(100)),
output="processed",
max_parallel=10,
)
# Agrégation
result = await MapReduce.reduce(
broker,
aggregate_results,
mapped.results,
output="final",
initial=0,
)
# Map-reduce combiné
final = await MapReduce.map_reduce(
broker,
map_task=process_item,
reduce_task=aggregate_results,
items=list(range(1000)),
max_parallel=20,
reduce_chunk_size=100,
)
Fonctionnalités map :
- Parallélisme automatique avec
asyncio - Chunking intelligent pour les grands ensembles
- Callbacks de progression
- Collecte des erreurs avec taux de réussite
5.4. Map/Reduce au Niveau Pipeline
DataflowPipeline intègre les opérations map-reduce :
pipeline = DataflowPipeline.from_tasks(
broker, [extract_features]
)
# Ajouter une opération map (traitement parallèle de nombreux éléments)
pipeline.map(
process_track,
track_list,
output="track_features",
max_parallel=10,
)
# Ajouter une opération reduce (agrégation)
pipeline.reduce(
aggregate_features,
input_name="track_features",
output="playlist_stats",
)
# Exécuter
results = await pipeline.kiq_map_reduce(track_list=tracks)
6. Combiner Pipelines Séquentiels et Dataflow
6.1. Pattern Hybride
Utilisez des pipelines séquentiels pour les flux linéaires et dataflow pour les sous-workflows complexes :
# Coquille séquentielle
main_pipeline = Pipeline(broker)
@broker.task
def run_dataflow_subset(data: list) -> dict:
# Pipeline dataflow interne
sub_pipeline = DataflowPipeline.from_tasks(
broker,
[task_a, task_b, task_c]
)
return await sub_pipeline.kiq_dataflow(data=data)
main_pipeline.call_next(run_dataflow_subset).call_next(finalize)
6.2. Planification de Pipelines
Planifiez des pipelines dataflow avec cron ou intervalles :
from taskiq_flow import PipelineScheduler
scheduler = PipelineScheduler(broker)
# Planification cron
await scheduler.schedule(
pipeline,
cron="0 2 * * *", # Quotidien à 2h du matin
kwargs={"paths": ["daily_files/*.mp3"]}
)
# Planification par intervalle
await scheduler.schedule(
pipeline,
interval_seconds=3600, # Toutes les heures
label="hourly_analysis"
)
7. Cache et Résultats Intermédiaires
7.1. DataCache
Le DataCache stocke les résultats intermédiaires pendant l’exécution du pipeline :
from taskiq_flow.dataflow.cache import DataCache
cache = DataCache()
# Stocker des résultats
cache.set("features", {"tempo": 120.0})
cache.set("tags", ["electronic", "dance"])
# Récupérer
features = cache.get("features")
# Vérifier l'existence
if cache.has("embedding"):
embedding = cache.get("embedding")
# Injection automatique de dépendances pour les tâches
inputs = cache.inject(["features", "tags"])
# → {"features": {...}, "tags": [...]}
# Effacer entre les exécutions
cache.clear()
8. Gestion des Erreurs dans Dataflow
8.1. Modes d’Erreur
from taskiq_flow.errors import ErrorHandlingMode
# Arrêter au premier erreur (défaut)
engine = ExecutionEngine(broker, dag, error_mode=ErrorHandlingMode.FAIL_FAST)
# Continuer malgré les erreurs
engine = ExecutionEngine(broker, dag, error_mode=ErrorHandlingMode.CONTINUE_ON_ERROR)
# Ignorer les tâches échouées
engine = ExecutionEngine(broker, dag, error_mode=ErrorHandlingMode.SKIP_FAILED)
8.2. Configuration des Retries
Configurez les retries au niveau de chaque tâche :
@broker.task
@pipeline_task(
output="reliable_feature",
retries=3,
retry_delay=2.0,
retry_backoff=2.0,
)
def fetch_with_retry(url: str) -> dict:
# Sera réessayé jusqu'à 3 fois avec backoff exponentiel
...
9. Gestion des Ressources
9.1. Parallélisme Basé sur les Ressources
Contrôlez le parallélisme en fonction de l’utilisation estimée des ressources :
from taskiq_flow.optimization.parallel import ResourceAwareExecutor
executor = ResourceAwareExecutor(
max_cpu_percent=80.0,
max_memory_percent=80.0,
min_parallel=1,
max_parallel=10,
)
engine = ExecutionEngine(
broker,
dag,
resource_aware=True,
resource_profiles={
"heavy_task": {"estimated_memory_mb": 500, "estimated_cpu_cores": 2.0},
"light_task": {"estimated_memory_mb": 50, "estimated_cpu_cores": 0.5},
},
)
10. Visualisation
10.1. Visualisation Intégrée
# ASCII dans la console
pipeline.print_dag()
# JSON pour interfaces web
viz = pipeline.visualize() # → {"nodes": [...], "edges": [...], "levels": [...]}
# DOT pour Graphviz
dot = pipeline.visualize_dot()
# Rendu : dot -Tpng pipeline.dot -o pipeline.png
10.2. Visualisation Avancée (nécessite networkx)
from taskiq_flow.visualization import DAGVisualizer
viz = DAGVisualizer(dag)
# Formats d'exportation
viz.to_json() # JSON pour frontend
viz.to_cytoscape_json() # Format Cytoscape.js
viz.to_graphviz() # Format DOT
viz.visualize_ascii() # Art ASCII
# Analyse
viz.detect_critical_path() # Chemin d'exécution le plus long
viz.find_parallelizable_groups() # Tâches exécutables en parallèle
10.3. Diagrammes Mermaid
from taskiq_flow.visualization import MermaidGenerator
mermaid = MermaidGenerator(dag)
print(mermaid.to_mermaid()) # Diagramme basique
print(mermaid.to_mermaid_with_styling()) # Stylisé avec couleurs
11. Exemple Complet : Pipeline de Traitement Audio
import asyncio
from taskiq import InMemoryBroker
from taskiq_flow import DataflowPipeline, pipeline_task
broker = InMemoryBroker()
@broker.task
@pipeline_task(output="audio_features")
async def extract_audio(paths: list[str]) -> dict:
return {"duration": 180.0, "tempo": 120.0}
@broker.task
@pipeline_task(output="mir_features")
async def compute_mir(audio_features: dict) -> dict:
return {"key": "C major", "loudness": -12.5}
@broker.task
@pipeline_task(output="tags")
async def generate_tags(mir_features: dict) -> list[str]:
return ["electronic", "dance"]
@broker.task
@pipeline_task(output="embedding")
async def create_embedding(
mir_features: dict,
tags: list[str]
) -> list[float]:
return [0.1, 0.2, 0.3, 0.4, 0.5]
async def main():
pipeline = DataflowPipeline.from_tasks(
broker,
[extract_audio, compute_mir, generate_tags, create_embedding]
)
# Inspecter avant exécution
pipeline.print_dag()
# Exécuter
results = await pipeline.kiq_dataflow(
paths=["track1.mp3", "track2.mp3"]
)
print(results)
# {
# "audio_features": {"duration": 180.0, "tempo": 120.0},
# "mir_features": {"key": "C major", "loudness": -12.5},
# "tags": ["electronic", "dance"],
# "embedding": [0.1, 0.2, 0.3, 0.4, 0.5]
# }
asyncio.run(main())
12. Pièges Courants
| Symptôme | Cause | Solution |
|---|---|---|
| Toutes les tâches s’exécutent séquentiellement | Utilisation de Pipeline au lieu de DataflowPipeline |
Passer à DataflowPipeline |
| Erreurs “sortie manquante” | @pipeline_task(output=...) ne correspond pas au paramètre en aval |
Aligner le nom de sortie avec le nom du paramètre |
| “No DAG built” | kiq_dataflow() appelé sans tâches ajoutées |
Ajouter des tâches via from_tasks() ou add_dataflow_task() |
| Tâches exécutées deux fois | Mélange de .call_next() et dépendances @pipeline_task |
Utiliser une seule approche de manière cohérente |
| Deadlock détecté | Dépendance circulaire dans le flux de données | Redéfinir avec un flux de données avant uniquement |
| Explosion mémoire | Trop de tâches parallèles | Définir max_parallel ou utiliser le mode resource-aware |
13. Conseils de Performance
- Limiter le parallélisme — Utiliser
max_parallelpour contrôler le nombre de tâches simultanées - Utiliser map-reduce pour les lots —
MapReduce.map()avec chunking pour les grands ensembles - Profiler l’utilisation des ressources — Définir
resource_profilespour le parallélisme adaptatif - Éviter les résultats intermédiaires volumineux — Streamer les données quand c’est possible
- Réutiliser le DAG — Construire le DAG une fois, exécuter plusieurs fois avec des entrées différentes
14. Résumé de l’API
DataflowPipeline
| Méthode | Description |
|---|---|
from_tasks(broker, tasks) |
Créer un pipeline à partir d’une liste de tâches |
add_dataflow_task(task) |
Ajouter une tâche dynamiquement |
kiq_dataflow(**inputs) |
Exécuter le pipeline avec orchestration dataflow |
kiq_map_reduce(**inputs) |
Exécuter en mode map-reduce |
kiq_map_reduce_advanced(...) |
Exécution map-reduce avancée avec options complètes |
kiq_map_sweep(task, param_values, ...) |
Balayage multi-dimensionnel de paramètres |
visualize() |
Obtenir le DAG en JSON |
visualize_dot() |
Obtenir le DAG au format DOT |
print_dag() |
Affichage ASCII dans la console |
schedule_with_cron(scheduler, label, cron, **inputs) |
Planification avec expression cron |
schedule_with_labels(scheduler, label, ...) |
Planification avec LabelBasedScheduler |
map(task, items, output, ...) |
Ajouter une opération map |
reduce(task, input_name, output, ...) |
Ajouter une opération reduce |
DataflowRegistry
| Méthode | Description |
|---|---|
register_task(task, output, inputs, **meta) |
Enregistrer une tâche avec ses métadonnées E/S |
build_dag() |
Construire le DAG à partir des tâches enregistrées |
get_producer(data_name) |
Trouver la tâche productrice pour une donnée |
get_consumers(data_name) |
Trouver les tâches consommatrices d’une donnée |
get_external_inputs() |
Lister les entrées externes |
get_outputs() |
Lister toutes les sorties |
get_tasks() |
Lister toutes les tâches enregistrées |
ExecutionEngine
| Méthode | Description |
|---|---|
execute(inputs, pipeline_id) |
Exécuter le DAG avec les entrées fournies |
get_execution_report() |
Obtenir les statistiques d’exécution |
MapReduce
| Méthode | Description |
|---|---|
map(broker, task, items, output, ...) |
Opération map en parallèle |
reduce(broker, task, inputs, output, ...) |
Opération de réduction |
map_reduce(broker, map_task, reduce_task, items, ...) |
Map + reduce combinés |
map_sweep(broker, task, param_values, output, ...) |
Balayage multi-dimensionnel |
Maîtrisez le dataflow pour construire des workflows complexes et parallèles. Pour les patterns séquentiels, voir le Guide des Pipelines.