杰瑞科技汇

python 实现websocket

核心概念

在开始之前,我们先快速理解一下 WebSocket 的基本流程:

python 实现websocket-图1
(图片来源网络,侵删)
  1. 握手:客户端向服务器发送一个 HTTP 请求,请求头中包含 Upgrade: websocketConnection: Upgrade,以请求将协议从 HTTP 升级为 WebSocket。
  2. 响应:服务器如果支持 WebSocket,会返回一个 101 Switching Protocols 的响应,之后连接就不再是 HTTP 了,而是一个持久的 TCP 连接,双方可以随时发送消息。
  3. 数据传输:数据在 WebSocket 连接上以“帧”(Frame)的形式传输,Python 的库通常会帮我们处理底层的帧格式,我们只需关注发送和接收文本或二进制数据即可。
  4. 关闭:任一方都可以发送一个“关闭帧”来终止连接。

使用 websockets 库(推荐)

websockets 是 Python 中最流行、功能最强大的 WebSocket 客户端和服务器库之一,它基于 asyncio,非常适合处理高并发连接。

安装

你需要安装这个库:

pip install websockets

实现一个简单的 WebSocket 服务器

这个服务器会接收客户端发送的消息,并将其原样返回(一个简单的回声服务器)。

# server.py
import asyncio
import websockets
# async 函数用于处理每个客户端连接
async def handle_connection(websocket, path):
    """
    path 是连接的路径,这里我们暂时不用它
    """
    print(f"新的客户端已连接: {websocket.remote_address}")
    try:
        # 无限循环,持续等待客户端的消息
        async for message in websocket:
            print(f"收到消息: {message}")
            # 将收到的消息原样发送回去
            await websocket.send(f"服务器收到了你的消息: {message}")
    except websockets.exceptions.ConnectionClosed as e:
        print(f"客户端 {websocket.remote_address} 断开连接: {e}")
# 启动 WebSocket 服务器
async def main():
    # host='0.0.0.0' 允许任何 IP 地址的客户端连接
    # port=8765 是我们选择的端口号
    async with websockets.serve(handle_connection, "0.0.0.0", 8765):
        print("WebSocket 服务器已启动,监听 ws://0.0.0.0:8765")
        # 保持服务器运行
        await asyncio.Future()  # run forever
# 运行主函数
asyncio.run(main())

实现一个 WebSocket 客户端

这个客户端会连接到服务器,发送一条消息,然后等待并打印服务器的回复。

python 实现websocket-图2
(图片来源网络,侵删)
# client.py
import asyncio
import websockets
async def connect_to_server():
    uri = "ws://localhost:8765"  # 服务器的地址和端口
    async with websockets.connect(uri) as websocket:
        print("已连接到服务器")
        # 发送一条消息
        message = "你好,服务器!"
        await websocket.send(message)
        print(f"已发送: {message}")
        # 等待服务器的回复
        response = await websocket.recv()
        print(f"收到回复: {response}")
# 运行客户端
asyncio.run(connect_to_server())

如何运行

  1. 启动服务器:在一个终端中运行 python server.py,你会看到 "WebSocket 服务器已启动..." 的提示。
  2. 启动客户端:在另一个终端中运行 python client.py
  3. 观察输出
    • 服务器终端会显示:新的客户端已连接: ('127.0.0.1', 54321)收到消息: 你好,服务器!
    • 客户端终端会显示:已连接到服务器已发送: 你好,服务器!收到回复: 服务器收到了你的消息: 你好,服务器!

使用 Flask-SocketIO(适用于 Web 应用)

如果你的项目是基于 Flask 的 Web 应用,并且你想在网页中使用 WebSocket,Flask-SocketIO 是一个绝佳的选择,它底层使用 Socket.IO 协议,这是 WebSocket 的一个超集,提供了更好的兼容性和额外功能(如房间、广播等)。

安装

pip install Flask-SocketIO

实现一个集成到 Flask 的 WebSocket 服务器

