Exemple: dataflow_audio_pipeline.py
DAG dataflow complet avec exécution parallèle, map-reduce et visualisation
Version : {VERSION} Fichier : examples/dataflow_audio_pipeline.py
Aperçu
Cet exemple exhaustif démontre toute la puissance de DataflowPipeline avec:
- Construction automatique de DAG depuis dépendances tâches
- Exécution parallèle de tâches indépendantes
- Motif map-reduce pour traitement par lots
- Visualisation de pipeline (DOT, JSON, ASCII)
- Workflows mixtes séquentiels et parallèles
C’est l’exemple de référence pour comprendre l’architecture dataflow.
Ce Que Cet Exemple Montre
- Utilisation du décorateur
@pipeline_taskavec sorties simples et multiples - Résolution automatique de dépendances — les tâches déclarent leurs sorties; tâches en aval consomment par nom de paramètre
- Exécution parallèle — tâches avec même dépendance s’exécutent concurremment
- Motif map-reduce — traitement batch avec
.map()et.reduce() - Visualisation DAG — affichage ASCII, export DOT, JSON
Explication du Code
Définition des Tâches
from taskiq import InMemoryBroker
from taskiq_flow import DataflowPipeline, pipeline_task
broker = InMemoryBroker(await_inplace=True)
# Tâche 1: Extraire caractéristiques audio (aucune dépendance)
@broker.task
@pipeline_task(output="audio_features")
async def extract_audio_features(track_paths: list[str]) -> dict:
features = {...}
return features
# Tâche 2: Calculer features MIR (dépend de audio_features)
@broker.task
@pipeline_task(output="mir_features")
async def compute_mir_features(audio_features: dict) -> dict:
# Reçoit audio_features automatiquement
return {...}
# Tâche 3: Générer tags (dépend de mir_features)
@broker.task
@pipeline_task(output="tags")
async def generate_tags(mir_features: dict) -> list[str]:
return ["electronic", "dance"]
# Tâche 4: Créer embedding (dépend de mir_features ET tags)
@broker.task
@pipeline_task(output="vector")
async def create_embedding(mir_features: dict, tags: list[str]) -> list[float]:
# Reçoit les deux entrées automatiquement
return [0.1, 0.5, 0.8]
Le pipeline construit automatiquement ce DAG:
flowchart TD
A[extract_audio_features] --> B[compute_mir_features]
A --> C[generate_tags]
B --> D[create_embedding]
C --> D
Note: create_embedding dépend à la fois de mir_features (sortie de compute_mir_features) et tags (sortie de generate_tags), donc il s’exécute après que les deux tâches parallèles sont terminées.
---
## Exemple 1: Pipeline Séquentiel avec Dépendances Automatiques
```python
async def example_sequential_pipeline():
pipeline = DataflowPipeline.from_tasks(
broker,
[
extract_audio_features,
compute_mir_features,
generate_tags,
create_embedding,
],
)
pipeline.print_dag()
# Sortie:
# Ordre Exécution DAG:
# Niveau 0 (parallèle): extract_audio_features
# Niveau 1 (parallèle): compute_mir_features
# Niveau 2 (parallèle): generate_tags, create_embedding
# Sorties finales: audio_features, mir_features, tags, vector
résultats = await pipeline.kiq_dataflow(track_paths=["track1.mp3"])
# résultats = {
# "audio_features": {...},
# "mir_features": {...},
# "tags": [...],
# "vector": [...]
# }
Résolution dépendances:
extract_audio_featuresaucune dépendance → exécute en premiercompute_mir_featuresbesoinaudio_features→ exécute après étape 1generate_tagsbesoinmir_features→ exécute après étape 2create_embeddingbesoinmir_featuresettags→ exécute après étapes 2 & 3 complétées
Exemple 2: Exécution Parallèle
Avec ajout de extract_spectral_features qui dépend aussi seulement de audio_features:
@broker.task
@pipeline_task(output="spectral_features")
async def extract_spectral_features(audio_features: dict) -> dict:
await asyncio.sleep(0.2)
return {"spectral_rolloff": 5000.0}
@broker.task
@pipeline_task(output="combined_features")
async def combine_features(
mir_features: dict,
spectral_features: dict,
tags: list[str],
) -> dict:
return {**mir_features, **spectral_features, "tags": tags}
pipeline = DataflowPipeline.from_tasks(
broker,
[
extract_audio_features,
compute_mir_features, # Niveau 1
extract_spectral_features, # Niveau 1 (parallèle à compute_mir_features)
generate_tags, # Niveau 2 (dépend de mir_features)
combine_features, # Niveau 2 (dépend de mir_features + spectral_features + tags)
],
)
Niveaux d’exécution:
- Niveau 0:
extract_audio_features - Niveau 1:
compute_mir_features,extract_spectral_features(parallèle) - Niveau 2:
generate_tags,combine_features(parallèle après leurs dépendances satisfaites)
Exemple 3: Motif Map-Reduce
Traiter multiples pistes en parallèle, puis agréger:
# Map: traiter chaque piste indépendamment
@broker.task
@pipeline_task(output="track_features")
async def process_single_track(track: str) -> dict:
return {"track": track, "duration": 180.0, "bpm": 120}
# Reduce: agréger toutes features de pistes
@broker.task
@pipeline_task(output="playlist_stats")
async def aggregate_track_features(track_features: list[dict]) -> dict:
total_duration = sum(t["duration"] for t in track_features)
avg_bpm = sum(t["bpm"] for t in track_features) / len(track_features)
return {"total_tracks": len(track_features), "total_duration": total_duration, "avg_bpm": avg_bpm}
# Construire pipeline
pipeline = DataflowPipeline(broker)
pipeline.map(
process_single_track,
tracks, # ["track1.mp3", "track2.mp3", ...]
output="track_features",
max_parallel=4,
)
pipeline.reduce(
aggregate_track_features,
input_name="track_features",
output="playlist_stats",
)
résultats = await pipeline.kiq_map_reduce()
# résultats = {"track_features": [...], "playlist_stats": {...}}
Exemple 4: Visualisation
Le pipeline fournit multiples formats de visualisation:
# ASCII art (console)
pipeline.print_dag()
# JSON (for web UIs)
viz_json = pipeline.visualize()
# Structure:
# {
# "nodes": [{"id": "task_name", "outputs": [...], "inputs": [...]}, ...],
# "edges": [{"from": "task_a", "to": "task_b"}],
# "levels": [["task1"], ["task2", "task3"], ...]
# }
# DOT format (for Graphviz)
dot = pipeline.visualize_dot()
# Save and render:
# with open("pipeline.dot", "w") as f:
# f.write(dot)
# Run: dot -Tpng pipeline.dot -o pipeline.png
Exécuter l’Exemple
python examples/dataflow_audio_pipeline.py
Sortie attendue inclut:
- Affichages DAG ASCII montrant ordre exécution
- Représentation DOT DAG extrait
- Structure JSON DAG extrait
Points Clés à Retenir
- Résolution automatique de dépendances — Pas besoin d’enchaîner manuellement; juste déclarer sorties
- Exécution parallèle — Tâches indépendantes s’exécutent concurremment automatiquement
- Programmation dataflow — Tâches sont fonctions pures; sortie va vers entrées
- Débogage visuel —
print_dag()montre exactement comment tâches s’exécuteront - Motifs évolutifs — Map-reduce intégré pour charges batch
Chemin d’Apprentissage
Après cet exemple:
- Guide DataflowPipeline — Plongée profonde fonctionnalités dataflow
- Guide d’Exécution — Parallélisme, timeouts, gestion erreurs
- Guide de Performance — Réglage
max_parallel, profils ressources
C’est l’exemple flagship. Étudiez-le thoroughly pour comprendre modèle dataflow Taskiq-Flow.