Guide WebSocket

Streaming d’événements en temps réel pour tableaux de bord et monitoring

Version: 1.0.0 Lié: Guide de Suivi, Guide API

Aperçu

L’intégration WebSocket de Taskiq-Flow fournit un streaming en direct des événements d’exécution de pipeline — idéal pour construire des tableaux de bord en temps réel, affichages de progression et outils de monitoring.

Ce guide couvre :

  • Configuration d’un serveur WebSocket
  • Abonnement des clients aux événements de pipeline
  • Types d’événements et charges utiles
  • Configuration de la couche de transport
  • Considérations de déploiement en production

1. Architecture

[Pipeline] → [HookManager] → [WebSocketBridge] → [Serveur WebSocket] → [Clients]

Composants:

  1. Pipeline — Émet des événements via des hooks à chaque étape du cycle de vie
  2. HookManager — Collecte les événements des pipelines
  3. WebSocketBridge — Connecte HookManager au transport WebSocket
  4. Serveur WebSocket — Gère les connexions clients et diffuse
  5. Client — Navigateur web, app de monitoring, tableau de bord

2. Démarrage Rapide

2.1. Configuration Côté Serveur

import asyncio
from taskiq import InMemoryBroker
from taskiq_flow import Pipeline
from taskiq_flow.hooks import HookManager, setup_websocket_bridge
from taskiq_flow.integration.websocket import get_websocket_server

# 1. Create broker and hook manager
broker = InMemoryBroker()
hook_manager = HookManager()

# 2. Configure WebSocket bridge
setup_websocket_bridge(hook_manager)  # connect HookManager → WebSocket transport

# 3. Create pipeline with hooks attached
pipeline = Pipeline(broker)
pipeline.pipeline_id = "workflow_demo"
pipeline.with_hooks(hook_manager)

# Ajouter des tâches au pipeline...

# 4. Démarrer le serveur WebSocket
async def main():
    serveur = get_websocket_server(host="0.0.0.0", port=8765)
    await serveur.start_server()

    # 5. Exécuter le pipeline
    résultat = await pipeline.kiq(données)

    # 6. Garder le serveur actif (ou intégrer à la boucle d'événements de votre app)
    await asyncio.Event().wait()

asyncio.run(main())

2.2. Connexion Client (JavaScript)

// Se connecter au serveur WebSocket
const ws = new WebSocket('ws://localhost:8765');

// S'abonner à un pipeline spécifique
ws.onopen = () => {
    ws.send(JSON.stringify({
        type: 'subscribe',
        pipeline_id: 'workflow_demo'
    }));
};

// Recevoir les événements
ws.onmessage = (event) => {
    const eventData = JSON.parse(event.data);
    console.log('Événement pipeline:', eventData);

    switch (eventData.type) {
        case 'PipelineStartEvent':
            showPipelineStarted();
            break;
        case 'StepStartEvent':
            showStepProgress(eventData.step_name);
            break;
        case 'StepCompleteEvent':
            updateProgress(eventData.step_name, eventData.duration_ms);
            break;
        case 'PipelineCompleteEvent':
            showResults(eventData.result);
            break;
        case 'PipelineErrorEvent':
            showError(eventData.error);
            break;
    }
};

3. Types d’Événements

Tous les événements sont serialisables JSON avec un champ type indiquant le genre d’événement.

3.1. PipelineStartEvent

{
  "type": "PipelineStartEvent",
  "pipeline_id": "workflow_demo",
  "pipeline_type": "sequential",
  "timestamp": "2026-04-29T18:50:19+02:00",
  "input": {...}
}

Émis quand un pipeline commence son exécution.

3.2. StepStartEvent

{
  "type": "StepStartEvent",
  "pipeline_id": "workflow_demo",
  "step_name": "process_data",
  "step_index": 2,
  "task_id": "abc123",
  "timestamp": "2026-04-29T18:50:19.5+02:00"
}

Émis avant que chaque étape ne démarre.

3.3. StepCompleteEvent

{
  "type": "StepCompleteEvent",
  "pipeline_id": "workflow_demo",
  "step_name": "process_data",
  "step_index": 2,
  "result": {"processed": 42},
  "duration_ms": 150.5,
  "timestamp": "2026-04-29T18:50:19.7+02:00"
}

Émis après qu’une étape se termine avec succès.

