Pipeline Scheduling Guide
Cron-based, interval, and one-off pipeline scheduling with PipelineScheduler
Version: {VERSION} Related: Execution Guide, Tracking Guide
Overview
Taskiq-Flow includes a powerful scheduling system for running pipelines at specific times or intervals, built on top of APScheduler.
This guide covers:
PipelineScheduler— Main scheduling interface- Cron expressions and patterns
- Interval-based scheduling
- One-off executions
- Timezone handling
- Job persistence and management
- Missed execution handling
1. Quick Start
from taskiq_flow import Pipeline, PipelineScheduler
# Create your pipeline
pipeline = Pipeline(broker).call_next(my_task).call_next(another_task)
# Create scheduler
scheduler = PipelineScheduler(broker)
# Schedule to run every minute
job_id = await scheduler.schedule(
pipeline,
cron="* * * * *", # Every minute
args=("some", "data") # Arguments passed to pipeline.kiq()
)
# Start the scheduler (runs in background)
await scheduler.start()
# ... keep your application running ...
# scheduler runs in background tasks
# Shutdown gracefully
await scheduler.shutdown()
That’s the basics. Let’s explore the features in detail.
2. PipelineScheduler
The main class for scheduling pipeline executions.
2.1. Initialization
from taskiq_flow import PipelineScheduler
scheduler = PipelineScheduler(
broker,
store="memory", # "memory" or "sqlite"
store_path="./scheduler_jobs.db" # for sqlite store
)
Storage options:
| Store | Persistence | Multi-worker | Use case |
|---|---|---|---|
"memory" |
❌ No | ❌ No | Development, single-process |
"sqlite" |
✅ Yes | ⚠️ Limited* | Single-worker production, simple persistence |
"postgresql" (via URL) |
✅ Yes | ✅ Yes | Production multi-worker, HA |
"mysql" (via URL) |
✅ Yes | ✅ Yes | Production multi-worker, alternative |
"redis" |
❌ | ❌ | Not implemented (raises NotImplementedError) |
*SQLite store works with single scheduler instance; multiple workers need PostgreSQL/MySQL.
Recommendation:
- Development/mocks →
store="memory" - Single-worker production →
store="sqlite"with persistent path - Production multi-worker →
store="postgresql://user:pass@host/dbname"(recommended) # pragma: allowlist secret
Note: PostgreSQL and MySQL support is already implemented in
JobPersistenceManagerand works via SQLAlchemy async engine. See Advanced Storage (PostgreSQL/MySQL) below.
2.2. Starting & Stopping
# Start scheduler (begins monitoring schedules)
await scheduler.start()
# Run in background while app is alive
# Typically integrated into FastAPI/Quart lifespan events
# Graceful shutdown
await scheduler.shutdown()
# Waits for running jobs to finish, cancels pending
Automatic start with context manager:
async with PipelineScheduler(broker) as scheduler:
await scheduler.schedule(pipeline, cron="*/5 * * * *")
# Scheduler automatically starts on __aenter__
# ... run your app ...
# Automatically shuts down on __aexit__
3. Scheduling Methods
3.1. Cron Scheduling
job_id = await scheduler.schedule(
pipeline,
cron="0 * * * *", # Every hour at minute 0
args=("input_data",),
kwargs={"key": "value"},
pipeline_id="hourly_job_001"
)
Cron expression format: minute hour day month day-of-week
| Field | Allowed values | Special characters |
|---|---|---|
| Minute | 0-59 | * , - / |
| Hour | 0-23 | * , - / |
| Day | 1-31 | * , - / ? |
| Month | 1-12 | * , - / |
| Day of week | 0-6 (Sun-Sat) | * , - / ? |
Examples:
"*/5 * * * *" # Every 5 minutes
"0 9 * * *" # Daily at 9:00 AM
"0 0 * * 0" # Weekly on Sunday at midnight
"0 0 1 * *" # Monthly on the 1st at midnight
"0 0 1 1 *" # Yearly on January 1st at midnight
3.2. Interval Scheduling
# Run every N seconds/minutes/hours/days/weeks
job_id = await scheduler.schedule_interval(
pipeline,
seconds=30, # Every 30 seconds
# minutes=5, # Every 5 minutes
# hours=1, # Every hour
args=(data,)
)
Note: Interval scheduling uses APScheduler’s IntervalTrigger. Cron is generally preferred for production (more flexible, handles DST).
3.3. One-Off Execution (Run At)
Schedule a single future execution:
from datetime import datetime, timedelta
job_id = await scheduler.schedule_at(
pipeline,
run_at=datetime.now() + timedelta(hours=2), # In 2 hours
args=(payload,)
)
Or schedule for a specific calendar time:
run_time = datetime(2026, 12, 31, 23, 59, 59)
await scheduler.schedule_at(pipeline, run_at=run_time)
4. Job Configuration
4.1. Job ID
Each scheduled job gets a unique identifier:
job_id = await scheduler.schedule(pipeline, cron="* * * * *")
print(job_id) # e.g., "job_20260505_abcdef123456"
Customize the ID:
job_id = await scheduler.schedule(
pipeline,
cron="0 9 * * *",
job_id="daily_etl_9am" # human-readable ID
)
Useful for later management (update, cancel, list).
4.2. Arguments & Keyword Arguments
Pass arguments to the pipeline’s kiq() method:
await scheduler.schedule(
pipeline,
cron="* * * * *",
args=("positional_arg",), # tuple
kwargs={"option": True}, # dict
pipeline_id="my_pipeline" # explicit pipeline ID
)
The scheduler calls: await pipeline.kiq(*args, **kwargs) on each trigger.
4.3. Pipeline ID
Each scheduled execution can override the pipeline’s default ID:
pipeline = Pipeline(broker) # generates random ID by default
# Schedule with explicit ID (ensures uniqueness for tracking)
await scheduler.schedule(
pipeline,
cron="*/5 * * * *",
pipeline_id="my_pipeline_v1"
)
Best practice: Include timestamp or version in ID for tracking:
job_id = f"batch_process_v2_{int(time.time())}"
5. Job Management
5.1. List Scheduled Jobs
jobs = await scheduler.list_jobs()
for job in jobs:
print(f"ID: {job.id}")
print(f" Trigger: {job.trigger}")
print(f" Next run: {job.next_run_time}")
print(f" Pipeline: {job.pipeline_id}")
5.2. Get Job Details
job = await scheduler.get_job(job_id)
if job:
print(f"Job {job.id} is scheduled for {job.next_run_time}")
5.3. Modify a Job
# Reschedule an existing job
await scheduler.reschedule_job(
job_id,
cron="0 */2 * * *" # Change to every 2 hours
)
# Update job arguments
await scheduler.modify_job(
job_id,
args=("new_arg",),
kwargs={"updated": True}
)
5.4. Remove (Cancel) a Job
await scheduler.remove_job(job_id)
# Future executions are cancelled; running job continues
5.5. Pause & Resume
# Temporarily pause a job
await scheduler.pause_job(job_id)
# Resume later
await scheduler.resume_job(job_id)
6. Tracking Scheduled Executions
Each scheduled pipeline execution is automatically tracked if the pipeline has tracking enabled:
tracking = PipelineTrackingManager().with_auto_storage(broker)
pipeline = Pipeline(broker).with_tracking(tracking)
scheduler = PipelineScheduler(broker)
await scheduler.schedule(pipeline, cron="*/5 * * * *")
# Later, query execution history
history = await tracking.get_history()
for run in history:
print(f"Run {run.pipeline_id}: {run.status} at {run.started_at}")
Distinguishing scheduled runs: Use descriptive pipeline_id patterns:
await scheduler.schedule(
pipeline,
cron="0 2 * * *", # Daily 2AM
pipeline_id=f"daily_etl_{datetime.now().strftime('%Y%m%d')}"
)
# Each day gets a unique pipeline ID for tracking
7. Missed Execution Handling
When a scheduled job’s trigger time is missed (e.g., scheduler downtime, long-running job), APScheduler provides controls:
7.1. Coalesce
Combine multiple missed runs into a single execution:
from apscheduler.triggers.cron import CronTrigger
trigger = CronTrigger(
hour=9,
minute=0,
coalesce=True # If scheduler was down at 9:00, run once at 9:05 instead of 5 times
)
job = await scheduler.schedule(pipeline, trigger=trigger)
7.2. Max Instances
Prevent overlapping runs of the same job:
# Job won't start a new execution if previous instance is still running
trigger = CronTrigger(minute="*/5", max_instances=1)
job = await scheduler.schedule(pipeline, trigger=trigger)
# If a 9:00 run is still executing at 9:05, the 9:05 run is skipped
7.3. Misfire Grace Time
Allow a window after scheduled time during which execution is still valid:
from apscheduler.triggers.cron import CronTrigger
# If scheduler restarts within 10 minutes of scheduled time, still run
trigger = CronTrigger(
minute="*/5",
misfire_grace_time=600 # 10 minutes in seconds
)
job = await scheduler.schedule(pipeline, trigger=trigger)
8. Timezone Handling
By default, APScheduler uses the local system timezone. For production, set explicit timezone:
from apscheduler.triggers.cron import CronTrigger
import pytz
# Schedule for 9:00 AM in New York timezone
trigger = CronTrigger(
hour=9,
minute=0,
timezone=pytz.timezone("America/New_York")
)
job = await scheduler.schedule(pipeline, trigger=trigger)
Or set globally on scheduler:
scheduler = PipelineScheduler(
broker,
timezone="UTC" # or "America/Los_Angeles", "Europe/Paris", ...
)
Daylight Saving Time (DST): Cron triggers with explicit timezone handle DST transitions automatically. Jobs scheduled at “9:00” will still run at 9:00 local time when clocks shift.
9. Custom Triggers
Beyond cron and intervals, use any APScheduler trigger:
from apscheduler.triggers.date import DateTrigger
from datetime import datetime, timedelta
# Run once at specific datetime
trigger = DateTrigger(run_date=datetime(2026, 12, 31, 23, 59, 59))
job = await scheduler.schedule(pipeline, trigger=trigger)
# Run after a delay (from now)
trigger = DateTrigger(run_date=datetime.now() + timedelta(minutes=10))
job = await scheduler.schedule(pipeline, trigger=trigger)
See APScheduler documentation for advanced triggers (calendar-based, etc.).
10. Error Handling
10.1. Catch Job Execution Errors
Wrap pipeline execution with error handling:
@broker.task
async def my_pipeline_task(data):
try:
result = await process(data)
return result
except Exception as exc:
# Log error, but let scheduler continue
logger.error(f"Pipeline failed: {exc}")
raise # Scheduler records failure, continues with next schedule
10.2. Scheduler-Level Error Callbacks
scheduler = PipelineScheduler(broker)
@scheduler.on_error
async def handle_scheduler_error(job_id, exception):
logger.error(f"Job {job_id} failed with: {exception}")
send_alert_email(job_id, exception)
await scheduler.start()
10.3. Dead Letter Queue (DLQ)
For jobs that repeatedly fail, route to DLQ:
from taskiq_flow.middlewares.retry import RetryMiddleware
# Configure retry with backoff
broker.add_middlewares(
RetryMiddleware(
max_retries=3,
delay=10,
backoff=2
)
)
# After max retries, task goes to DLQ (if broker supports it)
# RedisStreamBroker: dead_letter_stream
# KafkaBroker: dead_letter_topic
11. Monitoring Scheduled Jobs
11.1. Health Check
async def scheduler_health():
stats = scheduler.get_stats()
return {
"scheduled_jobs": len(scheduler.get_jobs()),
"running_jobs": stats.active_jobs,
"next_run": min(job.next_run_time for job in scheduler.get_jobs())
}
11.2. Logging
Configure structured logging:
import logging
logger = logging.getLogger("taskiq_flow.scheduler")
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Scheduler logs:
# 2026-05-05 10:00:00 - taskiq_flow.scheduler - INFO - Running job daily_etl_9am
# 2026-05-05 10:00:05 - taskiq_flow.scheduler - INFO - Job daily_etl_9am completed successfully
11.3. Metrics
Integrate with Prometheus:
from prometheus_client import Counter, Gauge
SCHEDULED_JOBS = Gauge('scheduled_jobs_total', 'Total scheduled jobs')
JOB_RUNS = Counter('scheduler_job_runs_total', 'Job executions', ['job_id'])
JOB_FAILURES = Counter('scheduler_job_failures_total', 'Job failures', ['job_id'])
class MetricsScheduler(PipelineScheduler):
async def _run_job(self, job_id, pipeline):
JOB_RUNS.labels(job_id=job_id).inc()
try:
await super()._run_job(job_id, pipeline)
except Exception:
JOB_FAILURES.labels(job_id=job_id).inc()
raise
12. Production Considerations
12.1. High Availability
For production HA deployments, run multiple scheduler instances with a shared job store:
# Scheduler 1
scheduler1 = PipelineScheduler(
broker,
store="postgresql",
db_url="postgresql+asyncpg://user:pass@host/db" # pragma: allowlist secret
)
# Scheduler 2 (identical config) — only one will acquire jobs
scheduler2 = PipelineScheduler(
broker,
store="postgresql",
# pragma: allownextline secret
db_url="postgresql+asyncpg://user:pass@host/db" # pragma: allowlist secret
)
# APScheduler's job stores use row-level locking; one scheduler per job
See Advanced Storage (PostgreSQL/MySQL) for detailed configuration.
12.2. Long-Running Jobs
If a pipeline execution might exceed its schedule interval:
# Ensure no overlap
trigger = CronTrigger(minute="*/5", max_instances=1, coalesce=True)
job = await scheduler.schedule(pipeline, trigger=trigger)
# Pipeline itself has timeout
pipeline.with_timeout(seconds=300) # 5 minutes max
12.3. Start-up Behavior
On scheduler restart, missed jobs are handled according to misfire_grace_time:
# Scheduler restarts at 9:05 AM, job was scheduled for 9:00
# With misfire_grace_time=600 (10 min): job runs at 9:05
# With misfire_grace_time=0: job is skipped
trigger = CronTrigger(hour=9, misfire_grace_time=600)
12.4. Job Store Size
The job store accumulates job history. Periodically clean up:
# Remove jobs older than 30 days
old_jobs = await scheduler.list_jobs()
for job in old_jobs:
if job.next_run_time < datetime.now() - timedelta(days=30):
await scheduler.remove_job(job.id)
12.5. Advanced Storage (PostgreSQL/MySQL)
JobPersistenceManager natively supports PostgreSQL and MySQL via SQLAlchemy AsyncEngine.
PostgreSQL Configuration (Recommended for Production)
from taskiq_flow.scheduling.storage import JobPersistenceManager
# PostgreSQL with asyncpg
storage = JobPersistenceManager(
# pragma: allownextline secret
db_url="postgresql+asyncpg://user:pass@localhost:5432/taskiq_flow", # pragma: allowlist secret
async_mode=True,
)
# Using the URL helper
storage = JobPersistenceManager(
db_url=JobPersistenceManager.get_connection_url(
"postgresql",
host="localhost",
port=5432,
user="taskiq",
# pragma: allownextline secret
password="secret", # pragma: allowlist secret
database="taskiq_flow",
),
async_mode=True,
)
MySQL Configuration
storage = JobPersistenceManager(
# pragma: allownextline secret
db_url="mysql+aiomysql://user:pass@localhost:3306/taskiq_flow", # pragma: allowlist secret
async_mode=True,
)
SQLite Configuration (Development)
# Sync (development only)
storage = JobPersistenceManager(
db_url="sqlite:///jobs.db",
async_mode=False,
)
# Async (recommended even for SQLite)
storage = JobPersistenceManager(
db_url="sqlite+aiosqlite:///jobs.db",
async_mode=True,
)
Integration with PipelineScheduler
from taskiq_flow.scheduling.scheduler import PipelineScheduler
scheduler = PipelineScheduler(
broker,
job_store_url="postgresql+asyncpg://user:pass@localhost:5432/taskiq_flow", # pragma: allowlist secret
)
CRUD Operations with JobPersistenceManager
from datetime import datetime, timezone
from taskiq_flow.scheduling.storage import JobPersistenceManager, SchedulerJob, PipelineExecution
storage = JobPersistenceManager(db_url="sqlite:///test.db")
# Save a job
job = SchedulerJob(
id="job_001",
pipeline_id="etl_daily",
label="Daily ETL",
cron="0 2 * * *",
timezone="UTC",
)
await storage.save_job(job)
# Load all jobs
jobs = await storage.load_jobs()
for j in jobs:
print(f"{j.id}: {j.cron} - {j.pipeline_id}")
# Save execution history
execution = PipelineExecution(
job_id="job_001",
pipeline_id="etl_daily",
status="success",
started_at=datetime.now(timezone.utc),
completed_at=datetime.now(timezone.utc),
duration_seconds=45.2,
)
await storage.save_execution_history("job_001", execution)
# Retrieve history
history = await storage.get_execution_history("job_001", limit=10)
for run in history:
print(f" {run.status} - {run.duration_seconds}s at {run.started_at}")
| Backend | Async | Multi-worker | Production |
|---|---|---|---|
| SQLite | ✅ sqlite+aiosqlite |
⚠️ Single-writer | Dev / small projects |
| PostgreSQL | ✅ postgresql+asyncpg |
✅ Full | ✅ Recommended |
| MySQL | ✅ mysql+aiomysql |
✅ Full | ✅ Supported |
13. Common Patterns
13.1. Daily ETL Pipeline
@scheduler.schedule(
pipeline=etl_pipeline,
cron="0 2 * * *", # 2:00 AM daily
pipeline_id="daily_etl"
)
async def run_daily_etl():
pass
13.2. Periodic Health Check
health_pipeline = Pipeline(broker).call_next(health_check_task)
await scheduler.schedule_interval(
health_pipeline,
minutes=5,
pipeline_id="health_check_5m"
)
13.3. Dynamic Scheduling
Create and cancel jobs at runtime:
# Schedule on-demand
job_id = await scheduler.schedule(
pipeline,
run_at=datetime.now() + timedelta(minutes=10)
)
# Cancel if no longer needed
await scheduler.remove_job(job_id)
13.4. Chained Pipelines
Pipeline A triggers Pipeline B via scheduling:
@broker.task
async def pipeline_a_finished(result):
# Schedule pipeline B to run after A completes
job_id = await scheduler.schedule_at(
pipeline_b,
run_at=datetime.now() + timedelta(minutes=5)
)
return job_id
14. Troubleshooting
Jobs Not Running
Symptom: Scheduled jobs never execute.
Fixes:
- Ensure
await scheduler.start()is called - Check cron expression validity:
CronTrigger.from_crontab("* * * * *") - Verify timezone matches expected time (check server TZ)
- Confirm job was successfully scheduled (non-None job_id)
- Check scheduler logs for errors
Duplicate Job Execution
Symptom: Same job runs multiple times concurrently.
Fixes:
- Set
max_instances=1in trigger - Use
coalesce=Trueto combine missed runs - Ensure only one scheduler instance is running (HA needs shared store)
Job Store Persistence Not Working
Symptom: Jobs disappear after restart despite sqlite store.
Fixes:
- Use
store="sqlite"and specifystore_path - Ensure file path is writable and persistent between restarts
- Don’t mix memory and sqlite stores in same app
Timezone Issues
Symptom: Job runs at wrong time (off by hours).
Fixes:
- Set explicit timezone on scheduler:
PipelineScheduler(broker, timezone="UTC") - Or on trigger:
CronTrigger(hour=9, timezone=pytz.timezone("America/New_York")) - Verify server’s system timezone matches expectations
15. Summary
PipelineScheduler provides robust, production-ready scheduling:
| Feature | API |
|---|---|
| Cron | scheduler.schedule(pipeline, cron="* * * * *") |
| Interval | scheduler.schedule_interval(pipeline, minutes=5) |
| One-off | scheduler.schedule_at(pipeline, run_at=datetime) |
| Management | list_jobs(), remove_job(), pause_job() |
| Persistence | SQLite (single-worker), PostgreSQL/MySQL (multi-worker) |
| Tracking | Automatic with PipelineTrackingManager |
| Concurrency | max_instances, coalesce controls |
Typical production setup:
tracking = PipelineTrackingManager().with_storage(RedisPipelineStorage(redis))
pipeline = Pipeline(broker).with_tracking(tracking)
scheduler = PipelineScheduler(
broker,
job_store_url="postgresql+asyncpg://user:pass@host/taskiq_flow", # pragma: allowlist secret
)
await scheduler.start()
# Schedule your jobs...
Next Steps
- Retry Guide — Error recovery and retry policies
- Performance Guide — Optimize scheduled pipeline performance
- Tracking Guide — Monitor scheduled job history
Schedule pipelines like cron jobs. Track them like never before.