Example: api_example.py
FastAPI integration for remote pipeline management
Version: {VERSION} File: examples/api_example.py
Overview
This comprehensive example demonstrates how to build a production-ready REST API for Taskiq-Flow using FastAPI. It covers:
- Setting up FastAPI with pipeline visualization endpoints
- Registering pipelines programmatically
- Adding custom endpoints for pipeline execution
- Retrieving pipeline results via API
- Full OpenAPI/Swagger documentation
Prerequisites: Install FastAPI and uvicorn:
pip install fastapi uvicorn[standard]
What This Example Shows
- Using
PipelineVisualizationAPIfor built-in endpoints - Registering pipelines with the API
- Creating custom endpoints to execute pipelines remotely
- Retrieving results by task ID
- Complete production API structure
Code Walkthrough
1. Define Tasks and Pipeline
from fastapi import FastAPI, HTTPException
from taskiq import InMemoryBroker
from taskiq_flow import DataflowPipeline, pipeline_task
broker = InMemoryBroker(await_inplace=True)
@broker.task
@pipeline_task(output="user_data")
async def fetch_user_data(user_id: int) -> dict:
"""Fetch user data from database."""
await asyncio.sleep(0.1)
return {"id": user_id, "name": f"User{user_id}", "email": f"user{user_id}@example.com"}
@broker.task
@pipeline_task(output="order_history")
async def fetch_orders(user_data: dict) -> list:
"""Fetch order history for user."""
await asyncio.sleep(0.2)
user_id = user_data["id"]
return [{"order_id": 100 + user_id, "total": 99.99}]
@broker.task
@pipeline_task(output="recommendations")
async def generate_recommendations(user_data: dict, order_history: list):
"""Generate recommendations."""
await asyncio.sleep(0.15)
return ["product_A", "product_B", "product_C"]
# Build pipeline
sample_pipeline = DataflowPipeline.from_tasks(
broker,
[fetch_user_data, fetch_orders, generate_recommendations],
)
sample_pipeline.pipeline_id = "sample_recommendation_pipeline"
Pipeline DAG structure:
flowchart TD
A[fetch_user_data<br/>output: user_data] --> B[fetch_orders<br/>output: order_history]
A --> C[generate_recommendations<br/>output: recommendations]
B --> C
2. Create FastAPI App with Visualization API
from taskiq_flow.api import create_visualization_api, PipelineVisualizationAPI
def create_app() -> FastAPI:
app = FastAPI(title="TaskIQ Flow API", version="1.0.0")
# Create visualization API (auto-mounts /pipelines endpoints)
viz_api = create_visualization_api(broker, app)
viz_api.add_pipeline("sample_recommendation_pipeline", sample_pipeline)
# Custom endpoints below...
return app
The create_visualization_api() automatically adds these endpoints:
GET /healthGET /pipelinesPOST /pipelines/{pipeline_id}(register)GET /pipelines/{pipeline_id}/statusGET /pipelines/{pipeline_id}/dagGET /pipelines/{pipeline_id}/dag/dotGET /pipelines/{pipeline_id}/visualize
3. Add Custom Execute Endpoint
@app.post("/pipelines/{pipeline_id}/execute")
async def execute_pipeline(
pipeline_id: str,
parameters: dict[str, Any],
) -> dict[str, Any]:
"""Execute a pipeline with given parameters."""
if pipeline_id not in viz_api.pipelines:
raise HTTPException(status_code=404, detail=f"Pipeline {pipeline_id} not found")
pipeline = viz_api.pipelines[pipeline_id]
try:
result = await pipeline.kiq_dataflow(**parameters)
return {
"status": "executed",
"pipeline_id": pipeline_id,
"task_id": result.task_id,
"message": "Pipeline execution started. Use /result/{task_id} to check status.",
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
4. Add Result Retrieval Endpoint
@app.get("/pipelines/result/{task_id}")
async def get_result(task_id: str) -> dict[str, Any]:
"""Get the result of a pipeline execution."""
try:
result = await broker.result_backend.get_result(task_id)
if result is None:
raise HTTPException(status_code=404, detail=f"No result found for task_id {task_id}")
return {"task_id": task_id, "result": result.return_value}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
5. Run the Server
uvicorn examples.api_example:create_app --reload --port 8000
Or programmatically:
if __name__ == "__main__":
import uvicorn
uvicorn.run(create_app(), host="0.0.0.0", port=8000)
API Endpoints Reference
Built-in (from create_visualization_api)
| Method | Endpoint | Description |
|---|---|---|
| GET | /health |
Health check |
| GET | /pipelines |
List all registered pipelines |
| POST | /pipelines/{pipeline_id} |
Register new pipeline |
| GET | /pipelines/{pipeline_id}/status |
Get current execution status |
| GET | /pipelines/{pipeline_id}/dag |
Get DAG as JSON |
| GET | /pipelines/{pipeline_id}/dag/dot |
Get DAG as DOT string |
| GET | /pipelines/{pipeline_id}/visualize |
Full visualization metadata |
Custom (defined in example)
| Method | Endpoint | Description |
|---|---|---|
| POST | /pipelines/{pipeline_id}/execute |
Execute pipeline with parameters |
| GET | /pipelines/result/{task_id} |
Get result by task ID |
Testing the API
1. Interactive Docs
Open http://localhost:8000/docs for Swagger UI.
2. Execute Pipeline
curl -X POST "http://localhost:8000/pipelines/sample_recommendation_pipeline/execute" \
-H "Content-Type: application/json" \
-d '{"user_id": 123}'
Response:
{
"status": "executed",
"pipeline_id": "sample_recommendation_pipeline",
"task_id": "abc123def456",
"message": "Pipeline execution started..."
}
3. Poll for Result
curl "http://localhost:8000/pipelines/result/abc123def456"
Response:
{
"task_id": "abc123def456",
"result": {
"user_data": {"id": 123, "name": "User123", ...},
"order_history": [...],
"recommendations": ["product_A", "product_B", "product_C"]
}
}
4. View DAG
curl "http://localhost:8000/pipelines/sample_recommendation_pipeline/dag"
Returns JSON structure of the pipeline graph.
Programmatic API Usage
You can also use the API classes directly without HTTP:
from taskiq_flow.api import PipelineVisualizationAPI
app = FastAPI()
viz_api = PipelineVisualizationAPI(broker, app)
# Register pipeline
viz_api.add_pipeline("my_pipe", my_pipeline)
# List registered pipelines
for pid, p in viz_api.pipelines.items():
print(f"Pipeline: {pid}, tasks: {len(p.visualize()['nodes'])}")
# Get visualization
dag_json = my_pipeline.visualize()
dot = my_pipeline.visualize_dot()
This is useful for building custom dashboard backends or CLI tools.
Production Considerations
1. Use Persistent Broker
from taskiq import RedisStreamBroker
broker = RedisStreamBroker(redis_url="redis://localhost:6379")
2. Add Authentication
from fastapi import Depends, Security
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key")
async def verify_api_key(api_key: str = Security(api_key_header)):
if api_key != os.getenv("API_SECRET"):
raise HTTPException(status_code=403, detail="Invalid API key")
return api_key
@app.post("/pipelines/{pipeline_id}/execute")
async def execute(..., api_key: str = Security(verify_api_key)):
# ...
2b. Add JWT Authentication
from jose import jwt
from fastapi import Depends
async def get_current_user(token: str = Depends(oauth2_scheme)):
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload["sub"]
@app.post("/pipelines/{pipeline_id}/execute")
async def execute(..., user: str = Depends(get_current_user)):
logger.info(f"User {user} executed {pipeline_id}")
# ...
2c. Add Pipeline-Level Authorization
from taskiq_flow.security.authorization import PipelineAuthorization
authorization = PipelineAuthorization(rules={
"admin": {"read": ["*"], "write": ["*"]},
"viewer": {"read": ["audio_*"], "write": []},
})
async def check_pipeline_access(
pipeline_id: str = Path(...),
user: dict = Depends(get_current_user),
):
if not authorization.can_read(pipeline_id, user):
raise HTTPException(status_code=403, detail="Access denied")
return user
3. Add Rate Limiting
from slowapi import Limiter
limiter = Limiter(key_func=get_remote_address)
@app.post("/pipelines/{pipeline_id}/execute")
@limiter.limit("10/minute")
async def execute(...):
# ...
4. Enable CORS for Web Frontend
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://your-dashboard.com"],
allow_methods=["*"],
allow_headers=["*"],
)
5. Deploy with Gunicorn
gunicorn -k uvicorn.workers.UvicornWorker -w 4 main:app --bind 0.0.0.0:8000
Learning Path
After this example:
- API Guide — Full REST API documentation and best practices
- WebSocket Guide — Add real-time updates to your API
- Tracking Guide — Store execution history for analytics
This example provides a complete, production-ready API foundation. Extend it with authentication, rate limiting, and custom endpoints for your specific use case.