Exemple: scheduled_pipeline.py
Planification de pipelines avec déclencheurs cron et intervalles
Version : {VERSION} Fichier : examples/scheduled_pipeline.py
Aperçu
Cet exemple démontre comment planifier des pipelines pour exécution périodique en utilisant LabelBasedScheduler. Il couvre:
- Planification cron (avec précision seconde)
- Planification par intervalle
- Lister et inspecter jobs planifiés
Note : Cet exemple utilise LabelBasedScheduler, mécanisme de planification par labels de TaskIQ. Pour planification cron production, considérer PipelineScheduler avec intégration APScheduler.
Ce Que Cet Exemple Montre
- Créer un pipeline simple
- Utiliser
LabelBasedSchedulerpour planifier exécutions pipeline - Expressions cron avec précision seconde
- Planification par intervalle
- Lister schedules actifs
Explication du Code
import asyncio
from taskiq import InMemoryBroker
from taskiq_flow import Pipeline, PipelineMiddleware
from taskiq_flow.scheduling import LabelBasedScheduler
# Créer broker
broker = InMemoryBroker(await_inplace=True).with_middlewares(PipelineMiddleware())
# Définir tâche simple
@broker.task
async def log_message(msg: str) -> str:
"""Logger un message."""
return f"Traited: {msg}"
async def main():
# Créer pipeline
pipeline = Pipeline(broker).call_next(log_message)
# Créer scheduler
scheduler = LabelBasedScheduler(broker)
# Planifier avec expression cron (toutes les 5 secondes)
schedule_id = await scheduler.schedule_with_cron(
pipeline=pipeline,
label="every-5-seconds",
cron="*/5 * * * * *", # cron 6 champs pour précision seconde
args=("Hello from scheduled pipeline!",),
)
print(f"Scheduled with cron: {schedule_id}")
# Planifier avec intervalle (toutes les 3 secondes)
interval_id = await scheduler.schedule_with_interval(
pipeline=pipeline,
label="every-3-seconds",
interval_seconds=3,
args=("Interval scheduled run!",),
)
print(f"Scheduled with interval: {interval_id}")
# Attendre quelques exécutions
print("Waiting for pipeline executions (12 seconds)...")
await asyncio.sleep(12)
# Lister jobs planifiés
schedules = scheduler.list_schedules()
print(f"Active schedules: {len(schedules)}")
for sched in schedules:
print(f" - {sched['label']}: cron={sched.get('cron')}, enabled={sched['enabled']}")
asyncio.run(main())
Méthodes de Planification
Planification Cron
schedule_id = await scheduler.schedule_with_cron(
pipeline=pipeline,
label="mon-schedule",
cron="*/5 * * * * *", # Toutes 5 secondes (cron 6 champs)
args=("message",),
)
Format cron 6 champs:seconde minute Heure jour mois jour-semaine
Exemples:
*/5 * * * * *— Toutes les 5 secondes0 * * * * *— Toutes les minutes à seconde 00 0 * * * *— Toutes les heures à minute 0, seconde 0
Planification Intervalle
interval_id = await scheduler.schedule_with_interval(
pipeline=pipeline,
label="interval-3s",
interval_seconds=3,
args=("message",),
)
Exécute toutes les N secondes, indépendamment de l’heure système.
Sortie Attendue
Scheduled with cron: schedule_123456
Scheduled with interval: interval_789012
Waiting for pipeline executions (12 seconds)...
INFO:root:Processed: Hello from scheduled pipeline!
INFO:root:Processed: Interval scheduled run!
INFO:root:Processed: Hello from scheduled pipeline!
INFO:root:Processed: Interval scheduled run!
...
Active schedules: 2
- every-5-seconds: cron=*/5 * * * * *, enabled=True
- every-3-seconds: cron=None, enabled=True
Vous devriez voir le message logué plusieurs fois comme schedules se déclenchent.
Points Clés
Planification par Label
- Chaque schedule requiert un
labelunique (utilisé pour identification) - Les labels peuvent activer/désactiver schedules dynamiquement
- Le scheduler gère persistance schedules selon votre broker
Limite InMemoryBroker
Avec InMemoryBroker, schedules fonctionnent seulement pendant processus en cours; perdus au redémarrage. Pour planification persistante, utiliser brokers Redis avec stores de schedule appropriés.
Multiples Schedules
Vous pouvez planifier même pipeline plusieurs fois avec labels, expressions cron, ou arguments différents.
Variations
Planification Avancée avec PipelineScheduler
Pour planification plus avancée (timezones, gestion misfire), utiliser PipelineScheduler:
from taskiq_flow import PipelineScheduler
scheduler = PipelineScheduler(broker)
job_id = await scheduler.schedule(
pipeline,
cron="0 9 * * *", # Quotidien à 9h
args=("daily",)
)
await scheduler.start()
Voir Guide de Planification pour détails complets sur PipelineScheduler.
Chemin d’Apprentissage
Après cet exemple:
- Guide de Planification — Planification cron et intervalle complète
- PipelineScheduler API — Référence API
- Guide de Retry — Gestion échecs pipelines planifiés
Cet exemple montre bases planification par label. Pour production, explorer PipelineScheduler avec stores de jobs externes (PostgreSQL/Redis).