FastAPI-Crons
Modern, powerful cron job scheduler for FastAPI applications with decorators, async support, hooks, and distributed locking.
from fastapi import FastAPI
from fastapi_crons import Crons
app = FastAPI()
crons = Crons(app)
@crons.cron("*/5 * * * *")
async def cleanup_task():
# Runs every 5 minutes
print("Cleaning up...")
return "โ
Cleanup complete"
Why FastAPI-Crons?
Decorator-Based
Define cron jobs with simple, clean decorators. No complex configuration files or boilerplate code.
Async Native
Full support for both sync and async functions. Built for modern Python async/await patterns.
Hooks System
Execute custom logic before, after, or on error. Perfect for logging, metrics, and notifications.
Distributed Locking
Redis-based distributed locking prevents job overlap across multiple instances.
State Tracking
Built-in SQLite and Redis backends track job execution history and status.
CLI Interface
Powerful command-line interface for managing, monitoring, and triggering jobs.
Installation
FastAPI-Crons requires Python 3.10+ and can be installed using pip or your favorite package manager.
pip install fastapi-crons
poetry add fastapi-crons
conda install -c conda-forge fastapi-crons
Dependencies
FastAPI-Crons automatically installs all required dependencies including FastAPI, Typer, croniter, and aiosqlite.
Quick Start
Get up and running with FastAPI-Crons in under 5 minutes.
Create your FastAPI app
from fastapi import FastAPI
from fastapi_crons import Crons, get_cron_router
app = FastAPI(title="My App with Cron Jobs")
crons = Crons(app)
# Add cron management endpoints
app.include_router(get_cron_router(), prefix="/api")
Define your cron jobs
@crons.cron("*/5 * * * *", name="cleanup")
async def cleanup_task():
# Runs every 5 minutes
print("๐งน Cleaning up temporary files...")
return "Cleanup completed"
@crons.cron("0 0 * * *", name="daily_report")
def generate_daily_report():
# Runs at midnight every day
print("๐ Generating daily report...")
return "Report generated"
Run your application
๐ Congratulations!
Your cron jobs are now running! Visit http://localhost:8000/api/crons
to see your jobs in action.
Cron Expressions
FastAPI-Crons uses standard cron expressions to define when jobs should run. Master the syntax to schedule jobs precisely.
(0-59)
(0-23)
(1-31)
(1-12)
(0-6)
Special Characters
Job Definition
Learn different ways to define cron jobs in FastAPI-Crons, from simple decorators to advanced configurations.
Decorator Method
Recommended@crons.cron("0 */2 * * *", name="data_sync", tags=["sync", "data"])
async def sync_data():
# Runs every 2 hours
print("Syncing data...")
return "Data synced successfully"
Instance Method
Advancedfrom fastapi_crons import CronJob
def backup_database():
print("Backing up database...")
job = CronJob(
func=backup_database,
expr="0 2 * * *",
name="database_backup",
tags=["backup", "database"]
)
crons.jobs.append(job)
Job Parameters
Scheduler
The Crons class is the heart of FastAPI-Crons, managing job execution, state, and lifecycle.
Singleton Pattern
One scheduler instance across your entire application
Async/Await
Native async support for modern Python applications
Error Handling
Robust error handling with hooks and recovery
State Tracking
Persistent job state and execution history
Scheduler Lifecycle
Startup
Initialize state backend, load jobs, start job loops
Runtime
Execute jobs on schedule, handle hooks, track state
Shutdown
Gracefully stop job loops, cleanup resources
State Management
FastAPI-Crons provides robust state management to track job execution history and status across restarts.
SQLite Backend
Default- Zero configuration
- File-based storage
- ACID compliance
- Thread-safe operations
from fastapi_crons import Crons
from fastapi_crons.state import SQLiteStateBackend
state = SQLiteStateBackend(db_path="jobs.db")
crons = Crons(state_backend=state)
Redis Backend
Distributed- Distributed state
- High performance
- Automatic expiration
- Pub/Sub support
import redis.asyncio as redis
from fastapi_crons.state import RedisStateBackend
redis_client = redis.from_url("redis://localhost:6379")
state = RedisStateBackend(redis_client)
crons = Crons(state_backend=state)
Database Schema
job_state
job_status
Hooks System
Hooks provide powerful extension points to execute custom logic at different stages of job execution.
Before Run Hooks
Execute before job starts. Perfect for setup, validation, and logging.
def log_job_start(job_name: str, context: dict):
print(f"๐ Starting job: {job_name}")
print(f"๐
Scheduled: {context['scheduled_time']}")
crons.add_before_run_hook(log_job_start)
After Run Hooks
Execute after successful completion. Great for notifications and cleanup.
async def notify_success(job_name: str, context: dict):
duration = context['duration']
print(f"โ
{job_name} completed in {duration:.2f}s")
# Send notification
await send_slack_message(f"Job {job_name} succeeded")
crons.add_after_run_hook(notify_success)
Error Hooks
Execute when job fails. Essential for error handling and alerting.
async def handle_error(job_name: str, context: dict):
error = context['error']
print(f"โ {job_name} failed: {error}")
# Send alert
await send_error_alert(job_name, error)
crons.add_on_error_hook(handle_error)
Hook Context
Hooks receive rich context information about the job execution:
Common Context
job_name
- Name of the jobtags
- Job tagsexpr
- Cron expressioninstance_id
- Scheduler instance IDmanual_trigger
- Whether manually triggered
Execution Context
start_time
- Job start timestampend_time
- Job end timestampduration
- Execution duration (seconds)success
- Success/failure flagresult
- Job return valueerror
- Error message (if failed)
Distributed Locking
Prevent job overlap across multiple instances with Redis-based distributed locking.
Configuration
from fastapi_crons import Crons, CronConfig
from fastapi_crons.locking import DistributedLockManager, RedisLockBackend
import redis.asyncio as redis
# Configure Redis connection
redis_client = redis.from_url("redis://localhost:6379")
# Setup distributed locking
config = CronConfig(enable_distributed_locking=True)
lock_backend = RedisLockBackend(redis_client)
lock_manager = DistributedLockManager(lock_backend, config)
crons = Crons(lock_manager=lock_manager, config=config)
Benefits
Prevents Overlap
Ensures jobs don't run simultaneously across instances
Load Distribution
Automatically distributes jobs across available instances
Health Monitoring
Detects and handles instance failures gracefully
Scalability
Scale horizontally without job conflicts
Thread Safety
FastAPI-Crons is designed with thread safety in mind, ensuring reliable operation in concurrent environments.
Async Locks
Uses asyncio locks for thread-safe state operations
async with self._lock:
ACID Compliance
SQLite backend ensures atomic database operations
async with db.begin():
Singleton Pattern
Single scheduler instance prevents race conditions
if _instance is None:
Immutable State
Job definitions are immutable after registration
@dataclass(frozen=True)
Concurrency Model
Thread Safety Best Practices
Use Async/Await
Prefer async functions for better concurrency handling
Avoid Shared State
Keep job functions stateless and independent
Use Context Managers
Properly manage resources with async context managers
Handle Exceptions
Use try/except blocks to prevent job failures from affecting others
Manual Execution
Trigger jobs manually for testing, debugging, or immediate execution outside their scheduled times.
Command Line Interface
HTTP API Endpoints
curl -X POST "http://localhost:8000/api/crons/backup_database/run" \
-H "Content-Type: application/json" \
-d '{"force": false}'
Response
{
"status": "success",
"message": "Job 'backup_database' executed successfully",
"execution_time": 2.34,
"instance_id": "worker-001",
"timestamp": "2025-01-09T11:47:21Z"
}
Programmatic Execution
from fastapi_crons import Crons
import asyncio
async def trigger_job_manually(job_name: str):
crons = Crons()
job = crons.get_job(job_name)
if not job:
raise ValueError(f"Job '{job_name}' not found")
# Execute the job function directly
if asyncio.iscoroutinefunction(job.func):
result = await job.func()
else:
result = await asyncio.to_thread(job.func)
return result
# Usage
result = await trigger_job_manually("backup_database")
Manual Execution Context
When jobs are triggered manually, they receive special context flags:
CLI Commands
Comprehensive command-line interface for managing, monitoring, and debugging your cron jobs.
list-jobs
Display all registered jobs with their status and last run times.
run-job
Manually execute a specific job by name.
status
Show system status and job statistics.
start-scheduler
Start the cron scheduler as a standalone process.
logs
View job execution logs and history.
config
View and modify configuration settings.
Detailed Examples
Job Listing with Filters
System Status Overview
Crons Class
The main scheduler class that manages all cron jobs in your application.
class Crons:
def __init__(
self,
app: FastAPI = None,
state_backend: StateBackend = None,
lock_manager: DistributedLockManager = None,
config: CronConfig = None
):
cron()
DecoratorDecorator for registering cron jobs.
cron(expr: str, *, name: str = None, tags: List[str] = None)
@crons.cron("0 * * * *", name="hourly")
def my_job():
pass
get_jobs()
MethodReturns a list of all registered jobs.
get_jobs() -> List[CronJob]
jobs = crons.get_jobs()
for job in jobs:
print(job.name)
get_job()
MethodGet a specific job by name.
get_job(name: str) -> CronJob | None
job = crons.get_job("backup")
if job:
print(job.next_run)
add_before_run_hook()
MethodAdd a hook to execute before job runs.
add_before_run_hook(hook: HookFunc, job_name: str = None)
crons.add_before_run_hook(log_start)
start()
Async MethodStart the cron scheduler.
async start() -> None
await crons.start()
stop()
Async MethodStop the cron scheduler gracefully.
async stop() -> None
await crons.stop()
Frequently Asked Questions
Can I use FastAPI-Crons with existing FastAPI applications?
Yes! FastAPI-Crons is designed to integrate seamlessly with existing FastAPI applications. Simply import and initialize the Crons class with your app instance.
How does distributed locking work?
FastAPI-Crons uses Redis-based distributed locking to ensure that jobs don't run simultaneously across multiple instances. This prevents race conditions and duplicate executions in distributed deployments.
Can I run both sync and async functions as cron jobs?
FastAPI-Crons automatically detects whether your function is synchronous or asynchronous and handles execution appropriately using asyncio.to_thread() for sync functions.
How do I monitor job execution and failures?
FastAPI-Crons provides built-in hooks for monitoring, logging, and alerting. You can also use the HTTP endpoints to check job status and execution history programmatically.
What happens if my application restarts?
Job state is persisted in the state backend (SQLite or Redis), so jobs will resume their schedules after application restart. No executions are lost.