杰瑞科技汇

Python如何实现WebSocket接口?

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它允许服务器主动向客户端推送数据,非常适合实时应用,如在线聊天、实时数据推送、在线游戏等。

Python如何实现WebSocket接口?-图1
(图片来源网络,侵删)

在 Python 中,有多个库可以实现 WebSocket,但最常用和最推荐的是 websockets 库,我们将以它为核心,并介绍另一个更底层的库 websockify


核心库:websockets (推荐)

websockets 是一个功能强大、文档完善的库,支持 Python 3.6+,它既可以用于编写客户端,也可以用于编写服务端。

安装

你需要安装这个库:

pip install websockets

使用 websockets 编写一个简单的服务端

服务端负责监听客户端的连接,处理收到的消息,并向客户端发送消息。

Python如何实现WebSocket接口?-图2
(图片来源网络,侵删)

下面是一个简单的 "Echo Server"(回声服务器),它会将客户端发来的任何消息原样返回。

服务端代码 (server.py)

import asyncio
import websockets
# 定义一个异步函数,用于处理每个客户端的连接
async def handler(websocket, path):
    """
    这个函数会在每个新的 WebSocket 连接建立时被调用。
    :param websocket: WebSocket 连接对象,用于收发消息。
    :param path: 客户端连接的路径。
    """
    print(f"新客户端已连接: {websocket.remote_address} on path {path}")
    try:
        # 无限循环,持续等待并接收来自客户端的消息
        async for message in websocket:
            print(f"收到来自客户端的消息: {message}")
            # 将收到的消息原样发送回客户端
            await websocket.send(f"服务器已收到你的消息: {message}")
            print(f"已向客户端回送消息")
    except websockets.exceptions.ConnectionClosed as e:
        # 当客户端正常断开连接时,会触发这个异常
        print(f"客户端 {websocket.remote_address} 已断开连接: {e}")
    except Exception as e:
        print(f"发生错误: {e}")
# 启动 WebSocket 服务器
async def main():
    # 监听所有网络接口的 8765 端口
    host = "0.0.0.0"
    port = 8765
    print(f"WebSocket 服务器启动在 ws://{host}:{port}")
    # websockets.serve 会创建一个 WebSocket 服务器
    # handler 是处理连接的回调函数
    async with websockets.serve(handler, host, port):
        # 保持服务器运行,直到被手动停止 (Ctrl+C)
        await asyncio.Future()  # run_forever()
# 运行主函数
if __name__ == "__main__":
    asyncio.run(main())

如何运行服务端:

  1. 将上述代码保存为 server.py
  2. 在终端中运行:
    python server.py
  3. 你会看到输出:WebSocket 服务器启动在 ws://0.0.0.0:8765,服务器开始等待连接。

使用 websockets 编写一个简单的客户端

客户端用于连接到 WebSocket 服务端,并发送/接收消息。

Python如何实现WebSocket接口?-图3
(图片来源网络,侵删)

客户端代码 (client.py)

import asyncio
import websockets
import json
async def hello():
    # WebSocket 服务器的地址
    uri = "ws://localhost:8765"
    try:
        # 建立连接
        async with websockets.connect(uri) as websocket:
            print(f"已连接到服务器 {uri}")
            # 创建一个任务来持续接收服务器的消息
            receive_task = asyncio.create_task(receive_messages(websocket))
            # 允许用户从控制台输入并发送消息
            while True:
                message = input("请输入要发送的消息 (输入 'exit' 退出): ")
                if message.lower() == 'exit':
                    break
                # 将消息发送给服务器
                await websocket.send(message)
                print(f"已发送消息: {message}")
    except websockets.exceptions.ConnectionClosed:
        print("与服务器连接已关闭。")
    except ConnectionRefusedError:
        print("无法连接到服务器,请确保服务端已启动。")
    finally:
        # 取消接收任务
        if 'receive_task' in locals() and not receive_task.done():
            receive_task.cancel()
# 异步函数,用于接收服务器发来的消息
async def receive_messages(websocket):
    try:
        async for message in websocket:
            print(f"\n收到服务器消息: {message}\n")
    except websockets.exceptions.ConnectionClosed:
        print("接收消息时连接已关闭。")
# 运行客户端
if __name__ == "__main__":
    asyncio.run(hello())

如何运行客户端:

  1. 确保服务端 (server.py) 正在运行。
  2. 将上述代码保存为 client.py
  3. 在另一个终端中运行:
    python client.py
  4. 你可以在客户端的控制台输入任何消息,按回车后,服务端会收到并回显,同时客户端也会显示服务端的回显消息。

更高级的例子:实时数据推送(股票行情)

这个例子展示了 WebSocket 的核心优势:服务器主动向客户端推送数据,而无需客户端轮询。

服务端代码 (stock_server.py)

