WebSocket Guide

Real-time event streaming for live dashboards and monitoring

Version: {VERSION} Related: Tracking Guide, API Guide

Overview

Taskiq-Flow’s WebSocket integration provides live streaming of pipeline execution events — perfect for building real-time dashboards, progress displays, and monitoring tools.

This guide covers:

  • Setting up a WebSocket server
  • Subscribing clients to pipeline events
  • Event types and payloads
  • Transport layer configuration
  • Production deployment considerations

1. Architecture

[Pipeline] → [HookManager] → [WebSocketBridge] → [FastAPI WS Manager] → [Clients]

Components:

  1. Pipeline — Emits events via hooks at each lifecycle stage
  2. HookManager — Collects events from pipelines
  3. WebSocketBridge — Connects HookManager to WebSocket transport
  4. FastAPI WebSocket Manager — Manages client connections and broadcasts via FastAPI WebSocket routes
  5. Client — Web browser, monitoring app, dashboard

Note: Taskiq-Flow uses a FastAPI-only WebSocket integration. The legacy standalone picows server (get_websocket_server, PipelineWebSocketServer) was removed in v1.1. WebSocket events are now exposed through FastAPI WebSocket endpoints mounted in your FastAPI application.


2. Quick Start

2.1. Server-Side Setup

import asyncio
from taskiq import InMemoryBroker
from taskiq_flow import Pipeline
from taskiq_flow.hooks import HookManager, setup_websocket_bridge

# 1. Create broker and hook manager
broker = InMemoryBroker()
hook_manager = HookManager()

# 2. Set up the WebSocket bridge
setup_websocket_bridge(hook_manager)  # connects HookManager → WebSocket transport

# 3. Create a pipeline with hooks attached
pipeline = Pipeline(broker)
pipeline.pipeline_id = "demo_workflow"
pipeline.with_hooks(hook_manager)

# Add tasks to pipeline...

# 4. For FastAPI: mount the WebSocket route in your app
# (See Section 6 below for the full FastAPI integration example)

# 5. Execute the pipeline
result = await pipeline.kiq(data)

2.2. Client Connection (JavaScript)

// Connect to WebSocket server
const ws = new WebSocket('ws://localhost:8765');

// Subscribe to a specific pipeline
ws.onopen = () => {
    ws.send(JSON.stringify({
        type: 'subscribe',
        pipeline_id: 'demo_workflow'
    }));
};

// Receive events
ws.onmessage = (event) => {
    const eventData = JSON.parse(event.data);
    console.log('Pipeline event:', eventData);

    switch (eventData.type) {
        case 'PipelineStartEvent':
            showPipelineStarted();
            break;
        case 'StepStartEvent':
            showStepProgress(eventData.step_name);
            break;
        case 'StepCompleteEvent':
            updateProgress(eventData.step_name, eventData.duration_ms);
            break;
        case 'PipelineCompleteEvent':
            showResults(eventData.result);
            break;
        case 'PipelineErrorEvent':
            showError(eventData.error);
            break;
    }
};

3. Event Types

All events are JSON-serializable with a type field indicating the event kind.

3.1. PipelineStartEvent

{
  "type": "PipelineStartEvent",
  "pipeline_id": "demo_workflow",
  "pipeline_type": "sequential",
  "timestamp": "2026-04-29T18:50:19+02:00",
  "input": {...}
}

Emitted when a pipeline begins execution.

3.2. StepStartEvent

{
  "type": "StepStartEvent",
  "pipeline_id": "demo_workflow",
  "step_name": "process_data",
  "step_index": 2,
  "task_id": "abc123",
  "timestamp": "2026-04-29T18:50:19.5+02:00"
}

Emitted before each step starts.

3.3. StepCompleteEvent

{
  "type": "StepCompleteEvent",
  "pipeline_id": "demo_workflow",
  "step_name": "process_data",
  "step_index": 2,
  "result": {"processed": 42},
  "duration_ms": 150.5,
  "timestamp": "2026-04-29T18:50:19.7+02:00"
}

Emitted after each step completes successfully.

3.4. PipelineCompleteEvent

{
  "type": "PipelineCompleteEvent",
  "pipeline_id": "demo_workflow",
  "pipeline_type": "sequential",
  "status": "COMPLETED",
  "duration_ms": 1250.3,
  "result": {"final": "output"},
  "timestamp": "2026-04-29T18:50:20.5+02:00"
}

