杰瑞科技汇

Python APScheduler如何实现定时任务?

Of course! Here is a comprehensive guide to using APScheduler (Advanced Python Scheduler) in Python.

Python APScheduler如何实现定时任务?-图1
(图片来源网络,侵删)

What is APScheduler?

APScheduler is a powerful, lightweight, and in-process task scheduler for Python. It allows you to schedule Python functions (or any callable) to be executed at certain times, on specific intervals, or using a Cron-like syntax.

Key features:

  • Multiple Backends: Can run in-memory, in a database (SQLite, PostgreSQL, MySQL), or with Redis.
  • Multiple Schedulers: Supports BackgroundScheduler (for long-running scripts), BlockingScheduler (for simple scripts), and AsyncIOScheduler (for use with asyncio).
  • Flexible Triggers: Schedule jobs by date, interval, or a cron expression.
  • Robust Job Stores: Persist jobs and their state across application restarts.
  • Error Handling: Includes built-in error handling and logging.

Installation

First, you need to install the library. It's recommended to install it with its dependencies for your chosen backend.

# Basic installation (uses memory as the job store)
pip install apscheduler
# Recommended for persistent jobs (using SQLite)
pip install apscheduler[sqlalchemy]
# For Redis
pip install apscheduler[redis]

Core Concepts

Before we dive into code, let's understand the main components of APScheduler:

Python APScheduler如何实现定时任务?-图2
(图片来源网络,侵删)
  • Scheduler (apscheduler.schedulers): The main object that manages jobs and triggers. It's the heart of the library.
  • Job (Job): A scheduled task. A job has a callable, a trigger, and other metadata (like its ID, next run time, etc.).
  • Job Store (JobStore): The backend that stores the jobs. It can be in-memory, a database table, or a Redis key. It allows jobs to persist.
  • Executor (Executor): The component that actually runs the job's callable. The default is a ThreadPoolExecutor, which runs jobs in separate threads to avoid blocking the main application.
  • Trigger (Trigger): The rule that defines when a job should run. The three main types are:
    • date: Run once at a specific point in time.
    • interval: Run repeatedly after a certain time interval.
    • cron: Run based on a cron expression (e.g., "every minute at 30 seconds past").

Basic Usage: The "Hello World" Example

Let's start with the simplest case: running a function once after a 5-second delay. We'll use the BlockingScheduler, which is perfect for simple scripts where you want the scheduler to run in the foreground.

from apscheduler.schedulers.blocking import BlockingScheduler
import time
def my_job():
    print("Hello, World! The time is:", time.strftime("%Y-%m-%d %H:%M:%S"))
# 1. Create a scheduler
scheduler = BlockingScheduler()
# 2. Add a job
# - The function to run: my_job
# - The trigger: date, set to 5 seconds from now
# - The id for the job: 'my_job_id'
scheduler.add_job(my_job, 'date', run_date=datetime.datetime.now() + datetime.timedelta(seconds=5), id='my_job_id')
print("Scheduler started. Job will run in 5 seconds...")
# 3. Start the scheduler
scheduler.start()

How to run it:

  1. Save the code as a Python file (e.g., simple_scheduler.py).
  2. Run it from your terminal: python simple_scheduler.py.
  3. You'll see the message "Scheduler started..." and then, after 5 seconds, "Hello, World!".

Scheduling with Different Triggers

This is where APScheduler becomes truly useful. Let's explore the different trigger types.

a) Interval Trigger

Run a job repeatedly at a fixed interval.

Python APScheduler如何实现定时任务?-图3
(图片来源网络,侵删)
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_message():
    print("This message is printed every 3 seconds.", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
# Run print_message every 3 seconds
scheduler.add_job(print_message, 'interval', seconds=3)
print("Scheduler started. Press Ctrl+C to exit.")
try:
    scheduler.start()
except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()
    print("Scheduler shut down.")

b) Cron Trigger

Run a job based on a cron expression. This is extremely powerful for recurring tasks.

from apscheduler.schedulers.blocking import BlockingScheduler
import time
def report():
    print("Generating a report...", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
# Run the report every minute at the 30-second mark
scheduler.add_job(report, 'cron', second=30)
print("Scheduler started. Report will run every minute at :30.")
try:
    scheduler.start()
except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()

Common Cron Expressions:

  • minute='*/5': Every 5 minutes.
  • hour='9-17', day_of_week='mon-fri': Every weekday (Monday to Friday) between 9 AM and 5 PM.
  • day=1: On the 1st day of every month (at midnight).

Choosing the Right Scheduler

Using the right scheduler is crucial for your application's architecture.

a) BlockingScheduler

  • Use Case: Simple scripts, standalone applications.
  • Behavior: It blocks the main thread. Once you call scheduler.start(), your script will wait there until the scheduler is shut down.
  • Example: The examples above all use this.

