Example: quickstart.py
Basic sequential pipeline with map, filter, and group operations
Version: {VERSION} File: examples/quickstart.py
Overview
This example demonstrates the fundamentals of Taskiq-Flow using a classic sequential pipeline. It covers:
- Task definition with
@broker.task - Pipeline construction with
.call_next(),.map(),.filter() - Running the pipeline and retrieving results
- Understanding data flow through steps
Code Walkthrough
import asyncio
from taskiq import InMemoryBroker
from taskiq_flow import Pipeline, PipelineMiddleware
# 1. Initialize broker and add middleware
broker = InMemoryBroker()
broker.add_middlewares(PipelineMiddleware())
# 2. Define tasks
@broker.task
def add_one(value: int) -> int:
return value + 1
@broker.task
def repeat(value: int, times: int) -> list[int]:
return [value] * times
@broker.task
def is_positive(value: int) -> bool:
return value >= 0
# 3. Build pipeline
async def main():
pipeline = (
Pipeline(broker)
.call_next(add_one) # Step 1: 1 → 2
.call_next(repeat, times=4) # Step 2: 2 → [2,2,2,2]
.map(add_one) # Step 3: [2,2,2,2] → [3,3,3,3]
.filter(is_positive) # Step 4: keep positives (all kept)
)
# 4. Execute
task = await pipeline.kiq(1)
result = await task.wait_result()
print("Result:", result.return_value) # [3, 3, 3, 3]
asyncio.run(main())
Step-by-Step Explanation
Step 1: call_next(add_one)
- Input:
1 - Operation:
add_one(1) = 2 - Output:
2
Step 2: call_next(repeat, times=4)
- Input:
2 - Operation:
repeat(2, times=4) = [2, 2, 2, 2] - Output:
[2, 2, 2, 2]
Step 3: map(add_one)
- Input:
[2, 2, 2, 2](iterable) - Operation: Apply
add_oneto each element in paralleladd_one(2) = 3add_one(2) = 3add_one(2) = 3add_one(2) = 3
- Output:
[3, 3, 3, 3]
Step 4: filter(is_positive)
- Input:
[3, 3, 3, 3](iterable) - Operation: Keep elements where
is_positive(element) == True- All 4 elements are positive → all kept
- Output:
[3, 3, 3, 3]
Key Concepts Demonstrated
- Task definition — Every pipeline step must be a task (
@broker.task) - Middleware requirement —
PipelineMiddlewaremust be added to broker - Data flow — Each step receives previous output (except
call_after) - Parallel execution —
.map()runs elements concurrently - Chaining — Methods return pipeline for fluent interface
Running the Example
python examples/quickstart.py
Expected output:
Result: [3, 3, 3, 3]
Variations to Try
Use filter to remove negatives
@broker.task
def subtract_three(value: int) -> int:
return value - 5 # results in [-2, -2, -2, -2]
pipeline = (
Pipeline(broker)
.call_next(add_one)
.call_next(repeat, times=4)
.map(subtract_three) # [2,2,2,2] → [-2,-2,-2,-2]
.filter(is_positive) # [] — all filtered out
)
Use group for parallel independent tasks
@broker.task
def task_a(x: int) -> int: return x * 2
@broker.task
def task_b(x: int) -> int: return x + 10
@broker.task
def task_c(x: int) -> int: return x ** 2
pipeline = Pipeline(broker).call_next(add_one) # 1 → 2
pipeline.group([task_a, task_b, task_c], param_names=["x"])
# All three receive 2 and run in parallel
# Result: [4, 12, 4]
Learning Path
After this example:
- Dataflow Pipelines — Automatic DAG construction
- Task Definition — Advanced task features
- Tracking — Monitor pipeline execution
- MapReduce — Batch processing pattern
This example is the “Hello World” of Taskiq-Flow. Master it before moving to more complex patterns.