杰瑞科技汇

Python如何实现并行计算?

Of course! Parallelizing in Python is a powerful way to speed up CPU-bound and I/O-bound tasks. The key is to choose the right tool for the right job.

Python如何实现并行计算?-图1
(图片来源网络,侵删)

Here’s a comprehensive guide, starting with the "why" and "when," moving to the main tools, and providing clear code examples.


The Core Concept: GIL (Global Interpreter Lock)

Before you start, you need to understand the Global Interpreter Lock (GIL).

  • What it is: The GIL is a mutex (a lock) that protects access to Python objects, preventing multiple native threads from executing Python bytecode at the same time within a single process.
  • The Consequence: This means that even if you have a multi-core processor, a single Python process can only use one core at a time for CPU-bound tasks. This seems like a major limitation for parallelism.
  • The Nuance: The GIL is released during I/O operations (like reading a file or making a network request). This is why threading is still very effective for I/O-bound tasks.

The Golden Rule of Python Parallelism:

  • For CPU-bound tasks (heavy math, data processing): Use the multiprocessing module. It gets around the GIL by using separate processes, each with its own memory and Python interpreter.
  • For I/O-bound tasks (network calls, disk reads): Use the threading module. While one thread is waiting for I/O, the GIL is released, allowing another thread to run.

The Main Tools for Parallelism

Here are the most common and effective tools, from the standard library to powerful third-party libraries.

Python如何实现并行计算?-图2
(图片来源网络,侵删)
Tool Best For How it Works Pros Cons
multiprocessing CPU-bound tasks Spawns separate Python processes. Each has its own GIL and memory. Bypasses GIL, true parallelism. Higher memory usage, inter-process communication (IPC) can be slow.
threading I/O-bound tasks Spawns multiple threads within a single process. They share memory. Low memory overhead, good for I/O. Blocked by GIL for CPU work. Not true parallelism.
concurrent.futures High-level interface A modern, unified API for both ThreadPoolExecutor and ProcessPoolExecutor. Very easy to use, clean syntax, integrates well with modern Python (async/await). Adds a thin layer of abstraction.
joblib Data Science / ML Optimized for embarrassingly parallel tasks, especially with NumPy arrays. Extremely fast for array-based data, simple API (Parallel, delayed). Primarily a third-party library (pip install joblib).
asyncio High-concurrency I/O Uses a single thread and an event loop to manage many tasks. Very lightweight, can handle thousands of concurrent connections. Complex to learn, not for CPU-bound tasks.

multiprocessing (For CPU-Bound Tasks)

This is the go-to for heavy computations. We'll use the high-level Pool interface, which manages a pool of worker processes for you.

Scenario: Calculate the square of numbers from 0 to 9. This is a simple CPU-bound task.

import multiprocessing
import time
def square(n):
    """A simple CPU-bound function."""
    # Simulate some work
    time.sleep(0.1) 
    result = n * n
    print(f"Process {multiprocessing.current_process().name}: Square of {n} is {result}")
    return result
if __name__ == "__main__":
    numbers = range(10)
    # --- Sequential Execution ---
    start_time = time.time()
    results_seq = [square(n) for n in numbers]
    end_time = time.time()
    print(f"Sequential execution took: {end_time - start_time:.4f} seconds\n")
    # --- Parallel Execution with Pool ---
    # The 'if __name__ == "__main__"' guard is crucial on Windows/macOS
    # to prevent infinite spawning of processes.
    start_time = time.time()
    # Create a pool of worker processes.
    # 'processes' can be set to the number of CPU cores (multiprocessing.cpu_count())
    with multiprocessing.Pool(processes=4) as pool:
        # The 'map' method applies the function to every item in the list
        results_par = pool.map(square, numbers)
    end_time = time.time()
    print(f"Parallel execution took: {end_time - start_time:.4f} seconds")
    print(f"Results are the same: {results_seq == results_par}")

Why if __name__ == "__main__":? On some operating systems (like Windows), when you start a new process, it re-imports the script's main module. Without this guard, the new process would try to create another pool of processes, leading to an infinite loop.


threading (For I/O-Bound Tasks)

This is perfect for tasks that spend most of their time waiting, like fetching data from a website.

Python如何实现并行计算?-图3
(图片来源网络,侵删)

Scenario: Fetch the title of several web pages.