3.4. PipelineCompleteEvent

{
  "type": "PipelineCompleteEvent",
  "pipeline_id": "workflow_demo",
  "pipeline_type": "sequential",
  "status": "COMPLETED",
  "duration_ms": 1250.3,
  "result": {"final": "output"},
  "timestamp": "2026-04-29T18:50:20.5+02:00"
}

Émis quand le pipeline entier se termine avec succès.

3.5. StepErrorEvent

{
  "type": "StepErrorEvent",
  "pipeline_id": "workflow_demo",
  "step_name": "failing_task",
  "error": "ValueError: invalid input",
  "timestamp": "2026-04-29T18:50:19.9+02:00"
}

Émis quand une étape échoue.

3.6. PipelineErrorEvent

{
  "type": "PipelineErrorEvent",
  "pipeline_id": "workflow_demo",
  "error": "Pipeline échoué à l'étape 'validate'",
  "timestamp": "2026-04-29T18:50:20.2+02:00"
}

Émis quand le pipeline abandonne suite à une erreur irrécupérable.


4. Implémentation Côté Client

4.1. Client JavaScript Basique

class MoniteurPipeline {
    constructor(url, pipelineId) {
        this.url = url;
        this.pipelineId = pipelineId;
        this.ws = null;
        this.events = [];
        this.callbacks = {};
    }

    connect() {
        this.ws = new WebSocket(this.url);

        this.ws.onopen = () => {
            console.log('Connecté au serveur WebSocket');
            this.subscribe(this.pipelineId);
        };

        this.ws.onmessage = (event) => {
            const data = JSON.parse(event.data);
            this.handleEvent(data);
        };

        this.ws.onerror = (err) => {
            console.error('Erreur WebSocket:', err);
        };

        this.ws.onclose = () => {
            console.log('Connexion WebSocket fermée');
            this.reconnect();
        };
    }

    subscribe(pipelineId) {
        this.ws.send(JSON.stringify({
            type: 'subscribe',
            pipeline_id: pipelineId
        }));
    }

    handleEvent(event) {
        this.events.push(event);
        const eventType = event.type;

        if (this.callbacks[eventType]) {
            this.callbacks[eventType](event);
        }

        // Gestionnaire d'événement générique
        if (this.callbacks['*']) {
            this.callbacks['*'](event);
        }
    }

    on(eventType, callback) {
        this.callbacks[eventType] = callback;
    }

    reconnect() {
        setTimeout(() => this.connect(), 3000);
    }
}

// Utilisation
monitor = new MoniteurPipeline('ws://localhost:8765', 'pipeline_123');
monitor.on('StepCompleteEvent', (event) => {
    console.log(`Étape ${event.step_name} complétée en ${event.duration_ms}ms`);
});
monitor.on('PipelineCompleteEvent', (event) => {
    console.log('Pipeline terminé avec statut:', event.status);
});
monitor.connect();

4.2. Client Python (pour scripts)

import asyncio
import websockets
import json

async def monitor_pipeline(uri, pipeline_id):
    async with websockets.connect(uri) as websocket:
        # S'abonner
        await websocket.send(json.dumps({
            "type": "subscribe",
            "pipeline_id": pipeline_id
        }))

        # Recevoir les événements
        async for message in websocket:
            event = json.loads(message)
            print(f"[{event['type']}] {event}")

            if event['type'] == 'PipelineCompleteEvent':
                print(f"Pipeline terminé: {event['status']}")

asyncio.run(monitor_pipeline('ws://localhost:8765', 'pipeline_123'))

5. Gestion des Abonnements

5.1. S’abonner à un Pipeline

Les clients envoient un message d’abonnement:

{
  "type": "subscribe",
  "pipeline_id": "mon_pipeline_001"
}

Après abonnement, tous les événements pour ce pipeline sont relayés.

5.2. Se Désabonner

{
  "type": "unsubscribe",
  "pipeline_id": "mon_pipeline_001"
}

5.3. S’abonner à Tous les Pipelines (Caractère générique)

{
  "type": "subscribe",
  "pipeline_id": "*"
}

Attention : Diffuser tous les événements peut générer un trafic important dans les systèmes à haut débit.

5.4. Multiples Abonnements

Un client peut s’abonner à plusieurs pipelines:

