杰瑞科技汇

Python schedule模块如何实现定时任务?

什么是 schedule 模块?

schedule 模块是一个轻量级的、纯 Python 实现的库,旨在让 Python 开发者能够像使用 cron(Linux/Unix 系统下的定时任务工具)一样,以非常直观和声明式的方式安排任务。

Python schedule模块如何实现定时任务?-图1
(图片来源网络,侵删)

它的核心思想是:你定义你的任务,然后告诉 schedule 模块在什么时间运行它。

核心特点:

  • 简单易用:API 设计非常直观,上手快。
  • 声明式:代码可读性高,一眼就能看出任务的执行计划。
  • 纯 Python:无需外部依赖或系统服务,非常适合在脚本、Web 应用(如 Flask/Django 后台任务)或小型服务中使用。
  • 灵活性:支持基于 seconds, minutes, hours, days, weeks 的调度,甚至可以指定具体在哪些天(如周一、周五)或哪些分钟(如每小时的第15、45分钟)执行。

安装

你需要安装这个模块,打开你的终端或命令行,运行:

pip install schedule

基本用法

schedule 的使用流程通常遵循三个简单的步骤:

  1. 定义一个任务函数:创建一个你希望定时执行的普通 Python 函数。
  2. 调度任务:使用 schedule 模块提供的方法(如 every().minutes.do())将你的任务函数与一个时间计划关联起来。
  3. 进入无限循环:启动一个后台循环,不断地检查是否有任务到了执行时间,如果有,就调用它。

示例1:每分钟执行一次任务

import schedule
import time
# 1. 定义任务函数
def job():
    print("I'm working...")
# 2. 调度任务:每分钟执行一次 job 函数
schedule.every().minute.do(job)
# 3. 进入无限循环
while True:
    schedule.run_pending()  # 运行所有已到时间的任务
    time.sleep(1)           # 暂停一秒,避免CPU空转

运行这个脚本,你会看到每分钟打印一次 "I'm working..."。


更丰富的调度选项

schedule 模块非常灵活,可以满足各种时间调度需求。

示例2:按不同时间单位调度

import schedule
import time
def job():
    print("I'm working...")
# 每10秒执行一次
schedule.every(10).seconds.do(job)
# 每2分钟执行一次
schedule.every(2).minutes.do(job)
# 每小时执行一次
schedule.every().hour.do(job)
# 每天(午夜0点)执行一次
schedule.every().day.at("00:00").do(job)
# 每周一执行一次
schedule.every().monday.do(job)
# 每周三的 13:15 执行一次
schedule.every().wednesday.at("13:15").do(job)
# 每隔5到10分钟执行一次(随机性)
schedule.every(5).to(10).minutes.do(job)
# 每隔2到3小时执行一次(随机性)
schedule.every(2).to(3).hours.do(job)
# 每个工作日(周一到周五)执行一次
schedule.every().monday.do(job)
schedule.every().tuesday.do(job)
schedule.every().wednesday.do(job)
schedule.every().thursday.do(job)
schedule.every().friday.do(job)
while True:
    schedule.run_pending()
    time.sleep(1)

管理和取消任务

当你创建了多个任务后,可能需要动态地管理它们(根据某些条件停止或修改任务)。

获取任务列表

def job():
    print("A job is running.")
# 创建几个任务
schedule.every().minute.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
# 获取所有任务
all_jobs = schedule.get_jobs()
print(f"当前有 {len(all_jobs)} 个任务:")
for job in all_jobs:
    print(f"- {job}")

取消任务

取消任务需要保留任务的“标签”(tag)或直接使用任务对象。

import schedule
def job():
    print("This job will be cancelled soon.")
# 创建一个带标签的任务
job_to_cancel = schedule.every().minute.do(job).tag("my_unique_job")
# 运行一段时间后...
time.sleep(30) # 假设运行了30秒
# 方法1: 通过标签取消
schedule.clear("my_unique_job")
print("任务已通过标签取消。")
# 方法2: 通过任务对象取消
# schedule.cancel_job(job_to_cancel)
# print("任务已通过任务对象取消。")
# 检查剩余任务
print(f"剩余任务数量: {len(schedule.get_jobs())}")

传递参数给任务函数

如果你的任务函数需要参数,可以直接在 .do() 方法中传递。

import schedule
import time
def greet(name):
    print(f"Hello, {name}!")
def greet_with_args(name, food):
    print(f"Hello, {name}! I would like to eat {food}.")
# 传递一个参数
schedule.every(2).seconds.do(greet, name="Alice")
# 传递多个参数
schedule.every(4).seconds.do(greet_with_args, name="Bob", food="pizza")
while True:
    schedule.run_pending()
    time.sleep(1)

错误处理

在长时间运行的调度器中,一个任务的崩溃不应该导致整个调度器停止,对任务函数进行错误处理非常重要。

import schedule
import time
def bad_job():
    print("This job will raise an exception.")
    raise Exception("Oops! Something went wrong.")
def good_job():
    print("This job is running successfully.")
# 为可能出错的任务添加错误处理
def run_job_with_error_handling(job_func):
    try:
        job_func()
    except Exception as e:
        print(f"Job failed with error: {e}")
# 将包装函数传递给调度器
schedule.every().second.do(run_job_with_error_handling, bad_job)
schedule.every(2).seconds.do(run_job_with_error_handling, good_job)
while True:
    schedule.run_pending()
    time.sleep(1)

在独立线程中运行

如果你在主线程中使用 while True 循环,它会阻塞你的程序,无法处理其他事情(Web 请求),一个更好的做法是在一个独立的线程中运行调度器。

import schedule
import time
import threading
def job():
    print("I'm running in a background thread.")
def run_scheduler():
    while True:
        schedule.run_pending()
        time.sleep(1)
# 在一个单独的线程中启动调度器
scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
scheduler_thread.start()
# 主线程可以继续做其他事情
print("Scheduler is running in the background. Main thread is free.")
print("Doing some other work in the main thread...")
# 模拟主线程的工作
for i in range(10):
    time.sleep(2)
    print(f"Main thread is working... Step {i+1}")
print("Main thread finished.")

注意daemon=True 设置了该线程为“守护线程”,这意味着当主线程结束时,这个守护线程也会自动结束,避免程序无法退出。


schedule vs. APScheduler

schedule 非常适合简单、轻量级的任务,如果你的需求更复杂,可以考虑功能更强大的 APScheduler (Advanced Python Scheduler)。

特性 schedule APScheduler
复杂度 简单,适合快速上手和简单脚本 复杂,功能非常强大
调度类型 基于时间间隔(every X seconds)和固定时间点(at "HH:MM" 支持更多:cron 表达式、日期、间隔、固定时间
持久化 不支持,程序重启后所有任务丢失 支持,可以将任务状态保存到数据库
后台执行 需要自己实现(如使用 threading 内置多种执行器(ThreadPoolExecutor, ProcessPoolExecutor
适用场景 脚本、小型应用、简单的后台任务 Web应用、数据处理服务、需要持久化和高可靠性的场景
  • 如果你只是想在脚本里每隔一段时间执行个简单任务,用 schedule 就足够了,它简单、直观。
  • 如果你正在开发一个 Web 应用(如 Flask/Django),需要处理更复杂的定时任务(比如每天凌晨清理数据),并且希望服务器重启后任务能自动恢复,APScheduler 是更好的选择。
分享:
扫描分享到社交APP
上一篇
下一篇