Emitted when the entire pipeline finishes successfully.

3.5. StepErrorEvent

{
  "type": "StepErrorEvent",
  "pipeline_id": "demo_workflow",
  "step_name": "failing_task",
  "error": "ValueError: invalid input",
  "timestamp": "2026-04-29T18:50:19.9+02:00"
}

Emitted when a step fails.

3.6. PipelineErrorEvent

{
  "type": "PipelineErrorEvent",
  "pipeline_id": "demo_workflow",
  "error": "Pipeline failed at step 'validate'",
  "timestamp": "2026-04-29T18:50:20.2+02:00"
}

Emitted when the pipeline aborts due to an unrecoverable error.


4. Client-Side Implementation

4.1. Basic JavaScript Client

class PipelineMonitor {
    constructor(url, pipelineId) {
        this.url = url;
        this.pipelineId = pipelineId;
        this.ws = null;
        this.events = [];
        this.callbacks = {};
    }

    connect() {
        this.ws = new WebSocket(this.url);

        this.ws.onopen = () => {
            console.log('Connected to WebSocket server');
            this.subscribe(this.pipelineId);
        };

        this.ws.onmessage = (event) => {
            const data = JSON.parse(event.data);
            this.handleEvent(data);
        };

        this.ws.onerror = (err) => {
            console.error('WebSocket error:', err);
        };

        this.ws.onclose = () => {
            console.log('WebSocket connection closed');
            this.reconnect();
        };
    }

    subscribe(pipelineId) {
        this.ws.send(JSON.stringify({
            type: 'subscribe',
            pipeline_id: pipelineId
        }));
    }

    handleEvent(event) {
        this.events.push(event);
        const eventType = event.type;

        if (this.callbacks[eventType]) {
            this.callbacks[eventType](event);
        }

        // Generic event handler
        if (this.callbacks['*']) {
            this.callbacks['*'](event);
        }
    }

    on(eventType, callback) {
        this.callbacks[eventType] = callback;
    }

    reconnect() {
        setTimeout(() => this.connect(), 3000);
    }
}

// Usage
monitor = new PipelineMonitor('ws://localhost:8765', 'pipeline_123');
monitor.on('StepCompleteEvent', (event) => {
    console.log(`Step ${event.step_name} completed in ${event.duration_ms}ms`);
});
monitor.on('PipelineCompleteEvent', (event) => {
    console.log('Pipeline finished with status:', event.status);
});
monitor.connect();

4.2. Python Client (for scripts)

import asyncio
import websockets
import json

async def monitor_pipeline(uri, pipeline_id):
    async with websockets.connect(uri) as websocket:
        # Subscribe
        await websocket.send(json.dumps({
            "type": "subscribe",
            "pipeline_id": pipeline_id
        }))

        # Receive events
        async for message in websocket:
            event = json.loads(message)
            print(f"[{event['type']}] {event}")

            if event['type'] == 'PipelineCompleteEvent':
                print(f"Pipeline finished: {event['status']}")

asyncio.run(monitor_pipeline('ws://localhost:8765', 'pipeline_123'))

5. Subscription Management

5.1. Subscribing to a Pipeline

Clients send a subscription message:

{
  "type": "subscribe",
  "pipeline_id": "my_pipeline_001"
}

After subscribing, all events for that pipeline are forwarded.

5.2. Unsubscribing

{
  "type": "unsubscribe",
  "pipeline_id": "my_pipeline_001"
}

5.3. Subscribing to All Pipelines (Wildcard)

{
  "type": "subscribe",
  "pipeline_id": "*"
}

Caution: Broadcasting all events can generate significant traffic in high-throughput systems.

5.4. Multiple Subscriptions

A client can subscribe to multiple pipelines:

monitor.subscribe('pipeline_1');
monitor.subscribe('pipeline_2');
// Receive events for both, distinguished by pipeline_id field

6. Server Configuration

6.1. WebSocket via FastAPI Route

WebSocket events are exposed through a FastAPI WebSocket route at /ws/{pipeline_id}. The client connects directly to your FastAPI application:

from fastapi import FastAPI, WebSocket
from taskiq_flow.integration.websocket.fastapi_ws import (
    fastapi_websocket_endpoint,
    get_fastapi_ws_manager,
)