# app.py
from flask import Flask, render_template
from flask_socketio import SocketIO, emit
app = Flask(__name__)
# 设置一个密钥,用于 session
app.config['SECRET_KEY'] = 'your-secret-key'
# 初始化 SocketIO
socketio = SocketIO(app, cors_allowed_origins="*") # cors_allowed_origins 允许跨域
# 当客户端连接时触发
@socketio.on('connect')
def handle_connect():
    print('客户端已连接')
    # 可以主动向客户端发送欢迎消息
    emit('response', {'data': '欢迎连接到 Flask-SocketIO!'})
# 当客户端发送 'message' 事件时触发
@socketio.on('message')
def handle_message(data):
    print(f'收到消息: {data}')
    # 将收到的数据再广播给所有连接的客户端
    emit('response', {'data': f'服务器广播: {data}'})
# 当客户端断开连接时触发
@socketio.on('disconnect')
def handle_disconnect():
    print('客户端已断开连接')
# 一个简单的路由,用于提供 HTML 页面
@app.route('/')
def index():
    return render_template('index.html')
if __name__ == '__main__':
    # 使用 socketio.run 来启动应用,而不是 app.run()
    socketio.run(app, debug=True, host='0.0.0.0', port=5000)

创建一个简单的 HTML 客户端

app.py 同级目录下,创建一个 templates 文件夹,并在其中创建 index.html 文件。

<!-- templates/index.html -->
<!DOCTYPE html>
<html>
<head>Flask-SocketIO 测试</title>
    <script src="https://cdn.socket.io/4.5.4/socket.io.min.js"></script>
</head>
<body>
    <h1>Flask-SocketIO 聊天室</h1>
    <input type="text" id="message_input" placeholder="输入消息...">
    <button onclick="sendMessage()">发送</button>
    <ul id="messages"></ul>
    <script>
        // 连接到服务器
        const socket = io("http://localhost:5000");
        // 监听 'response' 事件,并显示消息
        socket.on('response', function(msg) {
            const item = document.createElement('li');
            item.textContent = msg.data;
            document.getElementById('messages').appendChild(item);
        });
        // 发送消息的函数
        function sendMessage() {
            const input = document.getElementById('message_input');
            const message = input.value;
            if (message) {
                // 发送 'message' 事件到服务器
                socket.emit('message', { data: message });
                input.value = '';
            }
        }
        // 监听连接事件
        socket.on('connect', () => {
            console.log('连接到服务器!');
        });
    </script>
</body>
</html>

如何运行

  1. 启动服务器:运行 python app.py
  2. 打开浏览器:访问 http://127.0.0.1:5000
  3. 测试:在输入框中输入文字并点击发送,你会看到消息出现在下方的列表中,因为服务器配置了广播,所以如果你打开多个浏览器标签页,每个标签页都会收到消息。

手动实现(理解原理)

如果你想深入了解 WebSocket 的工作原理,可以尝试手动实现一个最简单的版本,这需要你处理 HTTP 握手和 WebSocket 数据帧的编解码。

警告:这种方法仅用于学习和理解,生产环境中强烈建议使用成熟的库,因为它们更安全、更高效、功能更全。

python 实现websocket-图3
(图片来源网络,侵删)

服务器实现

# manual_server.py
import socket
import base64
import hashlib
# WebSocket 响应头模板
RESPONSE_TEMPLATE = (
    "HTTP/1.1 101 Switching Protocols\r\n"
    "Upgrade: websocket\r\n"
    "Connection: Upgrade\r\n"
    "Sec-WebSocket-Accept: {accept_key}\r\n"
    "\r\n"
)
def get_accept_key(client_key):
    # WebSocket 握手规范中的魔法字符串
    magic_string = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
    # 将客户端提供的 key 和魔法字符串拼接
    combined = client_key + magic_string
    # 计算SHA1哈希
    sha1_hash = hashlib.sha1(combined.encode()).digest()
    # 将哈希值进行base64编码
    return base64.b64encode(sha1_hash).decode('utf-8')
