杰瑞科技汇

Python如何直接调用Go的goroutine?

  • Python: 由 CPython 解释器驱动,使用 GIL(全局解释器锁)进行线程管理。
  • Go: 由 Go 运行时驱动,有自己的调度器来管理轻量级的线程——goroutine。

我们可以通过几种巧妙的方式实现“在 Python 中使用 goroutine”的效果,核心思想是 进程间通信,让 Go 程序作为一个独立的、并发的服务,然后通过某种协议(如 HTTP、gRPC、标准输入/输出)与 Python 程序进行交互。

Python如何直接调用Go的goroutine?-图1
(图片来源网络,侵删)

下面我将介绍几种主流且实用的方法,从简单到复杂排序。


通过标准输入/输出 进行进程间通信

这是最直接的方法,将 Go 程序作为一个子进程启动,Python 通过 stdin 发送任务,通过 stdout 接收结果。

Go 端 (worker.go)

这个 Go 程序会从标准输入读取一行 JSON 格式的任务,处理它(模拟一个耗时操作),然后将结果以 JSON 格式写回标准输出。

package main
import (
    "bufio"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "time"
)
// Task 定义从 Python 接收的任务结构
type Task struct {
    ID     int    `json:"id"`
    Input  string `json:"input"`
}
// Result 定义返回给 Python 的结果结构
type Result struct {
    ID     int    `json:"id"`
    Output string `json:"output"`
}
func main() {
    // 创建一个扫描器,从标准输入逐行读取
    scanner := bufio.NewScanner(os.Stdin)
    for scanner.Scan() {
        // 读取一行任务数据
        line := scanner.Bytes()
        var task Task
        // 解析 JSON
        if err := json.Unmarshal(line, &task); err != nil {
            log.Printf("Error unmarshaling task: %v", err)
            continue
        }
        // 模拟一个耗时操作,比如网络请求或复杂计算
        // 为了演示并发,我们在这里启动一个 goroutine
        // 注意:这个例子中,每个任务一个 goroutine,你可以用更复杂的方式管理它们
        go func(t Task) {
            log.Printf("Worker received task: %+v", t)
            time.Sleep(2 * time.Second) // 模拟 2 秒的工作
            output := fmt.Sprintf("Processed: %s", t.Input)
            result := Result{
                ID:     t.ID,
                Output: output,
            }
            // 将结果编码为 JSON 并写入标准输出
            // 使用 Fprintln 来确保输出后有换行符,方便 Python 读取
            if jsonResult, err := json.Marshal(result); err == nil {
                fmt.Println(string(jsonResult))
            } else {
                log.Printf("Error marshaling result: %v", err)
            }
        }(task)
    }
    if err := scanner.Err(); err != nil {
        log.Fatalf("Error reading from stdin: %v", err)
    }
}

Python 端 (main.py)

Python 使用 subprocess 模块来启动 Go 程序,并发送任务。

Python如何直接调用Go的goroutine?-图2
(图片来源网络,侵删)
import subprocess
import json
import time
import threading
import queue
# 任务队列
tasks_to_do = queue.Queue()
# 结果队列
results = queue.Queue()
def producer():
    """生产者:生成任务并放入任务队列"""
    for i in range(1, 6):
        task = {"id": i, "input": f"Data-{i}"}
        tasks_to_do.put(task)
        print(f"Producer: Added task {i}")
        time.sleep(0.5) # 模拟生产任务的速度
def worker_process():
    """工作进程:启动 Go 程序并处理任务"""
    # 启动 Go 编译后的可执行文件
    # 确保你已经运行了 `go build -o worker worker.go`
    process = subprocess.Popen(
        ['./worker'],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )
    # 启动一个线程来持续读取 Go 程序的输出
    def reader_thread():
        while True:
            line = process.stdout.readline()
            if not line:
                break
            try:
                result = json.loads(line.strip())
                results.put(result)
                print(f"Python: Received result for task {result['id']}: {result['output']}")
            except json.JSONDecodeError:
                print(f"Python: Error decoding JSON: {line.strip()}")
    threading.Thread(target=reader_thread, daemon=True).start()
    # 从任务队列中取出任务并发送给 Go 程序
    while True:
        try:
            task = tasks_to_do.get(timeout=1)
            task_json = json.dumps(task)
            process.stdin.write(task_json + '\n')
            process.stdin.flush()
            print(f"Python: Sent task {task['id']} to Go worker.")
        except queue.Empty:
            # 没有更多任务了,可以关闭输入流
            process.stdin.close()
            break
        except Exception as e:
            print(f"Error sending task: {e}")
            break
    # 等待 Go 程序结束
    process.wait()
    print("Go worker process has terminated.")
if __name__ == "__main__":
    # 1. 编译 Go 代码 (在终端中运行: go build -o worker worker.go)
    # 这里我们假设编译已经完成
    # 2. 启动生产者
    producer_thread = threading.Thread(target=producer)
    producer_thread.start()
    # 3. 启动 Go 工作进程
    worker_process()
    # 4. 等待生产者完成
    producer_thread.join()
    print("\nAll tasks have been processed.")

如何运行:

  1. 将 Go 代码保存为 worker.go,Python 代码保存为 main.py
  2. 在终端编译 Go 代码:go build -o worker worker.go
  3. 运行 Python 脚本:python main.py

你会看到 Python 和 Go 程序并发地处理任务,即使 Go 程序内部有 2 秒的延迟,Python 的生产者也不会被阻塞。


通过 HTTP/gRPC 调用

这种方法更灵活、更强大,适用于构建微服务架构,Go 程序启动一个 HTTP 或 gRPC 服务器,Python 作为客户端向其发送请求。

