杰瑞科技汇

Python多进程如何高效实现并行任务?

Of course! Let's dive deep into Python's multiprocessing module.

Python多进程如何高效实现并行任务?-图1
(图片来源网络,侵删)

The Problem: The Global Interpreter Lock (GIL)

In Python, the Global Interpreter Lock (GIL) is a mutex that protects access to Python objects, preventing multiple native threads from executing Python bytecode at the same time within a single process.

This means that even if you have a multi-core processor, the standard threading module in Python cannot achieve true parallelism for CPU-bound tasks. It's great for I/O-bound tasks (like waiting for a network request or a file read), but for heavy computations, you're limited to one core.

The Solution: The multiprocessing module.

The multiprocessing module sidesteps the GIL by creating separate processes, each with its own Python interpreter and memory space. This allows your program to fully utilize multiple CPU cores for parallel processing.

Python多进程如何高效实现并行任务?-图2
(图片来源网络,侵删)

Key Concepts in multiprocessing

  1. Process: The core of the module. It represents an activity that runs in a separate process.
  2. Pool: A high-level interface for distributing a function across multiple input values, dividing the work among a pool of worker processes. This is often the easiest and most efficient way to parallelize tasks.
  3. Inter-Process Communication (IPC): Since processes don't share memory, they need ways to communicate. The main methods are:
    • Queues: A pipe-like structure that allows processes to exchange objects.
    • Pipes: A two-way communication channel.
    • Manager: Creates server-like processes that manage shared objects (lists, dicts, etc.) which can be accessed by other processes. This is flexible but can be slower.
  4. multiprocessing.Queue: A first-in, first-out (FIFO) data structure that is safe for use by multiple processes. It's the most common way to collect results from worker processes.

The Process Class (The Low-Level Approach)

This is the most direct way to create parallelism. You define a target function and create a Process object to run it.

Example: Running Tasks in Parallel

Let's create a simple CPU-bound task (calculating a factorial) and run it in parallel.

import multiprocessing
import time
import os
# A CPU-bound task
def calculate_factorial(n):
    """Calculates the factorial of n."""
    print(f"Process {os.getpid()}: Calculating factorial for {n}")
    result = 1
    for i in range(1, n + 1):
        result *= i
    # Simulate some work
    time.sleep(1)
    return result
if __name__ == "__main__":
    # List of numbers to calculate factorial for
    numbers = [5, 10, 15, 20]
    print(f"Main process ID: {os.getpid()}")
    # Create a list to hold the process objects
    processes = []
    # Create and start a process for each number
    for number in numbers:
        p = multiprocessing.Process(target=calculate_factorial, args=(number,))
        processes.append(p)
        p.start()
        print(f"Started process {p.pid}")
    # Wait for all processes to complete
    for p in processes:
        p.join()
    print("All processes have completed.")

How to Run It:

python your_script_name.py

Output Explanation: You will see that the "Calculating factorial" messages for different numbers appear almost simultaneously (or very close together), demonstrating that they are running in parallel on different CPU cores. The join() call ensures the main script waits for all these parallel tasks to finish before printing "All processes have completed."

Python多进程如何高效实现并行任务?-图3
(图片来源网络,侵删)

The Pool Class (The High-Level, Recommended Approach)

Using Process directly can be cumbersome. The Pool class abstracts away the management of a group of worker processes, making it much easier to parallelize a function across a list of inputs.

Example: Using map for Parallel Execution

The pool.map() function is similar to the built-in map(), but it distributes the work across the pool's processes.

import multiprocessing
import time
# The same CPU-bound task
def calculate_factorial(n):
    """Calculates the factorial of n."""
    print(f"Worker process calculating factorial for {n}")
    result = 1
    for i in range(1, n + 1):
        result *= i
    time.sleep(1)
    return result
if __name__ == "__main__":
    numbers = [5, 10, 15, 20]
    # Create a pool of worker processes.
    # By default, it uses the number of available CPU cores.
    with multiprocessing.Pool() as pool:
        print("Starting parallel calculations...")
        # pool.map blocks until all results are ready
        results = pool.map(calculate_factorial, numbers)
    print("\n--- Results ---")
    for number, result in zip(numbers, results):
        print(f"Factorial of {number} is {result}")
    print("All calculations done.")

