v2.0.0 Python 3.10+

FastAPI-Crons

Modern, powerful cron job scheduler for FastAPI applications with decorators, async support, hooks, and distributed locking.

Easy integration
MIT License
Type Safe
app.py
from fastapi import FastAPI
from fastapi_crons import Crons

app = FastAPI()
crons = Crons(app)

@crons.cron("*/5 * * * *")
async def cleanup_task():
    # Runs every 5 minutes
    print("Cleaning up...")
    return "โœ… Cleanup complete"

Why FastAPI-Crons?

Decorator-Based

Define cron jobs with simple, clean decorators. No complex configuration files or boilerplate code.

Async Native

Full support for both sync and async functions. Built for modern Python async/await patterns.

Hooks System

Execute custom logic before, after, or on error. Perfect for logging, metrics, and notifications.

Distributed Locking

Redis-based distributed locking prevents job overlap across multiple instances.

State Tracking

Built-in SQLite and Redis backends track job execution history and status.

CLI Interface

Powerful command-line interface for managing, monitoring, and triggering jobs.

Installation

FastAPI-Crons requires Python 3.10+ and can be installed using pip or your favorite package manager.

bash
pip install fastapi-crons
bash
poetry add fastapi-crons
bash
conda install -c conda-forge fastapi-crons

Dependencies

FastAPI-Crons automatically installs all required dependencies including FastAPI, Typer, croniter, and aiosqlite.

Quick Start

Get up and running with FastAPI-Crons in under 5 minutes.

1

Create your FastAPI app

python app.py
from fastapi import FastAPI
from fastapi_crons import Crons, get_cron_router

app = FastAPI(title="My App with Cron Jobs")
crons = Crons(app)

# Add cron management endpoints
app.include_router(get_cron_router(), prefix="/api")
2

Define your cron jobs

python
@crons.cron("*/5 * * * *", name="cleanup")
async def cleanup_task():
    # Runs every 5 minutes
    print("๐Ÿงน Cleaning up temporary files...")
    return "Cleanup completed"

@crons.cron("0 0 * * *", name="daily_report")
def generate_daily_report():
    # Runs at midnight every day
    print("๐Ÿ“Š Generating daily report...")
    return "Report generated"
3

Run your application

Terminal
$ uvicorn app:app --reload
INFO: Started server process [12345]
INFO: Waiting for application startup.
INFO: Cron scheduler started with 2 jobs
INFO: Uvicorn running on http://127.0.0.1:8000

๐ŸŽ‰ Congratulations!

Your cron jobs are now running! Visit http://localhost:8000/api/crons to see your jobs in action.

Cron Expressions

FastAPI-Crons uses standard cron expressions to define when jobs should run. Master the syntax to schedule jobs precisely.

*
Minute
(0-59)
*
Hour
(0-23)
*
Day
(1-31)
*
Month
(1-12)
*
Weekday
(0-6)
* * * * *
Every minute
*/5 * * * *
Every 5 minutes
0 * * * *
Every hour
0 0 * * *
Daily at midnight
0 0 * * 0
Weekly on Sunday
0 0 1 * *
Monthly on 1st

Special Characters

*
Asterisk
Matches any value
,
Comma
Separates multiple values
-
Hyphen
Defines ranges
/
Slash
Step values

Job Definition

Learn different ways to define cron jobs in FastAPI-Crons, from simple decorators to advanced configurations.

Decorator Method

Recommended
python
@crons.cron("0 */2 * * *", name="data_sync", tags=["sync", "data"])
async def sync_data():
    # Runs every 2 hours
    print("Syncing data...")
    return "Data synced successfully"

Instance Method

Advanced
python
from fastapi_crons import CronJob

def backup_database():
    print("Backing up database...")

job = CronJob(
    func=backup_database,
    expr="0 2 * * *",
    name="database_backup",
    tags=["backup", "database"]
)

crons.jobs.append(job)

Job Parameters

