杰瑞科技汇

APScheduler Python如何使用?

Of course! Here is a comprehensive guide to using APScheduler (Advanced Python Scheduler) in Python.

APScheduler Python如何使用?-图1
(图片来源网络,侵删)

What is APScheduler?

APScheduler is a powerful, in-process task scheduler that lets you schedule Python functions (or any other callable) to be executed at certain times or intervals. It's a library, not a standalone service, which means it runs within your Python application.

Key features:

  • In-Process: No need for external dependencies like a database or a separate server.
  • Multiple Backends: Supports different storage backends for job stores (e.g., Memory, SQLAlchemy, Redis).
  • Multiple Trigger Types: Schedule jobs based on dates, intervals (cron-like), or one-off events.
  • Easy to Use: Simple API for getting started, with advanced options for complex use cases.

Installation

First, you need to install the library. It's highly recommended to install the specific backend you plan to use.

# Basic installation (uses the default MemoryJobStore)
pip install apscheduler
# Recommended installations for persistent job stores
pip install apscheduler[sqlalchemy]  # For PostgreSQL, MySQL, SQLite
pip install apscheduler[redis]       # For Redis

Core Concepts

Before diving into code, it's helpful to understand the three main components of APScheduler:

APScheduler Python如何使用?-图2
(图片来源网络,侵删)
  1. Scheduler (apscheduler.schedulers): The heart of the library. It manages the execution of jobs. You choose a scheduler based on your application's needs (e.g., BackgroundScheduler for web apps).
  2. Job Store (apscheduler.jobstores): Where the scheduler stores the jobs and their states. The choice of job store determines if your jobs survive application restarts.
    • MemoryJobStore: Fast, but all jobs are lost when the program stops. Good for simple scripts.
    • SQLAlchemyJobStore: Stores jobs in a database (SQLite, PostgreSQL, etc.). Jobs persist between restarts.
    • RedisJobStore: Stores jobs in a Redis instance. Good for distributed systems.
  3. Executor (apscheduler.executors): How the jobs are actually run. The default is ThreadPoolExecutor, which runs jobs in separate threads. This is crucial for I/O-bound tasks to avoid blocking your main application.

Quick Start: The Basics

Let's start with the simplest example using the BackgroundScheduler and the MemoryJobStore. This is perfect for a long-running script or a web application where you don't want the scheduler to block the main execution flow.

Example 1: Simple Interval Scheduling

This script will print "Hello, World!" to the console every 5 seconds.

import time
from apscheduler.schedulers.background import BackgroundScheduler
def my_job():
    print("Hello, World! The time is", time.strftime("%Y-%m-%d %H:%M:%S"))
# 1. Create a scheduler
scheduler = BackgroundScheduler()
# 2. Add a job to the scheduler
# - `my_job`: The function to execute
# - 'interval': The trigger type
# - seconds=5: The interval
scheduler.add_job(my_job, 'interval', seconds=5)
# 3. Start the scheduler
scheduler.start()
print("Scheduler started. Press Ctrl+C to exit.")
# Keep the main thread alive
try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    # Shut down the scheduler gracefully
    scheduler.shutdown()
    print("Scheduler shut down.")

How it works:

  1. We import the BackgroundScheduler.
  2. We define the function my_job that we want to run.
  3. We create an instance of BackgroundScheduler.
  4. We use add_job to schedule our function. We specify the trigger type as 'interval' and the interval as 5 seconds.
  5. scheduler.start() begins the scheduler in a background thread.
  6. The while True loop is just to keep the main script alive so the background scheduler has time to work. In a real web app (like Flask or Django), the main server process would keep it alive.

Scheduling with Different Triggers

APScheduler supports three main trigger types.

APScheduler Python如何使用?-图3
(图片来源网络,侵删)

date Trigger (One-Time Execution)

Schedule a job to run once at a specific date and time.

from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
def run_once():
    print("This job will run only once!")
scheduler = BackgroundScheduler()
# Schedule the job to run 10 seconds from now
run_time = datetime.now() + timedelta(seconds=10)
scheduler.add_job(run_once, 'date', run_date=run_time)
scheduler.start()
print(f"Scheduled job to run at {run_time.strftime('%Y-%m-%d %H:%M:%S')}")
# Keep alive
try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()

interval Trigger (Repetitive Execution)

Schedule a job to run at fixed intervals.

from apscheduler.schedulers.background import BackgroundScheduler
def my_interval_job():
    print("Interval job is running!")
scheduler = BackgroundScheduler()
# Run every 3 seconds
scheduler.add_job(my_interval_job, 'interval', seconds=3)
# Run every minute
# scheduler.add_job(my_interval_job, 'interval', minutes=1)
scheduler.start()
print("Scheduler started with an interval job.")
# Keep alive
try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()

cron Trigger (Powerful, Cron-like Scheduling)

This is the most powerful trigger, allowing you to define complex schedules just like a Unix cron job.

