为什么需要多线程处理串口?
串口通信的本质是阻塞式 I/O,当你调用 ser.read() 或 ser.readline() 时,程序会一直等待,直到收到数据或超时,如果只有一个线程(主线程),那么在等待串口数据期间,整个程序都会被“卡住”,无法响应用户输入、更新界面或执行其他任何操作。

使用多线程可以完美解决这个问题:
- 主线程:负责处理用户交互、更新 GUI 界面、执行其他业务逻辑。
- 子线程:专门负责串口的读写操作,在后台默默运行,不会阻塞主线程。
核心思路
- 创建一个串口读取线程:这个线程在启动后会进入一个循环,持续地从串口读取数据,一旦收到数据,就将其放入一个共享的数据结构(如队列
queue.Queue)中,或者通过回调函数传递给主线程。 - 主线程:正常执行自己的任务,当需要从串口获取数据时,它可以从共享的数据结构中读取,或者通过事件(
threading.Event)来通知。 - 线程同步:必须确保对共享资源的访问是线程安全的。
queue.Queue是 Python 内置的线程安全队列,是实现线程间通信的最佳选择。 - 优雅退出:必须提供一个机制来安全地停止子线程,通常使用一个
threading.Event作为“停止信号”,子线程在每次循环开始时检查这个信号,如果信号被设置,则退出循环。
完整代码示例
下面是一个完整的、可运行的示例,它模拟了一个典型的场景:一个主线程和一个子线程,子线程负责从串口读取数据,并将数据打印出来,主线程可以等待用户输入,然后发送数据到串口。
准备工作:
- 安装
pyserial库(如果还没有安装):pip install pyserial
- 你需要一个可用的串口,在 Windows 上可能是
COM3,在 Linux/macOS 上可能是/dev/ttyUSB0或/dev/tty.SLAB_USBtoUART,请将代码中的port替换为你自己的串口名。
import threading
import queue
import serial
import time
# --- 1. 共享资源和信号量 ---
# 线程安全的队列,用于子线程向主线程传递数据
data_queue = queue.Queue()
# 事件对象,用于通知子线程停止
stop_event = threading.Event()
# --- 2. 串口读取线程 ---
def serial_read_thread(ser_port, baud_rate, data_queue, stop_event):
"""
专门负责从串口读取数据的线程函数。
"""
try:
# 打开串口
ser = serial.Serial(ser_port, baud_rate, timeout=1)
print(f"串口读取线程已启动,正在监听 {ser_port}...")
except serial.SerialException as e:
print(f"无法打开串口 {ser_port}: {e}")
# 如果串口打开失败,将错误信息放入队列并返回
data_queue.put(f"串口错误: {e}")
return
while not stop_event.is_set():
try:
# 检查是否有数据到达
if ser.in_waiting > 0:
# 读取一行数据
line = ser.readline().decode('utf-8').strip()
if line:
print(f"[子线程收到] {line}")
# 将数据放入队列,供主线程或其他部分使用
data_queue.put(line)
except serial.SerialException as e:
print(f"串口读取错误: {e}")
break
except UnicodeDecodeError:
print("接收到非UTF-8编码的数据,已忽略。")
# 短暂休眠,避免CPU空转
time.sleep(0.01)
# 清理工作
print("串口读取线程准备退出...")
if ser.is_open:
ser.close()
print("串口读取线程已退出。")
# --- 3. 主线程 ---
def main():
# --- 配置你的串口 ---
# !!! 请务必修改为你的实际串口 !!!
PORT = 'COM3' # 'COM3', '/dev/ttyUSB0', '/dev/tty.SLAB_USBtoUART'
BAUD_RATE = 9600
# 创建并启动串口读取子线程
read_thread = threading.Thread(
target=serial_read_thread,
args=(PORT, BAUD_RATE, data_queue, stop_event),
daemon=True # 设置为守护线程,主线程退出时它也会自动退出
)
read_thread.start()
print("主线程运行中,按 'q' 退出程序。")
print("在另一端(如另一台电脑、Arduino)向此串口发送数据来测试。")
try:
while True:
# --- 主线程可以执行其他任务 ---
# 检查队列中是否有新数据
try:
# 非阻塞方式从队列获取数据
# timeout=0 表示不等待,如果没有数据就立即返回
received_data = data_queue.get_nowait()
print(f"[主线程处理] 收到数据: {received_data}")
except queue.Empty:
# 队列为空,是正常情况,不做任何事
pass
# 模拟主线程的其他工作
# 你可以在这里添加GUI更新逻辑
# print("主线程正在执行其他任务...")
# time.sleep(0.5)
# 检查用户输入,用于发送数据
user_input = input("输入要发送的数据 (或 'q' 退出): ")
if user_input.lower() == 'q':
break
# 发送数据到串口
try:
with serial.Serial(PORT, BAUD_RATE, timeout=1) as ser:
ser.write(user_input.encode('utf-8') + b'\n')
print(f"[主线程发送] 已发送: {user_input}")
except serial.SerialException as e:
print(f"发送数据时出错: {e}")
except KeyboardInterrupt:
print("\n检测到键盘中断,正在退出...")
finally:
# --- 4. 优雅地停止子线程 ---
print("正在停止子线程...")
stop_event.set() # 设置停止信号
# 等待子线程执行完毕
read_thread.join(timeout=2) # 最多等待2秒
if read_thread.is_alive():
print("警告: 子线程未能正常退出。")
else:
print("子线程已成功停止。")
print("程序已安全退出。")
if __name__ == "__main__":
main()
代码分步解析
共享资源和信号量
data_queue = queue.Queue(): 创建了一个线程安全的队列,子线程将从串口收到的数据put进队列,主线程从队列中get数据。queue.Queue内部已经处理了锁,保证了线程安全。stop_event = threading.Event(): 创建了一个事件对象,这是一个线程间的通信工具,类似于一个“开关”。stop_event.is_set(): 检查开关是否被打开。stop_event.set(): 打开开关。stop_event.clear(): 关闭开关。
串口读取线程 (serial_read_thread)
- 这个函数是线程的核心,它包含了
while循环来持续监听串口。 while not stop_event.is_set(): 这是线程能够被安全退出的关键,只要主线程没有调用stop_event.set(),这个循环就会一直运行。ser.readline().decode('utf-8').strip(): 从串口读取一行,解码为字符串,并去除首尾的空白字符。data_queue.put(line): 将收到的数据放入队列,主线程稍后可以取用。time.sleep(0.01): 防止在没有数据时while循环疯狂占用 CPU。
主线程 (main)
- 创建并启动线程:
read_thread = threading.Thread(...) read_thread.start()
- 守护线程 (
daemon=True): 这是一个很好的实践,将子线程设置为守护线程,意味着当主线程结束时,Python 解释器会强制退出这个子线程,即使它正在运行,这可以防止程序因子线程未退出而卡住。 - 主循环:
data_queue.get_nowait(): 主线程尝试从队列中获取数据。get_nowait()是非阻塞的,如果队列为空,它会立即抛出queue.Empty异常,我们捕获它并忽略,这使得主线程在等待数据时不会被阻塞。input(...): 演示了主线程可以与用户交互,当用户输入数据并发送后,主线程会直接操作串口发送。
KeyboardInterrupt和finally块:try...finally结构确保了无论程序如何退出(正常或异常),清理代码都会被执行。stop_event.set(): 这是“优雅退出”的触发点,它向子线程发送了“该退出了”的信号。read_thread.join(timeout=2):join()方法会阻塞主线程,直到子线程执行完毕,我们设置了一个超时时间(2秒),以防子线程卡死,导致主线程也永远等待。
更高级的模式:回调函数
除了使用队列,你还可以使用回调函数来处理从串口收到的数据,这种方式在事件驱动的 GUI 编程(如 PyQt, Tkinter)中非常常见。