func
Callable
The function to execute (sync or async)
expr
str
Cron expression defining schedule
name
str | None
Unique job name (defaults to function name)
tags
List[str] | None
Tags for categorizing jobs

Scheduler

The Crons class is the heart of FastAPI-Crons, managing job execution, state, and lifecycle.

FastAPI App
โ†’
Crons Scheduler
โ†’
Job Runners

Singleton Pattern

One scheduler instance across your entire application

Async/Await

Native async support for modern Python applications

Error Handling

Robust error handling with hooks and recovery

State Tracking

Persistent job state and execution history

Scheduler Lifecycle

Startup

Initialize state backend, load jobs, start job loops

Runtime

Execute jobs on schedule, handle hooks, track state

Shutdown

Gracefully stop job loops, cleanup resources

State Management

FastAPI-Crons provides robust state management to track job execution history and status across restarts.

SQLite Backend

Default
  • Zero configuration
  • File-based storage
  • ACID compliance
  • Thread-safe operations
python
from fastapi_crons import Crons
from fastapi_crons.state import SQLiteStateBackend

state = SQLiteStateBackend(db_path="jobs.db")
crons = Crons(state_backend=state)

Redis Backend

Distributed
  • Distributed state
  • High performance
  • Automatic expiration
  • Pub/Sub support
python
import redis.asyncio as redis
from fastapi_crons.state import RedisStateBackend

redis_client = redis.from_url("redis://localhost:6379")
state = RedisStateBackend(redis_client)
crons = Crons(state_backend=state)

Database Schema

job_state

name TEXT PRIMARY KEY
last_run TEXT
created_at TEXT
updated_at TEXT

job_status

name TEXT PRIMARY KEY
status TEXT
instance_id TEXT
started_at TEXT

Hooks System

Hooks provide powerful extension points to execute custom logic at different stages of job execution.

Before Run
Setup, validation, logging
โ†’
Job Execution
Your job function runs
โ†’
After Run
Cleanup, notifications
โ†“
On Error
Error handling, alerts

Before Run Hooks

Execute before job starts. Perfect for setup, validation, and logging.

python
def log_job_start(job_name: str, context: dict):
    print(f"๐Ÿš€ Starting job: {job_name}")
    print(f"๐Ÿ“… Scheduled: {context['scheduled_time']}")

crons.add_before_run_hook(log_job_start)

After Run Hooks

Execute after successful completion. Great for notifications and cleanup.

python
async def notify_success(job_name: str, context: dict):
    duration = context['duration']
    print(f"โœ… {job_name} completed in {duration:.2f}s")
    
    # Send notification
    await send_slack_message(f"Job {job_name} succeeded")

crons.add_after_run_hook(notify_success)

Error Hooks

Execute when job fails. Essential for error handling and alerting.

python
async def handle_error(job_name: str, context: dict):
    error = context['error']
    print(f"โŒ {job_name} failed: {error}")
    
    # Send alert
    await send_error_alert(job_name, error)

crons.add_on_error_hook(handle_error)

Hook Context

Hooks receive rich context information about the job execution:

Common Context

  • job_name - Name of the job
  • tags - Job tags
  • expr - Cron expression
  • instance_id - Scheduler instance ID
  • manual_trigger - Whether manually triggered

Execution Context

  • start_time - Job start timestamp
  • end_time - Job end timestamp
  • duration - Execution duration (seconds)
  • success - Success/failure flag
  • result - Job return value
  • error - Error message (if failed)

Job Tags

Organize and categorize your jobs with tags for better management and filtering.

python Tagged Jobs Example
@crons.cron("0 2 * * *", tags=["backup", "database", "critical"])
async def backup_database():
    # Critical database backup
    pass

@crons.cron("*/15 * * * *", tags=["cleanup", "maintenance"])
def cleanup_temp_files():
    # Regular maintenance task
    pass

@crons.cron("0 9 * * 1", tags=["reporting", "analytics"])
def weekly_report():
    # Weekly analytics report
    pass

Common Tag Categories

Priority

critical high normal low

Function

backup cleanup sync reporting

System