Field Allowed Values Wildcard
year 4-digit year
month 1-12
day 1-31
week 1-53 or SUN-SAT
day_of_week 0-6 or SUN-SAT (0=Mon)
hour 0-23
minute 0-59
second 0-59

Examples:

from apscheduler.schedulers.background import BackgroundScheduler
def cron_job():
    print("Cron job is running!")
scheduler = BackgroundScheduler()
# Run every minute at the 30th second (e.g., 10:00:30, 10:01:30)
scheduler.add_job(cron_job, 'cron', second=30)
# Run every Monday at 8:30 AM
# scheduler.add_job(cron_job, 'cron', day_of_week='MON', hour=8, minute=30)
# Run on the first day of every month at midnight
# scheduler.add_job(cron_job, 'cron', day=1, hour=0, minute=0)
# Run every 5 minutes (on the 0, 5, 10, 15... minute marks)
# scheduler.add_job(cron_job, 'cron', minute='*/5')
scheduler.start()
print("Scheduler started with a cron job.")
# Keep alive
try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()

Passing Arguments to Jobs

You can pass arguments and keyword arguments to your job functions using the args and kwargs parameters.

from apscheduler.schedulers.background import BackgroundScheduler
def greet(name, greeting="Hello"):
    print(f"{greeting}, {name}!")
scheduler = BackgroundScheduler()
# Pass positional arguments
scheduler.add_job(greet, 'interval', seconds=3, args=['Alice'])
# Pass keyword arguments
scheduler.add_job(greet, 'interval', seconds=5, kwargs={'name': 'Bob', 'greeting': 'Hi'})
scheduler.start()
print("Scheduler started with jobs that take arguments.")
# Keep alive
try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()

Job Persistence with SQLAlchemy

For any serious application, you want your jobs to persist even if the application restarts. The SQLAlchemyJobStore is perfect for this.

Step 1: Create a database file (e.g., jobs.sqlite). APScheduler will create the necessary tables for you.

Step 2: Modify the code to use the persistent job store.

import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
# 1. Configure the job store
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 2. Configure the executor
executors = {
    'default': ThreadPoolExecutor(20) # 20 worker threads
}
# 3. Configure the job defaults
job_defaults = {
    'coalesce': False, # Don't run missed jobs all at once
    'max_instances': 3 # Don't run more than 3 instances of the same job
}
# 4. Create the scheduler with the configuration
scheduler = BackgroundScheduler(
    jobstores=jobstores,
    executors=executors,
    job_defaults=job_defaults
)
def my_persistent_job():
    print("This is a persistent job! Time:", time.strftime("%Y-%m-%d %H:%M:%S"))
# Add a job that runs every 10 seconds
scheduler.add_job(my_persistent_job, 'interval', seconds=10, id='my_persistent_job_id')
# Start the scheduler
scheduler.start()
print("Persistent scheduler started. The job will survive a restart.")
print("Try stopping the script (Ctrl+C) and running it again. The job will still be there.")
# Keep alive
try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()
    print("Persistent scheduler shut down.")

When you run this, a file named jobs.sqlite will be created. If you stop the script and run it again, the job will be picked up from the database and continue running.


Common Operations

You can manage jobs after they've been added.

# ... (scheduler setup from a previous example) ...
# Add a job and keep a reference to it
job = scheduler.add_job(my_job, 'interval', seconds=10)
# Get a job by its ID
job_to_modify = scheduler.get_job('my_persistent_job_id')
# Pause a job
scheduler.pause_job('my_persistent_job_id')
# Resume a paused job
scheduler.resume_job('my_persistent_job_id')
# Modify a job's next run time (reschedule it)
scheduler.reschedule_job('my_persistent_job_id', trigger='date', run_date=datetime.now() + timedelta(minutes=5))
# Remove a job
scheduler.remove_job('my_persistent_job_id')
# Get all jobs
all_jobs = scheduler.get_jobs()
print(f"All jobs: {all_jobs}")

Best Practices and Common Pitfalls

  1. Choose the Right Scheduler:

    • BackgroundScheduler: Use for web servers, long-running services, or any application where you don't want the scheduler to block the main thread.
    • BlockingScheduler: Use for simple scripts where the scheduler is the main application. It will block execution until you shut it down.
  2. Use a Persistent Job Store for Production: Never rely on the MemoryJobStore for anything critical. If your server restarts, all scheduled tasks will be lost. Use SQLAlchemyJobStore or RedisJobStore.

  3. Be Careful with max_instances: If a job takes longer to run than its interval, you might have multiple instances running at the same time. Use max_instances to limit this and prevent resource exhaustion.

  4. Handle Exceptions: Jobs run in threads. If a job raises an unhandled exception, it will be logged by the scheduler's default logger, but it won't crash your main application. However, the failed job instance will not complete. Ensure your jobs are robust or wrap them in a try...except block.

  5. Shutdown Gracefully: Always call scheduler.shutdown() when your application is closing. This allows currently executing jobs to finish and cleans up resources. You can use wait=True to block until all jobs are done.

分享:
扫描分享到社交APP
上一篇
下一篇