b) BackgroundScheduler

  • Use Case: Web servers (like Flask, Django), long-running applications.
  • Behavior: Runs in the background in a separate thread. It does not block your main application.
  • Example: A web server that needs to perform a task every hour without affecting its responsiveness.
from apscheduler.schedulers.background import BackgroundScheduler
import time
def my_background_task():
    print("This task runs in the background!", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BackgroundScheduler()
scheduler.add_job(my_background_task, 'interval', seconds=5)
scheduler.start()
# Your main application can continue to run
print("Main application is running...")
try:
    # Keep the main thread alive
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    # Shut down the scheduler gracefully
    scheduler.shutdown()
    print("Scheduler shut down.")

c) AsyncIOScheduler

  • Use Case: Applications built with asyncio (like FastAPI, aiohttp).
  • Behavior: Integrates with the asyncio event loop. It uses async/await for job execution.
  • Example:
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def my_async_task():
    print("This is an async task!", asyncio.get_event_loop().time())
async def main():
    scheduler = AsyncIOScheduler()
    # The 'async_' prefix is used for async jobs
    scheduler.add_job(my_async_task, 'interval', seconds=2, id='async_job_1')
    scheduler.start()
    print("AsyncIO Scheduler started. Press Ctrl+C to exit.")
    try:
        # Keep the event loop running
        await asyncio.sleep(3600)  # Run for an hour
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
        print("Scheduler shut down.")
if __name__ == '__main__':
    asyncio.run(main())

Job Persistence and Job Stores

If your application restarts, all jobs scheduled in memory will be lost. To persist jobs, you need a JobStore.

Let's use SQLAlchemyJobStore with an SQLite database.

Install the required dependency:

pip install apscheduler[sqlalchemy]

Create a script that uses a persistent job store:

import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
# 1. Configure the job store
# The URL for the SQLite database
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 2. Configure the executor
executors = {
    'default': ThreadPoolExecutor(20) # 20 worker threads
}
# 3. Configure the job defaults
job_defaults = {
    'coalesce': False, # Don't run missed jobs all at once
    'max_instances': 3 # Don't run a job more than 3 times concurrently
}
# 4. Create the scheduler with the configuration
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
def persistent_job():
    print("This job is persistent and will survive a restart!", datetime.datetime.now())
# Add a job that runs every 10 seconds
scheduler.add_job(persistent_job, 'interval', seconds=10, id='persistent_job_1')
scheduler.start()
print("Persistent scheduler started. Check for 'jobs.sqlite' file.")
try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()
    print("Persistent scheduler shut down.")

How it works:

  • The first time you run this script, it will create a file named jobs.sqlite.
  • If you stop the script and restart it, the job persistent_job_1 will be loaded from the database and continue to run every 10 seconds.

Managing Jobs

You can easily add, modify, remove, and pause jobs.

from apscheduler.schedulers.background import BackgroundScheduler
import time
scheduler = BackgroundScheduler()
scheduler.start()
def job_one():
    print("Job One is running.")
def job_two():
    print("Job Two is running.")
# Add some jobs
scheduler.add_job(job_one, 'interval', seconds=5, id='job1')
scheduler.add_job(job_two, 'interval', seconds=10, id='job2')
print("Jobs added. Waiting for 20 seconds...")
time.sleep(20)
# Pause a job
print("\nPausing job1...")
scheduler.pause_job('job1')
time.sleep(15)
# Resume a job
print("\nResuming job1...")
scheduler.resume_job('job1')
time.sleep(15)
# Modify a job (e.g., change its interval)
print("\nModifying job2 to run every 5 seconds...")
scheduler.modify_job('job2', seconds=5)
time.sleep(15)
# Remove a job
print("\nRemoving job2...")
scheduler.remove_job('job2')
time.sleep(10)
# Shut down the scheduler
print("\nShutting down scheduler...")
scheduler.shutdown()

Summary: When to Use APScheduler

Use Case Recommended Scheduler Job Store
Simple, one-off script BlockingScheduler In-memory
Web app (Flask/Django) BackgroundScheduler In-memory or Database
Async web app (FastAPI) AsyncIOScheduler In-memory or Database
Long-running daemon/service BackgroundScheduler Database (for persistence)
Complex, recurring batch jobs BackgroundScheduler Database (for persistence)
分享:
扫描分享到社交APP
上一篇
下一篇