杰瑞科技汇

Python多核multiprocess如何高效并行执行任务?

为什么需要 multiprocessing?—— GIL 的限制

在深入 multiprocessing 之前,必须先理解 Python 的 全局解释器锁

Python多核multiprocess如何高效并行执行任务?-图1
(图片来源网络,侵删)
  • GIL 是什么? GIL 是一个互斥锁,它保证在任何时刻,一个 Python 进程中只有一个线程在执行 Python 字节码,这意味着即使在多核处理器上,Python 的多线程也无法实现真正的并行计算,而是通过快速切换线程来实现并发。
  • GIL 的影响? 对于 I/O 密集型 任务(如网络请求、文件读写),线程在等待 I/O 时会释放 GIL,让其他线程运行,因此多线程仍然有效,但对于 CPU 密集型 任务(如数值计算、图像处理),由于 GIL 的存在,多线程并不能利用多核 CPU 的优势,反而因为线程切换的开销而可能比单线程更慢。

multiprocessing 的解决方案: multiprocessing 模块通过创建多个独立的 进程 来绕过 GIL,每个进程都有自己独立的 Python 解释器和内存空间,因此不受 GIL 的限制,可以真正地在多个 CPU 核心上并行运行。


multiprocessing 核心概念与组件

multiprocessing 模块提供了多种方式来创建和管理进程,类似于 threading 模块的 API,但操作的是进程而不是线程。

Process

这是最基础、最核心的组件,用于创建一个新进程。

基本用法:

Python多核multiprocess如何高效并行执行任务?-图2
(图片来源网络,侵删)
import multiprocessing
import time
import os
def worker(num):
    """一个简单的 worker 函数"""
    process_id = os.getpid()
    print(f'Worker {num} is running in Process {process_id}')
    time.sleep(2)  # 模拟耗时任务
    print(f'Worker {num} finished')
if __name__ == '__main__':
    print(f'Main process ID: {os.getpid()}')
    # 创建一个进程对象
    p = multiprocessing.Process(target=worker, args=(1,))
    # 启动进程
    p.start()
    # 主进程继续执行
    print('Main process is doing other work...')
    # 等待子进程结束 (非常重要!)
    p.join()
    print('Main process has finished.')

代码解释:

  1. if __name__ == '__main__'::这是使用 multiprocessing 的标准写法,在 Windows 等平台上,子进程会重新导入父进程的脚本,如果没有这个判断,子进程会再次执行 if 块内的代码,导致无限递归创建进程。
  2. multiprocessing.Process(target=worker, args=(1,))
    • target: 指定子进程要执行的函数。
    • args: 一个元组,传递给 target 函数的位置参数。
  3. p.start():启动进程,这会调用 worker 函数。
  4. p.join():主进程会在此处阻塞,直到子进程 p 执行完毕。如果你不调用 join(),主进程可能会在子进程完成前就结束,导致子进程被意外终止。

Pool

当你需要创建和管理大量相似的工作进程时,手动为每个任务创建一个 Process 对象会变得很繁琐。Pool 类提供了一个进程池,可以方便地分发任务。

Pool 主要有两种使用模式:

a) map() 方法

map() 是一个阻塞式方法,它将一个任务列表分配给进程池中的工作进程,并收集所有结果,最后返回一个结果列表,它的行为类似于内置的 map() 函数。

import multiprocessing
import time
def square(n):
    """计算一个数的平方"""
    time.sleep(1) # 模拟计算耗时
    return n * n
if __name__ == '__main__':
    numbers = [1, 2, 3, 4, 5, 6, 7, 8]
    # 创建一个包含4个工作进程的进程池
    with multiprocessing.Pool(processes=4) as pool:
        print("Using pool.map()...")
        # pool.map 会阻塞,直到所有任务完成
        results = pool.map(square, numbers)
    print(f"Results from map: {results}")

b) apply_async() 方法

