杰瑞科技汇

Python TimerManager如何高效管理定时任务?

Of course! Let's build a comprehensive and reusable TimerManager in Python.

Python TimerManager如何高效管理定时任务?-图1
(图片来源网络,侵删)

A simple Timer is great, but a TimerManager is essential for handling multiple, concurrent timers, starting and stopping them by name, and getting their status. This is a common requirement in applications like:

  • Automated trading bots.
  • Task schedulers.
  • Game development (e.g., cooldowns, timed events).
  • Any application that needs to track multiple independent durations.

We will build this using Python's built-in threading module, which is perfect for this kind of concurrent, non-blocking task.


The Core Concepts

Before we code, let's outline the key features our TimerManager needs:

  1. Store Timers: It needs a way to keep track of multiple timers. A dictionary is the perfect data structure for this, using a unique name (string) as the key and the timer object itself as the value.
  2. Start a Timer: A function to start a new timer with a given name and duration. It should handle cases where a timer with that name already exists (e.g., stop the old one first).
  3. Stop a Timer: A function to stop a timer by its name.
  4. Check Status: A function to see if a specific timer is currently running.
  5. Get Elapsed Time: A function to get how much time has passed for a specific timer.
  6. Thread-Safety: Since timers run in separate threads, we must protect our shared dictionary (the data store) from race conditions using a threading.Lock.

The TimerManager Class Implementation

Here is the complete, well-commented code for our TimerManager.

Python TimerManager如何高效管理定时任务?-图2
(图片来源网络,侵删)
import threading
import time
from typing import Dict, Optional
class TimerManager:
    """
    Manages multiple named timers running in separate threads.
    """
    def __init__(self):
        """
        Initializes the TimerManager.
        """
        # Stores the timer objects: {name: threading.Timer}
        self._timers: Dict[str, threading.Timer] = {}
        # A lock to ensure thread-safe operations on the _timers dictionary
        self._lock = threading.Lock()
    def start(self, name: str, interval: float, callback: callable) -> bool:
        """
        Starts a new timer with the given name.
        If a timer with the same name already exists, it is stopped and replaced.
        Args:
            name (str): A unique name for the timer.
            interval (float): The time interval in seconds before the callback is invoked.
            callback (callable): The function to call when the timer finishes.
        Returns:
            bool: True if the timer was started successfully, False otherwise.
        """
        with self._lock:
            # Stop any existing timer with the same name
            self.stop(name)
            # Create a new timer
            # The timer's target is a wrapper function that cleans up after itself
            timer = threading.Timer(
                interval,
                self._timer_finished_callback,
                args=[name, callback]
            )
            timer.daemon = True  # Allows the main program to exit even if the timer is running
            self._timers[name] = timer
            timer.start()
            print(f"Timer '{name}' started for {interval:.2f} seconds.")
            return True
    def stop(self, name: str) -> bool:
        """
        Stops a timer by its name.
        Args:
            name (str): The name of the timer to stop.
        Returns:
            bool: True if the timer was found and stopped, False otherwise.
        """
        with self._lock:
            timer = self._timers.pop(name, None)
            if timer:
                timer.cancel()
                # Calculate and print the elapsed time
                elapsed = time.time() - timer.timer_started_time # We'll add this attribute
                print(f"Timer '{name}' stopped. Elapsed time: {elapsed:.2f} seconds.")
                return True
            print(f"Timer '{name}' not found or already stopped.")
            return False
    def is_running(self, name: str) -> bool:
        """
        Checks if a timer with the given name is currently running.
        Args:
            name (str): The name of the timer to check.
        Returns:
            bool: True if the timer is running, False otherwise.
        """
        with self._lock:
            timer = self._timers.get(name)
            # A timer is considered running if it exists and has not been cancelled
            return timer is not None and timer.is_alive()
    def get_elapsed_time(self, name: str) -> Optional[float]:
        """
        Gets the elapsed time for a running timer.
        Args:
            name (str): The name of the timer.
        Returns:
            float: The elapsed time in seconds, or None if the timer is not running.
        """
        with self._lock:
            timer = self._timers.get(name)
            if timer and timer.is_alive():
                # time.time() - the time when the timer's start method was called
                return time.time() - timer.timer_started_time
            return None
    def _timer_finished_callback(self, name: str, callback: callable):
        """
        Internal callback executed when a timer finishes.
        It removes the timer from the dictionary and then calls the user's callback.
        """
        # We need to reacquire the lock to safely modify the dictionary
        # from within the timer's thread.
        with self._lock:
            # Remove the timer from the active list
            self._timers.pop(name, None)
        print(f"Timer '{name}' finished.")
        # Execute the user-provided callback
        callback()
    def list_active_timers(self) -> list:
        """Returns a list of names of all currently active timers."""
        with self._lock:
            return list(self._timers.keys())
