Guide WebSocket
Streaming d’événements en temps réel pour tableaux de bord et monitoring
Version : {VERSION} Lié : Guide de Suivi, Guide API
Aperçu
L’intégration WebSocket de Taskiq-Flow fournit un streaming en direct des événements d’exécution de pipeline — idéal pour construire des tableaux de bord en temps réel, affichages de progression et outils de monitoring.
Ce guide couvre :
- Configuration d’une route WebSocket via FastAPI
- Abonnement des clients aux événements de pipeline
- Types d’événements et charges utiles
- Filtrage d’événements
- Déploiement en production
- Considérations de sécurité
1. Architecture
[Pipeline] → [HookManager] → [WebSocketBridge] → [FastAPI WS Manager] → [Clients]
Composants :
- Pipeline — Émet des événements via des hooks à chaque étape du cycle de vie
- HookManager — Collecte les événements des pipelines
- WebSocketBridge — Connecte HookManager au transport WebSocket
- FastAPI WebSocket Manager — Gère les connexions clients et diffuse via routes FastAPI WebSocket
- Client — Navigateur web, app de monitoring, tableau de bord
Note : Taskiq-Flow utilise une intégration WebSocket FastAPI uniquement. Le serveur picows autonome historique (
get_websocket_server,PipelineWebSocketServer) a été supprimé en v1.1. Les événements WebSocket sont désormais exposés via des endpointsWebSocketFastAPI montés dans votre application FastAPI.
2. Démarrage Rapide
2.1. Configuration Côté Serveur
import asyncio
from taskiq import InMemoryBroker
from taskiq_flow import Pipeline
from taskiq_flow.hooks import HookManager, setup_websocket_bridge
# 1. Create broker and hook manager
broker = InMemoryBroker()
hook_manager = HookManager()
# 2. Configure WebSocket bridge
setup_websocket_bridge(hook_manager) # connect HookManager → WebSocket transport
# 3. Create pipeline with hooks attached
pipeline = Pipeline(broker)
pipeline.pipeline_id = "workflow_demo"
pipeline.with_hooks(hook_manager)
# Add tasks to pipeline...
# 4. For FastAPI: mount the WebSocket route in your app
# (See Section 6 below for the full FastAPI integration example)
# 5. Execute the pipeline
result = await pipeline.kiq(données)
2.2. Connexion Client (JavaScript)
// Se connecter au serveur WebSocket
const ws = new WebSocket('ws://localhost:8000/ws/workflow_demo');
// S'abonner à un pipeline spécifique
ws.onopen = () => {
ws.send(JSON.stringify({
type: 'subscribe',
pipeline_id: 'workflow_demo'
}));
};
// Recevoir les événements
ws.onmessage = (event) => {
const eventData = JSON.parse(event.data);
console.log('Événement pipeline:', eventData);
switch (eventData.type) {
case 'PipelineStartEvent':
showPipelineStarted();
break;
case 'StepStartEvent':
showStepProgress(eventData.step_name);
break;
case 'StepCompleteEvent':
updateProgress(eventData.step_name, eventData.duration_ms);
break;
case 'PipelineCompleteEvent':
showResults(eventData.result);
break;
case 'PipelineErrorEvent':
showError(eventData.error);
break;
}
};
3. Types d’Événements
Tous les événements sont serialisables JSON avec un champ type indiquant le genre d’événement.
3.1. PipelineStartEvent
{
"type": "PipelineStartEvent",
"pipeline_id": "workflow_demo",
"pipeline_type": "sequential",
"timestamp": "2026-04-29T18:50:19+02:00",
"input": {...}
}
Émis quand un pipeline commence son exécution.
3.2. StepStartEvent
{
"type": "StepStartEvent",
"pipeline_id": "workflow_demo",
"step_name": "process_data",
"step_index": 2,
"task_id": "abc123",
"timestamp": "2026-04-29T18:50:19.5+02:00"
}
Émis avant que chaque étape ne démarre.
3.3. StepCompleteEvent
{
"type": "StepCompleteEvent",
"pipeline_id": "workflow_demo",
"step_name": "process_data",
"step_index": 2,
"result": {"processed": 42},
"duration_ms": 150.5,
"timestamp": "2026-04-29T18:50:19.7+02:00"
}
Émis après qu’une étape se termine avec succès.
3.4. PipelineCompleteEvent
{
"type": "PipelineCompleteEvent",
"pipeline_id": "workflow_demo",
"pipeline_type": "sequential",
"status": "COMPLETED",
"duration_ms": 1250.3,
"result": {"final": "output"},
"timestamp": "2026-04-29T18:50:20.5+02:00"
}
Émis quand le pipeline entier se termine avec succès.
3.5. StepErrorEvent
{
"type": "StepErrorEvent",
"pipeline_id": "workflow_demo",
"step_name": "failing_task",
"error": "ValueError: invalid input",
"timestamp": "2026-04-29T18:50:19.9+02:00"
}
Émis quand une étape échoue.
3.6. PipelineErrorEvent
{
"type": "PipelineErrorEvent",
"pipeline_id": "workflow_demo",
"error": "Pipeline échoué à l'étape 'validate'",
"timestamp": "2026-04-29T18:50:20.2+02:00"
}
Émis quand le pipeline abandonne suite à une erreur irrécupérable.
4. Implémentation Côté Client
4.1. Client JavaScript Basique
class MoniteurPipeline {
constructor(url, pipelineId) {
this.url = url;
this.pipelineId = pipelineId;
this.ws = null;
this.events = [];
this.callbacks = {};
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('Connecté au serveur WebSocket');
this.subscribe(this.pipelineId);
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
this.handleEvent(data);
};
this.ws.onerror = (err) => {
console.error('Erreur WebSocket:', err);
};
this.ws.onclose = () => {
console.log('Connexion WebSocket fermée');
this.reconnect();
};
}
subscribe(pipelineId) {
this.ws.send(JSON.stringify({
type: 'subscribe',
pipeline_id: pipelineId
}));
}
handleEvent(event) {
this.events.push(event);
const eventType = event.type;
if (this.callbacks[eventType]) {
this.callbacks[eventType](event);
}
// Gestionnaire d'événement générique
if (this.callbacks['*']) {
this.callbacks['*'](event);
}
}
on(eventType, callback) {
this.callbacks[eventType] = callback;
}
reconnect() {
setTimeout(() => this.connect(), 3000);
}
}
// Utilisation
monitor = new MoniteurPipeline('ws://localhost:8000/ws/pipeline_123', 'pipeline_123');
monitor.on('StepCompleteEvent', (event) => {
console.log(`Étape ${event.step_name} complétée en ${event.duration_ms}ms`);
});
monitor.on('PipelineCompleteEvent', (event) => {
console.log('Pipeline terminé avec statut:', event.status);
});
monitor.connect();
4.2. Client Python (pour scripts)
import asyncio
import websockets
import json
async def monitor_pipeline(uri, pipeline_id):
async with websockets.connect(uri) as websocket:
# S'abonner
await websocket.send(json.dumps({
"type": "subscribe",
"pipeline_id": pipeline_id
}))
# Recevoir les événements
async for message in websocket:
event = json.loads(message)
print(f"[{event['type']}] {event}")
if event['type'] == 'PipelineCompleteEvent':
print(f"Pipeline terminé: {event['status']}")
asyncio.run(monitor_pipeline('ws://localhost:8000/ws/pipeline_123', 'pipeline_123'))
5. Gestion des Abonnements
5.1. S’abonner à un Pipeline
Les clients envoient un message d’abonnement:
{
"type": "subscribe",
"pipeline_id": "mon_pipeline_001"
}
Après abonnement, tous les événements pour ce pipeline sont relayés.
5.2. Se Désabonner
{
"type": "unsubscribe",
"pipeline_id": "mon_pipeline_001"
}
5.3. S’abonner à Tous les Pipelines (Caractère générique)
{
"type": "subscribe",
"pipeline_id": "*"
}
Attention : Diffuser tous les événements peut générer un trafic important dans les systèmes à haut débit.
5.4. Multiples Abonnements
Un client peut s’abonner à plusieurs pipelines:
monitor.subscribe('pipeline_1');
monitor.subscribe('pipeline_2');
// Reçoit les événements des deux, distingués par le champ pipeline_id
6. Configuration du Serveur
6.1. WebSocket via Route FastAPI
Les événements WebSocket sont exposés via une route WebSocket FastAPI à /ws/{pipeline_id}. Le client se connecte directement à votre application FastAPI :
from fastapi import FastAPI, WebSocket
from taskiq_flow.integration.websocket.fastapi_ws import (
fastapi_websocket_endpoint,
get_fastapi_ws_manager,
)
app = FastAPI()
@app.websocket("/ws/{pipeline_id}")
async def ws_endpoint(websocket: WebSocket, pipeline_id: str):
await fastapi_websocket_endpoint(websocket, pipeline_id)
Prérequis : Installer FastAPI avec :
pip install "fastapi[standard]".
6.2. CORS et En-têtes de Sécurité
Si derrière un reverse proxy (nginx, Traefik), configurer les en-têtes CORS:
# nginx.conf
location /ws {
proxy_pass http://localhost:8000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
add_header Access-Control-Allow-Origin "*";
add_header Access-Control-Allow-Credentials true;
}
6.3. Terminaison SSL/TLS
Le serveur ne gère pas directement le chiffrement TLS. Terminez TLS au reverse proxy :
# HTTPS → WSS forwarding
location /ws {
proxy_pass http://localhost:8000;
# WSS (WebSocket sécurisé) géré par la config SSL nginx
}
Le client se connecte avec:
const ws = new WebSocket('wss://votredomaine.com/ws');
6.4. Multi-Worker / Transport Redis
Pour multiples processus Python partageant l’état des événements :
from taskiq_flow.transport import RedisPubSubTransport
import redis
redis_client = redis.Redis(host="localhost", port=6379)
transport = RedisPubSubTransport(redis_client)
# Pass the transport when initializing the bridge
from taskiq_flow.hooks.bridge import setup_websocket_bridge
setup_websocket_bridge(hook_manager, use_fastapi=True)
# All workers connected to the same Redis channel share events
# WebSocket is always served via the FastAPI route
Prérequis : Installer l’extra
[brokers]:pip install "taskiq-flow[brokers]"pour le support Redis.
7. Filtrage des Événements
Réduire la bande passante en filtrant côté serveur:
from taskiq_flow.hooks import EventFilter
# Only send events for specific pipelines
filter = EventFilter(pipeline_ids=['pipeline_1', 'pipeline_2'])
hook_manager.add_filter(filter)
# Only send step events (not pipeline-level)
filter = EventFilter(event_types=['StepStartEvent', 'StepCompleteEvent'])
hook_manager.add_filter(filter)
Filtrage côté client également possible:
monitor.on('StepCompleteEvent', (event) => {
if (event.step_name === 'étape_importante') {
highlightStep(event.step_name);
}
});
8. Référence des Messages
Requête d’Abonnement
| Champ | Type | Description |
|---|---|---|
type |
"subscribe" |
Type de message |
pipeline_id |
str ou "*" |
Pipeline auquel s’abonner |
Requête de Désabonnement
| Champ | Type | Description |
|---|---|---|
type |
"unsubscribe" |
Type de message |
pipeline_id |
str |
Pipeline dont se désabonner |
Message d’Événement (serveur → client)
| Champ | Type | Description |
|---|---|---|
type |
str |
Type d’événement (voir Section 3) |
pipeline_id |
str |
ID du pipeline d’origine |
timestamp |
ISO 8601 str |
Horodatage de l’événement |
9. Déploiement en Production
9.1. Déploiement Docker
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "-m", "mon_app_fastapi"]
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
app:
build: .
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379
depends_on:
- redis
9.2. Service Systemd
# /etc/systemd/system/taskiq-flow-ws.service
[Unit]
Description=Taskiq-Flow WebSocket Server
After=network.target
[Service]
Type=simple
User=appuser
WorkingDirectory=/opt/taskiq-flow
ExecStart=/usr/bin/python3 -m mon_app_fastapi
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
9.3. Monitoring
Utilisez l’endpoint /health intégré de l’API FastAPI (voir Guide API).
9.4. Scalabilité
Pour déploiements haut débit :
- Scaling horizontal : Déployer multiples instances de l’application FastAPI avec sessions sticky ou transport Redis Pub/Sub
- Load balancing : Utiliser nginx ou HAProxy avec support WebSocket
- Limites de connexion : Configurer max connexions par worker (limites OS)
- Compression de messages : Activer permessage-deflate pour payloads larges
10. Considérations de Sécurité
10.1. Authentification
Exiger des tokens d’authentification à la connexion:
# Validation côté serveur
async def authenticate(websocket, token):
if not validate_token(token):
await websocket.close(code=4001, reason="Unauthorized")
return False
return True
# Le client envoie le token à la connexion via le header X-API-Key ou JWT
ws = new WebSocket('ws://localhost:8000/ws/pipeline_id');
10.2. Autorisation
Filtrer les événements par permissions utilisateur:
class AuthFilter(EventFilter):
def __init__(self, user_id, pipelines_autorises):
self.user_id = user_id
self.allowed = pipelines_autorises
def should_emit(self, event):
return event.pipeline_id in self.allowed
10.3. Limitation de Débit (Rate Limiting)
Prévenir les abus:
from collections import defaultdict
import time
class LimiteurDebit:
def __init__(self, max_events_per_second=100):
self.limits = defaultdict(list)
def allow(self, client_id):
now = time.time()
self.limits[client_id] = [
t for t in self.limits[client_id] if now - t < 1
]
if len(self.limits[client_id]) < 100:
self.limits[client_id].append(now)
return True
return False
10.4. Chiffrement SSL/TLS (WSS)
Puisque l’intégration WebSocket est FastAPI uniquement, la terminaison SSL est gérée au niveau du reverse proxy :
server {
listen 443 ssl;
server_name ws.taskiq-flow.example.com;
ssl_certificate /etc/letsencrypt/live/ws.example.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/ws.example.com/privkey.pem;
location /ws {
proxy_pass http://localhost:8000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
}
}
// Le client se connecte via wss à travers le proxy
const ws = new WebSocket("wss://ws.taskiq-flow.example.com/ws");
Note : Le
PipelineWebSocketServerhistorique avec SSL inline (ssl_cert/ssl_key) a été supprimé en v1.1. Utilisez un reverse proxy pour la terminaison TLS.
11. Dépannage
Connexion Refusée
Symptôme : Le client ne peut pas se connecter, erreur “Connection refused”.
Corrections :
- Vérifier que l’application FastAPI tourne :
uvicorn app:app - Vérifier les règles firewall permettent le port FastAPI (défaut : 8000)
- S’assurer que la route WebSocket est montée (
/ws/{pipeline_id}) - S’assurer que
setup_websocket_bridge(hook_manager)est appelé avant le pipeline
Aucun Événement Reçu Après Connexion
Symptôme : Connexion réussie, mais aucun événement n’arrive.
Corrections :
- S’assurer que le pipeline a
pipeline_iddéfini - Confirmer que
pipeline.with_hooks(hook_manager)est appelé - Vérifier que
setup_websocket_bridge(hook_manager)est appelé avant le démarrage du pipeline - Vérifier le format du message d’abonnement (voir Section 5)
Utilisation Mémoire Élevée
Symptôme : Mémoire serveur augmente avec le temps.
Corrections :
- Limiter le nombre de pipelines suivis
- Implémenter le nettoyage automatique des clients déconnectés
- Utiliser le transport Redis pour externaliser l’état du processus
- Définir la limite maximale de connexions
Événements Dans le Désordre
Symptôme : Le client reçoit StepComplete avant StepStart.
Corrections :
- Utiliser les garanties de livraison séquentielle (par défaut pour WebSocket)
- S’assurer que tous les hooks sont correctement attachés
- Vérifier les middleware personnalisés qui pourraient émettre des événements de manière asynchrone
12. Résumé
12.1. Résumé Composants
| Composant | Responsabilité |
|---|---|
Pipeline |
Génère événements d’exécution |
HookManager |
Collecte événements des pipelines |
WebSocketBridge |
Achemine événements vers transport WebSocket |
FastAPIWebSocketManager |
Gère connexions clients, diffuse |
Client |
S’abonne, reçoit, affiche événements |
Configuration de base (bridge + route FastAPI) :
# 1. Connecter le pipeline d'événements
hooks = HookManager()
setup_websocket_bridge(hooks)
pipeline = Pipeline(broker).with_hooks(hooks)
# 2. Monter la route WebSocket dans votre app FastAPI
# @app.websocket("/ws/{pipeline_id}")
# async def ws_endpoint(websocket: WebSocket, pipeline_id: str):
# await fastapi_websocket_endpoint(websocket, pipeline_id)
13. Prochaines Étapes
- Guide de Suivi — Backend de stockage et requêtes historiques
- Guide Dataflow — Pipeline DAG complet avec événements compatibles WebSocket
- Guide API — Endpoints REST pour backends de tableau de bord
- Exemples: Démo WebSocket — Code complet fonctionnel
Streamer les événements de pipeline en direct. Combiner avec Stockage de Suivi pour historique persistant.