import threading
import time
import requests
def fetch_title(url):
    """An I/O-bound function."""
    try:
        response = requests.get(url, timeout=5)
        response.raise_for_status()  # Raise an exception for bad status codes
        title = response.text.split("<title>")[1].split("</title>")[0]
        print(f"Thread {threading.current_thread().name}: Fetched title from {url}")
        return title
    except Exception as e:
        print(f"Error fetching {url}: {e}")
        return None
if __name__ == "__main__":
    urls = [
        "http://example.com",
        "http://python.org",
        "http://github.com",
        "http://stackoverflow.com",
        "http://google.com"
    ]
    # --- Sequential Execution ---
    start_time = time.time()s_seq = [fetch_title(url) for url in urls]
    end_time = time.time()
    print(f"Sequential execution took: {end_time - start_time:.4f} seconds\n")
    # --- Parallel Execution with Threads ---
    start_time = time.time()
    threads = []
    results_par = [None] * len(urls) # Pre-allocate results list
    for i, url in enumerate(urls):
        # Create a thread for each URL
        thread = threading.Thread(target=fetch_title, args=(url,))
        threads.append(thread)
        # Start the thread
        thread.start()
    # Wait for all threads to complete
    for thread in threads:
        thread.join()
    end_time = time.time()
    print(f"Threaded execution took: {end_time - start_time:.4f} seconds")

Note: A more modern way to manage threads is with concurrent.futures.ThreadPoolExecutor, which has a cleaner API.


concurrent.futures (The High-Level API)

This is the recommended starting point for most use cases. It provides a clean, consistent interface for both threading and multiprocessing.

A) ThreadPoolExecutor (for I/O-bound)

import concurrent.futures
import requests
def fetch_title(url):
    # ... (same function as above) ...
    try:
        response = requests.get(url, timeout=5)
        return response.text.split("<title>")[1].split("</title>")[0]
    except Exception:
        return None
urls = ["http://example.com", "http://python.org", "http://github.com"]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # map() returns results in the same order as the inputs
    results = list(executor.map(fetch_title, urls))
print(results)

B) ProcessPoolExecutor (for CPU-bound)

import concurrent.futures
import time
def square(n):
    # ... (same function as above) ...
    time.sleep(0.1)
    return n * n
numbers = range(10)
# The 'with' statement handles process creation and cleanup
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
    # map() applies the function to every item
    results = list(executor.map(square, numbers))
print(results)

joblib (The Data Scientist's Friend)

If you work with NumPy arrays or Pandas DataFrames, joblib is incredibly fast and easy to use.

Scenario: Apply a function to chunks of a large array.

from joblib import Parallel, delayed
import numpy as np
# A function that processes a chunk of an array
def process_chunk(chunk):
    # Simulate some computation
    return np.sum(chunk ** 2)
if __name__ == "__main__":
    # Create a large array
    large_array = np.random.rand(1_000_000)
    # Define the number of chunks (and thus, jobs)
    n_chunks = 4
    # Split the array into chunks
    chunks = np.array_split(large_array, n_chunks)
    # Run in parallel
    # n_jobs=-1 uses all available CPU cores
    results = Parallel(n_jobs=-1)(delayed(process_chunk)(chunk) for chunk in chunks)
    # The final result is the sum of the results from each chunk
    final_sum = sum(results)
    print(f"Sum of squares computed in parallel: {final_sum}")
    # For verification, compare with sequential sum
    print(f"Sum of squares (sequential): {np.sum(large_array**2)}")

Summary and How to Choose

  1. Is your task waiting for I/O (network, disk)?

    • Use threading or concurrent.futures.ThreadPoolExecutor. The latter is generally preferred for its cleaner API.
  2. Is your task doing heavy computation (CPU-bound)?

    • Use multiprocessing or concurrent.futures.ProcessPoolExecutor. The latter is the modern, high-level way to do it.
  3. Are you a data scientist working with NumPy/Pandas?

    • Strongly consider joblib. It's optimized for this exact use case and is often the fastest and simplest solution.
  4. Do you need to manage thousands of concurrent network connections with high performance?

    • Learn asyncio. It's a different paradigm (event-loop based) but is the right tool for this specific job.

For most general-purpose parallel programming in Python, start with concurrent.futures. It provides the simplicity of threading/multiprocessing with a powerful and consistent interface.

分享:
扫描分享到社交APP
上一篇
下一篇