Performance Optimization Guide
Resource-aware parallelism, memory optimization, and scaling strategies
Version: {VERSION} Related: Execution Guide, Tracking Guide
Overview
Taskiq-Flow is designed for high-performance asynchronous execution. This guide covers optimization techniques to maximize throughput, minimize latency, and efficiently use system resources.
Topics covered:
- Parallelism tuning (
max_parallel) - CPU and RAM profiling
- Task resource profiles
- Memory management strategies
- Bottleneck identification
- Scaling from single worker to distributed
1. Understanding the Performance Landscape
Performance optimization involves tradeoffs between:
| Dimension | What it affects | Typical trade-off |
|---|---|---|
| Concurrency | Throughput (# tasks/second) | Memory usage, context switching |
| Parallelism | CPU utilization | Overhead of coordination |
| Latency | Task completion time | Resource consumption |
| Memory | Dataset size capacity | GC pauses, cache efficiency |
| I/O | External service calls | Network bandwidth, connection limits |
Key insight: Taskiq-Flow’s parallelism is bounded by max_parallel settings across pipeline steps, and by available system resources (CPU cores, RAM).
2. Parallelism Tuning
2.1. The max_parallel Parameter
Control concurrent task execution at the step level:
# Sequential Pipeline
pipeline.map(process_item, items, max_parallel=10) # Max 10 concurrent
# Dataflow Pipeline: configure at pipeline level
pipeline = DataflowPipeline(broker, max_parallel=20)
# MapReduce
mapped = await MapReduce.map(
broker,
process_item,
items,
max_parallel=15
)
Default behavior: Without max_parallel, Taskiq-Flow attempts to run all independent tasks concurrently (essentially unlimited). This is fine for small numbers (<100) but dangerous for large datasets.
2.2. Determining Optimal max_parallel
For I/O-Bound Tasks (network calls, disk I/O)
# High I/O wait, low CPU: can handle many concurrent tasks
pipeline.map(fetch_url, url_list, max_parallel=50)
# Rule of thumb: 2–5× number of CPU cores
Rationale: While one task waits for network, another uses CPU. High concurrency saturates I/O pipelines.
For CPU-Bound Tasks (computations, transcoding)
# CPU-intensive: limit to core count (or slightly higher)
import os
cpu_cores = os.cpu_count() or 4
pipeline.map(transcode, files, max_parallel=cpu_cores + 2)
# Rule of thumb: CPU cores ± 2
Rationale: Python’s GIL limits true parallelism; asyncio still benefits from multiple cores when tasks release GIL (NumPy, C extensions). Over-subscription leads to context switching overhead.
For Mixed Workloads
Profile and adjust:
# Start conservative
for parallel in [5, 10, 20, 50]:
start = time.time()
await pipeline.kiq_dataflow(data)
duration = time.time() - start
print(f"Parallelism {parallel}: {duration:.2f}s")
Find the knee of the curve — point where increasing parallelism yields diminishing returns.
2.3. Global Parallelism Limit
Set a global cap across all pipelines:
from taskiq_flow.optimization.parallel import set_max_parallel_tasks
set_max_parallel_tasks(100) # Never exceed 100 concurrent tasks globally
Useful in multi-tenant systems to prevent one pipeline from starving others.
3. Resource-Aware Scheduling
Taskiq-Flow can schedule tasks based on CPU/RAM requirements (requires resource-aware worker pool — advanced).
3.1. Annotating Tasks with Resource Needs
from taskiq_flow import CPUProfile, RAMProfile
@broker.task
@CPUProfile(cpu_units=2) # Needs 2 CPU cores
@RAMProfile(ram_mb=4096) # Needs 4GB RAM
def heavy_computation(data):
# Will only run on workers with sufficient resources
pass
3.2. Resource-Aware Worker Pool
from taskiq_flow import ResourceAwareWorkerPool
pool = ResourceAwareWorkerPool(
workers=[
{"cpu_cores": 8, "ram_gb": 32, "labels": {"gpu": True}},
{"cpu_cores": 4, "ram_gb": 16, "labels": {"gpu": False}},
]
)
# Tasks are automatically routed to compatible workers
Note: This feature requires custom worker implementation; standard brokers ignore resource profiles.
4. Memory Optimization
4.1. Avoid Large In-Memory Data Transfers
Pass references instead of full data:
# ❌ Bad: copies entire dataset per task call
pipeline.map(process, large_dataset) # Each task gets full dataset copy
# ✅ Better: pass identifiers, fetch inside task
@broker.task
def process(item_id: str):
item = database.get(item_id) # Fetch on-demand
return process_item(item)
pipeline.map(process, item_ids) # Only IDs passed
4.2. Stream Large Datasets
Use chunking:
def chunked(iterable, chunk_size=100):
for i in range(0, len(iterable), chunk_size):
yield iterable[i:i + chunk_size]
for chunk in chunked(large_list, 100):
results = await pipeline.kiq_dataflow(chunk)
# Process results before next chunk to free memory
4.3. Clear Results After Use
Pipeline results stay in tracking storage. Clean up after you’re done:
# After processing, delete pipeline record
await tracking.delete_pipeline(pipeline.pipeline_id)
Or set TTL on storage:
RedisPipelineStorage(redis, ttl_seconds=86400) # Auto-delete after 1 day
5. Profiling & Bottleneck Detection
5.1. Built-in Timing
Each step records duration automatically (with tracking enabled):
status = await tracking.get_status(pipeline_id)
for step in status.steps:
print(f"{step.name}: {step.duration_ms}ms")
Identify slowest steps → optimization targets.
5.2. Memory Profiling
Use Python’s tracemalloc:
import tracemalloc
tracemalloc.start()
# Run pipeline
await pipeline.kiq(data)
# Check memory usage
current, peak = tracemalloc.get_traced_memory()
print(f"Current: {current/1024/1024:.1f} MB")
print(f"Peak: {peak/1024/1024:.1f} MB")
tracemalloc.stop()
5.3. CPU Profiling
import cProfile
import pstats
profiler = cProfile.Profile()
profiler.enable()
await pipeline.kiq(data)
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20) # Top 20 functions
5.4. Async-Specific Profiling
uvloop for faster event loop:
import uvloop
uvloop.install() # Replaces default asyncio event loop
Benchmark improvement: uvloop can provide 2×–3× speedup for I/O-bound workloads.
6. Database/External Service Optimization
6.1. Connection Pooling
For databases (PostgreSQL, Redis), reuse connections:
from asyncpg import create_pool
pool = await create_pool(database="...", min_size=5, max_size=20)
@broker.task
async def db_task(query: str):
async with pool.acquire() as conn:
return await conn.fetch(query)
6.2. Batch Operations
Instead of many small calls, batch:
# ❌ N separate calls
for item in items:
await db.insert(item)
# ✅ Single batch insert
await db.bulk_insert(items)
6.3. Cache Results
from functools import lru_cache
@broker.task
@lru_cache(maxsize=1000)
def expensive_computation(key: str):
return compute(key)
Or use Redis cache:
import redis
cache = redis.Redis(...)
@broker.task
async def cached_task(key: str):
cached = await cache.get(key)
if cached:
return json.loads(cached)
result = await compute(key)
await cache.setex(key, 3600, json.dumps(result))
return result
7. Distributed Scaling
7.1. Multiple Workers
Scale horizontally by running multiple worker processes:
# Terminal 1
taskiq worker --broker redis://localhost:6379
# Terminal 2
taskiq worker --broker redis://localhost:6379
# Terminal 3
taskiq worker --broker redis://localhost:6379
All workers share the same broker (Redis) and process tasks concurrently.
Throughput ≈ (# workers) × (tasks/worker/second).
7.2. Worker Pool Management
Use a process manager (systemd, supervisord, Docker Compose):
# docker-compose.yml
services:
worker-1:
image: taskiq-flow-worker
command: taskiq worker --broker ${REDIS_URL}
worker-2:
image: taskiq-flow-worker
command: taskiq worker --broker ${REDIS_URL}
worker-3:
image: taskiq-flow-worker
command: taskiq worker --broker ${REDIS_URL}
7.3. Queue Prioritization
Route critical pipelines to dedicated queues:
@broker.task(queue="high_priority")
def critical_task(): ...
# Workers can be configured to process specific queues first
7.4. Geo-Distribution
For low-latency global deployments, deploy workers in multiple regions with a global broker (Kafka) or regional Redis clusters with replication.
8. Benchmarking
Measure before and after optimization:
import time
async def benchmark(pipeline, iterations=10):
durations = []
for _ in range(iterations):
start = time.perf_counter()
result = await pipeline.kiq(data)
await result.wait_result()
duration = time.perf_counter() - start
durations.append(duration)
avg = sum(durations) / len(durations)
p95 = sorted(durations)[int(0.95 * len(durations))]
print(f"Average: {avg:.3f}s, P95: {p95:.3f}s")
return durations
Key metrics:
- Throughput: tasks/second
- P50/P95/P99 latency: median, 95th, 99th percentile
- Memory peak: maximum RSS/resident set size
- CPU utilization: % of cores used
9. Production Checklist
- Set
max_parallelappropriately for task type (CPU vs I/O) - Use connection pooling for external services
- Enable Redis storage for tracking (avoid memory leaks)
- Set TTL on tracking/result storage
- Configure timeouts on all tasks
- Add retry policies with backoff and jitter
- Monitor memory usage and set alerts
- Profile slow tasks with cProfile/tracemalloc
- Scale workers horizontally based on queue depth
- Use queue priorities for critical pipelines
- Implement DLQ and review failed tasks regularly
- Test failure scenarios (network partitions, service outages)
10. Troubleshooting Performance
Pipeline Running Slowly
Diagnostic steps:
- Check step durations in tracking:
status = await tracking.get_status(pipeline_id) slowest = max(status.steps, key=lambda s: s.duration_ms) print(f"Slowest step: {slowest.name} at {slowest.duration_ms}ms") - Profile with cProfile to see where time is spent
- Verify
max_parallelnot too low - Check for blocking I/O (use async libraries)
High Memory Usage
Causes & fixes:
| Cause | Fix |
|---|---|
| Large dataset in single step | Chunk data, process in batches |
| Results accumulating in tracking storage | Set TTL, delete after use |
| Memory leak in task code | Profile with tracemalloc, fix leaks |
| Too many parallel tasks | Reduce max_parallel |
Worker Starvation
Symptom: Tasks queued but not executing.
Fixes:
- Increase number of worker processes
- Ensure broker (Redis) has enough connections
- Check for long-running tasks blocking queue
- Consider task priorities or separate queues
11. Advanced: Custom Executors
For specialized workloads, implement custom executors:
from taskiq_flow import ExecutionEngine
from taskiq_flow.dataflow import DAG
class GPUOptimizedEngine(ExecutionEngine):
async def schedule_task(self, task_node, inputs):
# Custom scheduling logic: route GPU tasks to GPU workers
if task_node.labels.get("requires_gpu"):
return await self.gpu_worker_pool.submit(task_node, inputs)
return await super().schedule_task(task_node, inputs)
engine = GPUOptimizedEngine(broker, dag)
results = await engine.execute(inputs)
12. Summary
Performance optimization is iterative:
- Measure — establish baseline with benchmarks
- Identify — find bottlenecks with profiling
- Tune — adjust
max_parallel, resource profiles, batching - Scale — add workers, optimize external services
- Monitor — track metrics in production
- Repeat — optimization never ends
Next Steps
- Tracking Guide — Monitor pipeline metrics
- Dataflow Guide — Complete guide to DAG pipelines and dataflow architecture
- API Guide — Build custom dashboards for performance
- Example: Dataflow Audio Pipeline — See optimization in action
Go fast, but measure first.