Storage & Cache Middleware Guide
Centralized persistence with StorageMiddleware and Dogpile caching with CacheMiddleware
Version: {VERSION} New in v1.2.0 Related: Execution Guide, API Reference — Storage, API Reference — Cache
Overview
v1.2.0 introduces two new middlewares that modularize persistence and caching concerns:
| Middleware | Responsibility |
|---|---|
StorageMiddleware |
Centralized, pluggable persistence for task results, pipeline state, and execution history |
CacheMiddleware |
Dogpile-based worker caching to avoid redundant task executions |
Both implement the TaskiqMiddleware lifecycle (pre_execute, post_save) and can be active simultaneously with PipelineMiddleware, TransportMiddleware, and PipelineRetryMiddleware.
StorageMiddleware — Centralized Persistence
StorageMiddleware captures every task result and stores it via a configured BaseStorageAdapter. Unlike the previous approach where tracking and scheduling persisted independently, there is now one unified store.
Why StorageMiddleware?
- Single source of truth — all task results, pipeline history, and scheduling metadata live in one place
- Pluggable backend — swap InMemory for Redis or SQLite without changing application code
- Auto-detection —
StorageAdapterFactorypicks the right backend from environment and config - Isolation — storage, cache, and tracking concerns are each in their own layer
Installation
No extra install required — included in taskiq-flow.
# For Redis backend
pip install redis
# SQLite backend included via aiosqlite (bundled)
Basic Usage
from taskiq import InMemoryBroker
from taskiq_flow import PipelineMiddleware, DataflowPipeline, pipeline_task
from taskiq_flow.middlewares import StorageMiddleware
from taskiq_flow.storage import InMemoryStorageAdapter
broker = InMemoryBroker(await_inplace=True)
# central persistence layer — store all task results automatically
storage = InMemoryStorageAdapter()
broker.add_middlewares(
StorageMiddleware(storage=storage, enabled=True),
PipelineMiddleware(),
)
Production with Redis
from taskiq_flow.middlewares import StorageMiddleware
from taskiq_flow.storage import RedisStorageAdapter
broker.add_middlewares(
StorageMiddleware(
storage=RedisStorageAdapter(
redis_url="redis://localhost:6379",
ttl_seconds=86400, # 24-hour retention
),
),
PipelineMiddleware(),
)
Task Result Keys
StorageMiddleware stores results under keys derived from the TaskiqMessage labels:
| Key Pattern | Example |
|---|---|
pipeline:{pipeline_id}:task:{task_id} |
pipeline:audio_v1:task:abc123 |
task:{task_id} |
task:abc123 |
Stored value shape:
{
"task_id": "abc123",
"pipeline_id": "audio_v1",
"is_err": false,
"return_value": "{...}",
"error": null,
"execution_time": 0.42
}
Manual Inspector Usage
storage = InMemoryStorageAdapter()
await storage.set("my_key", {"status": "running"}, ttl_seconds=600)
# Later...
result = await storage.get("my_key")
exists = await storage.exists("my_key")
# Pattern-based listing
keys = await storage.keys("pipeline:my_run:*")
# Cleanup expired entries
deleted = await storage.cleanup(ttl_seconds=3600)
TTL and Expiration
All three adapters support per-key TTL. Entries that have expired are lazily cleaned on access and eagerly via cleanup():
# Store with 24-hour TTL
await storage.set("status", {"running": True}, ttl_seconds=86_400)
# Check remaining time before it expires
entry = StorageEntry(key="status", value={"running": True}, ...)
seconds_left = entry.remaining_ttl()
CacheMiddleware — Dogpile Worker Caching
CacheMiddleware prevents redundant task executions by caching task outputs at the worker level. The Dogpile pattern ensures that only one coroutine regenerates an expired entry while others wait.
Why CacheMiddleware?
- Reduce unnecessary work — skip re-executing idempotent tasks whose inputs haven’t changed
- Lower latency — cached results are returned instantly without scheduling
- Stampede protection — Dogpile lock prevents thundering-herd at TTL expiry
- Pluggable backend — InMemory for single-worker, Redis for distributed
Basic Usage
from taskiq_flow.middlewares import CacheMiddleware
from taskiq_flow.cache import InMemoryCacheAdapter
# Task results are cached for 1 hour by default
broker.add_middlewares(
CacheMiddleware(
cache=InMemoryCacheAdapter(),
default_ttl=3600,
enabled=True,
)
)
After this, every task’s result is cached automatically. A second task execution with the same result is reduced to a cache lookup.
Producer/Consumer Middleware Ordering
Middleware order matters. CacheMiddleware should be placed before StorageMiddleware in the chain so that cache hits short-circuit before any persistence write is attempted:
# Correct ordering — cache checked first, then storage
broker.add_middlewares(
CacheMiddleware(), # ← checked first (pre_execute runs first)
StorageMiddleware(), # ← persisted if not cached
PipelineMiddleware(), # ← orchestrates downstream
)
Per-Task Overrides
Set TTL and error-caching per task execution via TaskiqMessage labels:
# In a task, override cache TTL for this execution
result = await some_task.kiq(
input_data,
labels={"cache_ttl": "7200", "cache_errors": "true"},
)
| Label | Values | Effect |
|---|---|---|
cache_ttl |
int (seconds) |
Override the default_ttl for this single execution |
cache_errors |
"true" / "false" |
Cache error results when "true" (disabled by default) |
StorageAdapterFactory — Zero-Config Setup
StorageAdapterFactory auto-creates the right adapters from TaskiqFlowConfig (read from env vars):
from taskiq_flow.storage.factory import StorageAdapterFactory
from taskiq_flow.config import TaskiqFlowConfig
# Get both middlewares in one call — sensible defaults
config = TaskiqFlowConfig(
storage_type="redis", # "redis" | "sqlite" | "inmemory" | "auto"
storage_redis_url="redis://localhost:6379",
storage_ttl_seconds=86_400, # 24 h
cache_type="redis",
cache_redis_url="redis://localhost:6379",
cache_default_ttl=3600,
)
middlewares = StorageAdapterFactory.create_default_middlewares(config=config)
broker.add_middlewares(
middlewares["cache"], # CacheMiddleware
middlewares["storage"], # StorageMiddleware
PipelineMiddleware(),
)
Environment variables (all optional):
| Env Var | Description |
|---|---|
TASKIQ_FLOW_STORAGE_TYPE |
"redis", "sqlite", "inmemory", or "auto" |
TASKIQ_FLOW_STORAGE_REDIS_URL |
Redis URL for storage |
TASKIQ_FLOW_STORAGE_TTL_SECONDS |
Default storage TTL |
TASKIQ_FLOW_CACHE_TYPE |
"redis", "inmemory", or "auto" |
TASKIQ_FLOW_CACHE_REDIS_URL |
Redis URL for cache |
Comparison: Storage vs Cache
| Aspect | StorageMiddleware |
CacheMiddleware |
|---|---|---|
| Purpose | Long-term persistence of task/pipeline state | Short-term deduplication of task results |
| TTL | Hours to days (storage_ttl_seconds) |
Minutes to hours (default_ttl) |
| Scope | Pipeline IDs, task IDs, scheduling metadata | Individual task result IDs |
| Backend | InMemory / Redis / SQLite | InMemory / Redis |
| Dogpile stampede | N/A | Yes |
| Auto-dedup | N/A | Yes |
Use both together for a complete production setup:
from taskiq_flow.storage.factory import StorageAdapterFactory
from taskiq_flow.config import TaskiqFlowConfig
config = TaskiqFlowConfig(
storage_type="redis",
storage_redis_url="redis://localhost:6379",
cache_type="redis",
cache_redis_url="redis://localhost:6379",
)
middlewares = StorageAdapterFactory.create_default_middlewares(config=config)
broker.add_middlewares(
middlewares["cache"],
middlewares["storage"],
PipelineMiddleware(),
)
Monitoring
Cache Hit Rate
stats = cache.get_stats()
print(f"Hit rate: {stats['hit_rate']:.1%}") # 94.5%
print(f"Hits: {stats['hits']}, Misses: {stats['misses']}")
Aim for a hit rate above 80% for reproducible pipelines with stable inputs.
Storage Size
all_keys = await storage.keys("*")
print(f"Total stored entries: {len(all_keys)}")
Troubleshooting
| Symptom | Likely Cause | Fix |
|---|---|---|
| Every task is a cache miss | TTL too short or inputs too variable | Increase default_ttl; check task arguments |
| Cache stampede on expiry | Using InMemoryCacheAdapter without Dogpile |
Switch to RedisCacheAdapter (proper distributed locking) |
| Storage grows without bounds | No TTL set on entries | Set ttl_seconds on StorageMiddleware; run cleanup() periodically |
| Workers share stale results | Redis TTL not respected | Verify Redis EXPIRE is applied; check Redis config |
New in v1.2.0. Both middlewares are additive — drop them into an existing broker without redesign.