杰瑞科技汇

Python unittest阻塞问题如何解决?

什么是阻塞?

在编程中,“阻塞”(Blocking)指的是一个进程或线程在执行某个操作时,如果该操作无法立即完成,它就会暂停(或“阻塞”)执行,直到满足某个条件(如 I/O 操作完成、收到响应、锁被释放等)。

Python unittest阻塞问题如何解决?-图1
(图片来源网络,侵删)

unittest 的上下文中,阻塞通常表现为:

  1. 测试用例长时间运行,永不结束:这是最典型的表现,当你运行 python -m unittest discover 时,程序会卡住,不会返回到命令行提示符。
  2. 测试套件中的后续测试被跳过:如果一个测试用例阻塞了,整个测试进程可能就会卡住,导致它后面的所有测试用例都无法执行。

为什么测试会阻塞?常见原因

网络请求超时

这是最常见的原因之一,你的代码向一个外部服务(如 API、数据库)发送请求,但该服务响应缓慢或无响应,导致你的测试用例在等待响应时无限期阻塞。

示例代码(待测试的模块 my_module.py):

import requests
import time
def fetch_data_from_api(url):
    print(f"正在请求: {url}")
    # 没有设置 timeout,如果服务无响应,这里会永久阻塞
    response = requests.get(url)
    response.raise_for_status()  # 如果状态码不是 2xx,会抛出异常
    return response.json()

失败的测试用例 test_my_module.py

Python unittest阻塞问题如何解决?-图2
(图片来源网络,侵删)
import unittest
from my_module import fetch_data_from_api
class TestApi(unittest.TestCase):
    def test_fetch_data(self):
        # 使用一个不存在的或响应缓慢的 URL
        url = "http://httpstat.us/200?sleep=10000" # 这个 URL 会让服务器响应 10 秒钟
        data = fetch_data_from_api(url)
        self.assertEqual(data['status'], 200)
if __name__ == '__main__':
    unittest.main()

当你运行这个测试时,它会打印 "正在请求: ...",然后卡住整整 10 秒钟,如果目标服务器宕机,它会卡住更久甚至永久。

无限循环或逻辑错误

代码中可能存在一个永远不会退出的 while 循环,或者一个错误的逻辑判断,导致程序无法继续执行。

示例代码(待测试的模块 my_module.py):

def process_items(items):
    processed = []
    i = 0
    # 这是一个 Bug,应该使用 while i < len(items)
    while True:
        if items[i]: # 假设 items 是非空的
            processed.append(items[i])
        i += 1 # 这个循环永远不会结束
    return processed

多线程/多进程中的死锁

如果你的代码使用了线程锁(threading.Lock)或进程间通信机制,并且存在死锁风险,测试用例可能会在获取锁的步骤上无限期等待。

示例代码(待测试的模块 my_module.py):

import threading
lock = threading.Lock()
def worker():
    print("Worker 尝试获取锁...")
    lock.acquire()
    print("Worker 已获取锁。")
    # 如果这里忘记释放锁,其他等待这个锁的线程就会永久阻塞
    # lock.release() 
    print("Worker (未释放锁) 结束。")
def test_deadlock():
    t1 = threading.Thread(target=worker)
    t2 = threading.Thread(target=worker)
    t1.start()
    t2.start()
    t1.join() # 主线程会在这里等待 t1 完成
    t2.join() # t1 没有释放锁,t2 永远不会完成,导致 join() 阻塞

如何解决和避免阻塞?

针对不同的原因,有多种解决方案。

设置超时 - 最直接有效的方法

对于网络请求、文件读写等 I/O 操作,设置超时是防止阻塞的最佳实践。

修改后的 my_module.py

import requests
def fetch_data_from_api(url, timeout=5):
    print(f"正在请求: {url}")
    # 设置 timeout=5 秒,5 秒后未收到响应则抛出 requests.exceptions.Timeout 异常
    response = requests.get(url, timeout=timeout)
    response.raise_for_status()
    return response.json()

修改后的测试用例 test_my_module.py

import unittest
from my_module import fetch_data_from_api
import requests # 需要导入异常类
class TestApi(unittest.TestCase):
    def test_fetch_data_success(self):
        # 使用一个快速响应的 URL
        url = "https://httpbin.org/get"
        data = fetch_data_from_api(url, timeout=2)
        self.assertIn('url', data)
    def test_fetch_data_timeout(self):
        # 使用一个会超时的 URL
        url = "http://httpstat.us/200?sleep=10" # 等待 10 秒
        # 我们期望它抛出 Timeout 异常
        with self.assertRaises(requests.exceptions.Timeout):
            fetch_data_from_api(url, timeout=3) # 设置 3 秒超时
if __name__ == '__main__':
    unittest.main()

第二个测试用例会在 3 秒后失败,并正确抛出 Timeout 异常,而不是卡住 10 秒。

使用 unittestaddCleanupsetUp/tearDown 进行资源管理

确保在测试结束后,所有资源(如线程、锁、套接字、文件)都被正确释放。

示例:管理线程

