杰瑞科技汇

Python pool回调如何正确实现?

Of course! Let's break down Python's multiprocessing.Pool callbacks. This is a powerful feature for processing results as they become available, which is crucial for building responsive and efficient applications.

Python pool回调如何正确实现?-图1
(图片来源网络,侵删)

The Core Idea: Why Use a Callback?

Imagine you have a list of tasks to run in parallel. A standard Pool.map or Pool.apply_async call will wait for all tasks to complete before returning a list of results. This can be inefficient if:

  1. Tasks take different amounts of time: You have to wait for the slowest task to finish to get any results.
  2. You want to process results immediately: As soon as one task finishes, you want to do something with its result (e.g., save it to a database, write it to a file, send a notification).

A callback is a function that you provide to apply_async. This function is automatically executed by the main process as soon as the corresponding worker task completes and its result is ready.


The Key Method: apply_async

To use a callback, you must use pool.apply_async(). The synchronous pool.apply() and pool.map() do not support callbacks.

The signature is: pool.apply_async(func, args=(), kwds={}, callback=None)

Python pool回调如何正确实现?-图2
(图片来源网络,侵删)
  • func: The function to be executed in a worker process.
  • args: A tuple of arguments for func.
  • kwds: A dictionary of keyword arguments for func.
  • callback: This is the key. It's a function that will be called with the result of func as its only argument.

Simple Example: Processing a List of URLs

Let's create a classic example where we fetch the content length from multiple URLs. We want to print the length of each page as soon as it's fetched, without waiting for the others.

Setup: A Helper Function for the Worker

This is the function that will run in each separate process.

import time
import requests
from multiprocessing import Pool
# This function will be executed in a worker process
def fetch_url_length(url):
    """Fetches a URL and returns its content length."""
    try:
        print(f"Worker: Starting to fetch {url}...")
        response = requests.get(url, timeout=5)
        response.raise_for_status()  # Raise an exception for bad status codes
        length = len(response.text)
        print(f"Worker: Finished fetching {url}. Length: {length}")
        return length
    except requests.RequestException as e:
        print(f"Worker: Error fetching {url}: {e}")
        return -1 # Return an error code

The Callback Function

This function runs in the main process and receives the result from the worker.

# This function will be called in the main process when a worker is done
def print_result(result):
    """Callback function to process the result."""
    print(f"Main Process: Received result -> {result}")
    # You could do anything here: save to a database, write to a file, etc.

Putting It All Together

Now, let's use Pool to run the tasks with the callback.

Python pool回调如何正确实现?-图3
(图片来源网络,侵删)
if __name__ == "__main__":
    urls = [
        "https://www.python.org",
        "https://www.github.com",
        "https://www.wikipedia.org",
        "https://www.google.com", # This one might be fast
        "https://this-url-does-not-exist-12345.com" # This one will fail
    ]
    print("Main Process: Starting the pool...")
    # Create a pool of 4 worker processes
    with Pool(processes=4) as pool:
        # Submit tasks to the pool
        async_results = []
        for url in urls:
            # apply_async returns an AsyncResult object immediately
            async_result = pool.apply_async(fetch_url_length, args=(url,), callback=print_result)
            async_results.append(async_result)
        print("Main Process: All tasks submitted. Waiting for workers to complete...")
        # The main process can now do other things if needed.
        # For this example, we'll just wait for all AsyncResult objects to be ready.
        # This ensures the script doesn't exit before the callbacks are called.
        for result in async_results:
            result.wait() # Wait for the specific task to finish
    print("Main Process: All work is done.")

Expected Output (Order May Vary)

Notice how the output from print_result can appear at any time, interleaved with the worker's status messages. This is the key benefit.

Main Process: Starting the pool...
Main Process: All tasks submitted. Waiting for workers to complete...
Worker: Starting to fetch https://www.python.org...
Worker: Starting to fetch https://www.github.com...
Worker: Starting to fetch https://www.wikipedia.org...
Worker: Starting to fetch https://www.google.com...
Worker: Finished fetching https://www.google.com. Length: 13607
Main Process: Received result -> 13607
Worker: Finished fetching https://www.python.org. Length: 48885
Main Process: Received result -> 48885
Worker: Finished fetching https://www.wikipedia.org. Length: 833421
Main Process: Received result -> 833421
Worker: Finished fetching https://www.github.com. Length: 24301
Main Process: Received result -> 24301
Worker: Starting to fetch https://this-url-does-not-exist-12345.com...
Worker: Error fetching https://this-url-does-not-exist-12345.com: 404 Client Error: Not Found for url: https://this-url-does-not-exist-12345.com
Main Process: Received result -> -1
Main Process: All work is done.

Important Concepts and Caveats

Callbacks Run in the Main Process

The callback function is executed by the process that created the Pool (the main process). This is important because:

  • It's safe to interact with objects that are not process-safe (like a database connection, a file handle, or a GUI object) from within the callback, as long as it's running in the main process.
  • It means the main process is blocked while the callback is running. If your callback is very slow, it can become a bottleneck.

The Callback Argument

The callback function must accept exactly one argument: the result of the worker function.

  • If the worker function returns a single value (e.g., length), the callback gets that value.
  • If the worker function returns a tuple (a, b), the callback gets the tuple (a, b).
  • If the worker function returns a dictionary {'key': 'value'}, the callback gets that dictionary.
def worker_func(x):
    return x * 2, x + 10
def my_callback(result_tuple):
    print(f"Callback received: {result_tuple}")
    # result_tuple will be (2, 11) if x was 1
pool.apply_async(worker_func, args=(1,), callback=my_callback)

Handling Exceptions in the Worker

By default, if a worker function raises an unhandled exception, the apply_async call will raise a multiprocessing.pool.RemoteError when you try to get its result (e.g., result.get()). The callback will NOT be called in this case.

To handle exceptions, you have two main options:

Option A: Handle the exception inside the worker function. This is the simplest approach. The worker can return a special value (like None, -1, or an error message) that the callback can then check.

def safe_worker(x):
    try:
        return 10 / x
    except ZeroDivisionError:
        return "Error: Division by zero"
def callback(result):
    print(f"Callback got: {result}")
# This will call the callback with the string "Error..."
pool.apply_async(safe_worker, args=(0,), callback=callback)

Option B: Use the error_callback (Python 3.7+). apply_async also has an error_callback argument. This function is called if the worker raises an exception. It receives the exception object as an argument.

def handle_error(exc):
    print(f"An error occurred in a worker: {type(exc).__name__}: {exc}")
def faulty_worker(x):
    # This will raise a ZeroDivisionError
    return 10 / x
# The main callback will not be called.
# The error_callback will be called instead.
pool.apply_async(
    faulty_worker,
    args=(0,),
    callback=lambda r: print(f"Success: {r}"), # This won't run
    error_callback=handle_error
)

When to Use map vs. apply_async with Callback

Scenario Best Choice Why?
You need to process a list of inputs and get a list of results in the same order. pool.map(func, iterable) Simple, synchronous, and guarantees order.
You have a large number of tasks and want to start processing results as soon as they are ready, regardless of order. pool.apply_async with callback
分享:
扫描分享到社交APP
上一篇
下一篇