Référence API: Moteur d’Exécution
ExecutionEngine, DAG, utilitaires map-reduce, et gestion d’erreurs
Version : {VERSION} Module : taskiq_flow.execution_engine,taskiq_flow.dataflow.dag,taskiq_flow.map_reduce
ExecutionEngine
Moteur de bas niveau pour exécuter des DAGs directement, évitant l’abstraction Pipeline.
from taskiq_flow import ExecutionEngine, DataflowRegistry
# Construire le registre manuellement
registry = DataflowRegistry()
registry.register_task(load, output="raw", inputs=[])
registry.register_task(process, output="clean", inputs=["raw"])
registry.register_task(save, output="saved", inputs=["clean"])
# Construire le DAG
dag = registry.build_dag()
# Créer le moteur
engine = ExecutionEngine(broker, dag)
# Exécuter
results = await engine.execute(inputs={"source": "data.csv"})
# results = {"raw": ..., "clean": ..., "saved": ...}
Constructeur :
ExecutionEngine(
broker: BaseBroker,
dag: DAG,
max_parallel: int = None,
on_step_complete: callable = None
)
Méthodes :
| Méthode | Signature | Description |
|---|---|---|
execute |
execute(inputs: dict) -> dict |
Exécute le DAG avec les entrées données |
execute_async |
execute_async(inputs: dict) -> AsyncIterator |
Stream les résultats au fur et à mesure |
cancel |
cancel() |
Arrête l’exécution en cours |
Événements :
async def on_step(task_name: str, result: Any):
print(f"Étape {task_name} terminée")
engine = ExecutionEngine(broker, dag, on_step_complete=on_step)
DAG (Directed Acyclic Graph)
Représente le graphe d’exécution des tâches.
from taskiq_flow.dataflow import DAG, DAGNode
dag = DAG()
node = DAGNode(task=my_task, output="result", inputs=["input_a"])
dag.add_node(node)
Méthodes DAG :
| Méthode | Description |
|---|---|
add_node(node: DAGNode) |
Ajoute un nœud tâche |
add_edge(from_task, to_task) |
Ajoute une dépendance |
topological_sort() -> list[DAGNode] |
Retourne l’ordre d’exécution |
get_parallel_levels() -> list[list[DAGNode]] |
Groupe les nœuds par niveau d’exécution parallèle |
validate() |
Vérifie cycles, nœuds manquants |
print() |
Visualisation ASCII vers console |
Propriétés DAG :
| Propriété | Type | Description |
|---|---|---|
nodes |
list[DAGNode] |
Tous les nœuds du graphe |
edges |
set[tuple[DAGNode, DAGNode]] |
Arêtes de dépendance |
roots |
list[DAGNode] |
Nœuds sans dépendances |
leaves |
list[DAGNode] |
Nœuds sans dépendants |
DAGNode
Représente une tâche unique dans le DAG avec sa spécification E/S.
from taskiq_flow.dataflow import DAGNode
node = DAGNode(
task=my_task_function,
output="result_key",
inputs=["input_a", "input_b"],
metadata={"description": "Ma tâche"}
)
Propriétés :
| Propriété | Type | Description |
|---|---|---|
task |
Callable |
La fonction tâche |
task_name |
str |
Nom auto-généré ou personnalisé |
output |
str |
Clé de sortie (unique) |
outputs |
list[str] |
Clés de sortie (multiples) |
inputs |
list[str] |
Clés d’entrée requises |
metadata |
dict |
Métadonnées arbitraires |
DAGBuilder
Helper pour construire des DAGs par programmation (moins courant ; utilisez généralement DataflowRegistry).
from taskiq_flow import DAGBuilder
builder = DAGBuilder()
builder.add_task(task1, output="a", inputs=[])
builder.add_task(task2, output="b", inputs=["a"])
builder.add_task(task3, output="c", inputs=["a", "b"])
dag = builder.build()
Pattern Builder :
dag = (DAGBuilder()
.node(load, output="raw", inputs=[])
.node(process, output="clean", inputs=["raw"])
.node(save, output="saved", inputs=["clean"])
.build()
)
MapReduce
Utilitaire pour map parallèle suivi d’un reduce.
MapReduce.map
from taskiq_flow import MapReduce
mapped = await MapReduce.map(
broker,
map_func, # Fonction tâche à appliquer
items: Iterable, # Éléments à traiter
output: str = "mapped",
max_parallel: int = None
)
# Retourne : MapReduceResult (comme une Task)
MapReduce.reduce
reduced = await MapReduce.reduce(
broker,
reduce_func, # Fonction d'agrégation
mapped_result, # Résultat de MapReduce.map
input_name: str, # Nom de la sortie mappée à consommer
output: str = "reduced"
)
# Retourne : Task (avec résultat final)
MapReduce.map_reduce (combiné)
final = await MapReduce.map_reduce(
broker,
map_func,
items,
reduce_func,
map_output="mapped",
reduce_output="final",
max_parallel=10
)
Les trois retournent des objets Task ; appelez .wait_result() pour récupérer la valeur.
DataflowRegistry (Avancé)
Enregistrement manuel des tâches pour construction dynamique de pipeline.
from taskiq_flow import DataflowRegistry
registry = DataflowRegistry()
# Enregistrer les tâches avec E/S explicites
registry.register_task(
task=extract,
output="features",
inputs=["audio_files"] # entrée externe
)
registry.register_task(
task=tag,
output="tags",
inputs=["features"] # dépend de la sortie de extract
)
# Inspection
print("Tâches:", [t.task_name for t in registry.get_tasks()])
print("Sorties:", registry.get_outputs())
print("Entrées externes:", registry.get_external_inputs())
# Construction du DAG
dag = registry.build_dag()
dag.print()
# Exécution via ExecutionEngine
engine = ExecutionEngine(broker, dag)
results = await engine.execute(inputs={"audio_files": files})
Requêtes Registry :
| Méthode | Description |
|---|---|
get_tasks() |
Liste tous les objets TaskNode |
get_outputs() |
Liste toutes les clés de sortie |
get_external_inputs() |
Liste les entrées non produites par une tâche |
get_producer(output_key) |
Retourne la tâche produisant cette sortie |
get_consumers(input_key) |
Liste les tâches consommant cette entrée |
build_dag() |
Construit le DAG, valide, retourne prêt à exécuter |
Notes de Version
- ExecutionEngine introduit en v0.3.0
DAGetDAGNodesont utilisés en interne par DataflowPipeline- MapReduce disponible depuis v0.2.0
Prochaines Étapes
- API Suivi — Surveiller l’exécution avec PipelineTrackingManager
- API WebSocket — HookManager et système d’événements
- API Core — Référence Pipeline et middleware
- Exemple Pipeline Audio Dataflow — Voir ExecutionEngine utilisé dans un pipeline DAG réel
- Exemple Découverte Registry — Construction manuelle de DAG et utilisation d’ExecutionEngine
Pour cas avancés uniquement. 95% des utilisateurs devraient se contenter des abstractions Pipeline et DataflowPipeline.