API Reference: WebSocket Integration
HookManager, WebSocket bridge, and real-time event transport
Version: {VERSION} Module: taskiq_flow.hooks,taskiq_flow.transport.websocket
HookManager
Central event bus that collects pipeline execution events and dispatches them to subscribers.
from taskiq_flow.hooks import HookManager
hook_manager = HookManager()
pipeline = Pipeline(broker).with_hooks(hook_manager)
Events emitted:
PipelineStartEvent— Pipeline execution startedStepStartEvent— A step startedStepCompleteEvent— A step completedPipelineCompleteEvent— Pipeline finished successfullyStepErrorEvent— A step failedPipelineErrorEvent— Pipeline failed
Adding Custom Hooks
class MyHook:
async def on_pipeline_start(self, event):
print(f"Pipeline {event.pipeline_id} started")
async def on_step_complete(self, event):
print(f"Step {event.step_name} finished in {event.duration_ms}ms")
hook = MyHook()
hook_manager.add_hook(hook)
Hook methods (all optional):
| Method | Event |
|---|---|
on_pipeline_start(event: PipelineStartEvent) |
Pipeline started |
on_step_start(event: StepStartEvent) |
Step starting |
on_step_complete(event: StepCompleteEvent) |
Step finished |
on_pipeline_complete(event: PipelineCompleteEvent) |
Pipeline completed |
on_step_error(event: StepErrorEvent) |
Step failed |
on_pipeline_error(event: PipelineErrorEvent) |
Pipeline error |
Event Types
All events are Pydantic models with a type discriminator.
PipelineStartEvent
from taskiq_flow.hooks import PipelineStartEvent
event = PipelineStartEvent(
pipeline_id="my_pipeline",
pipeline_type="sequential",
timestamp=datetime.now(),
input=initial_data
)
Fields:
| Field | Type | Description |
|---|---|---|
type |
Literal["PipelineStartEvent"] |
Event type discriminator |
pipeline_id |
str |
Pipeline instance ID |
pipeline_type |
str |
"sequential" or "dataflow" |
timestamp |
datetime |
Event time |
input |
Any |
Initial input data |
StepStartEvent
from taskiq_flow.hooks import StepStartEvent
event = StepStartEvent(
pipeline_id="my_pipeline",
step_name="process_data",
step_index=2,
task_id="task_abc123",
timestamp=datetime.now()
)
Fields:
| Field | Type | Description |
|---|---|---|
type |
Literal["StepStartEvent"] |
Event type |
pipeline_id |
str |
Origin pipeline |
step_name |
str |
Task name |
step_index |
int |
Position in pipeline (0-indexed) |
task_id |
str |
Underlying taskiq task ID |
timestamp |
datetime |
Event time |
StepCompleteEvent
from taskiq_flow.hooks import StepCompleteEvent
event = StepCompleteEvent(
pipeline_id="my_pipeline",
step_name="process_data",
step_index=2,
result={"processed": 42},
duration_ms=150.5,
timestamp=datetime.now()
)
Fields:
| Field | Type | Description |
|---|---|---|
type |
Literal["StepCompleteEvent"] |
Event type |
pipeline_id |
str |
Origin pipeline |
step_name |
str |
Completed task |
step_index |
int |
Step position |
result |
Any |
Task return value |
duration_ms |
float |
Execution time in milliseconds |
timestamp |
datetime |
Event time |
PipelineCompleteEvent
from taskiq_flow.hooks import PipelineCompleteEvent
event = PipelineCompleteEvent(
pipeline_id="my_pipeline",
pipeline_type="dataflow",
status="COMPLETED",
duration_ms=1250.3,
result={"final": "output"},
timestamp=datetime.now()
)
Fields:
| Field | Type | Description |
|---|---|---|
type |
Literal["PipelineCompleteEvent"] |
Event type |
pipeline_id |
str |
Pipeline ID |
pipeline_type |
str |
Pipeline type |
status |
str |
"COMPLETED", "FAILED", "CANCELLED" |
duration_ms |
float |
Total execution time |
result |
Any |
Final pipeline result |
timestamp |
datetime |
Event time |
Error Events
from taskiq_flow.hooks import StepErrorEvent, PipelineErrorEvent
step_error = StepErrorEvent(
pipeline_id="my_pipeline",
step_name="failing_task",
error="ValueError: invalid input",
timestamp=datetime.now()
)
pipeline_error = PipelineErrorEvent(
pipeline_id="my_pipeline",
error="Pipeline aborted after 3 failures",
timestamp=datetime.now()
)
WebSocket Transport
setup_websocket_bridge
Connects HookManager to the WebSocket transport layer:
from taskiq_flow.hooks import HookManager, setup_websocket_bridge
hook_manager = HookManager()
setup_websocket_bridge(hook_manager)
# Now all hooks are forwarded to WebSocket server
This installs a bridge that forwards events from the HookManager to any connected WebSocket servers.
get_websocket_server
Factory function to obtain or create a WebSocket server:
from taskiq_flow.integration.websocket import get_websocket_server
server = get_websocket_server(
host="0.0.0.0",
port=8765,
transport=None # Uses default WebSocketTransport
)
await server.start_server()
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
str |
"0.0.0.0" |
Bind address |
port |
int |
8765 |
Bind port |
transport |
WebSocketTransport | None |
Auto-created if None |
The server is a singleton per configuration; subsequent calls to get_websocket_server() with same host/port return the same instance.
Event Filtering
Reduce traffic by filtering events:
from taskiq_flow.hooks import EventFilter
# Only specific pipelines
filter = EventFilter(pipeline_ids=["pipeline_1", "pipeline_2"])
# Only step events
filter = EventFilter(event_types=["StepStartEvent", "StepCompleteEvent"])
# Both
filter = EventFilter(
pipeline_ids=["*"], # all pipelines (or specific)
event_types=["StepCompleteEvent", "PipelineCompleteEvent"]
)
hook_manager.add_filter(filter)
EventFilter Logic
Event → Check pipeline_id match? → Check event_type match? → Emit?
Both filters are ORed internally: an event passes if it matches both the pipeline_ids AND event_types filters. Use "*" to match all.
WebSocket Protocol
Connection
Client connects via standard WebSocket:
ws://localhost:8765
For secure connections (WSS), terminate SSL at reverse proxy (nginx, Traefik).
Subscription
After connecting, client sends a subscription message:
{
"type": "subscribe",
"pipeline_id": "my_pipeline"
}
Wildcard subscription (receive all events):
{
"type": "subscribe",
"pipeline_id": "*"
}
Unsubscribe:
{
"type": "unsubscribe",
"pipeline_id": "my_pipeline"
}
Message Format (Server → Client)
All messages are JSON with a type field:
{
"type": "StepCompleteEvent",
"pipeline_id": "pipeline_123",
"step_name": "process_data",
"duration_ms": 150.2,
"timestamp": "2026-05-05T16:30:00Z"
}
Full field reference in the WebSocket Guide.
Custom Transport
For advanced use cases, implement your own transport:
from taskiq_flow.transport import WebSocketTransport
class MyTransport(WebSocketTransport):
async def broadcast(self, event: BaseEvent):
# Custom routing logic (e.g., to Redis Pub/Sub, Kafka, etc.)
await self.redis.publish("pipeline_events", event.json())
transport = MyTransport()
server = get_websocket_server(transport=transport)
Multi-Worker Coordination
For multiple Python workers sharing event state:
from taskiq_flow.transport import RedisPubSubTransport
transport = RedisPubSubTransport(redis_client)
server = get_websocket_server(transport=transport)
# All workers connected to same Redis channel share events
All workers subscribe to the same Redis pub/sub channel; events from any worker are broadcast to all WebSocket clients connected to any worker.
Production Considerations
Connection Limits
import asyncio
# Limit concurrent WebSocket connections
MAX_CONNECTIONS = 1000
semaphore = asyncio.Semaphore(MAX_CONNECTIONS)
# In your connection handler:
async def handle_connection(websocket):
if not semaphore.acquire(blocking=False):
await websocket.close(code=1013, reason="Too many connections")
return
try:
await websocket_service.handle(websocket)
finally:
semaphore.release()
Graceful Shutdown
async def shutdown():
await server.close() # Stop accepting new connections
await server.wait_closed() # Wait for existing connections to close
Monitoring
Expose metrics:
@app.get("/ws/metrics")
async def ws_metrics():
return {
"connections": server.connection_count(),
"messages_sent": server.messages_sent,
"messages_per_second": server.rate()
}
Troubleshooting
| Issue | Diagnosis | Fix |
|---|---|---|
| Clients not receiving events | setup_websocket_bridge() not called |
Call before pipeline starts |
| Connection refused | Server not started | Call await server.start_server() |
| Events delayed | Event filter blocking | Check filter configuration |
| High CPU | Too many connections | Enforce connection limits |
Summary
| Component | Role |
|---|---|
HookManager |
Collect events from pipelines |
BaseEvent subclasses |
Structured event data |
EventFilter |
Selectively broadcast events |
WebSocketTransport |
Low-level transport (default or custom) |
WebSocketServer |
Manage client connections |
get_websocket_server() |
Factory/singleton accessor |
Minimum setup:
hooks = HookManager()
setup_websocket_bridge(hooks)
pipeline = Pipeline(broker).with_hooks(hooks)
server = get_websocket_server()
await server.start_server()
For client implementation details, see WebSocket Guide. For event filtering strategies, see that guide’s Section 7.