database api filesystem network

Tag-Based Operations

python
# Hook that only runs for critical jobs
def critical_job_monitor(job_name: str, context: dict):
    if "critical" in context["tags"]:
        # Send immediate notification for critical jobs
        send_priority_alert(job_name, context)

# Apply different monitoring for different job types
def tag_based_monitoring(job_name: str, context: dict):
    tags = context["tags"]
    
    if "backup" in tags:
        log_backup_metrics(job_name, context)
    
    if "reporting" in tags:
        track_report_generation(job_name, context)

Distributed Locking

Prevent job overlap across multiple instances with Redis-based distributed locking.

Instance A
Running Job
Redis Lock
job:backup_db
Instance B
Waiting
Instance C
Skipped

Configuration

python
from fastapi_crons import Crons, CronConfig
from fastapi_crons.locking import DistributedLockManager, RedisLockBackend
import redis.asyncio as redis

# Configure Redis connection
redis_client = redis.from_url("redis://localhost:6379")

# Setup distributed locking
config = CronConfig(enable_distributed_locking=True)
lock_backend = RedisLockBackend(redis_client)
lock_manager = DistributedLockManager(lock_backend, config)

crons = Crons(lock_manager=lock_manager, config=config)
enable_distributed_locking
bool
Enable Redis-based distributed locking
False
lock_timeout
int
Lock timeout in seconds
300
lock_renewal_interval
int
Lock renewal interval in seconds
60
instance_id
str
Unique instance identifier
Auto-generated

Benefits

Prevents Overlap

Ensures jobs don't run simultaneously across instances

Load Distribution

Automatically distributes jobs across available instances

Health Monitoring

Detects and handles instance failures gracefully

Scalability

Scale horizontally without job conflicts

Thread Safety

FastAPI-Crons is designed with thread safety in mind, ensuring reliable operation in concurrent environments.

Async Locks

Uses asyncio locks for thread-safe state operations

async with self._lock:

ACID Compliance

SQLite backend ensures atomic database operations

async with db.begin():

Singleton Pattern

Single scheduler instance prevents race conditions

if _instance is None:

Immutable State

Job definitions are immutable after registration

@dataclass(frozen=True)

Concurrency Model

Application Layer
FastAPI App
Request Handlers
Scheduler Layer
Crons Instance
Job Registry
Execution Layer
Job Runners
Hook Executors
Storage Layer
State Backend
Lock Manager

Thread Safety Best Practices

Use Async/Await

Prefer async functions for better concurrency handling

Avoid Shared State

Keep job functions stateless and independent

Use Context Managers

Properly manage resources with async context managers

Handle Exceptions

Use try/except blocks to prevent job failures from affecting others

Manual Execution

Trigger jobs manually for testing, debugging, or immediate execution outside their scheduled times.

Command Line Interface

Terminal
$ python -m fastapi_crons run-job backup_database
๐Ÿš€ Running job 'backup_database' manually...
๐Ÿ“Š Job 'backup_database' completed successfully in 2.34s
โœ… Manual execution completed

CLI Options

--force Force execution even if job is locked
--timeout Set execution timeout in seconds
--verbose Enable verbose output

HTTP API Endpoints

POST
/api/crons/{job_name}/run
bash cURL Example
curl -X POST "http://localhost:8000/api/crons/backup_database/run" \
     -H "Content-Type: application/json" \
     -d '{"force": false}'

Response

json
{
  "status": "success",
  "message": "Job 'backup_database' executed successfully",
  "execution_time": 2.34,
  "instance_id": "worker-001",
  "timestamp": "2025-01-09T11:47:21Z"
}

Programmatic Execution

python
from fastapi_crons import Crons
import asyncio

async def trigger_job_manually(job_name: str):
    crons = Crons()
    job = crons.get_job(job_name)
    
    if not job:
        raise ValueError(f"Job '{job_name}' not found")
    
    # Execute the job function directly
    if asyncio.iscoroutinefunction(job.func):
        result = await job.func()
    else:
        result = await asyncio.to_thread(job.func)
    
    return result

