Example: resource_aware_demo.py
Dynamic parallelism based on CPU/memory
Version: {VERSION} File: examples/resource_aware_demo.py
Overview
This example demonstrates the ResourceAwareExecutor and TaskResourceProfile features introduced in v0.4.5. It shows how to:
- Annotate tasks with resource requirements (CPU, memory, I/O vs CPU-bound)
- Compute optimal parallelism based on current system resources
- Adjust
max_paralleldynamically to avoid overloading the host - Apply different parallelism strategies for I/O-bound vs CPU-bound tasks
What This Example Shows
- Defining
TaskResourceProfilefor tasks - Creating a
ResourceAwareExecutorwith system limits - Querying
get_optimal_parallelism()for task types - Using resource profiles in DataflowPipeline
- Manual parallelism tuning guidelines
Code Walkthrough
1. Resource-Aware Executor Setup
from taskiq_flow.optimization import ResourceAwareExecutor, TaskResourceProfile
executor = ResourceAwareExecutor(
max_cpu_percent=80.0, # Don't exceed 80% CPU usage
max_memory_percent=80.0, # Don't exceed 80% RAM
min_parallel=1,
max_parallel=20,
)
# Query optimal parallelism for a given task resource estimate
optimal_light = executor.get_optimal_parallelism(
task_memory_estimate=50, # 50 MB per task
task_cpu_estimate=0.2, # 0.2 cores per task
)
print(f"Optimal for light tasks: {optimal_light}")
optimal_heavy = executor.get_optimal_parallelism(
task_memory_estimate=200, # 200 MB per task
task_cpu_estimate=1.0, # 1.0 core per task
)
print(f"Optimal for heavy tasks: {optimal_heavy}")
The executor queries current system load (via psutil) and computes how many tasks of the given profile can run in parallel without exceeding the configured limits.
2. Annotating Tasks with Resource Profiles
@broker.task
@pipeline_task(
output="light_result",
resources=TaskResourceProfile(
estimated_memory_mb=50,
estimated_cpu_cores=0.2,
io_bound=True,
),
)
async def light_task(item: int) -> dict:
await asyncio.sleep(0.1) # Simulate I/O
return {"item": item, "result": item * 2}
@broker.task
@pipeline_task(
output="heavy_result",
resources=TaskResourceProfile(
estimated_memory_mb=200,
estimated_cpu_cores=1.0,
io_bound=False,
),
)
async def heavy_task(item: int) -> dict:
total = 0
for _ in range(100000):
total += item * 2
return {"item": item, "result": total}
ResourceProfile fields:
estimated_memory_mb: Expected memory usage per task instanceestimated_cpu_cores: CPU cores required (0.5 = half a core)io_bound: True for I/O-heavy tasks (network, disk), False for CPU-heavy
3. Using Resource Profiles in Pipelines
The DataflowPipeline’s max_parallel parameter acts as an upper bound. The ResourceAwareExecutor can be used to compute a dynamic max_parallel before launching:
# Compute optimal parallelism for current system state
current_parallel = executor.get_optimal_parallelism(
task_memory_estimate=50,
task_cpu_estimate=0.2,
)
pipeline = DataflowPipeline(broker, max_parallel=current_parallel)
pipeline.map(light_task, items=list(range(20)), output="light_results")
results = await pipeline.kiq_dataflow()
For mixed workloads, sum resource usage across parallel tasks.
4. Manual Parallelism Tuning Guidelines
import psutil
cpu_count = psutil.cpu_count() or 4
memory_gb = psutil.virtual_memory().total / (1024 ** 3)
# I/O-bound tasks: can oversubscribe CPU (they spend time waiting)
io_parallel = min(50, cpu_count * 5)
# CPU-bound tasks: limit to available cores ± a small buffer
cpu_parallel = min(cpu_count + 2, 20)
print(f"Recommended max_parallel for I/O-bound: {io_parallel}")
print(f"Recommended max_parallel for CPU-bound: {cpu_parallel}")
Start conservative, benchmark, and adjust.
Expected Output
=== Resource-Aware Parallelism Demo ===
Current system state:
CPU Usage: ? (will query at runtime)
Memory: ? (will query at runtime)
--- Light tasks (I/O bound) ---
Optimal parallelism for light tasks: 25
--- Heavy tasks (CPU bound) ---
Optimal parallelism for heavy tasks: 4
Note: Actual values depend on current system load.
=== Pipeline with Resource-Aware Execution ===
Pipeline structure:
[items:20] --light_task--> [light_results]
[items:10] --heavy_task--> [heavy_results]
[light_results, heavy_results] --combine--> [final]
Executing pipeline...
Pipeline completed: {'light_results': [...], 'heavy_results': [...], 'final': {...}}
TaskResourceProfile allows you to annotate tasks with resource requirements.
ResourceAwareExecutor uses these profiles to compute optimal parallelism.
=== Manual Parallelism Tuning ===
System: 8 CPU cores, 15.6 GB RAM
Recommended max_parallel for I/O-bound tasks: 40
Recommended max_parallel for CPU-bound tasks: 10
Start with conservative values and benchmark:
pipeline.map(light_task, items, max_parallel=10)
pipeline.map(heavy_task, items, max_parallel=cpu_count)
=== Resource-Aware Demo Complete ===
Key takeaways:
1. Use TaskResourceProfile to annotate task resource needs
2. ResourceAwareExecutor computes optimal parallelism at runtime
3. Adjust max_parallel based on task type (I/O vs CPU)
4. Monitor system resources and tune accordingly
Key Points
Why Resource-Aware Parallelism?
Without resource awareness, setting max_parallel too high can:
- Exhaust memory → OOM kills
- Saturate CPU → tasks thrash, overall slowdown
- Starve other services on the same host
ResourceAwareExecutor prevents this by querying current system usage and computing safe parallelism levels.
Best Practices
- Profile your tasks: Measure actual memory/CPU usage in production
- Set conservative defaults: Start with
max_parallel=5and increase - Monitor: Watch system metrics while pipelines run
- Tune per task type: I/O-bound tasks can be more parallel than CPU-bound
- Use
min_parallelandmax_parallelbounds:ResourceAwareExecutorrespects these
Integration with Monitoring
Combine with Prometheus metrics:
from taskiq_flow.metrics import MetricsMiddleware
broker.add_middlewares(MetricsMiddleware())
Track:
taskiq_flow_worker_cpu_usage_percenttaskiq_flow_worker_memory_usage_bytestaskiq_flow_pipeline_executions_total
Learning Path
After this example:
- Performance Guide — Resource-aware parallelism deep dive
- Optimization Module API — Full
ResourceAwareExecutorreference - Tracking Guide — Monitor resource usage over time
This example keeps your pipelines from overwhelming the host. Always test resource profiles with realistic data volumes.