思路: 将一个函数(回调函数)传递给子线程,子线程在收到数据后,不存入队列,而是直接调用这个函数,并将数据作为参数传递。
修改后的 serial_read_thread 示例:
def serial_read_thread_with_callback(ser_port, baud_rate, callback, stop_event):
try:
ser = serial.Serial(ser_port, baud_rate, timeout=1)
print(f"串口读取线程 (回调模式) 已启动,正在监听 {ser_port}...")
except serial.SerialException as e:
print(f"无法打开串口 {ser_port}: {e}")
return
while not stop_event.is_set():
try:
if ser.in_waiting > 0:
line = ser.readline().decode('utf-8').strip()
if line:
print(f"[子线程收到] {line}")
# 直接调用回调函数并传递数据
callback(line)
except serial.SerialException as e:
print(f"串口读取错误: {e}")
break
except UnicodeDecodeError:
print("接收到非UTF-8编码的数据,已忽略。")
time.sleep(0.01)
if ser.is_open:
ser.close()
print("串口读取线程 (回调模式) 已退出。")
# 在 main 函数中这样调用:
def handle_received_data(data):
"""这是回调函数,由主线程定义,但被子线程调用"""
print(f"[回调函数处理] 收到数据: {data}")
# 在这里可以更新GUI界面等
# ...
# main() 函数中:
# read_thread = threading.Thread(
# target=serial_read_thread_with_callback,
# args=(PORT, BAUD_RATE, handle_received_data, stop_event),
# daemon=True
# )
# ...
| 方法 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
queue.Queue |
简单、通用、线程安全,解耦了生产者和消费者。 | 主线程需要主动轮询队列来获取数据。 | 最常用,适用于大多数后台任务,尤其是与GUI结合时。 |
| 回调函数 | 事件驱动,代码逻辑更清晰,响应“即时”。 | 需要小心处理线程安全问题(如果回调函数要操作GUI元素,必须确保在主线程执行)。 | GUI编程(PyQt, Tkinter),需要立即响应数据并更新界面。 |
对于初学者和大多数应用场景,使用 threading + queue.Queue 是最推荐、最稳健的方案,它清晰地分离了“生产数据”和“消费数据”的逻辑,queue 模块为你处理了所有复杂的锁细节。
