杰瑞科技汇

Python如何模拟WebSocket实现?

  1. 模拟 WebSocket 服务器:用于测试客户端代码,无需依赖真实的后端服务。
  2. 模拟 WebSocket 客户端:用于测试服务器逻辑,或者在没有浏览器环境的情况下进行自动化测试。

下面我将分别介绍如何使用 Python 实现这两种模拟,并提供详细的代码示例。

Python如何模拟WebSocket实现?-图1
(图片来源网络,侵删)

核心概念

在开始之前,我们先快速回顾一下 WebSocket 的基本流程:

  1. 握手:客户端发送一个 HTTP Upgrade 请求,请求将协议从 HTTP/1.1 升级到 WebSocket。
  2. 响应:服务器如果支持 WebSocket,会返回一个 101 Switching Protocols 响应,之后连接就变成了全双工的 WebSocket 通道。
  3. 数据传输:客户端和服务器之间可以互相发送消息(文本或二进制)。
  4. 关闭:任一方可以发送“关闭帧”来终止连接。

我们将使用 Python 内置的 http.serversocketserver 来处理 HTTP 握手,并使用 threading 来处理并发连接。


模拟一个简单的 WebSocket 服务器

这是最常见的需求,比如你想测试你的前端 JavaScript WebSocket 客户端是否工作正常。

我们将创建一个服务器,它可以:

Python如何模拟WebSocket实现?-图2
(图片来源网络,侵删)
  • 接受客户端的连接和握手。
  • 接收客户端发来的消息,并打印出来。
  • 向客户端发送一个“你好,客户端!”的欢迎消息。
  • 保持连接,等待客户端发送“exit”消息来关闭连接。

代码实现

# server.py
import base64
import hashlib
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
import json
# 一个简单的内存存储,用于保存客户端连接
clients = {}
class WebSocketHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        # 检查是否是 WebSocket 握手请求
        if self.headers.get("Upgrade", "").lower() == "websocket":
            self.handle_websocket_handshake()
        else:
            self.send_error(400, "Bad Request: Not a WebSocket request.")
    def handle_websocket_handshake(self):
        # 1. 获取 Sec-WebSocket-Key
        websocket_key = self.headers.get("Sec-WebSocket-Key")
        # 2. 生成 Sec-WebSocket-Accept
        # 协议规定:将 Sec-WebSocket-Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" 进行 SHA1 哈希,然后进行 base64 编码
        magic_string = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
        accept_key = base64.b64encode(hashlib.sha1((websocket_key + magic_string).encode('utf-8')).digest()).decode('utf-8')
        # 3. 发送 101 Switching Protocols 响应
        self.send_response(101)
        self.send_header("Upgrade", "websocket")
        self.send_header("Connection", "Upgrade")
        self.send_header("Sec-WebSocket-Accept", accept_key)
        self.end_headers()
        # 4. 握手成功,开始处理数据帧
        # 我们需要一个新的 socket 来处理这个长连接
        # 注意:BaseHTTPRequestHandler 的 wfile 在这里已经无效了
        # 我们需要从请求中获取原始的 socket
        connection_socket = self.connection
        client_address = self.client_address
        # 为这个客户端创建一个唯一的 ID
        client_id = f"{client_address[0]}:{client_address[1]}"
        clients[client_id] = connection_socket
        print(f"Client connected: {client_id}")
        # 发送欢迎消息
        welcome_message = "Hello, Client!"
        self.send_frame(connection_socket, welcome_message.encode('utf-8'))
        # 持续监听客户端消息
        try:
            while True:
                message = self.receive_frame(connection_socket)
                if message is None: # 连接已关闭
                    break
                decoded_message = message.decode('utf-8')
                print(f"Received from {client_id}: {decoded_message}")
                # 如果收到 "exit",则关闭连接
                if decoded_message.lower() == "exit":
                    print(f"Client {client_id} requested to close.")
                    break
                # 回显消息
                response_message = f"Server received: {decoded_message}"
                self.send_frame(connection_socket, response_message.encode('utf-8'))
        except ConnectionResetError:
            print(f"Client {client_id} disconnected unexpectedly.")
        finally:
            print(f"Client {client_id} disconnected.")
            if client_id in clients:
                del clients[client_id]
            connection_socket.close()
    def send_frame(self, sock, payload):
        """构建并发送 WebSocket 数据帧"""
        # 使用一个简单的帧格式: FIN=1, RSV=0, Opcode=0x1 (Text)
        # Mask=0 (服务器发送的消息不需要 mask)
        b1 = 0b10000001  # FIN + Text Frame
        b2 = len(payload)
        # 构建帧头
        if b2 < 126:
            header = bytes([b1, b2])
        elif b2 < 65536:
            header = bytes([b1, 126]) + b2.to_bytes(2, 'big')
        else:
            header = bytes([b1, 127]) + b2.to_bytes(8, 'big')
        sock.send(header + payload)
    def receive_frame(self, sock):
        """接收并解析 WebSocket 数据帧"""
        # 读取前两个字节
        first_two_bytes = sock.recv(2)
        if not first_two_bytes:
            return None # 连接已关闭
        b1, b2 = first_two_bytes
        # 解析 FIN, RSV, Opcode (这里我们只关心 Text Frame)
        # opcode = b1 & 0x0f
        # is_final = (b1 & 0x80) != 0
        # 解析 Mask 和 Payload Length
        is_masked = (b2 & 0x80) != 0
        payload_len = b2 & 0x7f
        # 读取扩展长度(如果需要)
        if payload_len == 126:
            payload_len = int.from_bytes(sock.recv(2), 'big')
        elif payload_len == 127:
            payload_len = int.from_bytes(sock.recv(8), 'big')
        # 读取 Mask Key (如果消息被掩码)
        mask_key = None
        if is_masked:
            mask_key = sock.recv(4)
        # 读取 Payload
        payload = sock.recv(payload_len)
        # 解码 Payload (如果被掩码)
        if is_masked and mask_key:
            decoded_payload = bytes([payload[i] ^ mask_key[i % 4] for i in range(len(payload))])
        else:
            decoded_payload = payload
        return decoded_payload