How it Works:

  1. with multiprocessing.Pool() as pool: creates a pool of worker processes.
  2. pool.map(calculate_factorial, numbers) takes the function calculate_factorial and the list numbers.
  3. It automatically divides the numbers list and distributes the chunks to the available worker processes.
  4. The main process waits (blocks) until all worker processes have finished and returned their results.
  5. The results are collected in the same order as the input numbers.

Example: Using apply_async for Non-Blocking Results

If you don't want the main process to block, you can use apply_async(). This returns an AsyncResult object, which you can use to check for completion and retrieve the result later.

import multiprocessing
import time
def calculate_factorial(n):
    """Calculates the factorial of n."""
    print(f"Worker process calculating factorial for {n}")
    result = 1
    for i in range(1, n + 1):
        result *= i
    time.sleep(1)
    return result
if __name__ == "__main__":
    numbers = [5, 10, 15, 20]
    with multiprocessing.Pool() as pool:
        print("Starting non-blocking parallel calculations...")
        # Submit all tasks to the pool
        # apply_async returns an AsyncResult object immediately
        async_results = [pool.apply_async(calculate_factorial, (num,)) for num in numbers]
        print("All tasks submitted. Main process can do other work here...")
        # In a real app, you might do other things here
        # Now, collect the results
        # Calling .get() on an AsyncResult object will block until that specific task is done
        results = []
        for async_result in async_results:
            try:
                result = async_result.get() # This will wait for the result
                results.append(result)
            except Exception as e:
                print(f"An error occurred: {e}")
    print("\n--- Results ---")
    for number, result in zip(numbers, results):
        print(f"Factorial of {number} is {result}")
    print("All calculations done.")

Inter-Process Communication (IPC) with Queue

What if the worker processes need to send data back to the main process as they work? A multiprocessing.Queue is perfect for this.

Example: Collecting Results with a Queue

import multiprocessing
import time
import os
# The worker function puts its result into the queue
def worker(queue, number):
    """A worker that calculates and puts the result in a queue."""
    pid = os.getpid()
    print(f"Worker {pid}: Starting for number {number}")
    time.sleep(2) # Simulate a long task
    result = number * number
    print(f"Worker {pid}: Finished, putting result {result} in queue")
    queue.put(result)
if __name__ == "__main__":
    numbers = [1, 2, 3, 4]
    result_queue = multiprocessing.Queue()
    processes = []
    # Create and start a process for each number
    for number in numbers:
        p = multiprocessing.Process(target=worker, args=(result_queue, number))
        processes.append(p)
        p.start()
    # Wait for all processes to finish
    for p in processes:
        p.join()
    print("\nAll worker processes have finished.")
    # Now, get all results from the queue
    # We know how many results to expect
    print("Collecting results from the queue...")
    while not result_queue.empty():
        print(f"Got result from queue: {result_queue.get()}")

Key Points:

  • The Queue is created in the main process and passed as an argument to the worker processes.
  • Workers use queue.put() to add items.
  • The main process uses queue.get() to retrieve items. get() is a blocking call.
  • The while not result_queue.empty(): loop ensures we collect all results. A more robust way would be to have the workers put a special "sentinel" value into the queue to signal completion.

Important: The if __name__ == "__main__": Guard

You must protect the entry point of your script with this if statement when using multiprocessing on some platforms (like Windows and macOS).

Why? When you start a new process, the operating system essentially re-runs your Python script from the top. If you don't have this guard, the new child process will try to create its own child processes, leading to an infinite, exponential explosion of processes and a crash.

The if __name__ == "__main__": block ensures that the code inside it only runs when the script is executed directly, not when it's imported by another script or re-executed by a child process.


Summary: multiprocessing vs. threading

Feature multiprocessing threading
Parallelism True parallelism on multi-core CPUs. Concurrency, not parallelism (due to GIL).
Memory Separate memory space. Requires IPC (Queue, Pipe, Manager) to share data. Shared memory space. Data is easily accessible.
GIL Bypasses the GIL. Subject to the GIL.
Best For CPU-bound tasks (heavy computations, data processing). I/O-bound tasks (network calls, file operations, database queries).
Overhead Higher overhead for creating and managing processes. Lower overhead for creating and managing threads.
分享:
扫描分享到社交APP
上一篇
下一篇