Exemple: api_example.md
Intégration FastAPI pour gestion distante de pipelines
Version : {VERSION} Fichier : examples/api_example.py
Aperçu
Cet exemple exhaustif démontre comment construire une API REST production-ready pour Taskiq-Flow en utilisant FastAPI. Il couvre:
- Configuration FastAPI avec endpoints visualization pipeline
- Enregistrer pipelines programmatiquement
- Ajouter endpoints personnalisés pour exécution distante pipeline
- Récupérer résultats pipeline via API
- Documentation OpenAPI/Swagger complète
Prérequis: Installer FastAPI et uvicorn:
pip install fastapi uvicorn[standard]
Ce Que Cet Exemple Montre
- Utilisation de
PipelineVisualizationAPIpour endpoints built-in - Enregistrement pipelines avec l’API
- Création endpoints personnalisés pour exécution distante pipeline
- Récupération résultats par task ID
- Structure API complète production
Explication du Code
1. Définir Tâches et Pipeline
Le pipeline suivant illustre un cas d’usage commun de recommandation:
from fastapi import FastAPI, HTTPException
from taskiq import InMemoryBroker
from taskiq_flow import DataflowPipeline, pipeline_task
broker = InMemoryBroker(await_inplace=True)
@broker.task
@pipeline_task(output="user_data")
async def fetch_user_data(user_id: int) -> dict:
"""Récupérer données utilisateur depuis base."""
await asyncio.sleep(0.1)
return {"id": user_id, "name": f"User{user_id}", "email": f"user{user_id}@example.com"}
@broker.task
@pipeline_task(output="order_history")
async def fetch_orders(user_data: dict) -> list:
"""Récupérer historique commandes utilisateur."""
await asyncio.sleep(0.2)
user_id = user_data["id"]
return [{"order_id": 100 + user_id, "total": 99.99}]
@broker.task
@pipeline_task(output="recommendations")
async def generate_recommendations(user_data: dict, order_history: list):
"""Générer recommandations."""
await asyncio.sleep(0.15)
return ["product_A", "product_B", "product_C"]
# Construire pipeline
sample_pipeline = DataflowPipeline.from_tasks(
broker,
[fetch_user_data, fetch_orders, generate_recommendations],
)
sample_pipeline.pipeline_id = "sample_recommendation_pipeline"
Structure du DAG:
flowchart TD
A[fetch_user_data<br/>output: user_data] --> B[fetch_orders<br/>output: order_history]
A --> C[generate_recommendations<br/>output: recommendations]
B --> C
2. Créer App FastAPI avec Visualization API
from taskiq_flow.api import create_visualization_api, PipelineVisualizationAPI
def create_app() -> FastAPI:
app = FastAPI(title="TaskIQ Flow API", version="1.0.0")
# Créer visualization API (monte automatiquement endpoints /pipelines)
viz_api = create_visualization_api(broker, app)
viz_api.add_pipeline("sample_recommendation_pipeline", sample_pipeline)
# Ajouter endpoints personnalisés ci-dessous...
return app
create_visualization_api() ajoute automatiquement endpoints:
GET /pipelines— Lister tous les pipelines enregistrésGET /pipelines/{pipeline_id}— Obtenir pipeline par IDGET /pipelines/{pipeline_id}/dag— DAG JSONGET /pipelines/{pipeline_id}/dag/dot— DAG format DOTGET /pipelines/{pipeline_id}/visualize— Métadonnées complètes
3. Ajouter Endpoint Exécution Personnalisé
@app.post("/pipelines/{pipeline_id}/execute")
async def execute_pipeline(
pipeline_id: str,
parameters: dict[str, Any],
) -> dict[str, Any]:
"""Exécute un pipeline avec paramètres donnés."""
if pipeline_id not in viz_api.pipelines:
raise HTTPException(status_code=404, detail=f"Pipeline {pipeline_id} non trouvé")
pipeline = viz_api.pipelines[pipeline_id]
try:
result = await pipeline.kiq_dataflow(**parameters)
return {
"status": "executed",
"pipeline_id": pipeline_id,
"task_id": result.task_id,
"message": "Pipeline execution started. Use /result/{task_id} pour vérifier statut.",
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
4. Ajouter Endpoint Récupération Résultat
@app.get("/pipelines/result/{task_id}")
async def get_result(task_id: str) -> dict[str, Any]:
"""Récupère le résultat d'une exécution de pipeline."""
try:
result = await broker.result_backend.get_result(task_id)
if result is None:
raise HTTPException(status_code=404, detail=f"Aucun résultat trouvé pour task_id {task_id}")
return {"task_id": task_id, "result": result.return_value}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
5. Lancer le Serveur
uvicorn examples.api_example:create_app --reload --port 8000
Ou programmatiquement:
if __name__ == "__main__":
import uvicorn
uvicorn.run(create_app(), host="0.0.0.0", port=8000)
Référence des Endpoints API
Built-in (depuis create_visualization_api)
| Méthode | Endpoint | Description |
|---|---|---|
| GET | /health |
Health check |
| GET | /pipelines |
Lister pipelines enregistrés |
| POST | /pipelines/{pipeline_id} |
Enregistrer nouveau pipeline |
| GET | /pipelines/{pipeline_id}/status |
Obtenir statut exécution courant |
| GET | /pipelines/{pipeline_id}/dag |
Obtenir DAG en JSON |
| GET | /pipelines/{pipeline_id}/dag/dot |
Obtenir DAG en format DOT |
| GET | /pipelines/{pipeline_id}/visualize |
Métadonnées complètes visualization |
Personnalisés (définis dans exemple)
| Méthode | Endpoint | Description |
|---|---|---|
| POST | /pipelines/{pipeline_id}/execute |
Exécuter pipeline avec paramètres |
| GET | /pipelines/result/{task_id} |
Obtenir résultat par task ID |
Tester l’API
1. Docs Interactives
Ouvrir http://localhost:8000/docs pour Swagger UI.
2. Exécuter Pipeline
curl -X POST "http://localhost:8000/pipelines/sample_recommendation_pipeline/execute" \
-H "Content-Type: application/json" \
-d '{"user_id": 123}'
Réponse:
{
"status": "executed",
"pipeline_id": "sample_recommendation_pipeline",
"task_id": "abc123def456",
"message": "Pipeline execution started..."
}
3. Sonder pour Résultat
curl "http://localhost:8000/pipelines/result/abc123def456"
Réponse:
{
"task_id": "abc123def456",
"result": {
"user_data": {"id": 123, "name": "User123", ...},
"order_history": [...],
"recommendations": ["product_A", "product_B", "product_C"]
}
}
4. Voir DAG
curl "http://localhost:8000/pipelines/sample_recommendation_pipeline/dag"
Retourne structure JSON du graphe pipeline.
Utilisation API Programmatique
Vous pouvez aussi utiliser classes API directement sans HTTP:
from taskiq_flow.api import PipelineVisualizationAPI
app = FastAPI()
viz_api = PipelineVisualizationAPI(broker, app)
# Register pipeline
viz_api.add_pipeline("my_pipe", my_pipeline)
# List registered pipelines
for pid, p in viz_api.pipelines.items():
print(f"Pipeline: {pid}, tasks: {len(p.visualize()['nodes'])}")
# Get visualization
dag_json = my_pipeline.visualize()
dot = my_pipeline.visualize_dot()
Utile pour construire backends dashboard personnalisés ou outils CLI.
Considérations Production
1. Utiliser Broker Persistant
from taskiq import RedisStreamBroker
broker = RedisStreamBroker(redis_url="redis://localhost:6379")
2. Ajouter Authentication
from fastapi import Depends, Security
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key")
async def verify_api_key(api_key: str = Security(api_key_header)):
if api_key != os.getenv("API_SECRET"):
raise HTTPException(status_code=403, detail="Clé API invalide")
return api_key
@app.post("/pipelines/{pipeline_id}/execute")
async def execute(..., api_key: str = Security(verify_api_key)):
# ...
2b. Ajouter Authentification JWT
from jose import jwt
from fastapi import Depends
async def get_current_user(token: str = Depends(oauth2_scheme)):
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload["sub"]
@app.post("/pipelines/{pipeline_id}/execute")
async def execute(..., user: str = Depends(get_current_user)):
logger.info(f"Utilisateur {user} a exécuté {pipeline_id}")
# ...
2c. Ajouter Autorisation au Niveau Pipeline
from taskiq_flow.security.authorization import PipelineAuthorization
authorization = PipelineAuthorization(rules={
"admin": {"read": ["*"], "write": ["*"]},
"viewer": {"read": ["audio_*"], "write": []},
})
async def check_pipeline_access(
pipeline_id: str = Path(...),
user: dict = Depends(get_current_user),
):
if not authorization.can_read(pipeline_id, user):
raise HTTPException(status_code=403, detail="Accès refusé")
return user
3. Ajouter Rate Limiting
from slowapi import Limiter
limiter = Limiter(key_func=get_remote_address)
@app.post("/pipelines/{pipeline_id}/execute")
@limiter.limit("10/minute")
async def execute(...):
# ...
4. Activer CORS pour Frontend Web
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://votre-dashboard.com"],
allow_methods=["*"],
allow_headers=["*"],
)
5. Déployer avec Gunicorn
gunicorn -k uvicorn.workers.UvicornWorker -w 4 main:app --bind 0.0.0.0:8000
Chemin d’Apprentissage
Après cet exemple:
- Guide API — Documentation complète endpoints REST et meilleures pratiques
- Guide WebSocket — Ajouter mises à jour temps réel à votre API
- Guide de Suivi — Stocker historique exécution pour analytics
Cet exemple fournit fondation API complète production-ready. Étendez-le avec authentication, rate limiting, et endpoints personnalisés pour votre cas d’usage spécifique.