def run_server():
    server_address = ('', 9001) # 监听所有网络接口的 9001 端口
    httpd = HTTPServer(server_address, WebSocketHandler)
    print("WebSocket server started on ws://localhost:9001")
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        print("Server is shutting down...")
        httpd.shutdown()
if __name__ == '__main__':
    run_server()

如何运行和测试

  1. 保存代码:将上面的代码保存为 server.py

  2. 运行服务器:在终端中执行 python server.py

  3. 测试客户端

    • 使用浏览器开发者工具

      Python如何模拟WebSocket实现?-图3
      (图片来源网络,侵删)
      • 打开浏览器的开发者工具 (F12)。

      • 在 Console (控制台) 中输入以下 JavaScript 代码:

        const ws = new WebSocket('ws://localhost:9001');
        ws.onopen = function(event) {
            console.log('Connection established!');
            ws.send('Hello from browser client!');
        };
        ws.onmessage = function(event) {
            console.log('Message from server:', event.data);
        };
        ws.onclose = function(event) {
            console.log('Connection closed.');
        };
        ws.onerror = function(error) {
            console.error('WebSocket Error:', error);
        };
      • 你会看到控制台输出,并且服务器终端也会打印出接收到的消息。

    • 使用 Python 客户端(见方案二)


模拟一个 WebSocket 客户端

这个场景通常用于自动化测试你的 WebSocket 服务器,或者在没有浏览器环境时与服务器交互。

我们将创建一个客户端,它可以:

  • 连接到上面创建的模拟服务器。
  • 发送一条初始消息。
  • 等待并接收服务器的回复。
  • 保持连接,直到发送 "exit" 或服务器关闭。

代码实现

# client.py
import socket
import base64
import hashlib
import threading
import json
class WebSocketClient:
    def __init__(self, host='localhost', port=9001):
        self.host = host
        self.port = port
        self.socket = None
    def connect(self):
        """连接到 WebSocket 服务器并进行握手"""
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.connect((self.host, self.port))
        # 1. 发送握手请求
        handshake_key = base64.b64encode(hashlib.sha1(b"dummy_key_for_client").digest()).decode('utf-8')
        handshake_request = (
            f"GET / HTTP/1.1\r\n"
            f"Host: {self.host}:{self.port}\r\n"
            f"Upgrade: websocket\r\n"
            f"Connection: Upgrade\r\n"
            f"Sec-WebSocket-Key: {handshake_key}\r\n"
            f"Sec-WebSocket-Version: 13\r\n"
            f"\r\n"
        )
        self.socket.send(handshake_request.encode('utf-8'))
        # 2. 接收并验证握手响应
        response = self.socket.recv(1024).decode('utf-8')
        if "101 Switching Protocols" in response:
            print("Connected to WebSocket server!")
            # 启动一个线程来持续接收消息
            receive_thread = threading.Thread(target=self.receive_messages)
            receive_thread.daemon = True
            receive_thread.start()
        else:
            print("Failed to connect to WebSocket server.")
            self.socket.close()
    def send_message(self, message):
        """发送消息到服务器"""
        if self.socket:
            # 构建帧并发送
            b1 = 0b10000001  # FIN + Text Frame
            payload = message.encode('utf-8')
            b2 = len(payload)
            header = bytes([b1, b2])
            self.socket.send(header + payload)
            print(f"Sent: {message}")
        else:
            print("Socket is not connected.")
    def receive_messages(self):
        """持续接收服务器发来的消息"""
        while True:
            try:
                message = self.receive_frame()
                if message is None:
                    print("Server closed the connection.")
                    break
                print(f"Received: {message.decode('utf-8')}")
            except ConnectionResetError:
                print("Connection lost.")
                break
            except Exception as e:
                print(f"An error occurred: {e}")
                break
        self.socket.close()
    def receive_frame(self):
        """接收并解析服务器发来的数据帧"""
        # 读取前两个字节
        first_two_bytes = self.socket.recv(2)
        if not first_two_bytes:
            return None # 连接已关闭
        b1, b2 = first_two_bytes
        # 解析 Mask 和 Payload Length
        is_masked = (b2 & 0x80) != 0 # 客户端接收的消息通常是被掩码的
        payload_len = b2 & 0x7f
        # 读取扩展长度
        if payload_len == 126:
            payload_len = int.from_bytes(self.socket.recv(2), 'big')
        elif payload_len == 127:
            payload_len = int.from_bytes(self.socket.recv(8), 'big')
        # 读取 Mask Key
        mask_key = None
        if is_masked:
            mask_key = self.socket.recv(4)
        # 读取 Payload
        payload = self.socket.recv(payload_len)
        # 解码 Payload
        if is_masked and mask_key:
            decoded_payload = bytes([payload[i] ^ mask_key[i % 4] for i in range(len(payload))])
        else:
            decoded_payload = payload
        return decoded_payload
    def close(self):
        """关闭连接"""
        if self.socket:
            self.socket.close()
            print("Connection closed.")
