WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它允许服务器主动向客户端推送数据,非常适合实时应用,如在线聊天、实时数据推送、在线游戏等。

在 Python 中,有多个库可以实现 WebSocket,但最常用和最推荐的是 websockets 库,我们将以它为核心,并介绍另一个更底层的库 websockify。
核心库:websockets (推荐)
websockets 是一个功能强大、文档完善的库,支持 Python 3.6+,它既可以用于编写客户端,也可以用于编写服务端。
安装
你需要安装这个库:
pip install websockets
使用 websockets 编写一个简单的服务端
服务端负责监听客户端的连接,处理收到的消息,并向客户端发送消息。

下面是一个简单的 "Echo Server"(回声服务器),它会将客户端发来的任何消息原样返回。
服务端代码 (server.py)
import asyncio
import websockets
# 定义一个异步函数,用于处理每个客户端的连接
async def handler(websocket, path):
"""
这个函数会在每个新的 WebSocket 连接建立时被调用。
:param websocket: WebSocket 连接对象,用于收发消息。
:param path: 客户端连接的路径。
"""
print(f"新客户端已连接: {websocket.remote_address} on path {path}")
try:
# 无限循环,持续等待并接收来自客户端的消息
async for message in websocket:
print(f"收到来自客户端的消息: {message}")
# 将收到的消息原样发送回客户端
await websocket.send(f"服务器已收到你的消息: {message}")
print(f"已向客户端回送消息")
except websockets.exceptions.ConnectionClosed as e:
# 当客户端正常断开连接时,会触发这个异常
print(f"客户端 {websocket.remote_address} 已断开连接: {e}")
except Exception as e:
print(f"发生错误: {e}")
# 启动 WebSocket 服务器
async def main():
# 监听所有网络接口的 8765 端口
host = "0.0.0.0"
port = 8765
print(f"WebSocket 服务器启动在 ws://{host}:{port}")
# websockets.serve 会创建一个 WebSocket 服务器
# handler 是处理连接的回调函数
async with websockets.serve(handler, host, port):
# 保持服务器运行,直到被手动停止 (Ctrl+C)
await asyncio.Future() # run_forever()
# 运行主函数
if __name__ == "__main__":
asyncio.run(main())
如何运行服务端:
- 将上述代码保存为
server.py。 - 在终端中运行:
python server.py
- 你会看到输出:
WebSocket 服务器启动在 ws://0.0.0.0:8765,服务器开始等待连接。
使用 websockets 编写一个简单的客户端
客户端用于连接到 WebSocket 服务端,并发送/接收消息。