def handle_handshake(client_socket, client_address):
    # 接收客户端的握手请求
    request = client_socket.recv(1024).decode('utf-8')
    print("收到握手请求:\n", request)
    # 从请求头中提取 Sec-WebSocket-Key
    headers = request.split('\r\n')
    for header in headers:
        if header.startswith('Sec-WebSocket-Key: '):
            client_key = header.split(': ')[1]
            break
    # 计算并生成握手响应
    accept_key = get_accept_key(client_key)
    response = RESPONSE_TEMPLATE.format(accept_key=accept_key)
    client_socket.send(response.encode('utf-8'))
    print("握手成功,连接已升级为 WebSocket")
def simple_decode_message(data):
    # 解码简单的文本帧 (只处理最基本的情况)
    # 第一个字节的第5位是1表示这是一个文本帧
    # 第2个字节的后7位是数据的长度
    if len(data) < 2:
        return None
    payload_length = data[1] & 0x7F
    if payload_length == 126:
        # 长度在2字节扩展中 (这里不处理)
        return None
    elif payload_length == 127:
        # 长度在8字节扩展中 (这里不处理)
        return None
    # 掩码从第4个字节开始
    mask = data[2:6]
    payload_data = data[6:6+payload_length]
    # 使用掩码对数据进行解码
    decoded_bytes = bytearray()
    for i in range(len(payload_data)):
        decoded_bytes.append(payload_data[i] ^ mask[i % 4])
    return decoded_bytes.decode('utf-8')
def main():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind(('0.0.0.0', 8765))
    server_socket.listen(5)
    print("手动 WebSocket 服务器已启动,监听 0.0.0.0:8765")
    while True:
        client_socket, client_address = server_socket.accept()
        print(f"新的连接来自: {client_address}")
        # 1. 处理握手
        handle_handshake(client_socket, client_address)
        # 2. 进入消息循环
        try:
            while True:
                # 接收数据
                data = client_socket.recv(4096)
                if not data:
                    break # 连接已关闭
                # 解码消息
                message = simple_decode_message(data)
                if message:
                    print(f"收到消息: {message}")
                    # 简单的回显
                    response = f"手动服务器回复: {message}"
                    # 手动编码响应帧 (只处理最简单的文本帧)
                    response_bytes = response.encode('utf-8')
                    frame = bytearray()
                    # FIN + RSV + Opcode (0x1 for text)
                    frame.append(0b10000001)
                    # Payload length
                    frame.append(len(response_bytes))
                    # Payload data
                    frame.extend(response_bytes)
                    client_socket.send(frame)
        except Exception as e:
            print(f"发生错误: {e}")
        finally:
            client_socket.close()
            print(f"客户端 {client_address} 已断开连接")
if __name__ == '__main__':
    main()

客户端实现

客户端也需要遵循同样的协议,使用 websockets 库作为客户端来测试我们的手动服务器是最方便的。

# test_manual_client.py
import asyncio
import websockets
async def test_manual_server():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        print("已连接到手动服务器")
        await websocket.send("你好,手动服务器!")
        print("已发送消息")
        response = await websocket.recv()
        print(f"收到回复: {response}")
asyncio.run(test_manual_server())

总结与选择

方法 优点 缺点 适用场景
websockets - 轻量级、高性能
- 纯 Python,基于 asyncio
- 语法简洁,文档完善
- 需要理解 asyncio 概念 - 构建独立的 WebSocket 服务
- 后台任务、实时数据推送
- 高性能 I/O 应用
Flask-SocketIO - 与 Flask 无缝集成
- 功能强大(房间、广播、命名空间)
- 前端使用简单
- 依赖较多
- 协议是 Socket.IO,不是纯 WebSocket
- 在 Flask Web 应用中添加实时功能
- 聊天室、实时通知、在线游戏
手动实现 - 深入理解 WebSocket 协议原理
- 无外部依赖
- 代码复杂、易出错
- 功能有限,不安全
- 性能可能不佳
- 学习和研究目的
- 面试中考察基础

对于绝大多数实际项目,推荐使用 websocketsFlask-SocketIO,它们经过了大量实践的检验,能够让你专注于业务逻辑,而不是底层的协议细节。

分享:
扫描分享到社交APP
上一篇
下一篇