app = FastAPI()

@app.websocket("/ws/{pipeline_id}")
async def ws_endpoint(websocket: WebSocket, pipeline_id: str):
    await fastapi_websocket_endpoint(websocket, pipeline_id)

Prerequisite: Install FastAPI: pip install fastapi uvicorn.

The WebSocket URL is tied to your FastAPI server address:

// Client connects using your FastAPI app address
const ws = new WebSocket('ws://localhost:8000/ws/demo_workflow');

6.2. CORS and Security Headers

If behind a reverse proxy (nginx, Traefik), configure CORS headers:

# nginx.conf
location /ws {
    proxy_pass http://localhost:8765;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    add_header Access-Control-Allow-Origin "*";
    add_header Access-Control-Allow-Credentials true;
}

6.3. SSL/TLS Termination

Terminate SSL at reverse proxy:

# HTTPS → WSS forwarding
location /ws {
    proxy_pass http://localhost:8765;
    # WSS (secure WebSocket) handled by nginx SSL config
}

Client connects with:

const ws = new WebSocket('wss://yourdomain.com/ws');

6.4. Multi-Worker / Redis Transport

For multi-worker deployments where multiple processes need to share event state, use the RedisPubSubTransport:

from taskiq_flow.transport import RedisPubSubTransport
import redis

redis_client = redis.Redis(host="localhost", port=6379)
transport = RedisPubSubTransport(redis_client)

# Pass the transport when initializing the bridge
from taskiq_flow.hooks.bridge import setup_websocket_bridge
setup_websocket_bridge(hook_manager, use_fastapi=True)

# All workers connected to the same Redis channel share events
# WebSocket is always served via the FastAPI route

Prerequisite: Install the [brokers] extra: pip install "taskiq-flow[brokers]" for Redis support.

With this setup, workers share event state via Redis Pub/Sub while WebSocket connections are always managed through the FastAPI application.


7. Filtering Events

Reduce bandwidth by filtering on the server side:

from taskiq_flow.hooks import EventFilter

# Only send events for specific pipelines
filter = EventFilter(pipeline_ids=['pipeline_1', 'pipeline_2'])
hook_manager.add_filter(filter)

# Only send step events (not pipeline-level)
filter = EventFilter(event_types=['StepStartEvent', 'StepCompleteEvent'])
hook_manager.add_filter(filter)

Client-side filtering also possible:

monitor.on('StepCompleteEvent', (event) => {
    if (event.step_name === 'important_step') {
        highlightStep(event.step_name);
    }
});

8. Message Format Reference

Subscription Request

Field Type Description
type "subscribe" Message type
pipeline_id str or "*" Pipeline to subscribe to

Unsubscription Request

Field Type Description
type "unsubscribe" Message type
pipeline_id str Pipeline to unsubscribe from

Event Message (server → client)

Field Type Description
type str Event type (see Section 3)
pipeline_id str Origin pipeline ID
timestamp ISO 8601 str Event time

Additional fields per event type (see above).


9. Production Deployment

9.1. Docker Deployment

# Dockerfile
FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["python", "-m", "my_websocket_app"]
# docker-compose.yml
version: '3.8'
services:
  redis:
    image: redis:7-alpine
  app:
    build: .
    ports:
      - "8765:8765"
    environment:
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis

9.2. Systemd Service

# /etc/systemd/system/taskiq-flow-ws.service
[Unit]
Description=Taskiq-Flow WebSocket Server
After=network.target

[Service]
Type=simple
User=appuser
WorkingDirectory=/opt/taskiq-flow
ExecStart=/usr/bin/python3 -m my_websocket_app
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

9.3. Monitoring

Health check endpoint:

from aiohttp import web

async def health(request):
    return web.json_response({"status": "healthy"})

app = web.Application()
app.router.add_get('/health', health)

Or use the built-in API health endpoint (/health) from API Guide.

9.4. Scalability

For high-volume deployments:

  • Horizontal scaling: Deploy multiple WebSocket server instances with sticky sessions or Redis Pub/Sub transport
  • Load balancing: Use nginx or HAProxy with WebSocket support
  • Connection limits: Configure max connections per worker (OS limits)
  • Message compression: Enable permessage-deflate for large payloads

10. Security Considerations

10.1. Authentication