monitor.subscribe('pipeline_1');
monitor.subscribe('pipeline_2');
// Reçoit les événements des deux, distingués par le champ pipeline_id

6. Configuration du Serveur

6.1. Hôte et Port Personnalisés

# Utiliser une interface et port spécifiques
serveur = get_websocket_server(host='127.0.0.1', port=8765)
await serveur.start_server()

# Ou lier à toutes les interfaces (exposer au réseau)
serveur = get_websocket_server(host='0.0.0.0', port=8765)

6.2. CORS et En-têtes de Sécurité

Si derrière un reverse proxy (nginx, Traefik), configurer les en-têtes CORS:

# nginx.conf
location /ws {
    proxy_pass http://localhost:8765;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    add_header Access-Control-Allow-Origin "*";
    add_header Access-Control-Allow-Credentials true;
}

6.3. Terminaison SSL/TLS

Terminer SSL au reverse proxy:

# HTTPS → WSS forwarding
location /ws {
    proxy_pass http://localhost:8765;
    # WSS (WebSocket sécurisé) géré par la config SSL nginx
}

Le client se connecte avec:

const ws = new WebSocket('wss://votredomaine.com/ws');

6.4. Multiples Workers

Pour multiples processus Python, chacun a besoin de son propre serveur WebSocket sur un port différent (ou utiliser un broker de messages comme Redis Pub/Sub pour coordonner):

# Worker 1
serveur1 = get_websocket_server(port=8765)

# Worker 2
serveur2 = get_websocket_server(port=8766)

# Load balancer distribue les connexions WebSocket

Pour un véritable partage d’événements multi-worker, utiliser le transport Redis:

from taskiq_flow.transport import RedisPubSubTransport

transport = RedisPubSubTransport(client_redis)
serveur = get_websocket_server(transport=transport)
# Maintenant tous les workers partagent l'état des événements via Redis

7. Filtrage des Événements

Réduire la bande passante en filtrant côté serveur:

from taskiq_flow.hooks import EventFilter

# Send only events for specific pipelines
filter = EventFilter(pipeline_ids=['pipeline_1', 'pipeline_2'])
hook_manager.add_filter(filter)

# Only step events (not pipeline-level)
filter = EventFilter(event_types=['StepStartEvent', 'StepCompleteEvent'])
hook_manager.add_filter(filter)

Filtrage côté client également possible:

monitor.on('StepCompleteEvent', (event) => {
    if (event.step_name === 'étape_importante') {
        highlightStep(event.step_name);
    }
});

8. Référence des Messages

Requête d’Abonnement

Champ Type Description
type "subscribe" Type de message
pipeline_id str ou "*" Pipeline auquel s’abonner

Requête de Désabonnement

Champ Type Description
type "unsubscribe" Type de message
pipeline_id str Pipeline dont se désabonner

Message d’Événement (serveur → client)

Champ Type Description
type str Type d’événement (voir Section 3)
pipeline_id str ID du pipeline d’origine
timestamp ISO 8601 str Horodatage de l’événement

Champs additionnels selon le type d’événement (voir ci-dessus).


9. Déploiement en Production

9.1. Déploiement Docker

# Dockerfile
FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["python", "-m", "mon_app_websocket"]
# docker-compose.yml
version: '3.8'
services:
  redis:
    image: redis:7-alpine
  app:
    build: .
    ports:
      - "8765:8765"
    environment:
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis

9.2. Service Systemd

# /etc/systemd/system/taskiq-flow-ws.service
[Unit]
Description=Serveur WebSocket Taskiq-Flow
After=network.target

[Service]
Type=simple
User=appuser
WorkingDirectory=/opt/taskiq-flow
ExecStart=/usr/bin/python3 -m mon_app_websocket
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

9.3. Monitoring

Health check endpoint:

from aiohttp import web

async def health(request):
    return web.json_response({"status": "healthy"})

app = web.Application()
app.router.add_get('/health', health)

Ou utiliser l’endpoint health intégré (/health) depuis le Guide API.

9.4. Scalabilité

Pour déploiements haut débit:

  • Scaling horizontal : Déployer multiples instances de serveur WebSocket avec sessions sticky ou transport Redis Pub/Sub
  • Load balancing : Utiliser nginx ou HAProxy avec support WebSocket
  • Limites de connexion : Configurer max connexions par worker (limites OS)
  • Compression de messages : Activer permessage-deflate pour payloads larges