if __name__ == '__main__':
    client = WebSocketClient()
    client.connect()
    try:
        # 发送几条消息
        client.send_message("Hello from Python client!")
        # 模拟持续交互
        while True:
            user_input = input("Enter a message to send (or 'exit' to quit): ")
            if user_input.lower() == 'exit':
                client.send_message("exit")
                break
            client.send_message(user_input)
    except KeyboardInterrupt:
        print("\nClient shutting down...")
    finally:
        client.close()

如何运行和测试

  1. 确保服务器运行:在第一个终端中运行 python server.py
  2. 运行客户端:在第二个终端中运行 python client.py
  3. 观察交互
    • 客户端会自动连接并发送 "Hello from Python client!"。
    • 服务器会打印接收到的消息,并回显一条。
    • 客户端会进入一个循环,等待你输入,输入任何内容并回车,客户端就会发送给服务器。
    • 输入 exit 并回车,客户端会发送 "exit" 给服务器,然后关闭连接。

更高级的方案:使用现有的库

虽然手动实现有助于理解 WebSocket 协议,但在实际项目中,我们通常会使用成熟的库来避免处理底层的细节。

模拟服务器:使用 websockets

websockets 是 Python 中最流行的 WebSocket 库之一。

安装

pip install websockets

模拟服务器代码 (server_advanced.py)

import asyncio
import websockets
async def handler(websocket, path):
    print(f"Client connected from {path}")
    try:
        # 发送欢迎消息
        await websocket.send("Hello, Client! (from advanced server)")
        print("Sent: Hello, Client!")
        # 持续接收消息
        async for message in websocket:
            print(f"Received: {message}")
            if message.lower() == "exit":
                print("Client requested to close.")
                await websocket.close()
                break
            # 回显消息
            response = f"Server received: {message}"
            await websocket.send(response)
            print(f"Sent: {response}")
    except websockets.exceptions.ConnectionClosedOK:
        print("Client disconnected gracefully.")
    except websockets.exceptions.ConnectionClosedError:
        print("Client disconnected unexpectedly.")
async def main():
    async with websockets.serve(handler, "localhost", 9002):
        print("Advanced WebSocket server started on ws://localhost:9002")
        await asyncio.Future()  # 永远运行
if __name__ == "__main__":
    asyncio.run(main())

模拟客户端:使用 websockets

客户端代码 (client_advanced.py)

import asyncio
import websockets
async def hello():
    uri = "ws://localhost:9002"
    async with websockets.connect(uri) as websocket:
        print("Connected to server!")
        # 接收欢迎消息
        welcome_message = await websocket.recv()
        print(f"Received: {welcome_message}")
        # 发送消息
        await websocket.send("Hello from advanced client!")
        print("Sent: Hello from advanced client!")
        # 接收回复
        reply = await websocket.recv()
        print(f"Received: {reply}")
        # 模拟持续交互
        while True:
            user_input = input("Enter a message (or 'exit' to quit): ")
            if user_input.lower() == 'exit':
                await websocket.send("exit")
                break
            await websocket.send(user_input)
            # 等待服务器回复
            response = await websocket.recv()
            print(f"Received: {response}")
if __name__ == "__main__":
    asyncio.run(hello())
方案 优点 缺点 适用场景
手动实现 - 深入理解 WebSocket 协议细节
- 无需外部依赖
- 代码量大,易出错
- 功能有限(如不支持二进制帧、ping/pong等)
学习、教学、简单测试
websockets - 代码简洁,功能强大
- 支持异步,性能好
- 稳定可靠,社区活跃
- 需要额外安装 生产环境、复杂应用、自动化测试

对于绝大多数项目,强烈推荐使用 websockets 这样的成熟库,手动实现则更适合于学习和理解协议本身。

分享:
扫描分享到社交APP
上一篇
下一篇