API Reference: WebSocket Integration
HookManager, WebSocket bridge, and real-time event transport via FastAPI
Version: {VERSION} Module: taskiq_flow.hooks,taskiq_flow.integration.websocket
HookManager
Central event bus that collects pipeline execution events and dispatches them to subscribers.
from taskiq_flow.hooks import HookManager
hook_manager = HookManager()
pipeline = Pipeline(broker).with_hooks(hook_manager)
Events emitted:
PipelineStartEvent— Pipeline execution startedStepStartEvent— A step startedStepCompleteEvent— A step completedPipelineCompleteEvent— Pipeline finished successfullyStepErrorEvent— A step failedPipelineErrorEvent— Pipeline failed
Adding Custom Hooks
class MyHook:
async def on_pipeline_start(self, event):
print(f"Pipeline {event.pipeline_id} started")
async def on_step_complete(self, event):
print(f"Step {event.step_name} finished in {event.duration_ms}ms")
hook = MyHook()
hook_manager.add_hook(hook)
Hook methods (all optional):
| Method | Event |
|---|---|
on_pipeline_start(event: PipelineStartEvent) |
Pipeline started |
on_step_start(event: StepStartEvent) |
Step starting |
on_step_complete(event: StepCompleteEvent) |
Step finished |
on_pipeline_complete(event: PipelineCompleteEvent) |
Pipeline completed |
on_step_error(event: StepErrorEvent) |
Step failed |
on_pipeline_error(event: PipelineErrorEvent) |
Pipeline error |
Event Types
All events are Pydantic models with a type discriminator.
PipelineStartEvent
from taskiq_flow.hooks import PipelineStartEvent
event = PipelineStartEvent(
pipeline_id="my_pipeline",
pipeline_type="sequential",
timestamp=datetime.now(),
input=initial_data
)
Fields:
| Field | Type | Description |
|---|---|---|
type |
Literal["PipelineStartEvent"] |
Event type discriminator |
pipeline_id |
str |
Pipeline instance ID |
pipeline_type |
str |
"sequential" or "dataflow" |
timestamp |
datetime |
Event time |
input |
Any |
Initial input data |
StepStartEvent
from taskiq_flow.hooks import StepStartEvent
event = StepStartEvent(
pipeline_id="my_pipeline",
step_name="process_data",
step_index=2,
task_id="task_abc123",
timestamp=datetime.now()
)
Fields:
| Field | Type | Description |
|---|---|---|
type |
Literal["StepStartEvent"] |
Event type |
pipeline_id |
str |
Origin pipeline |
step_name |
str |
Task name |
step_index |
int |
Position in pipeline (0-indexed) |
task_id |
str |
Underlying taskiq task ID |
timestamp |
datetime |
Event time |
StepCompleteEvent
from taskiq_flow.hooks import StepCompleteEvent
event = StepCompleteEvent(
pipeline_id="my_pipeline",
step_name="process_data",
step_index=2,
result={"processed": 42},
duration_ms=150.5,
timestamp=datetime.now()
)
Fields:
| Field | Type | Description |
|---|---|---|
type |
Literal["StepCompleteEvent"] |
Event type |
pipeline_id |
str |
Origin pipeline |
step_name |
str |
Completed task |
step_index |
int |
Step position |
result |
Any |
Task return value |
duration_ms |
float |
Execution time in milliseconds |
timestamp |
datetime |
Event time |
PipelineCompleteEvent
from taskiq_flow.hooks import PipelineCompleteEvent
event = PipelineCompleteEvent(
pipeline_id="my_pipeline",
pipeline_type="dataflow",
status="COMPLETED",
duration_ms=1250.3,
result={"final": "output"},
timestamp=datetime.now()
)
Fields:
| Field | Type | Description |
|---|---|---|
type |
Literal["PipelineCompleteEvent"] |
Event type |
pipeline_id |
str |
Pipeline ID |
pipeline_type |
str |
Pipeline type |
status |
str |
"COMPLETED", "FAILED", "CANCELLED" |
duration_ms |
float |
Total execution time |
result |
Any |
Final pipeline result |
timestamp |
datetime |
Event time |
Error Events
from taskiq_flow.hooks import StepErrorEvent, PipelineErrorEvent
step_error = StepErrorEvent(
pipeline_id="my_pipeline",
step_name="failing_task",
error="ValueError: invalid input",
timestamp=datetime.now()
)
pipeline_error = PipelineErrorEvent(
pipeline_id="my_pipeline",
error="Pipeline aborted after 3 failures",
timestamp=datetime.now()
)
WebSocket Integration (FastAPI)
Taskiq-Flow uses a FastAPI-only WebSocket integration. WebSocket events are served via a FastAPI WebSocket route at /ws/{pipeline_id} in your application.
setup_websocket_bridge
Connects the HookManager event system to the WebSocket transport layer:
from taskiq_flow.hooks import HookManager, setup_websocket_bridge
hook_manager = HookManager()
setup_websocket_bridge(hook_manager)
# Now all pipeline events are forwarded to the WebSocket transport
Once the bridge is active, mount the FastAPI WebSocket endpoint in your application:
from fastapi import FastAPI, WebSocket
from taskiq_flow.integration.websocket.fastapi_ws import fastapi_websocket_endpoint
app = FastAPI()
@app.websocket("/ws/{pipeline_id}")
async def ws_endpoint(websocket: WebSocket, pipeline_id: str):
await fastapi_websocket_endpoint(websocket, pipeline_id)
Clients connect to ws://<host>:<port>/ws/<pipeline_id> and receive real-time pipeline events. The pipeline_id in the URL path is used to subscribe the client to that specific pipeline’s event channel.
get_fastapi_ws_manager
Get the singleton FastAPIWebSocketManager instance:
from taskiq_flow.integration.websocket.fastapi_ws import get_fastapi_ws_manager
manager = get_fastapi_ws_manager()
print(f"Active channels: {manager.get_channel_ids()}")
print(f"Client count: {manager.get_client_count('pipeline_1')}")
fastapi_websocket_endpoint
The FastAPI WebSocket route handler. Use it as a route callback:
from taskiq_flow.integration.websocket.fastapi_ws import fastapi_websocket_endpoint
@app.websocket("/ws/{pipeline_id}")
async def ws_endpoint(websocket: WebSocket, pipeline_id: str):
await fastapi_websocket_endpoint(websocket, pipeline_id)
The endpoint:
- Accepts the WebSocket connection
- Subscribes the client to the
pipeline_idchannel - Tracks the connection metric
- Disconnects cleanly on client disconnect
get_websocket_server ⚠️ Deprecated
:::warning Deprecated since v1.1
The function get_websocket_server() and class PipelineWebSocketServer were removed in v1.1. The standalone picows WebSocket server is no longer supported. Use the FastAPI WebSocket integration instead.
:::
from taskiq_flow.integration.websocket import get_websocket_server
server = get_websocket_server(
host="0.0.0.0",
port=8765,
transport=None # Uses default WebSocketTransport
)
await server.start_server()
:::danger Raises RuntimeError
Calling get_websocket_server() always raises RuntimeError. Use the FastAPI integration instead:
from fastapi import FastAPI, WebSocket
from taskiq_flow.integration.websocket.fastapi_ws import fastapi_websocket_endpoint
app = FastAPI()
@app.websocket("/ws/{pipeline_id}")
async def ws_endpoint(websocket: WebSocket, pipeline_id: str):
await fastapi_websocket_endpoint(websocket, pipeline_id)
:::
Event Filtering
Reduce traffic by filtering events:
from taskiq_flow.hooks import EventFilter
# Only specific pipelines
filter = EventFilter(pipeline_ids=["pipeline_1", "pipeline_2"])
# Only step events
filter = EventFilter(event_types=["StepStartEvent", "StepCompleteEvent"])
# Both
filter = EventFilter(
pipeline_ids=["*"], # all pipelines (or specific)
event_types=["StepCompleteEvent", "PipelineCompleteEvent"]
)
hook_manager.add_filter(filter)
EventFilter Logic
Event → Check pipeline_id match? → Check event_type match? → Emit?
Both filters are ANDed internally: an event passes if it matches both the pipeline_ids AND event_types filters. Use "*" to match all.
WebSocket Protocol
Connection
Clients connect via standard WebSocket to the FastAPI route:
ws://localhost:8000/ws/my_pipeline
For secure connections (WSS), terminate TLS at a reverse proxy (nginx, Traefik).
Subscription
After connecting, the client is already subscribed to the pipeline identified by the URL path. Additional subscribe/unsubscribe messages can be sent:
{
"type": "subscribe",
"pipeline_id": "my_pipeline"
}
Wildcard subscription (receive all events):
{
"type": "subscribe",
"pipeline_id": "*"
}
Unsubscribe:
{
"type": "unsubscribe",
"pipeline_id": "my_pipeline"
}
Message Format (Server → Client)
All messages are JSON with a type field:
{
"type": "StepCompleteEvent",
"pipeline_id": "pipeline_123",
"step_name": "process_data",
"duration_ms": 150.2,
"timestamp": "2026-05-05T16:30:00Z"
}
Full field reference in the WebSocket Guide.
FastAPI Route Reference
Mounting the WebSocket Endpoint
from fastapi import FastAPI, WebSocket
from taskiq_flow.integration.websocket.fastapi_ws import fastapi_websocket_endpoint
app = FastAPI()
@app.websocket("/ws/{pipeline_id}")
async def ws_endpoint(websocket: WebSocket, pipeline_id: str):
await fastapi_websocket_endpoint(websocket, pipeline_id)
Querying Connected Clients
from taskiq_flow.integration.websocket.fastapi_ws import get_fastapi_ws_manager
manager = get_fastapi_ws_manager()
client_count = manager.get_client_count("pipeline_1")
Full REST endpoints for querying clients and pipelines are available in the API routes module:
from taskiq_flow.api.routes.websocket import router as ws_router
app.include_router(ws_router, prefix="/api")
Multi-Worker Coordination
For multiple workers sharing event state via Redis:
from taskiq_flow.transport import RedisPubSubTransport
from taskiq_flow.hooks.bridge import setup_websocket_bridge
import redis
redis_client = redis.Redis(host="localhost", port=6379)
transport = RedisPubSubTransport(redis_client)
setup_websocket_bridge(hook_manager, use_fastapi=True)
All workers subscribe to the same Redis pub/sub channel; events from any worker are broadcast to all WebSocket clients connected to any worker. WebSocket connections are always served through the FastAPI application.
Prerequisite: Install the
[brokers]extra:pip install "taskiq-flow[brokers]"for Redis support.
Production Considerations
Connection Limits
import asyncio
# Limit concurrent WebSocket connections
MAX_CONNECTIONS = 1000
semaphore = asyncio.Semaphore(MAX_CONNECTIONS)
# In your connection handler:
async def handle_connection(websocket):
if not semaphore.acquire(blocking=False):
await websocket.close(code=1013, reason="Too many connections")
return
try:
await websocket_service.handle(websocket)
finally:
semaphore.release()
Graceful Shutdown
async def shutdown():
manager = get_fastapi_ws_manager()
await manager.close_all()
Monitoring
Expose metrics via the built-in metrics endpoint:
@app.get("/ws/metrics")
async def ws_metrics():
manager = get_fastapi_ws_manager()
return {
"channels": manager.get_channel_ids(),
}
SSL/TLS
Terminate TLS at a reverse proxy (nginx, Traefik) — never inline in the application:
server {
listen 443 ssl;
location /ws {
proxy_pass http://localhost:8000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
Note: The legacy
PipelineWebSocketServerwith inline SSL (ssl_cert/ssl_keyparameters) was removed in v1.1. Use a reverse proxy for TLS termination.
Troubleshooting
| Issue | Diagnosis | Fix |
|---|---|---|
| Clients not receiving events | setup_websocket_bridge() not called |
Call before pipeline starts |
| Connection refused | FastAPI app not running | Start with uvicorn app:app |
| Events delayed | Event filter blocking | Check filter configuration |
| High CPU | Too many connections | Enforce connection limits |
Summary
| Component | Role |
|---|---|
HookManager |
Collect events from pipelines |
BaseEvent subclasses |
Structured event data |
EventFilter |
Selectively broadcast events |
setup_websocket_bridge() |
Connects HookManager to WebSocket transport |
FastAPIWebSocketManager |
WebSocket client management (singleton) |
fastapi_websocket_endpoint() |
FastAPI WebSocket route handler |
FastAPI route /ws/{pipeline_id} |
Client-facing WebSocket endpoint |
Minimum FastAPI setup:
from fastapi import FastAPI, WebSocket
from taskiq_flow.integration.websocket.fastapi_ws import fastapi_websocket_endpoint
from taskiq_flow.hooks import HookManager, setup_websocket_bridge
hook_manager = HookManager()
setup_websocket_bridge(hook_manager)
pipeline = Pipeline(broker).with_hooks(hook_manager)
app = FastAPI()
@app.websocket("/ws/{pipeline_id}")
async def ws_endpoint(websocket: WebSocket, pipeline_id: str):
await fastapi_websocket_endpoint(websocket, pipeline_id)
For client implementation details, see the WebSocket Guide. For event filtering strategies, see that guide’s Section 7.