Go 端 (server.go)

package main
import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"
)
type TaskRequest struct {
    Input string `json:"input"`
}
type TaskResponse struct {
    Output string `json:"output"`
}
// 一个处理 HTTP 请求的处理函数
func processHandler(w http.ResponseWriter, r *http.Request) {
    // 只处理 POST 请求
    if r.Method != http.MethodPost {
        http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
        return
    }
    var req TaskRequest
    // 解析请求体中的 JSON
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, "Error decoding request body", http.StatusBadRequest)
        return
    }
    // 启动一个 goroutine 来处理这个请求,这样 HTTP 服务器就不会被阻塞
    go func(input string) {
        log.Printf("Goroutine started processing: %s", input)
        time.Sleep(2 * time.Second) // 模拟耗时操作
        output := fmt.Sprintf("HTTP Processed: %s", input)
        log.Printf("Goroutine finished processing: %s", input)
        // 在实际应用中,你可能需要一个通道或数据库来存储结果
        // 这里为了简单,我们只打印到日志
        // 客户端需要通过轮询或其他方式获取结果
        _ = output
    }(req.Input)
    // 立即返回一个响应,告知请求已接收
    w.WriteHeader(http.StatusAccepted)
    json.NewEncoder(w).Encode(map[string]string{"status": "processing", "message": "Task received and is being processed by a goroutine."})
}
func main() {
    // 设置路由
    http.HandleFunc("/process", processHandler)
    fmt.Println("Go HTTP server starting on :8080...")
    if err := http.ListenAndServe(":8080", nil); err != nil {
        log.Fatalf("Could not start server: %s\n", err)
    }
}

Python 端 (client.py)

import requests
import json
import time
import threading
def send_task(task_data):
    """向 Go HTTP 服务器发送任务"""
    url = "http://localhost:8080/process"
    try:
        response = requests.post(url, json=task_data)
        response.raise_for_status() # 如果请求失败则抛出异常
        print(f"Client: Sent task '{task_data['input']}'. Server response: {response.json()}")
    except requests.exceptions.RequestException as e:
        print(f"Error sending task: {e}")
if __name__ == "__main__":
    # 模拟多个客户端并发发送请求
    threads = []
    for i in range(1, 6):
        task = {"input": f"HTTP-Data-{i}"}
        thread = threading.Thread(target=send_task, args=(task,))
        threads.append(thread)
        thread.start()
        time.sleep(0.2) # 模拟请求间隔
    # 等待所有线程完成发送
    for thread in threads:
        thread.join()
    print("\nAll tasks have been sent to the Go HTTP server.")
    print("Check the Go server's console logs to see the goroutine output.")

如何运行:

Python如何直接调用Go的goroutine?-图3
(图片来源网络,侵删)
  1. 将 Go 代码保存为 server.go,Python 代码保存为 client.py
  2. 编译并运行 Go 服务器:go run server.go
  3. 在另一个终端运行 Python 客户端:python client.py

你会看到 Python 客户端快速地发送了所有请求并立即收到 202 Accepted 响应,而 Go 服务器则在后台的 goroutine 中慢慢地处理每个任务。


通过 gRPC 调用 (更高效、更类型安全)

gRPC 是 Google 开发的高性能、开源的通用 RPC 框架,使用 Protocol Buffers 作为接口定义语言,它比 HTTP/JSON 更快,更适合内部服务通信。

步骤概述 (更复杂,但功能最强)

  1. 定义 .proto 文件: 定义服务接口和消息格式。
  2. 生成代码: 使用 protoc 编译器为 Go 和 Python 生成客户端和服务端代码。
  3. 实现 Go gRPC 服务器: 编写服务器逻辑,并在处理请求时使用 goroutine。
  4. 编写 Python gRPC 客户端: 调用 Go 服务器提供的方法。

由于 gRPC 的设置相对复杂,这里不提供完整代码,但关键点在于:

  • Go 服务器端:你会在生成的服务器桩代码中实现服务方法,在这个方法里,你会像 HTTP 示例一样启动一个 goroutine 来处理耗时任务,然后立即返回一个 grpc.Errorf 或一个包含状态(如 "PROCESSING")的响应,或者使用 gRPC 的流式功能来异步返回最终结果。
  • Python 客户端:它会调用生成的存根代码来发起 RPC 调用。

这种方法是构建现代、高性能分布式系统的首选。


总结与对比

方法 优点 缺点 适用场景
标准输入/输出 - 简单,无需网络
- 进程间通信直接
- 耦合度高,难以扩展
- 不适合分布式系统
- 错误处理和同步较复杂
简单的脚本任务、将一个 CPU 密集型或阻塞型任务卸载给 Go 子进程。
HTTP/gRPC - 解耦,易于扩展和部署
- 标准化,支持多种语言和客户端
- gRPC 性能极高,类型安全
- 需要网络编程知识
- 设置比标准 I/O 复杂
- 有网络开销(尽管 gRPC 很小)
微服务架构、需要高并发处理的后端服务、Python 作为前端或客户端与 Go 后端通信。
gRPC (子集) - 性能最佳
- 强类型,减少运行时错误
- 支持流式通信
- 设置最复杂,需要学习 .proto 文件 对性能和延迟有极致要求的内部服务通信,如金融交易、实时数据分析。

对于绝大多数想在 Python 中利用 Go 并发能力的场景,通过 HTTP 或 gRPC 调用一个独立的 Go 服务是最佳实践,它将 Python 的优势(丰富的库、快速开发)和 Go 的优势(出色的并发性能、高效的编译型执行)完美地结合在了一起,构建了一个灵活、可维护的系统。

分享:
扫描分享到社交APP
上一篇
下一篇