import unittest
import threading
import time
class TestThread(unittest.TestCase):
    def setUp(self):
        self.thread = None
        # 在每个测试开始前,可以做一些准备工作
    def tearDown(self):
        # 在每个测试结束后,无论成功还是失败,都会执行
        if self.thread and self.thread.is_alive():
            print("测试结束,尝试停止线程...")
            # 这里需要一个机制来通知线程退出,例如设置一个 Event
            # self.stop_event.set()
            # self.thread.join(timeout=1) # 等待线程优雅退出
            # if self.thread.is_alive():
            #     print("线程未能正常停止,可能存在阻塞。")
            pass # 简单示例
    def test_background_worker(self):
        def worker(stop_event):
            while not stop_event.is_set(): # 检查事件以决定是否退出
                print("Worker is running...")
                time.sleep(1)
        # self.stop_event = threading.Event()
        # self.thread = threading.Thread(target=worker, args=(self.stop_event,))
        # self.thread.start()
        # time.sleep(2)
        # self.stop_event.set() # 通知线程停止
        # self.thread.join() # 等待线程结束
        # self.assertFalse(self.thread.is_alive())
        pass # 占位符,展示结构

使用 unittest.mock 模拟外部依赖

如果你的测试阻塞是因为依赖了外部服务(数据库、API、网络等),最佳实践是使用 unittest.mock 来模拟这些依赖,这样,你的测试就不再与真实的外部世界交互,速度飞快且稳定。

示例:模拟 requests.get

import unittest
from unittest import mock
# from my_module import fetch_data_from_api # 假设我们想测试这个函数
# 模拟的响应数据
MOCK_RESPONSE = {
    "status": 200,
    "data": {"message": "This is a mock response"}
}
class TestApiWithMock(unittest.TestCase):
    @mock.patch('my_module.requests.get') # 拦截 my_module 中的 requests.get 调用
    def test_fetch_data_with_mock(self, mock_get):
        # 配置 mock 对象的行为
        mock_get.return_value.status_code = 200
        mock_get.return_value.json.return_value = MOCK_RESPONSE
        # 调用被测函数
        # result = fetch_data_from_api("http://any_url.com")
        # result = fetch_data_from_api("http://any_url.com") # 取消注释以运行
        # 断言
        # mock_get.assert_called_once_with("http://any_url.com", timeout=5)
        # self.assertEqual(result, MOCK_RESPONSE)
        pass # 占位符,展示结构
if __name__ == '__main__':
    unittest.main()

使用 unittestTestCase.subTest 进行隔离

当在一个循环中运行多个相似的测试时,如果其中一个阻塞,整个循环都会卡住。subTest 可以将一个大测试分解成多个独立的小测试,其中一个失败或阻塞不会影响其他的。

示例:

import unittest
class TestNumbers(unittest.TestCase):
    def test_even_numbers(self):
        # 假设 check_number 是一个可能阻塞的函数
        # for number in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]:
        #     with self.subTest(number=number):
        #         self.assertTrue(check_number(number))
        pass # 占位符

使用 unittestmaxDiffassertLogs 进行调试

如果测试因为某些原因挂起,但又很难复现,可以增加日志输出来观察程序执行到了哪一步。

import unittest
import logging
# 配置日志
logging.basicConfig(level=logging.DEBUG)
class TestWithLogging(unittest.TestCase):
    def test_something_that_blocks(self):
        logging.debug("测试开始...")
        # ... 可能阻塞的代码 ...
        logging.debug("测试结束。")
        self.assertTrue(True)

调试已阻塞的测试

如果你的测试已经卡住了,怎么办?

  1. 使用超时运行测试:Python 的 unittest 模块本身不支持为单个测试用例设置超时,但你可以使用 signal 模块(仅限 Unix-like 系统)或 multiprocessing 来实现一个“超时运行器”。

    一个简单的超时运行器示例 (使用 signal):

    import signal
    import unittest
    class TimeoutException(Exception):
        pass
    def timeout_handler(signum, frame):
        raise TimeoutException("测试运行超时!")
    def run_test_with_timeout(test_case_class, timeout=10):
        # 设置信号处理器
        signal.signal(signal.SIGALRM, timeout_handler)
        signal.alarm(timeout) # 设置超时(秒)
        try:
            suite = unittest.TestLoader().loadTestsFromTestCase(test_case_class)
            unittest.TextTestRunner(verbosity=2).run(suite)
        except TimeoutException as e:
            print(f"\n[错误] {e}")
        finally:
            signal.alarm(0) # 取消超时设置
    if __name__ == '__main__':
        # 运行你的测试类,并设置 5 秒超时
        run_test_with_timeout(TestApi, timeout=5)
  2. 使用调试器:使用 pdb (Python Debugger) 或 IDE 的调试功能,在测试代码的关键位置设置断点,逐步执行,观察变量的变化和程序的执行流程,从而找到阻塞点。

问题场景 根本原因 解决方案
网络请求卡死 服务无响应,没有超时机制 设置 timeout 参数,并在测试中用 assertRaises 验证超时行为。
I/O 操作卡死 文件、套接字等操作无响应 同上,为 I/O 操作设置超时。
无限循环/逻辑错误 代码 Bug,无法退出 仔细审查代码逻辑,使用 pdb 或日志进行调试。
多线程/多进程死锁 错误的锁使用或资源竞争 遵循正确的锁管理原则,使用 Event 等机制控制线程生命周期,在 tearDown 中清理资源。
依赖外部服务 测试环境不稳定,速度慢 使用 unittest.mock 模拟外部依赖,使测试独立、快速、可靠。
测试套件整体卡死 某个测试用例阻塞 使用 subTest 隔离测试;使用超时运行器强制终止长时间运行的测试。

核心思想:让你的测试快速、独立、确定,避免测试与外部世界(网络、文件系统等)产生不必要的、不可控的交互,通过设置超时和模拟外部依赖,可以极大地提高测试的健壮性和可维护性。

分享:
扫描分享到社交APP
上一篇
下一篇