Example: tracking_demo.py
Pipeline execution tracking with PipelineTrackingManager
Version: {VERSION} File: examples/tracking_demo.py
Overview
This example demonstrates how to monitor pipeline execution in real-time using the PipelineTrackingManager. It covers:
- Setting up tracking with automatic storage selection
- Attaching tracking to a pipeline
- Running a pipeline and checking its status
- Accessing step-by-step execution history
What This Example Shows
- Creating a
PipelineTrackingManagerwith auto-storage - Using
.with_tracking()on a pipeline - Waiting for pipeline completion
- Querying pipeline status from the tracking manager
- Logging step progress
Code Walkthrough
import asyncio
import logging
from taskiq import InMemoryBroker
from taskiq_flow import Pipeline, PipelineMiddleware
from taskiq_flow.tracking import PipelineTrackingManager
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create broker
broker = InMemoryBroker(await_inplace=True)
# Define a task with a delay to show tracking in action
@broker.task
async def slow_task(x: int) -> int:
"""Slow task that doubles the input."""
await asyncio.sleep(1)
print(f"Slow task called with {x}")
return x * 2
async def main():
# 1. Setup tracking with auto-storage selection
tracking_manager = PipelineTrackingManager().with_auto_storage(broker)
# 2. Create middleware with tracking manager
middleware = PipelineMiddleware(tracking_manager=tracking_manager)
broker_with_middleware = broker.with_middlewares(middleware)
# 3. Create pipeline with tracking enabled
pipeline = (
Pipeline(broker_with_middleware)
.with_tracking(manager=tracking_manager)
.call_next(slow_task)
.call_next(slow_task)
)
# 4. Execute the pipeline
result = await pipeline.kiq(10)
await result.wait_result()
# 5. Query the tracking status
pipeline_id = pipeline.pipeline_id
if pipeline_id is None:
raise RuntimeError("Pipeline has no ID")
status = await tracking_manager.get_status(pipeline_id)
if status is None:
raise RuntimeError("Failed to get pipeline status")
logger.info(f"Pipeline status: {status.status}")
logger.info(f"Steps completed: {len(status.steps)}")
asyncio.run(main())
Key Points
Tracking Setup
tracking_manager = PipelineTrackingManager().with_auto_storage(broker)
with_auto_storage()automatically selects the appropriate storage backend based on the broker- For
InMemoryBroker, usesInMemoryPipelineStorage - For Redis brokers, uses
RedisPipelineStorage
Attaching Tracking to Pipeline
pipeline = Pipeline(broker).with_tracking(manager=tracking_manager)
The tracking manager must be attached before calling pipeline.kiq().
Inspecting Status
After execution, the PipelineStatus object contains:
status— Overall status (COMPLETED,FAILED, etc.)steps— List ofStepStatusobjects, one per stepstarted_at/completed_at— Timestampsduration_ms— Total execution timeresult— Final return value (if completed)
Each StepStatus includes:
step_name— Task namestatus— Step’s statusduration_ms— How long the step tookresult— Step’s return value
Expected Output
INFO:__main__:Pipeline status: COMPLETED
INFO:__main__:Steps completed: 2
You’ll also see log messages from the slow_task calls with 1-second delays.
Variations
Access Step Details
for step in status.steps:
logger.info(f"Step '{step.step_name}' took {step.duration_ms:.0f}ms")
if step.result:
logger.info(f" Result: {step.result}")
Track Multiple Pipelines
# Launch several pipelines concurrently
tasks = [pipeline.kiq(i) for i in range(5)]
await asyncio.gather(*[t.wait_result() for t in tasks])
# List all tracked pipelines
all_statuses = await tracking_manager.list_pipelines()
for s in all_statuses:
print(f"{s.pipeline_id}: {s.status}")
Learning Path
After this example:
- Tracking Guide — Full tracking features (storage backends, metrics)
- WebSocket Guide — Real-time streaming of tracking events
- API Guide — Expose tracking data via REST API
This example shows the basics. For production, use Redis storage and add listeners for alerts.