杰瑞科技汇

Python串口多线程如何高效通信与同步?

为什么需要多线程处理串口?

串口通信的本质是阻塞式 I/O,当你调用 ser.read()ser.readline() 时,程序会一直等待,直到收到数据或超时,如果只有一个线程(主线程),那么在等待串口数据期间,整个程序都会被“卡住”,无法响应用户输入、更新界面或执行其他任何操作。

Python串口多线程如何高效通信与同步?-图1
(图片来源网络,侵删)

使用多线程可以完美解决这个问题:

  • 主线程:负责处理用户交互、更新 GUI 界面、执行其他业务逻辑。
  • 子线程:专门负责串口的读写操作,在后台默默运行,不会阻塞主线程。

核心思路

  1. 创建一个串口读取线程:这个线程在启动后会进入一个循环,持续地从串口读取数据,一旦收到数据,就将其放入一个共享的数据结构(如队列 queue.Queue)中,或者通过回调函数传递给主线程。
  2. 主线程:正常执行自己的任务,当需要从串口获取数据时,它可以从共享的数据结构中读取,或者通过事件(threading.Event)来通知。
  3. 线程同步:必须确保对共享资源的访问是线程安全的。queue.Queue 是 Python 内置的线程安全队列,是实现线程间通信的最佳选择。
  4. 优雅退出:必须提供一个机制来安全地停止子线程,通常使用一个 threading.Event 作为“停止信号”,子线程在每次循环开始时检查这个信号,如果信号被设置,则退出循环。

完整代码示例

下面是一个完整的、可运行的示例,它模拟了一个典型的场景:一个主线程和一个子线程,子线程负责从串口读取数据,并将数据打印出来,主线程可以等待用户输入,然后发送数据到串口。

准备工作:

  1. 安装 pyserial 库(如果还没有安装):
    pip install pyserial
  2. 你需要一个可用的串口,在 Windows 上可能是 COM3,在 Linux/macOS 上可能是 /dev/ttyUSB0/dev/tty.SLAB_USBtoUART,请将代码中的 port 替换为你自己的串口名。
import threading
import queue
import serial
import time
# --- 1. 共享资源和信号量 ---
# 线程安全的队列,用于子线程向主线程传递数据
data_queue = queue.Queue()
# 事件对象,用于通知子线程停止
stop_event = threading.Event()
# --- 2. 串口读取线程 ---
def serial_read_thread(ser_port, baud_rate, data_queue, stop_event):
    """
    专门负责从串口读取数据的线程函数。
    """
    try:
        # 打开串口
        ser = serial.Serial(ser_port, baud_rate, timeout=1)
        print(f"串口读取线程已启动,正在监听 {ser_port}...")
    except serial.SerialException as e:
        print(f"无法打开串口 {ser_port}: {e}")
        # 如果串口打开失败,将错误信息放入队列并返回
        data_queue.put(f"串口错误: {e}")
        return
    while not stop_event.is_set():
        try:
            # 检查是否有数据到达
            if ser.in_waiting > 0:
                # 读取一行数据
                line = ser.readline().decode('utf-8').strip()
                if line:
                    print(f"[子线程收到] {line}")
                    # 将数据放入队列,供主线程或其他部分使用
                    data_queue.put(line)
        except serial.SerialException as e:
            print(f"串口读取错误: {e}")
            break
        except UnicodeDecodeError:
            print("接收到非UTF-8编码的数据,已忽略。")
        # 短暂休眠,避免CPU空转
        time.sleep(0.01)
    # 清理工作
    print("串口读取线程准备退出...")
    if ser.is_open:
        ser.close()
    print("串口读取线程已退出。")
# --- 3. 主线程 ---
def main():
    # --- 配置你的串口 ---
    # !!! 请务必修改为你的实际串口 !!!
    PORT = 'COM3'  #  'COM3', '/dev/ttyUSB0', '/dev/tty.SLAB_USBtoUART'
    BAUD_RATE = 9600
    # 创建并启动串口读取子线程
    read_thread = threading.Thread(
        target=serial_read_thread,
        args=(PORT, BAUD_RATE, data_queue, stop_event),
        daemon=True  # 设置为守护线程,主线程退出时它也会自动退出
    )
    read_thread.start()
    print("主线程运行中,按 'q' 退出程序。")
    print("在另一端(如另一台电脑、Arduino)向此串口发送数据来测试。")
    try:
        while True:
            # --- 主线程可以执行其他任务 ---
            # 检查队列中是否有新数据
            try:
                # 非阻塞方式从队列获取数据
                # timeout=0 表示不等待,如果没有数据就立即返回
                received_data = data_queue.get_nowait()
                print(f"[主线程处理] 收到数据: {received_data}")
            except queue.Empty:
                # 队列为空,是正常情况,不做任何事
                pass
            # 模拟主线程的其他工作
            # 你可以在这里添加GUI更新逻辑
            # print("主线程正在执行其他任务...")
            # time.sleep(0.5)
            # 检查用户输入,用于发送数据
            user_input = input("输入要发送的数据 (或 'q' 退出): ")
            if user_input.lower() == 'q':
                break
            # 发送数据到串口
            try:
                with serial.Serial(PORT, BAUD_RATE, timeout=1) as ser:
                    ser.write(user_input.encode('utf-8') + b'\n')
                    print(f"[主线程发送] 已发送: {user_input}")
            except serial.SerialException as e:
                print(f"发送数据时出错: {e}")
    except KeyboardInterrupt:
        print("\n检测到键盘中断,正在退出...")
    finally:
        # --- 4. 优雅地停止子线程 ---
        print("正在停止子线程...")
        stop_event.set()  # 设置停止信号
        # 等待子线程执行完毕
        read_thread.join(timeout=2)  # 最多等待2秒
        if read_thread.is_alive():
            print("警告: 子线程未能正常退出。")
        else:
            print("子线程已成功停止。")
        print("程序已安全退出。")
