Exemple: quickstart.py

Pipeline séquentiel simple avec opérations map, filter, group

Version : {VERSION} Fichier : examples/quickstart.py

Aperçu

Cet exemple démontre les fondamentaux de Taskiq-Flow en utilisant un pipeline séquentiel classique. Il couvre:

  • Définition de tâches avec @broker.task
  • Construction de pipeline avec .call_next(), .map(), .filter()
  • Exécution du pipeline et récupération des résultats
  • Compréhension du flux de données à travers étapes

Explication Pas-à-Pas du Code

import asyncio
from taskiq import InMemoryBroker
from taskiq_flow import Pipeline, PipelineMiddleware

# 1. Initialiser broker et ajouter middleware
broker = InMemoryBroker()
broker.add_middlewares(PipelineMiddleware())

# 2. Définir les tâches
@broker.task
def add_one(value: int) -> int:
    return value + 1

@broker.task
def repeat(value: int, times: int) -> list[int]:
    return [value] * times

@broker.task
def is_positive(value: int) -> bool:
    return value >= 0

# 3. Construire le pipeline
async def main():
    pipeline = (
        Pipeline(broker)
        .call_next(add_one)           # Étape 1: 1 → 2
        .call_next(repeat, times=4)   # Étape 2: 2 → [2,2,2,2]
        .map(add_one)                  # Étape 3: [2,2,2,2] → [3,3,3,3]
        .filter(is_positive)           # Étape 4: garder positifs (tous gardés)
    )

    # 4. Exécuter
    task = await pipeline.kiq(1)
    result = await task.wait_result()
    print("Résultat:", result.return_value)  # [3, 3, 3, 3]

asyncio.run(main())

Explication Étape par Étape

Étape 1: call_next(add_one)

  • Entrée: 1
  • Opération: add_one(1) = 2
  • Sortie: 2

Étape 2: call_next(repeat, times=4)

  • Entrée: 2
  • Opération: repeat(2, times=4) = [2, 2, 2, 2]
  • Sortie: [2, 2, 2, 2]

Étape 3: map(add_one)

  • Entrée: [2, 2, 2, 2] (itérable)
  • Opération: Appliquer add_one à chaque élément en parallèle
    • add_one(2) = 3
    • add_one(2) = 3
    • add_one(2) = 3
    • add_one(2) = 3
  • Sortie: [3, 3, 3, 3]

Étape 4: filter(is_positive)

  • Entrée: [3, 3, 3, 3] (itérable)
  • Opération: Garder éléments où is_positive(element) == True
    • Tous 4 éléments positifs → tous gardés
  • Sortie: [3, 3, 3, 3]

Concepts Clés Démontrés

  1. Définition de tâche — Toute étape de pipeline doit être une tâche (@broker.task)
  2. Exigence middlewarePipelineMiddleware doit être ajouté au broker
  3. Flux de données — Chaque étape reçoit sortie précédente (sauf call_after)
  4. Exécution parallèle.map() exécute éléments concurremment
  5. Enchaînement — Les méthodes retournent pipeline pour interface fluide

Exécuter l’Exemple

python examples/quickstart.py

Sortie attendue:

Résultat: [3, 3, 3, 3]

Variations à Tester

Utiliser filter pour éliminer négatifs

@broker.task
def subtract_three(valeur: int) -> int:
    return valeur - 5  # résultats en [-2, -2, -2, -2]

pipeline = (
    Pipeline(broker)
    .call_next(add_one)
    .call_next(repeat, times=4)
    .map(subtract_three)  # [2,2,2,2] → [-2,-2,-2,-2]
    .filter(is_positive)   # [] — tous filtrés
)

Utiliser group pour tâches indépendantes parallèles

@broker.task
def task_a(x: int) -> int: return x * 2
@broker.task
def task_b(x: int) -> int: return x + 10
@broker.task
def task_c(x: int) -> int: return x ** 2

pipeline = Pipeline(broker).call_next(add_one)  # 1 → 2
pipeline.group([task_a, task_b, task_c], param_names=["x"])
# All three receive 2 and execute in parallel
# Result: [4, 12, 4]

Chemin d’Apprentissage

Après cet exemple:

  1. Pipelines Dataflow — Construction automatique de DAG
  2. Définition des Tâches — Fonctionnalités avancées de tâches
  3. Suivi — Monitor exécutions de pipeline
  4. MapReduce — Motif de traitement par lots

Cet exemple est le “Hello World” de Taskiq-Flow. Maîtriser-le avant de passer à motifs plus complexes.


This site uses Just the Docs, a documentation theme for Jekyll.