Retry & Error Handling Guide
Resilient pipeline execution with retry policies, backoff, and dead-letter queues
Version: {VERSION} Related: Execution Guide, Scheduling Guide
Overview
Failures are inevitable in distributed systems. Taskiq-Flow provides comprehensive retry and error handling mechanisms to ensure pipeline robustness.
This guide covers:
- Retry policies at task and pipeline levels
- Exponential backoff strategies
- Dead-letter queues (DLQ) for unrecoverable failures
- Conditional retry logic
- Timeout configuration
- Monitoring retry metrics
1. Understanding Retries
A retry is automatically re-executing a failed task with the same inputs. Retry policies define when and how to retry.
When to Retry
✅ Good candidates for retry:
- Network timeouts (external API unavailable)
- Database connection errors (transient)
- Rate limit hits (retry-after header)
- Temporary resource exhaustion
❌ Do NOT retry:
- Validation errors (bad input won’t fix itself)
- Programming errors (bug in code)
- Missing data (won’t reappear)
- Permanent failures (404 Not Found, 401 Unauthorized)
2. Retry at Task Level
Configure retry directly on the task decorator:
@broker.task(
max_retries=3, # Maximum retry attempts (default: 0 = no retry)
retry_delay=5.0, # Seconds between retries
retry_backoff=2.0, # Multiply delay by this after each attempt
retry_timeout=60 # Overall timeout including retries
)
async def flaky_api_call():
response = await call_external_api()
return response.json()
Retry sequence:
| Attempt | Delay | Cumulative |
|---|---|---|
| 1 (initial) | 0s | 0s |
| 2 (retry 1) | 5s | 5s |
| 3 (retry 2) | 10s (5 × 2) | 15s |
| 4 (retry 3) | 20s (10 × 2) | 35s |
| Final failure | — | 35s |
3. Retry at Pipeline Level
Apply consistent retry policy to all tasks in a pipeline:
pipeline = Pipeline(broker)
pipeline.with_retry(
max_attempts=3,
delay=2.0, # Initial delay
backoff=1.5, # Backoff multiplier
on_retry=None # Optional callback
)
All tasks in this pipeline inherit this policy unless they have their own.
Inheritance precedence: Task-level overrides pipeline-level.
4. Custom Retry Policies
For fine control, implement RetryPolicy:
from taskiq_flow import RetryPolicy
class MyRetryPolicy(RetryPolicy):
def should_retry(self, attempt: int, exception: Exception) -> bool:
# Retry only on network errors, max 5 attempts
if attempt >= 5:
return False
return isinstance(exception, NetworkError)
def get_delay(self, attempt: int) -> float:
# Custom backoff: 2^attempt + random jitter
import random
base = 2 ** attempt
jitter = random.uniform(-0.1, 0.1) * base
return max(0.5, base + jitter)
pipeline.with_retry(policy=MyRetryPolicy())
4.1. Conditional Retry (Only on Specific Exceptions)
@broker.task
async def task_with_selective_retry():
try:
result = await call_api()
return result
except NetworkTimeout:
# This exception type should be retried
raise RetryException("Timeout, will retry")
except InvalidResponse:
# This error is permanent; don't retry
raise # Will fail immediately
Built-in exception-based retry:
from taskiq.exceptions import RetryException
@broker.task(retry_on=[NetworkError, TimeoutError])
async def task():
# Automatically retries on these exception types
pass
5. Exponential Backoff with Jitter
Avoid thundering herd problem (all retries happen at same time):
import random
def exponential_backoff_with_jitter(
attempt: int,
base_delay: float = 1.0,
max_delay: float = 60.0,
backoff_factor: float = 2.0,
jitter: bool = True
) -> float:
"""Calculate retry delay."""
delay = min(max_delay, base_delay * (backoff_factor ** attempt))
if jitter:
# Add ±10% random jitter
delay *= random.uniform(0.9, 1.1)
return delay
# Usage in policy
class JitteredRetryPolicy(RetryPolicy):
def get_delay(self, attempt: int) -> float:
return exponential_backoff_with_jitter(attempt, base_delay=2.0)
Why jitter? Prevents synchronized retry storms that overwhelm services.
6. Dead Letter Queues (DLQ)
When all retries are exhausted, failed tasks need somewhere to go.
6.1. Configuring DLQ
from taskiq_flow.middlewares.retry import RetryMiddleware
broker.add_middlewares(
RetryMiddleware(
max_retries=3,
dlq_queue="failed_tasks" # Tasks go here after exhausting retries
)
)
Behavior:
- Task fails → retry 1 (after delay)
- Fails again → retry 2 (after longer delay)
- Fails again → retry 3
- Fails all retries → move to
failed_tasksqueue
6.2. DLQ Inspection & Reprocessing
from taskiq_flow.middlewares.retry import DLQManager
dlq = DLQManager(broker)
# List failed tasks
failed_tasks = await dlq.list_failed()
for task_info in failed_tasks:
print(f"Task {task_info.task_id} failed: {task_info.error}")
# Replay a failed task (re-queue for execution)
await dlq.retry_task(task_id)
# Discard a failed task permanently
await dlq.delete_task(task_id)
# Bulk delete older than N days
await dlq.cleanup_older_than(days=7)
6.3. DLQ Alerting
Set up alerts when tasks land in DLQ:
class DLQAlertListener:
async def on_task_to_dlq(self, task_id: str, error: str):
send_slack_alert(f"Task {task_id} failed after retries: {error}")
create_incident_ticket(task_id, error)
dlq_manager = DLQManager(broker).with_listener(DLQAlertListener())
7. Timeouts
Prevent tasks from running indefinitely.
7.1. Task-Level Timeout
@broker.task(timeout=30) # seconds
async def potentially_slow_task():
await long_running_operation()
If the task exceeds 30 seconds, asyncio.TimeoutError is raised and retry policy applies.
7.2. Pipeline-Level Timeout
pipeline = Pipeline(broker)
pipeline.with_timeout(seconds=300) # 5 minutes for entire pipeline
Cancels all running steps when timeout expires.
7.3. Step-Level Timeout (Advanced)
from taskiq_flow.steps import TimeoutStep
pipeline = Pipeline(broker)
pipeline.call_next(TimeoutStep(my_task, timeout=10.0))
8. Error Propagation
8.1. Fail Fast (Default)
Pipeline stops at first failure:
pipeline = Pipeline(broker)
# By default: on_error="stop"
pipeline.call_next(task1) # Fails → pipeline stops, task2 never runs
pipeline.call_next(task2)
8.2. Continue on Error
Continue executing remaining steps despite failures:
pipeline = Pipeline(broker)
pipeline.on_error("continue")
pipeline.call_next(task1) # Fails, but task2 still runs
pipeline.call_next(task2)
Result: Task2 receives None or partial result; check result.is_failed.
8.3. Compensation (Saga Pattern)
Execute a cleanup task if a step fails:
pipeline = Pipeline(broker)
pipeline.call_next(allocate_resource)
.on_failure(compensate_allocation) # Run compensation if previous step failed
pipeline.call_next(process)
9. Monitoring Retries
Track retry metrics:
from taskiq_flow import PipelineTrackingManager
tracking = PipelineTrackingManager().with_auto_storage(broker)
# Retry metrics exposed in PipelineStatus:
status = await tracking.get_status(pipeline_id)
print(f"Steps: {len(status.steps)}")
for step in status.steps:
if step.retry_count > 0:
print(f" {step.name}: retried {step.retry_count} times")
print(f" Errors: {step.errors}")
Metrics to monitor:
- Retry rate (%) of tasks needing retry
- Average retry count per task
- Top failing tasks (most retries)
- DLQ size (tasks giving up)
- Time spent in retries vs actual work
Integration with Prometheus
from prometheus_client import Counter, Summary
RETRY_COUNT = Counter('task_retries_total', 'Total retry attempts', ['task_name'])
TASK_FAILURES = Counter('task_failures_total', 'Tasks that failed after retries', ['task_name'])
TASK_DURATION = Summary('task_duration_seconds', 'Task execution time', ['task_name'])
class MetricsMiddleware(PipelineMiddleware):
async def on_step_complete(self, ctx, result):
step_name = ctx.task_name
RETRY_COUNT.labels(step_name).inc(ctx.retry_count)
TASK_DURATION.labels(step_name).observe(ctx.duration_ms / 1000)
10. Best Practices
10.1. Set Reasonable Retry Limits
# Don't retry indefinitely
@broker.task(max_retries=3) # Good: bounded
@broker.task(max_retries=None) # Bad: infinite retries
10.2. Use Exponential Backoff
Implemented via retry_backoff:
@broker.task(max_retries=5, retry_delay=2.0, retry_backoff=2.0)
# Delays: 2s, 4s, 8s, 16s, 32s
10.3. Add Jitter
Randomize delays to avoid thundering herd:
retry_backoff=2.0, retry_jitter=True # Add ±10% jitter
10.4. Set Deadlines
# Overall timeout including retries
@broker.task(retry_timeout=300) # Give up after 5 minutes total
10.5. Log Every Retry
import logging
logger = logging.getLogger(__name__)
@broker.task(
max_retries=3,
on_retry=lambda attempt, exc: logger.warning(f"Retry {attempt} for task: {exc}")
)
10.6. Separate Transient vs Permanent Errors
@broker.task
async def smart_task():
try:
return await call_api()
except (Timeout, ConnectionError) as e:
raise RetryException("Transient error") from e # Will retry
except NotFoundError:
raise # No retry, fail permanently
10.7. DLQ for Investigation
Never discard failed tasks without review:
dlq = DLQManager(broker)
# Periodically review DLQ
failed = await dlq.list_failed(limit=100)
for task in failed:
logger.error(f"DLQ task {task.task_id}: {task.error}")
# Consider manual replay or data correction
11. Common Pitfalls
| Pitfall | Consequence | Solution |
|---|---|---|
Infinite retries (max_retries=None) |
System stuck in retry loop | Set explicit max |
| No backoff (delay=0) | Service overwhelmed | Use exponential backoff |
| Retrying validation errors | Wasted resources | Distinguish error types |
| No DLQ | Lost failed tasks | Configure DLQ |
| Timeout shorter than retry delay | Premature timeout | Ensure timeout > sum of retry delays |
| Multiple retries on non-idempotent tasks | Duplicate side-effects | Make tasks idempotent or limit retries |
12. Summary
| Feature | Task-level | Pipeline-level |
|---|---|---|
| Retry limit | @broker.task(max_retries=N) |
pipeline.with_retry(max_attempts=N) |
| Delay | retry_delay |
delay |
| Backoff | retry_backoff |
backoff |
| Timeout | timeout per task |
with_timeout(seconds) overall |
| DLQ | Via RetryMiddleware |
Inherited from tasks |
Complete resilient pipeline:
tracking = PipelineTrackingManager().with_auto_storage(broker)
pipeline = Pipeline(broker).with_tracking(tracking)
pipeline.with_retry(max_attempts=3, delay=2.0, backoff=2.0)
pipeline.with_timeout(seconds=300)
pipeline.on_error("continue") # Or use compensation steps
# Add retry middleware with DLQ
from taskiq_flow.middlewares.retry import RetryMiddleware
broker.add_middlewares(RetryMiddleware(max_retries=3, dlq_queue="failed_tasks"))
Next Steps
- Performance Guide — Optimize task execution and resource usage
- Scheduling Guide — Automated pipeline retries at scheduled intervals
- Tracking Guide — Monitor retry metrics in production
Failures happen. Retry smart. Track everything.