if __name__ == "__main__":
    main()

代码分步解析

共享资源和信号量

  • data_queue = queue.Queue(): 创建了一个线程安全的队列,子线程将从串口收到的数据 put 进队列,主线程从队列中 get 数据。queue.Queue 内部已经处理了锁,保证了线程安全。
  • stop_event = threading.Event(): 创建了一个事件对象,这是一个线程间的通信工具,类似于一个“开关”。
    • stop_event.is_set(): 检查开关是否被打开。
    • stop_event.set(): 打开开关。
    • stop_event.clear(): 关闭开关。

串口读取线程 (serial_read_thread)

  • 这个函数是线程的核心,它包含了 while 循环来持续监听串口。
  • while not stop_event.is_set(): 这是线程能够被安全退出的关键,只要主线程没有调用 stop_event.set(),这个循环就会一直运行。
  • ser.readline().decode('utf-8').strip(): 从串口读取一行,解码为字符串,并去除首尾的空白字符。
  • data_queue.put(line): 将收到的数据放入队列,主线程稍后可以取用。
  • time.sleep(0.01): 防止在没有数据时 while 循环疯狂占用 CPU。

主线程 (main)

  • 创建并启动线程:
    read_thread = threading.Thread(...)
    read_thread.start()
  • 守护线程 (daemon=True): 这是一个很好的实践,将子线程设置为守护线程,意味着当主线程结束时,Python 解释器会强制退出这个子线程,即使它正在运行,这可以防止程序因子线程未退出而卡住。
  • 主循环:
    • data_queue.get_nowait(): 主线程尝试从队列中获取数据。get_nowait() 是非阻塞的,如果队列为空,它会立即抛出 queue.Empty 异常,我们捕获它并忽略,这使得主线程在等待数据时不会被阻塞。
    • input(...): 演示了主线程可以与用户交互,当用户输入数据并发送后,主线程会直接操作串口发送。
  • KeyboardInterruptfinally:
    • try...finally 结构确保了无论程序如何退出(正常或异常),清理代码都会被执行。
    • stop_event.set(): 这是“优雅退出”的触发点,它向子线程发送了“该退出了”的信号。
    • read_thread.join(timeout=2): join() 方法会阻塞主线程,直到子线程执行完毕,我们设置了一个超时时间(2秒),以防子线程卡死,导致主线程也永远等待。

更高级的模式:回调函数

除了使用队列,你还可以使用回调函数来处理从串口收到的数据,这种方式在事件驱动的 GUI 编程(如 PyQt, Tkinter)中非常常见。

Python串口多线程如何高效通信与同步?-图2
(图片来源网络,侵删)

思路: 将一个函数(回调函数)传递给子线程,子线程在收到数据后,不存入队列,而是直接调用这个函数,并将数据作为参数传递。

修改后的 serial_read_thread 示例:

def serial_read_thread_with_callback(ser_port, baud_rate, callback, stop_event):
    try:
        ser = serial.Serial(ser_port, baud_rate, timeout=1)
        print(f"串口读取线程 (回调模式) 已启动,正在监听 {ser_port}...")
    except serial.SerialException as e:
        print(f"无法打开串口 {ser_port}: {e}")
        return
    while not stop_event.is_set():
        try:
            if ser.in_waiting > 0:
                line = ser.readline().decode('utf-8').strip()
                if line:
                    print(f"[子线程收到] {line}")
                    # 直接调用回调函数并传递数据
                    callback(line)
        except serial.SerialException as e:
            print(f"串口读取错误: {e}")
            break
        except UnicodeDecodeError:
            print("接收到非UTF-8编码的数据,已忽略。")
        time.sleep(0.01)
    if ser.is_open:
        ser.close()
    print("串口读取线程 (回调模式) 已退出。")
# 在 main 函数中这样调用:
def handle_received_data(data):
    """这是回调函数,由主线程定义,但被子线程调用"""
    print(f"[回调函数处理] 收到数据: {data}")
    # 在这里可以更新GUI界面等
# ...
# main() 函数中:
# read_thread = threading.Thread(
#     target=serial_read_thread_with_callback,
#     args=(PORT, BAUD_RATE, handle_received_data, stop_event),
#     daemon=True
# )
# ...
方法 优点 缺点 适用场景
queue.Queue 简单、通用、线程安全,解耦了生产者和消费者。 主线程需要主动轮询队列来获取数据。 最常用,适用于大多数后台任务,尤其是与GUI结合时。
回调函数 事件驱动,代码逻辑更清晰,响应“即时”。 需要小心处理线程安全问题(如果回调函数要操作GUI元素,必须确保在主线程执行)。 GUI编程(PyQt, Tkinter),需要立即响应数据并更新界面。

对于初学者和大多数应用场景,使用 threading + queue.Queue 是最推荐、最稳健的方案,它清晰地分离了“生产数据”和“消费数据”的逻辑,queue 模块为你处理了所有复杂的锁细节。

分享:
扫描分享到社交APP
上一篇
下一篇