10. Considérations de Sécurité

10.1. Authentification

Exiger des tokens d’authentification à la connexion:

# Validation côté serveur
async def authenticate(websocket, token):
    if not validate_token(token):
        await websocket.close(code=4001, reason="Unauthorized")
        return False
    return True

# Le client envoie le token à la connexion
ws = new WebSocket(`ws://localhost:8765?token=${authToken}`);

10.2. Autorisation

Filtrer les événements par permissions utilisateur:

class FiltreAuth(EventFilter):
    def __init__(self, user_id, pipelines_autorises):
        self.user_id = user_id
        self.allowed = pipelines_autorises

    def should_emit(self, event):
        return event.pipeline_id in self.allowed

10.3. Limitation de Débit (Rate Limiting)

Prévenir les abus:

from collections import defaultdict
import time

class LimiteurDebit:
    def __init__(self, max_events_per_second=100):
        self.limits = defaultdict(list)

    def allow(self, client_id):
        now = time.time()
        self.limits[client_id] = [
            t for t in self.limits[client_id] if now - t < 1
        ]
        if len(self.limits[client_id]) < 100:
            self.limits[client_id].append(now)
            return True
        return False

11. Dépannage

Connexion Refusée

Symptôme : Le client ne peut pas se connecter, erreur “Connection refused”.

Corrections

  • Vérifier que le serveur tourne: netstat -lnp | grep 8765
  • Vérifier les règles firewall permettent le port 8765
  • S’assurer que le host binding correspond (0.0.0.0 pour accès externe)

Aucun Événement Reçu Après Connexion

Symptôme : Connexion réussie, mais aucun événement n’arrive.

Corrections

  • S’assurer que le pipeline a pipeline_id défini
  • Confirmer que pipeline.with_hooks(hook_manager) est appelé
  • Vérifier que setup_websocket_bridge(hook_manager) est appelé avant le démarrage du pipeline
  • Vérifier le format du message d’abonnement (voir Section 5)

Utilisation Mémoire Élevée

Symptôme : Mémoire serveur augmente avec le temps.

Corrections :

  • Limiter le nombre de pipelines suivis
  • Implémenter le nettoyage automatique des clients déconnectés
  • Utiliser le transport Redis pour externaliser l’état du processus
  • Définir la limite maximale de connexions

Événements Dans le Désordre

Symptôme : Le client reçoit StepComplete avant StepStart.

Corrections :

  • Utiliser les garanties de livraison séquentielle (par défaut pour WebSocket)
  • S’assurer que tous les hooks sont correctement attachés
  • Vérifier les middleware personnalisés qui pourraient émettre des événements de manière asynchrone

10.4. Chiffrement SSL/TLS (WSS)

Activez les connexions WebSocket chiffrées pour la production :

from taskiq_flow.integration.websocket.server import PipelineWebSocketServer

# Avec certificats SSL
server = PipelineWebSocketServer(
    host="0.0.0.0",
    port=8765,
    ssl_cert="/chemin/vers/cert.pem",
    ssl_key="/chemin/vers/key.pem",
)

# Connexion avec wss://
# Client : new WebSocket("wss://votre-domaine.com/ws")

Via un reverse proxy (recommandé) :

server {
    listen 443 ssl;
    server_name ws.taskiq-flow.example.com;

    ssl_certificate /etc/letsencrypt/live/ws.example.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/ws.example.com/privkey.pem;

    location /ws {
        proxy_pass http://localhost:8765;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
    }
}
// Le client se connecte via wss à travers le proxy
const ws = new WebSocket("wss://ws.taskiq-flow.example.com/ws");

12. Résumé

12.1. Résumé Composants

Composant Responsabilité
Pipeline Génère événements d’exécution
HookManager Collecte événements des pipelines
WebSocketBridge Achemine événements vers transport WebSocket
Serveur WebSocket Gère connexions clients, diffuse
Client S’abonne, reçoit, affiche événements

Script de base (5 lignes):

hooks = HookManager()
setup_websocket_bridge(hooks)
pipeline = Pipeline(broker).with_hooks(hooks)
server = get_websocket_server()
await server.start_server()

13. Prochaines Étapes


Streamer les événements de pipeline en direct. Combiner avec Stockage de Suivi pour historique persistant.



This site uses Just the Docs, a documentation theme for Jekyll.