Référence API : Stockage

Couche de persistance centralisée — adaptateurs, factory et StorageMiddleware

Version : {VERSION} Nouveau en v1.2.0 Module : taskiq_flow.storage, taskiq_flow.middlewares.storage

Aperçu

Taskiq-Flow v1.2.0 introduit une couche de stockage centralisée qui découple les préoccupations de persistance du broker sous-jacent. Le système de stockage offre :

  • Une interface unifiéeBaseStorageAdapter fonctionne avec tous les backends
  • Trois adaptateurs natifs — InMemory, Redis, SQLite/SQLAlchemy
  • Factory d’auto-détectionStorageAdapterFactory choisit le bon backend automatiquement
  • Intégration middlewareStorageMiddleware se branche dans le cycle de vie TaskIQ

Utilisez StorageMiddleware plutôt que du code ad-hoc : il intercepte les événements de tâche et persiste les résultats via un adaptateur interchangeable.


Module taskiq_flow.storage

StorageEntry

from taskiq_flow.storage import StorageEntry
from datetime import datetime, timezone

entry = StorageEntry(
    key="pipeline:run42:task:abc123",
    value={"statut": "terminé", "resultat": 42},
    expires_at=datetime.now(timezone.utc) + timedelta(hours=1),
    metadata={"pipeline_id": "run42"},
)

Conteneur typé pour une valeur stockée avec TTL optionnel et métadonnées.

Attribut Type Description
key str Clé unique de l’entrée
value Any Valeur stockée (recommandé : sérialisable JSON)
created_at datetime Horodatage de création (UTC)
expires_at datetime \| None Horodatage d’expiration ; None = pas d’expiration
metadata dict Métadonnées arbitraires
Méthode Signature Description
is_expired() () -> bool True si l’entrée a expiré
remaining_ttl() () -> float \| None Secondes restantes avant expiration ; None si jamais

BaseStorageAdapter (ABC)

from taskiq_flow.storage import BaseStorageAdapter

class MonAdaptateur(BaseStorageAdapter):
    async def get(self, key: str) -> Any | None: ...
    async def set(self, key: str, value: Any, ttl_seconds=None) -> None: ...
    async def delete(self, key: str) -> bool: ...
    async def exists(self, key: str) -> bool: ...
    async def keys(self, pattern="*") -> list[str]: ...
    async def cleanup(self, ttl_seconds=3600) -> int: ...

Interface abstraite que tous les backends de stockage doivent implémenter. Utilisez-la pour créer un backend personnalisé (PostgreSQL, DynamoDB, etc.).

Méthode Description
get(cle) Récupérer une valeur par clé ; None si absente ou expirée
set(cle, valeur, ttl_seconds) Stocker une valeur avec TTL optionnel en secondes
delete(cle) Supprimer l’entrée ; retourne True si supprimée
exists(cle) Vérifier l’existence d’une clé
keys(motif) Lister les clés correspondant à un motif glob (ex. "pipeline:*")
cleanup(ttl_seconds) Purger les entrées expirées ; retourne le nombre supprimé

InMemoryStorageAdapter

from taskiq_flow.storage import InMemoryStorageAdapter

stockage = InMemoryStorageAdapter()

Adaptateur en mémoire basé sur un dict avec support de TTL par clé. Idéal pour le développement, les tests et les déploiements mono-processus.


RedisStorageAdapter

from taskiq_flow.storage import RedisStorageAdapter

stockage = RedisStorageAdapter(
    redis_url="redis://localhost:6379",
    ttl_seconds=3600,
)

Adaptateur persistant basé sur Redis avec TTL natif et sérialisation JSON.

Fonctionnalité Statut
TTL natif Par clé via EXPIRE Redis
Sérialisation JSON Automatique
Partage distribué Tous les workers partagent le même Redis
Persistance Tant que Redis persiste

SQLiteStorageAdapter

from taskiq_flow.storage import SQLiteStorageAdapter

stockage = SQLiteStorageAdapter(
    db_url="sqlite+aiosqlite:///taskiq-flow.db",
    async_mode=True,
)

Adaptateur SQLite/SQLAlchemy pour une persistance locale sans service externe.


Module taskiq_flow.storage.factory

StorageAdapterFactory

from taskiq_flow.storage.factory import StorageAdapterFactory
config = TaskiqFlowConfig()
adaptateur = StorageAdapterFactory.create_storage_adapter(config=config)

Ordre de priorité pour create_storage_adapter(type="auto") :

Priorité Backend Condition
1 RedisStorageAdapter storage_type="redis" ou broker est RedisBroker
2 SQLiteStorageAdapter storage_type="sqlite" ou "sqlalchemy"
3 InMemoryStorageAdapter Fallback
Méthode de Factory Description
create_storage_adapter(config, broker, …) Crée un BaseStorageAdapter
create_cache_adapter(config, …) Crée un BaseCacheAdapter
create_default_middlewares(config, broker) Crée StorageMiddleware et CacheMiddleware

Module taskiq_flow.middlewares.storage

StorageMiddleware

from taskiq_flow.middlewares import StorageMiddleware
broker.add_middlewares(
    StorageMiddleware(storage=InMemoryStorageAdapter(), enabled=True),
    PipelineMiddleware(),
)

Intercepte le cycle de vie TaskIQ et persiste les résultats de tâche via l’adaptateur de stockage configuré.

Paramètre Type Défaut Description
storage BaseStorageAdapter \| None None Backend de stockage
enabled bool True Active/désactive la persistance
Hook Signature Description
post_save(message, result) Persiste TaskiqResult dans le stockage Clé : task:{task_id} ou pipeline:{pipeline_id}:task:{task_id}

Choix d’un Backend

Backend Cas d’usage Avantages Inconvénients
InMemoryStorageAdapter Dev, tests, mono-processus Zéro dépendance, rapide Volatile, non partagé
RedisStorageAdapter Production, distribué Rapide, partagé, persisté Requiert Redis
SQLiteStorageAdapter Persistance légère sans service externe Pas de service externe Contention mono-écriture

Lectures Associées


Nouveau en v1.2.0. Les adaptateurs de stockage sont entièrement interchangeables : changez l’adaptateur sans toucher la logique métier.


This site uses Just the Docs, a documentation theme for Jekyll.