apply_async() 是一个非阻塞式方法,它提交一个任务到进程池并立即返回一个 AsyncResult 对象,你可以稍后通过这个对象获取结果,这种方式更灵活,适合需要立即处理其他任务或处理耗时不同任务的场景。

import multiprocessing
import time
def square(n):
    """计算一个数的平方"""
    time.sleep(1) # 让不同任务有不同的耗时
    print(f"Calculating square of {n}")
    return n * n
if __name__ == '__main__':
    numbers = [1, 2, 3, 4, 5]
    with multiprocessing.Pool(processes=2) as pool:
        print("Using pool.apply_async()...")
        # 提交所有任务,但不会阻塞
        results = [pool.apply_async(square, (num,)) for num in numbers]
        # 主进程可以在这里做其他事情...
        print("All tasks submitted. Main process is free.")
        # 稍后,从 AsyncResult 对象中获取结果
        # get() 方法会阻塞,直到对应的结果准备好
        final_results = [r.get() for r in results]
    print(f"Final results: {final_results}")

Pool 的优势:

  • 自动管理进程: 你不需要手动创建和销毁进程。
  • 任务队列: 内部实现了任务队列,自动将任务分配给空闲的工作进程。
  • 负载均衡: 自动处理任务的分发和负载均衡。

进程间通信

由于每个进程有独立的内存空间,它们不能像线程那样直接共享变量,如果需要进程间共享数据,必须使用特定的 IPC 机制。

  • Queue (队列): Queue 是最常用的进程间通信方式,实现了生产者-消费者模型,一个进程(生产者)可以把数据放入队列,另一个进程(消费者)可以从队列中取出数据。

    import multiprocessing
    def producer(queue):
        for i in range(5):
            queue.put(i)
            print(f'Producer put {i}')
        queue.put(None) # 发送结束信号
    def consumer(queue):
        while True:
            item = queue.get()
            if item is None:
                break # 收到结束信号,退出循环
            print(f'Consumer got {item}')
    if __name__ == '__main__':
        q = multiprocessing.Queue()
        p1 = multiprocessing.Process(target=producer, args=(q,))
        p2 = multiprocessing.Process(target=consumer, args=(q,))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        print("All done.")
  • Pipe (管道): Pipe 创建一对连接对象,代表管道的两端,进程可以通过这些对象的 send()recv() 方法来通信,它适用于两个进程之间的直接通信。

    import multiprocessing
    def worker(conn):
        """子进程接收数据并处理"""
        print("Worker: Receiving data...")
        data = conn.recv()
        print(f"Worker: Received {data}")
        result = data * 2
        conn.send(result)
        conn.close()
    if __name__ == '__main__':
        # 创建一个管道,返回两个连接对象
        parent_conn, child_conn = multiprocessing.Pipe()
        p = multiprocessing.Process(target=worker, args=(child_conn,))
        p.start()
        # 父进程发送数据
        print("Parent: Sending data 10...")
        parent_conn.send(10)
        # 父进程接收返回结果
        print("Parent: Receiving result...")
        result = parent_conn.recv()
        print(f"Parent: Received result {result}")
        p.join()
  • Manager (管理器): Manager 提供了一种高级方式,允许你在不同进程间共享 Python 对象(如列表、字典、Value, Array 等)。Manager 在一个单独的进程中运行所有共享对象,其他进程通过代理来访问这些对象,这种方式比 QueuePipe 更灵活,但速度也慢得多,因为涉及到进程间通信的开销。

    import multiprocessing
    def worker(shared_list, shared_dict):
        shared_list.append(multiprocessing.current_process().name)
        shared_dict['key'] = 'value from ' + multiprocessing.current_process().name
    if __name__ == '__main__':
        with multiprocessing.Manager() as manager:
            # 创建共享的列表和字典
            shared_list = manager.list()
            shared_dict = manager.dict()
            processes = []
            for i in range(3):
                p = multiprocessing.Process(target=worker, args=(shared_list, shared_dict))
                processes.append(p)
                p.start()
分享:
扫描分享到社交APP
上一篇
下一篇