Require authentication tokens on connection:

# Server-side validation
async def authenticate(websocket, token):
    if not validate_token(token):
        await websocket.close(code=4001, reason="Unauthorized")
        return False
    return True

# Client sends token upon connection
ws = new WebSocket(`ws://localhost:8765?token=${authToken}`);

10.2. Authorization

Filter events by user permissions:

class AuthFilter(EventFilter):
    def __init__(self, user_id, allowed_pipelines):
        self.user_id = user_id
        self.allowed = allowed_pipelines

    def should_emit(self, event):
        return event.pipeline_id in self.allowed

10.3. Rate Limiting

Prevent abuse:

from collections import defaultdict
import time

class RateLimiter:
    def __init__(self, max_events_per_second=100):
        self.limits = defaultdict(list)

    def allow(self, client_id):
        now = time.time()
        self.limits[client_id] = [
            t for t in self.limits[client_id] if now - t < 1
        ]
        if len(self.limits[client_id]) < 100:
            self.limits[client_id].append(now)
            return True
        return False

10.4. SSL/TLS Encryption (WSS)

Since the WebSocket integration is FastAPI-only, SSL termination is handled at the reverse proxy layer. Do not implement WSS directly at the application level — use a reverse proxy (nginx, Traefik, Caddy):

server {
    listen 443 ssl;
    server_name ws.taskiq-flow.example.com;

    ssl_certificate /etc/letsencrypt/live/ws.example.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/ws.example.com/privkey.pem;

    location /ws {
        proxy_pass http://localhost:8000;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
    }
}

Client connects with:

const ws = new WebSocket("wss://ws.taskiq-flow.example.com/ws");

Note: The legacy PipelineWebSocketServer with inline SSL (ssl_cert/ssl_key parameters) was removed in v1.1. Use a reverse proxy for TLS termination.

server {
    listen 443 ssl;
    server_name ws.taskiq-flow.example.com;

    ssl_certificate /etc/letsencrypt/live/ws.example.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/ws.example.com/privkey.pem;

    location /ws {
        proxy_pass http://localhost:8765;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
    }
}
// Client connects via wss through the proxy
const ws = new WebSocket("wss://ws.taskiq-flow.example.com/ws");

11. Troubleshooting

Connection Refused

Symptom: Client can’t connect, “Connection refused” error.

Fixes:

  • Verify your FastAPI application is running: uvicorn app:app
  • Check firewall rules allow the FastAPI port (default: 8000)
  • Ensure the WebSocket route is mounted (/ws/{pipeline_id})
  • Ensure setup_websocket_bridge(hook_manager) is called before the pipeline starts

No Events Received After Connection

Symptom: Connection succeeds, but no events arrive.

Fixes:

  • Ensure pipeline has pipeline_id set
  • Confirm pipeline.with_hooks(hook_manager) called
  • Verify setup_websocket_bridge(hook_manager) called before pipeline starts
  • Check subscription message format (see Section 5)

High Memory Usage

Symptom: Server memory grows over time.

Fixes:

  • Limit number of tracked pipelines
  • Implement automatic cleanup of disconnected clients
  • Use Redis transport to offload state from process memory
  • Set max connections limit

Events Out of Order

Symptom: Client receives StepComplete before StepStart.

Fixes:

  • Use sequential delivery guarantees (default for WebSocket)
  • Ensure all hooks are correctly attached
  • Check for custom middleware that may emit events asynchronously

12. Summary

Component Responsibility
Pipeline Generates execution events
HookManager Collects events from pipelines
WebSocketBridge Routes events to WebSocket transport
FastAPIWebSocketManager Manages client connections, broadcasts
Client Subscribes, receives, displays events

Basic setup (3 lines for hooks + FastAPI route):

# 1. Connect event pipeline
hooks = HookManager()
setup_websocket_bridge(hooks)
pipeline = Pipeline(broker).with_hooks(hooks)

# 2. Mount WebSocket route in your FastAPI app
# @app.websocket("/ws/{pipeline_id}")
# async def ws_endpoint(websocket: WebSocket, pipeline_id: str):
#     await fastapi_websocket_endpoint(websocket, pipeline_id)

Next Steps


Stream live pipeline events. Combine with Tracking Storage for persistent history.


This site uses Just the Docs, a documentation theme for Jekyll.