# Usage
result = await trigger_job_manually("backup_database")

Manual Execution Context

When jobs are triggered manually, they receive special context flags:

manual_trigger
true
Indicates manual execution
trigger_time
ISO timestamp
When the manual trigger occurred
triggered_by
cli|api|code
How the job was triggered

CLI Commands

Comprehensive command-line interface for managing, monitoring, and debugging your cron jobs.

list-jobs

Display all registered jobs with their status and last run times.

$ python -m fastapi_crons.cli list-jobs

run-job

Manually execute a specific job by name.

$ python -m fastapi_crons.cli run-job backup_db

status

Show system status and job statistics.

$ python -m fastapi_crons.cli status

start-scheduler

Start the cron scheduler as a standalone process.

$ python -m fastapi_crons.cli start-scheduler

logs

View job execution logs and history.

$ python -m fastapi_crons logs --job backup_db

config

View and modify configuration settings.

$ python -m fastapi_crons config-show

Detailed Examples

Job Listing with Filters

Job Status Output
$ python -m fastapi_crons list-jobs
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Job Name โ”‚ Last Run โ”‚ Status โ”‚ Instance โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ backup_database โ”‚ 2025-01-09 02:00:00 โ”‚ completed โ”‚ worker-001 โ”‚
โ”‚ cleanup_temp โ”‚ 2025-01-09 11:45:00 โ”‚ completed โ”‚ worker-002 โ”‚
โ”‚ send_reports โ”‚ Never โ”‚ pending โ”‚ - โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

System Status Overview

System Status
$ python -m fastapi_crons status
๐Ÿ  System Status
Instance ID: worker-001
Backend: SQLiteStateBackend
Locking: Distributed (Redis)
๐Ÿ“Š Job Statistics
Total Jobs: 12
Running: 1
Completed: 10
Failed: 1

Crons Class

The main scheduler class that manages all cron jobs in your application.

python Class Definition
class Crons:
    def __init__(
        self,
        app: FastAPI = None,
        state_backend: StateBackend = None,
        lock_manager: DistributedLockManager = None,
        config: CronConfig = None
    ):

cron()

Decorator

Decorator for registering cron jobs.

cron(expr: str, *, name: str = None, tags: List[str] = None)
@crons.cron("0 * * * *", name="hourly")
def my_job():
    pass

get_jobs()

Method

Returns a list of all registered jobs.

get_jobs() -> List[CronJob]
jobs = crons.get_jobs()
for job in jobs:
    print(job.name)

get_job()

Method

Get a specific job by name.

get_job(name: str) -> CronJob | None
job = crons.get_job("backup")
if job:
    print(job.next_run)

add_before_run_hook()

Method

Add a hook to execute before job runs.

add_before_run_hook(hook: HookFunc, job_name: str = None)
crons.add_before_run_hook(log_start)

start()

Async Method

Start the cron scheduler.

async start() -> None
await crons.start()

stop()

Async Method

Stop the cron scheduler gracefully.

async stop() -> None
await crons.stop()

Frequently Asked Questions

Can I use FastAPI-Crons with existing FastAPI applications?

Yes! FastAPI-Crons is designed to integrate seamlessly with existing FastAPI applications. Simply import and initialize the Crons class with your app instance.

How does distributed locking work?

FastAPI-Crons uses Redis-based distributed locking to ensure that jobs don't run simultaneously across multiple instances. This prevents race conditions and duplicate executions in distributed deployments.

Can I run both sync and async functions as cron jobs?

FastAPI-Crons automatically detects whether your function is synchronous or asynchronous and handles execution appropriately using asyncio.to_thread() for sync functions.

How do I monitor job execution and failures?

FastAPI-Crons provides built-in hooks for monitoring, logging, and alerting. You can also use the HTTP endpoints to check job status and execution history programmatically.

What happens if my application restarts?

Job state is persisted in the state backend (SQLite or Redis), so jobs will resume their schedules after application restart. No executions are lost.