Exemple: websocket_demo.py
Streaming d’événements en temps réel via WebSockets
Version : {VERSION} Fichier : examples/websocket_demo.py
Aperçu
Cet exemple démontre comment configurer un serveur WebSocket qui diffuse en temps réel les événements d’exécution de pipeline. Il couvre:
- Créer un
HookManageret le connecter au transport WebSocket - Démarrer un serveur WebSocket sur un host/port spécifique
- S’abonner aux événements de pipeline depuis un client
- Observer les événements de complétion d’étape en direct
Note : Ceci est une démo minimale. Pour production, ajouter authentication, gestion d’erreurs, et gestion connexions robuste.
Ce Que Cet Exemple Montre
- Configuration
HookManageravecsetup_websocket_bridge() - Attacher hooks à un pipeline
- Démarrer le serveur WebSocket
- Comment les clients peuvent se connecter et s’abonner
- Les messages d’événements broadcastés
Explication du Code
import asyncio
from taskiq import InMemoryBroker
from taskiq_flow import Pipeline
from taskiq_flow.hooks import HookManager, setup_websocket_bridge
from taskiq_flow.integration.websocket import get_websocket_server
from taskiq_flow.middleware import PipelineMiddleware
# Create broker
broker = InMemoryBroker(await_inplace=True).with_middlewares(PipelineMiddleware())
# Define simple tasks
@broker.task
def add_one(x: int) -> int:
return x + 1
@broker.task
def multiply_by_two(x: int) -> int:
return x * 2
async def main():
# 1. Configure hook manager and WebSocket bridge
hook_manager = HookManager()
setup_websocket_bridge(hook_manager)
# 2. Create pipeline and attach hooks
pipeline = Pipeline(broker)
pipeline.pipeline_id = "websocket_demo"
pipeline.call_next(add_one, param_name="x")
pipeline.call_next(multiply_by_two, param_name="x")
pipeline.with_hooks(hook_manager)
# 3. Start WebSocket server in background
websocket_server = get_websocket_server()
_ = asyncio.create_task(
websocket_server.start_server("127.0.0.1", 8765),
)
print("WebSocket server started on ws://127.0.0.1:8765")
msg = '{"pipeline_id": "websocket_demo"}'
print(f"Connect a WebSocket client and subscribe with: {msg}")
print("Then run the pipeline to see real-time events...")
# Wait for server to start
await asyncio.sleep(1)
# 4. Execute the pipeline
result = await pipeline.kiq(5) # Start with 5 → 6 → 12
print(f"Pipeline result: {result}")
# Keep server alive briefly
await asyncio.sleep(5)
print("Demo complete. Server will shut down.")
asyncio.run(main())
Séquence d’Événements
Quand le pipeline s’exécute, événements suivants sont broadcastés:
- PipelineStartEvent
{"type": "PipelineStartEvent", "pipeline_id": "websocket_demo", "timestamp": "..."} - StepStartEvent (pour add_one)
{"type": "StepStartEvent", "pipeline_id": "websocket_demo", "step_name": "add_one", ...} - StepCompleteEvent (pour add_one)
{"type": "StepCompleteEvent", "pipeline_id": "websocket_demo", "step_name": "add_one", "result": 6, "duration_ms": 1.2, ...} -
StepStartEvent (pour multiply_by_two)
-
StepCompleteEvent (pour multiply_by_two)
- PipelineCompleteEvent
{"type": "PipelineCompleteEvent", "pipeline_id": "websocket_demo", "status": "COMPLETED", "result": 12, ...}
Implémentation Client (JavaScript)
Ouvrir console navigateur ou script Node.js:
// Se connecter au serveur WebSocket
const ws = new WebSocket('ws://127.0.0.1:8765');
// S'abonner au pipeline de démo
ws.onopen = () => {
console.log('Connecté au serveur WebSocket');
ws.send(JSON.stringify({
type: 'subscribe',
pipeline_id: 'websocket_demo'
}));
};
// Gérer événements entrants
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Événement:', data.type, data);
switch (data.type) {
case 'StepCompleteEvent':
console.log(`Étape ${data.step_name} finie:`, data.result);
break;
case 'PipelineCompleteEvent':
console.log('Pipeline terminé avec statut:', data.status);
console.log('Résultat final:', data.result);
break;
}
};
Points Clés Configuration
1. Créer HookManager
hook_manager = HookManager()
2. Installer Pont WebSocket
setup_websocket_bridge(hook_manager)
Cela connecte système événements HookManager au transport WebSocket.
3. Attacher Hooks au Pipeline
pipeline = Pipeline(broker).with_hooks(hook_manager)
Sans ceci, le pipeline n’émettra pas événements vers WebSocket.
4. Define pipeline_id
pipeline.pipeline_id = "my_pipeline"
Necessary for clients to subscribe to specific pipelines.
5. Start Server
server = get_websocket_server(host="127.0.0.1", port=8765)
await server.start_server()
Personnalisation
Change Port
server = get_websocket_server(port=9000)
Multiple Pipelines
pipeline1 = Pipeline(broker).with_hooks(hook_manager)
pipeline1.pipeline_id = "pipeline_1"
pipeline2 = Pipeline(broker).with_hooks(hook_manager)
pipeline2.pipeline_id = "pipeline_2"
Clients can subscribe to specific pipeline IDs.
Event Filtering
from taskiq_flow.hooks import EventFilter
# Only send step completion events
filter = EventFilter(
pipeline_ids=["*"],
event_types=["StepCompleteEvent", "PipelineCompleteEvent"]
)
hook_manager.add_filter(filter)
Multiple Pipelines
pipeline1 = Pipeline(broker).with_hooks(hook_manager)
pipeline1.pipeline_id = "pipeline_1"
pipeline2 = Pipeline(broker).with_hooks(hook_manager)
pipeline2.pipeline_id = "pipeline_2"
Clients can subscribe to specific pipeline IDs.
Event Filtering
Multiples Pipelines
pipeline1 = Pipeline(broker).with_hooks(hook_manager)
pipeline1.pipeline_id = "pipeline_1"
pipeline2 = Pipeline(broker).with_hooks(hook_manager)
pipeline2.pipeline_id = "pipeline_2"
Les clients peuvent s’abonner à IDs pipeline spécifiques.
Filtrage Événements
from taskiq_flow.hooks import EventFilter
# Only send step completion events
filter = EventFilter(
pipeline_ids=["*"],
event_types=["StepCompleteEvent", "PipelineCompleteEvent"]
)
hook_manager.add_filter(filter)
Dépannage
Aucun Événement Reçu
- Vérifier
setup_websocket_bridge(hook_manager)appelé avantpipeline.kiq() - Vérifier
pipeline.with_hooks(hook_manager)appelé - Vérifier
pipeline.pipeline_iddéfini
Connexion Refusée
- Vérifier
await server.start_server()appelé avant connexion - Vérifier host/port correspondent client connection string
Événements Dans Désordre
WebSocket livre messages en ordre; si désordre, vérifier problèmes réseau ou middleware custom émettant événements incorrectement.
Chemin d’Apprentissage
Après cet exemple:
- Guide WebSocket — Configuration WebSocket complète, filtrage, déploiement production
- Guide de Suivi — Stockage historique données alongside événements temps réel
- Guide API — Exposer via REST pour clients non-WebSocket
Cet exemple montre bases streaming temps réel. Pour production, ajouter authentication, pooling connexions, et scaling horizontal avec transport Redis Pub/Sub.