Référence API: Intégration WebSocket
HookManager, pont WebSocket, et transport d’événements temps réel
Version : {VERSION} Module : taskiq_flow.hooks,taskiq_flow.transport.websocket
HookManager
Bus central d’événements qui collecte les événements d’exécution de pipeline et les dispatch aux abonnés.
from taskiq_flow.hooks import HookManager
hook_manager = HookManager()
pipeline = Pipeline(broker).with_hooks(hook_manager)
Événements émis:
PipelineStartEvent— Exécution pipeline démarréeStepStartEvent— Une étape a démarréStepCompleteEvent— Une étape complétéePipelineCompleteEvent— Pipeline terminé avec succèsStepErrorEvent— Une étape a échouéPipelineErrorEvent— Pipeline a échoué
Ajouter Hooks Personnalisés
class MonHook:
async def on_pipeline_start(self, event):
print(f"Pipeline {event.pipeline_id} démarré")
async def on_step_complete(self, event):
print(f"Étape {event.step_name} finie en {event.duration_ms}ms")
hook = MonHook()
hook_manager.add_hook(hook)
Méthodes hooks (toutes optionnelles):
| Méthode | Événement |
|---|---|
on_pipeline_start(event: PipelineStartEvent) |
Pipeline démarré |
on_step_start(event: StepStartEvent) |
Étape en démarrage |
on_step_complete(event: StepCompleteEvent) |
Étape terminée |
on_pipeline_complete(event: PipelineCompleteEvent) |
Pipeline complété |
on_step_error(event: StepErrorEvent) |
Étape échouée |
on_pipeline_error(event: PipelineErrorEvent) |
Erreur pipeline |
Types d’Événements
Tous événements sont modèles Pydantic avec discriminateur type.
PipelineStartEvent
from taskiq_flow.hooks import PipelineStartEvent
event = PipelineStartEvent(
pipeline_id="mon_pipeline",
pipeline_type="sequential",
timestamp=datetime.now(),
input=données_initiales
)
Champs:
| Champ | Type | Description |
|---|---|---|
type |
Literal["PipelineStartEvent"] |
Discriminateur type d’événement |
pipeline_id |
str |
Instance pipeline |
pipeline_type |
str |
"sequential" ou "dataflow" |
timestamp |
datetime |
Horodatage événement |
input |
Any |
Données d’entrée initiales |
StepStartEvent
from taskiq_flow.hooks import StepStartEvent
event = StepStartEvent(
pipeline_id="mon_pipeline",
step_name="process_data",
step_index=2,
task_id="task_abc123",
timestamp=datetime.now()
)
Champs:
| Champ | Type | Description |
|---|---|---|
type |
Literal["StepStartEvent"] |
Type événement |
pipeline_id |
str |
Pipeline origine |
step_name |
str |
Nom de la tâche |
step_index |
int |
Position dans pipeline (0-indexé) |
task_id |
str |
ID tâche taskiq sous-jacente |
timestamp |
datetime |
Horodatage événement |
StepCompleteEvent
from taskiq_flow.hooks import StepCompleteEvent
event = StepCompleteEvent(
pipeline_id="mon_pipeline",
step_name="process_data",
step_index=2,
result={"processed": 42},
duration_ms=150.5,
timestamp=datetime.now()
)
Champs:
| Champ | Type | Description |
|---|---|---|
type |
Literal["StepCompleteEvent"] |
Type événement |
pipeline_id |
str |
Pipeline origine |
step_name |
str |
Tâche complétée |
step_index |
int |
Position étape |
result |
Any |
Valeur de retour tâche |
duration_ms |
float |
Temps d’exécution en millisecondes |
timestamp |
datetime |
Horodatage événement |
PipelineCompleteEvent
from taskiq_flow.hooks import PipelineCompleteEvent
event = PipelineCompleteEvent(
pipeline_id="mon_pipeline",
pipeline_type="dataflow",
status="COMPLETED",
duration_ms=1250.3,
result={"final": "output"},
timestamp=datetime.now()
)
Champs:
| Champ | Type | Description |
|---|---|---|
type |
Literal["PipelineCompleteEvent"] |
Type événement |
pipeline_id |
str |
ID pipeline |
pipeline_type |
str |
Type pipeline |
status |
str |
"COMPLETED", "FAILED", "CANCELLED" |
duration_ms |
float |
Temps total exécution |
result |
Any |
Résultat final pipeline |
timestamp |
datetime |
Horodatage événement |
Événements Erreur
from taskiq_flow.hooks import StepErrorEvent, PipelineErrorEvent
erreur_étape = StepErrorEvent(
pipeline_id="mon_pipeline",
step_name="failing_task",
error="ValueError: invalid input",
timestamp=datetime.now()
)
erreur_pipeline = PipelineErrorEvent(
pipeline_id="mon_pipeline",
error="Pipeline abandonné après 3 échecs",
timestamp=datetime.now()
)
Transport WebSocket
setup_websocket_bridge
Connecte HookManager à la couche de transport WebSocket:
from taskiq_flow.hooks import HookManager, setup_websocket_bridge
hook_manager = HookManager()
setup_websocket_bridge(hook_manager)
# Now all hooks are forwarded to the WebSocket server
Cela installe un pont qui transfère les événements du HookManager aux serveurs WebSocket connectés.
get_websocket_server
Factory pour obtenir ou créer un serveur WebSocket:
from taskiq_flow.integration.websocket import get_websocket_server
serveur = get_websocket_server(
host="0.0.0.0",
port=8765,
transport=None # Utilise WebSocketTransport par défaut
)
await serveur.start_server()
Paramètres:
| Paramètre | Type | Défaut | Description |
|---|---|---|---|
host |
str |
"0.0.0.0" |
Adresse d’écoute |
port |
int |
8765 |
Port d’écoute |
transport |
WebSocketTransport | None |
Auto-créé si None |
Le serveur est un singleton par configuration; appels subséquents à get_websocket_server() avec même host/port retournent même instance.
Filtrage d’Événements
Réduire trafic en filter les événements:
from taskiq_flow.hooks import EventFilter
# Seulement pipelines spécifiques
filtre = EventFilter(pipeline_ids=["pipeline_1", "pipeline_2"])
# Seulement événements d'étape
filtre = EventFilter(event_types=["StepStartEvent", "StepCompleteEvent"])
# Both
filter = EventFilter(
pipeline_ids=["*"], # all pipelines (or specific ones)
event_types=["StepCompleteEvent", "PipelineCompleteEvent"]
)
hook_manager.add_filter(filter)
Logique EventFilter
Événement → Vérifier correspondance pipeline_id? → Vérifier correspondance event_type? → Émettre?
Les deux filtres sont OU logique interne: un événement passe s’il correspond AUX DEUX filtres pipeline_ids ET event_types. Utiliser "*" pour tout matcher.
Protocole WebSocket
Connexion
Client se connecte via WebSocket standard:
ws://localhost:8765
Pour connexions sécurisées (WSS), terminer SSL au reverse proxy (nginx, Traefik).
Abonnement
Après connexion, client envoie message abonnement:
{
"type": "subscribe",
"pipeline_id": "mon_pipeline"
}
Abonnement wildcard (recevoir tous événements):
{
"type": "subscribe",
"pipeline_id": "*"
}
Désabonnement:
{
"type": "unsubscribe",
"pipeline_id": "mon_pipeline"
}
Format Message (Serveur → Client)
Tous messages sont JSON avec champ type:
{
"type": "StepCompleteEvent",
"pipeline_id": "pipeline_123",
"step_name": "process_data",
"duration_ms": 150.2,
"timestamp": "2026-05-05T16:30:00Z"
}
Référence complète champs dans Guide WebSocket.
Transport Personnalisé
Pour cas avancés, implémenter son propre transport:
from taskiq_flow.transport import WebSocketTransport
class MonTransport(WebSocketTransport):
async def broadcast(self, event: BaseEvent):
# Logique routage personnalisée (ex: Redis Pub/Sub, Kafka)
await self.redis.publish("pipeline_events", event.json())
transport = MonTransport()
serveur = get_websocket_server(transport=transport)
Coordination Multi-Worker
Pour multiples processus Python partageant état événements:
from taskiq_flow.transport import RedisPubSubTransport
transport = RedisPubSubTransport(client_redis)
serveur = get_websocket_server(transport=transport)
# Tous workers sur même canal Redis partagent événements
Tous workers souscivent au même canal Redis pub/sub; événements de n’importe quel worker sont broadcast aux clients WebSocket connectés à n’importe quel worker.
Considérations Production
Limites de Connexion
import asyncio
# Limiter connexions WebSocket concurrentes
MAX_CONNECTIONS = 1000
sémaphore = asyncio.Semaphore(MAX_CONNECTIONS)
# Dans gestionnaire connexion:
async def handle_connection(websocket):
if not sémaphore.acquire(blocking=False):
await websocket.close(code=1013, reason="Trop de connexions")
return
try:
await websocket_service.handle(websocket)
finally:
sémaphore.release()
Arrêt Gracieux
async def shutdown():
await server.close() # Arrêter accepter nouvelles connexions
await server.wait_closed() # Attendre fermeture connexions existantes
Monitoring
Exposer métriques:
@app.get("/ws/metrics")
async def ws_metrics():
return {
"connexions": server.connection_count(),
"messages_envoyés": server.messages_sent,
"messages_par_seconde": server.rate()
}
Dépannage
| Problème | Diagnostic | Correction |
|---|---|---|
| Clients ne reçoivent événements | setup_websocket_bridge() non appelé |
Appeler avant démarrage pipeline |
| Connexion refusée | Serveur non démarré | Appeler await server.start_server() |
| Événements retardés | Filtre événements bloque | Vérifier configuration filtre |
| CPU élevé | Trop de connexions | Appliquer limites connexions |
Résumé
| Composant | Rôle |
|---|---|
HookManager |
Collecte événements depuis pipelines |
Classes BaseEvent |
Données événements structurées |
EventFilter |
Diffusion sélective événements |
WebSocketTransport |
Transport bas niveau (défaut ou custom) |
WebSocketServer |
Gère connexions clients |
get_websocket_server() |
Factory/singleton accès |
Minimal configuration:
hooks = HookManager()
setup_websocket_bridge(hooks)
pipeline = Pipeline(broker).with_hooks(hooks)
server = get_websocket_server()
await server.start_server()
Pour détails implémentation client, voir Guide WebSocket. Pour stratégies filtrage, section 7 de ce guide.