import asyncio
import websockets
import random
import json
import time
# 模拟一个股票行情生成器
def generate_stock_data():
    symbols = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
    stock = random.choice(symbols)
    price = round(random.uniform(100, 2000), 2)
    change = round(random.uniform(-10, 10), 2)
    timestamp = time.time()
    return {
        "symbol": stock,
        "price": price,
        "change": change,
        "timestamp": timestamp
    }
# 处理连接,将新连接的客户端添加到集合中
connected_clients = set()
async def handler(websocket, path):
    print(f"新客户端已连接: {websocket.remote_address}")
    connected_clients.add(websocket)
    try:
        # 客户端连接后,持续向其推送数据
        while True:
            data = generate_stock_data()
            message = json.dumps(data)
            # 向所有连接的客户端广播消息
            await asyncio.gather(
                *[client.send(message) for client in connected_clients if client != websocket and client.open],
                return_exceptions=True
            )
            # 每2秒推送一次新数据
            await asyncio.sleep(2)
    except websockets.exceptions.ConnectionClosed:
        print(f"客户端 {websocket.remote_address} 已断开连接")
    finally:
        # 客户端断开后,从集合中移除
        connected_clients.remove(websocket)
        print(f"当前连接数: {len(connected_clients)}")
async def main():
    async with websockets.serve(handler, "0.0.0.0", 8766):
        print("股票行情服务器启动在 ws://0.0.0.0:8766")
        await asyncio.Future() # run_forever()
if __name__ == "__main__":
    asyncio.run(main())

客户端代码 (stock_client.py)

import asyncio
import websockets
import json
async def subscribe():
    uri = "ws://localhost:8766"
    async with websockets.connect(uri) as websocket:
        print(f"已连接到股票行情服务器 {uri}")
        print("等待实时数据...")
        try:
            # 客户端只需要不断接收服务器的推送
            async for message in websocket:
                data = json.loads(message)
                print(
                    f"时间戳: {data['timestamp']} | "
                    f"股票: {data['symbol']} | "
                    f"价格: ${data['price']} | "
                    f"涨跌: ${data['change']:+.2f}"
                )
        except websockets.exceptions.ConnectionClosed:
            print("与服务器连接已关闭。")
if __name__ == "__main__":
    asyncio.run(subscribe())

这个例子中,客户端连接后就不需要再发送任何消息,而是被动地接收服务器持续推送的实时数据,这是典型的 "Push" 模型。


其他库和框架

websockify

websockify 通常用作一个代理,它将传统的 WebSocket 流量(如 VNC 的 noVNC)转发到 TCP 端口,它更偏向于底层和代理场景,而不是直接的应用逻辑开发。

FastAPI / Starlette / Daphne

如果你在使用像 FastAPI 这样的现代 Web 框架,它们通常内置了对 WebSocket 的支持,或者可以轻松集成,这种方式的好处是你可以将 WebSocket 端点和你的 REST API 端点放在同一个应用中。

FastAPI WebSocket 示例 (极简)

from fastapi import FastAPI, WebSocket
import asyncio
app = FastAPI
# 使用一个列表来存储所有连接的 WebSocket 客户端
active_connections = []
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    active_connections.append(websocket)
    try:
        while True:
            # 接收来自客户端的消息
            data = await websocket.receive_text()
            # 广播给所有连接的客户端
            for connection in active_connections:
                await connection.send_text(f"服务器广播: {data}")
    except Exception as e:
        print(f"WebSocket 错误: {e}")
    finally:
        active_connections.remove(websocket)
# 你还需要一个 ASGI 服务器来运行它,Uvicorn
# pip install fastapi uvicorn
# uvicorn main:app --reload --host 0.0.0.0 --port 8000

总结与对比

特性 websockets (独立库) FastAPI / Starlette (框架集成)
适用场景 纯粹的 WebSocket 应用,学习协议,小型项目 构建包含 REST API 和 WebSocket 的综合性 Web 服务
易用性 简单直接,概念清晰 需要学习框架,但开发效率高,生态好
功能 核心 WebSocket 功能,非常灵活 提供 WebSocket、路由、依赖注入、数据验证等全套功能
推荐 初学者和专注于 WebSocket 的开发者首选 构建现代、全功能的 Web 应用首选

关键概念回顾

  • 全双工通信:客户端和服务端可以同时、独立地发送和接收数据。
  • 长连接:连接一旦建立,会持续存在,直到客户端或服务端主动关闭。
  • 异步:WebSocket 编程在 Python 中几乎总是使用 asyncio,因为 I/O 操作(网络收发)是异步的,可以高效地处理成千上万个并发连接。
  • 服务端websockets.serve() 用于创建服务器,handler 函数处理每个连接。
  • 客户端websockets.connect() 用于连接服务器,async for message in websocket 用于接收消息,await websocket.send() 用于发送消息。

希望这个详细的指南能帮助你开始在 Python 中使用 WebSocket 接口!

分享:
扫描分享到社交APP
上一篇
下一篇