# Monkey patch to add start_time attribute to Timer objects
# This is a bit of a hack but makes it easy to track elapsed time.
_original_timer_init = threading.Timer.__init__
def _patched_timer_init(self, *args, **kwargs):
    _original_timer_init(self, *args, **kwargs)
    self.timer_started_time = time.time()
threading.Timer.__init__ = _patched_timer_init

How to Use the TimerManager

Here is a complete example demonstrating how to use the TimerManager.

# --- Callback functions ---
def my_task_callback():
    """This function is called when 'my_task' timer finishes."""
    print("-> Callback for 'my_task': Performing a long-running task now!")
def another_callback():
    """This function is called when 'another_timer' finishes."""
    print("-> Callback for 'another_timer': This is a different task.")
# --- Main execution ---
if __name__ == "__main__":
    print("Starting TimerManager demonstration...")
    manager = TimerManager()
    # Start a few timers
    manager.start("my_task", 5.0, my_task_callback)
    manager.start("another_timer", 3.0, another_callback)
    manager.start("short_timer", 1.5, lambda: print("-> Callback for 'short_timer': Quick task done!"))
    print("\nActive timers:", manager.list_active_timers())
    # Simulate some work and check status
    time.sleep(2)
    print("\n--- After 2 seconds ---")
    print("Is 'my_task' running?", manager.is_running("my_task"))
    print("Is 'another_timer' running?", manager.is_running("another_timer"))
    print("Elapsed time for 'my_task': {:.2f}s".format(manager.get_elapsed_time("my_task")))
    print("Elapsed time for 'another_timer': {:.2f}s".format(manager.get_elapsed_time("another_timer")))
    # Stop one of the timers early
    time.sleep(1)
    print("\n--- Stopping 'my_task' early ---")
    manager.stop("my_task")
    print("Is 'my_task' running after stopping?", manager.is_running("my_task"))
    # Wait for the remaining timers to finish
    print("\nWaiting for remaining timers to finish...")
    time.sleep(5)
    print("\nFinal active timers:", manager.list_active_timers())
    print("Demonstration finished.")

Expected Output:

Starting TimerManager demonstration...
Timer 'my_task' started for 5.00 seconds.
Timer 'another_timer' started for 3.00 seconds.
Timer 'short_timer' started for 1.50 seconds.
Active timers: ['my_task', 'another_timer', 'short_timer']
--- After 2 seconds ---
Is 'my_task' running? True
Is 'another_timer' running? True
Elapsed time for 'my_task': 2.00s
Elapsed time for 'another_timer': 2.00s
Timer 'short_timer' finished.
-> Callback for 'short_timer': Quick task done!
--- Stopping 'my_task' early ---
Timer 'my_task' stopped. Elapsed time: 3.00 seconds.
Is 'my_task' running after stopping? False
Waiting for remaining timers to finish...
Timer 'another_timer' finished.
-> Callback for 'another_timer': This is a different task.
Final active timers: []
Demonstration finished.

Enhancements and Alternatives

This TimerManager is robust and great for many use cases. Here are some ways you could extend it or consider other approaches.

Enhancement 1: Reusable Timers

The current implementation creates a new threading.Timer object every time start() is called. For very frequent, short-lived timers, this could be inefficient. You could create a ThreadPoolExecutor that holds a pool of worker threads and dispatches timer tasks to them, reusing threads instead of creating new ones.

Enhancement 2: More Callback Information

The callback function currently only receives no arguments. You could modify _timer_finished_callback to pass more information to the user's callback, like the timer's name, duration, or elapsed time.

Python TimerManager如何高效管理定时任务?-图3
(图片来源网络,侵删)
# In TimerManager.__init__
self._timer_info = {} # To store interval for each timer
# In start()
self._timer_info[name] = interval
# In _timer_finished_callback()
def _timer_finished_callback(self, name: str, callback: callable):
    with self._lock:
        self._timers.pop(name, None)
        interval = self._timer_info.pop(name, 0) # Get and remove interval info
    print(f"Timer '{name}' (interval: {interval}s) finished.")
    callback(name, interval) # Pass info to callback

Alternative: The schedule Library

For many scheduling needs, a third-party library like schedule is often easier and more powerful. It's designed for running jobs periodically or at specific times.

pip install schedule

Example with schedule:

import schedule
import time
def job():
    print("I'm working...")
# Schedule a job to run every 10 seconds
schedule.every(10).seconds.do(job)
# Schedule a job to run at a specific time
# schedule.every().day.at("10:30").do(job)
while True:
    schedule.run_pending()
    time.sleep(1)

The schedule library is excellent for recurring tasks but is less focused on the one-shot, named timer management we built our TimerManager for. Our custom class gives you fine-grained control over individual, named timers.

分享:
扫描分享到社交APP
上一篇
下一篇