客户端代码 (client.py)
import asyncio
import websockets
import json
async def hello():
# WebSocket 服务器的地址
uri = "ws://localhost:8765"
try:
# 建立连接
async with websockets.connect(uri) as websocket:
print(f"已连接到服务器 {uri}")
# 创建一个任务来持续接收服务器的消息
receive_task = asyncio.create_task(receive_messages(websocket))
# 允许用户从控制台输入并发送消息
while True:
message = input("请输入要发送的消息 (输入 'exit' 退出): ")
if message.lower() == 'exit':
break
# 将消息发送给服务器
await websocket.send(message)
print(f"已发送消息: {message}")
except websockets.exceptions.ConnectionClosed:
print("与服务器连接已关闭。")
except ConnectionRefusedError:
print("无法连接到服务器,请确保服务端已启动。")
finally:
# 取消接收任务
if 'receive_task' in locals() and not receive_task.done():
receive_task.cancel()
# 异步函数,用于接收服务器发来的消息
async def receive_messages(websocket):
try:
async for message in websocket:
print(f"\n收到服务器消息: {message}\n")
except websockets.exceptions.ConnectionClosed:
print("接收消息时连接已关闭。")
# 运行客户端
if __name__ == "__main__":
asyncio.run(hello())
如何运行客户端:
- 确保服务端 (
server.py) 正在运行。 - 将上述代码保存为
client.py。 - 在另一个终端中运行:
python client.py
- 你可以在客户端的控制台输入任何消息,按回车后,服务端会收到并回显,同时客户端也会显示服务端的回显消息。
更高级的例子:实时数据推送(股票行情)
这个例子展示了 WebSocket 的核心优势:服务器主动向客户端推送数据,而无需客户端轮询。
服务端代码 (stock_server.py)
import asyncio
import websockets
import random
import json
import time
# 模拟一个股票行情生成器
def generate_stock_data():
symbols = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
stock = random.choice(symbols)
price = round(random.uniform(100, 2000), 2)
change = round(random.uniform(-10, 10), 2)
timestamp = time.time()
return {
"symbol": stock,
"price": price,
"change": change,
"timestamp": timestamp
}
# 处理连接,将新连接的客户端添加到集合中
connected_clients = set()
async def handler(websocket, path):
print(f"新客户端已连接: {websocket.remote_address}")
connected_clients.add(websocket)
try:
# 客户端连接后,持续向其推送数据
while True:
data = generate_stock_data()
message = json.dumps(data)
# 向所有连接的客户端广播消息
await asyncio.gather(
*[client.send(message) for client in connected_clients if client != websocket and client.open],
return_exceptions=True
)
# 每2秒推送一次新数据
await asyncio.sleep(2)
except websockets.exceptions.ConnectionClosed:
print(f"客户端 {websocket.remote_address} 已断开连接")
finally:
# 客户端断开后,从集合中移除
connected_clients.remove(websocket)
print(f"当前连接数: {len(connected_clients)}")
async def main():
async with websockets.serve(handler, "0.0.0.0", 8766):
print("股票行情服务器启动在 ws://0.0.0.0:8766")
await asyncio.Future() # run_forever()
if __name__ == "__main__":
asyncio.run(main())
客户端代码 (stock_client.py)
import asyncio
import websockets
import json
async def subscribe():
uri = "ws://localhost:8766"
async with websockets.connect(uri) as websocket:
print(f"已连接到股票行情服务器 {uri}")
print("等待实时数据...")
try:
# 客户端只需要不断接收服务器的推送
async for message in websocket:
data = json.loads(message)
print(
f"时间戳: {data['timestamp']} | "
f"股票: {data['symbol']} | "
f"价格: ${data['price']} | "
f"涨跌: ${data['change']:+.2f}"
)
except websockets.exceptions.ConnectionClosed:
print("与服务器连接已关闭。")
if __name__ == "__main__":
asyncio.run(subscribe())
这个例子中,客户端连接后就不需要再发送任何消息,而是被动地接收服务器持续推送的实时数据,这是典型的 "Push" 模型。
其他库和框架
websockify
websockify 通常用作一个代理,它将传统的 WebSocket 流量(如 VNC 的 noVNC)转发到 TCP 端口,它更偏向于底层和代理场景,而不是直接的应用逻辑开发。
FastAPI / Starlette / Daphne
如果你在使用像 FastAPI 这样的现代 Web 框架,它们通常内置了对 WebSocket 的支持,或者可以轻松集成,这种方式的好处是你可以将 WebSocket 端点和你的 REST API 端点放在同一个应用中。
FastAPI WebSocket 示例 (极简)
from fastapi import FastAPI, WebSocket
import asyncio
app = FastAPI
# 使用一个列表来存储所有连接的 WebSocket 客户端
active_connections = []
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
active_connections.append(websocket)
try:
while True:
# 接收来自客户端的消息
data = await websocket.receive_text()
# 广播给所有连接的客户端
for connection in active_connections:
await connection.send_text(f"服务器广播: {data}")
except Exception as e:
print(f"WebSocket 错误: {e}")
finally:
active_connections.remove(websocket)
# 你还需要一个 ASGI 服务器来运行它,Uvicorn
# pip install fastapi uvicorn
# uvicorn main:app --reload --host 0.0.0.0 --port 8000
总结与对比
| 特性 | websockets (独立库) |
FastAPI / Starlette (框架集成) |
|---|---|---|
| 适用场景 | 纯粹的 WebSocket 应用,学习协议,小型项目 | 构建包含 REST API 和 WebSocket 的综合性 Web 服务 |
| 易用性 | 简单直接,概念清晰 | 需要学习框架,但开发效率高,生态好 |
| 功能 | 核心 WebSocket 功能,非常灵活 | 提供 WebSocket、路由、依赖注入、数据验证等全套功能 |
| 推荐 | 初学者和专注于 WebSocket 的开发者首选 | 构建现代、全功能的 Web 应用首选 |
关键概念回顾
- 全双工通信:客户端和服务端可以同时、独立地发送和接收数据。
- 长连接:连接一旦建立,会持续存在,直到客户端或服务端主动关闭。
- 异步:WebSocket 编程在 Python 中几乎总是使用
asyncio,因为 I/O 操作(网络收发)是异步的,可以高效地处理成千上万个并发连接。 - 服务端:
websockets.serve()用于创建服务器,handler函数处理每个连接。 - 客户端:
websockets.connect()用于连接服务器,async for message in websocket用于接收消息,await websocket.send()用于发送消息。
希望这个详细的指南能帮助你开始在 Python 中使用 WebSocket 接口!
