WebSocket 学习教程

实时网络通信技术详解与实战

目录

1. WebSocket 简介

思考问题

  1. 传统的 HTTP 通信在实现实时应用时有哪些局限性?
  2. 为什么现代 Web 应用需要双向通信能力?
  3. 你能想象哪些场景需要服务器主动向客户端推送数据?
  4. 在实现实时通信时,长轮询和服务器发送事件(SSE)与 WebSocket 相比有什么缺点?
  5. WebSocket 如何影响应用架构和扩展性设计?

在学习本章内容前,请思考这些问题。这将帮助你更好地理解 WebSocket 技术的价值和应用场景。

1.1 什么是 WebSocket

WebSocket 是一种网络通信协议,提供全双工通信通道,使服务器和客户端之间能够在单个 TCP 连接上进行双向实时数据传输。它是 HTML5 规范的一部分,被设计用来克服 HTTP 协议在实时通信方面的局限性。

与传统的 HTTP 请求-响应模式不同,WebSocket 一旦建立连接,就允许数据在任何时间点、从任何一方发送到另一方,无需重新建立连接,这使得它特别适合需要低延迟、高频率数据交换的应用场景。

WebSocket 协议图解

WebSocket 协议流程图

1.2 WebSocket 的起源与发展

WebSocket 协议由 Ian Hickson 起草,并由 IETF 作为 RFC 6455 标准化,同时由 W3C 将 WebSocket API 标准化为 Web IDL 规范的一部分。该协议于 2011 年完成标准化,目的是解决 Web 应用中实时通信的需求,替代当时流行但效率较低的解决方案,如长轮询(Long Polling)和 Comet。

随着移动互联网、物联网和实时 Web 应用的兴起,WebSocket 技术得到了广泛采用,成为现代 Web 开发中不可或缺的一部分。

1.3 WebSocket 的主要特点

WebSocket URL 使用 ws://wss://(安全 WebSocket)协议标识符,类似于 HTTP 使用 http://https://

1.4 WebSocket 的应用场景

WebSocket 技术特别适合以下应用场景:

实践思考

思考一个你熟悉的 Web 应用,它如何从 WebSocket 技术中受益?如果要将该应用改造为使用 WebSocket,主要的架构变化会是什么?

2. WebSocket 与 HTTP 对比

思考问题

  1. HTTP 协议的无状态特性如何影响实时应用的开发?
  2. 在不使用 WebSocket 的情况下,如何实现类似的实时通信功能?
  3. 为什么 WebSocket 比轮询和长轮询更适合高频数据交换?
  4. 什么样的应用场景下,HTTP 可能比 WebSocket 更合适?
  5. WebSocket 和 HTTP/2 推送功能有何异同?

在学习本章内容前,请思考这些问题。这将帮助你理解不同协议的优缺点和适用场景。

2.1 通信模型比较

HTTP 和 WebSocket 在通信模型上有根本性的区别:

特性 HTTP WebSocket
通信方式 单向(请求-响应) 双向(全双工)
连接特性 短连接(非持久)或多请求持久连接 长连接(持久)
状态 无状态 有状态
头部开销 每次请求都有完整的头部 仅在建立连接时有完整头部,之后很小
实时性 依赖客户端请求 服务器可主动推送
交互模式 同步 异步
适用场景 常规网页、RESTful API、数据提交 聊天、游戏、实时监控、协作工具
HTTP vs WebSocket 通信模型对比

HTTP 与 WebSocket 通信模型对比图

2.2 HTTP 实时通信解决方案

在 WebSocket 出现之前,开发者使用多种技术在 HTTP 上模拟实时通信:

2.2.1 轮询(Polling)

客户端定期向服务器发送请求,检查是否有新数据。这种方法简单但效率低下,会产生大量无效请求和服务器负载。

// 简单的轮询示例
function poll() {
    fetch('/api/updates')
        .then(response => response.json())
        .then(data => {
            if(data.hasUpdates) {
                processUpdates(data);
            }
            // 5秒后再次轮询
            setTimeout(poll, 5000);
        })
        .catch(error => console.error('轮询出错:', error));
}

poll(); // 开始轮询

2.2.2 长轮询(Long Polling)

客户端发送请求后,服务器保持连接打开,直到有新数据或超时。这减少了轮询的频率,但仍有连接开销。

// 长轮询示例
function longPoll() {
    fetch('/api/updates?timeout=30000')
        .then(response => response.json())
        .then(data => {
            processUpdates(data);
            // 立即发起新的长轮询请求
            longPoll();
        })
        .catch(error => {
            console.error('长轮询出错:', error);
            // 出错时稍等后重试
            setTimeout(longPoll, 1000);
        });
}

longPoll(); // 开始长轮询

2.2.3 服务器发送事件(Server-Sent Events, SSE)

允许服务器通过单个 HTTP 连接向客户端推送数据。这是单向的(只能服务器到客户端),但比轮询更高效。

// SSE 示例
const eventSource = new EventSource('/api/events');

eventSource.onmessage = function(event) {
    const data = JSON.parse(event.data);
    processUpdates(data);
};

eventSource.onerror = function(error) {
    console.error('SSE 出错:', error);
    eventSource.close();
};

2.3 性能与资源使用对比

WebSocket 相比 HTTP 实时通信解决方案有显著的性能优势:

性能指标 轮询 长轮询 SSE WebSocket
连接开销 高(频繁连接) 中(较少连接) 低(单一连接) 低(单一连接)
头部开销 中(仅首次高) 低(仅首次高)
数据实时性 低(受轮询间隔限制) 中(有延迟) 高(服务器推送) 高(即时双向)
服务器资源消耗 低-中
网络带宽使用 低-中
通信方向 客户端到服务器 客户端到服务器 服务器到客户端 双向

在高频数据交换(如在线游戏、协作编辑)或需要低延迟的场景中,WebSocket 的性能优势尤为明显。一项研究表明,在高并发环境下,WebSocket 可以比轮询减少高达 90% 的服务器负载。

2.4 何时选择 WebSocket 或 HTTP

选择合适的协议应基于应用需求:

适合使用 WebSocket 的场景:

适合使用 HTTP 的场景:

实践思考

分析一个现有的使用长轮询实现"实时"功能的应用,估算如果改用 WebSocket 协议,会节省多少请求次数和带宽。考虑用户数量、更新频率和每次请求的数据量等因素。

3. WebSocket API

思考问题

  1. 浏览器原生 WebSocket API 提供了哪些基本功能?
  2. WebSocket 连接的各个生命周期事件分别代表什么状态?
  3. 如何判断一个 WebSocket 连接是否仍然活跃?
  4. WebSocket 协议如何处理不同类型的数据(文本、二进制)?
  5. 在开发中,如何优雅地处理 WebSocket 重连逻辑?

请思考这些问题,帮助你理解 WebSocket API 的核心概念和使用模式。

3.1 WebSocket 协议基础

WebSocket 协议是一个独立的 TCP 协议,以 HTTP 握手开始,但随后转换为自己的协议。

3.1.1 WebSocket URL

WebSocket 使用特殊的 URL 方案:

示例:wss://echo.websocket.org

在生产环境中,始终使用 wss:// 协议以确保数据传输安全。非加密的 ws:// 连接容易受到中间人攻击。

3.1.2 WebSocket 握手

WebSocket 连接始于 HTTP 握手,客户端发送特殊的升级请求,服务器接受后协议切换为 WebSocket:

// 客户端请求
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13

// 服务器响应
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat

3.2 浏览器 WebSocket API

Web 浏览器提供了简洁的原生 WebSocket API,用于创建和管理 WebSocket 连接。

3.2.1 创建 WebSocket 连接

// 创建一个新的 WebSocket 连接
const socket = new WebSocket('wss://example.com/socketserver');

// 可选:指定协议
const chatSocket = new WebSocket('wss://example.com/chat', ['json', 'xml']);

3.2.2 WebSocket 事件

WebSocket API 提供了四个主要事件用于处理连接的生命周期和消息传递:

// 连接建立时触发
socket.onopen = function(event) {
    console.log('WebSocket 连接已建立');
    // 可以开始发送消息
    socket.send('Hello Server!');
};

// 收到消息时触发
socket.onmessage = function(event) {
    console.log('收到消息: ' + event.data);
    // 处理收到的消息
};

// 连接关闭时触发
socket.onclose = function(event) {
    console.log('WebSocket 连接已关闭');
    // event.code: 关闭码
    // event.reason: 关闭原因
    // event.wasClean: 是否是干净关闭
};

// 发生错误时触发
socket.onerror = function(error) {
    console.error('WebSocket 错误: ', error);
};

也可以使用 EventListener 模式:

socket.addEventListener('open', function(event) {
    console.log('WebSocket 连接已建立');
});

socket.addEventListener('message', function(event) {
    console.log('收到消息: ' + event.data);
});

3.2.3 发送数据

WebSocket API 支持发送文本和二进制数据:

// 发送文本数据
socket.send('Hello Server!');

// 发送 JSON 数据
const data = {
    type: 'chat',
    message: 'Hello everyone!',
    timestamp: Date.now()
};
socket.send(JSON.stringify(data));

// 发送二进制数据
const binaryData = new Uint8Array([0, 1, 2, 3]);
socket.send(binaryData.buffer);

3.2.4 关闭连接

WebSocket 连接可以由客户端或服务器关闭:

// 正常关闭连接
socket.close();

// 带关闭码和原因的关闭
socket.close(1000, "操作完成");

// 常见关闭码:
// 1000 - 正常关闭
// 1001 - 离开(如页面关闭)
// 1002 - 协议错误
// 1003 - 无法接受的数据类型
// 1008 - 违反策略
// 1011 - 服务器错误

3.3 WebSocket 连接状态

WebSocket 对象提供了 readyState 属性,用于检查连接的当前状态:

// 检查连接状态
switch(socket.readyState) {
    case WebSocket.CONNECTING: // 0
        console.log('连接中...');
        break;
    case WebSocket.OPEN: // 1
        console.log('连接已打开');
        // 只有在连接打开时才能发送消息
        socket.send('数据');
        break;
    case WebSocket.CLOSING: // 2
        console.log('连接正在关闭');
        break;
    case WebSocket.CLOSED: // 3
        console.log('连接已关闭或无法打开');
        break;
}

在发送消息前,总是检查 readyState 是否为 WebSocket.OPEN,以避免在连接未建立或已关闭时尝试发送数据而导致错误。

3.4 高级功能和最佳实践

3.4.1 心跳机制

为了保持连接活跃并检测连接断开,可以实现心跳机制:

// 实现简单的心跳机制
function heartbeat() {
    if (socket.readyState === WebSocket.OPEN) {
        socket.send(JSON.stringify({type: 'ping'}));
    }
}

// 每30秒发送一次心跳
const heartbeatInterval = setInterval(heartbeat, 30000);

// 收到服务器响应重置计时器
let timeoutId = null;
socket.onmessage = function(event) {
    // 处理消息...
    
    // 清除上一个超时计时器
    if (timeoutId) {
        clearTimeout(timeoutId);
    }
    
    // 设置新的超时计时器
    timeoutId = setTimeout(() => {
        console.log('连接似乎已断开,尝试重连');
        reconnect();
    }, 35000); // 稍长于心跳间隔
};

3.4.2 自动重连

实现可靠的自动重连机制,确保连接中断时能自动恢复:

// 更复杂的 WebSocket 包装器,包含自动重连功能
class ReconnectingWebSocket {
    constructor(url, protocols) {
        this.url = url;
        this.protocols = protocols;
        this.socket = null;
        this.isConnected = false;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
        this.reconnectInterval = 1000; // 起始为1秒
        this.maxReconnectInterval = 30000; // 最大30秒
        this.callbacks = {
            open: [],
            message: [],
            close: [],
            error: []
        };
        
        this.connect();
    }
    
    connect() {
        this.socket = new WebSocket(this.url, this.protocols);
        
        this.socket.onopen = (event) => {
            console.log('WebSocket 连接已建立');
            this.isConnected = true;
            this.reconnectAttempts = 0;
            this.callbacks.open.forEach(callback => callback(event));
        };
        
        this.socket.onmessage = (event) => {
            this.callbacks.message.forEach(callback => callback(event));
        };
        
        this.socket.onclose = (event) => {
            this.isConnected = false;
            this.callbacks.close.forEach(callback => callback(event));
            
            if (!event.wasClean) {
                this.reconnect();
            }
        };
        
        this.socket.onerror = (event) => {
            this.callbacks.error.forEach(callback => callback(event));
        };
    }
    
    reconnect() {
        if (this.reconnectAttempts >= this.maxReconnectAttempts) {
            console.log('达到最大重连次数');
            return;
        }
        
        this.reconnectAttempts++;
        const timeout = Math.min(
            this.reconnectInterval * Math.pow(1.5, this.reconnectAttempts),
            this.maxReconnectInterval
        );
        
        console.log(`尝试在 ${timeout}ms 后重连...`);
        setTimeout(() => this.connect(), timeout);
    }
    
    addEventListener(type, callback) {
        if (this.callbacks[type]) {
            this.callbacks[type].push(callback);
        }
    }
    
    send(data) {
        if (this.isConnected) {
            this.socket.send(data);
            return true;
        }
        return false;
    }
    
    close(code, reason) {
        if (this.socket) {
            this.socket.close(code, reason);
        }
    }
}

// 使用示例
const ws = new ReconnectingWebSocket('wss://example.com/socket');
ws.addEventListener('message', function(event) {
    console.log('收到消息:', event.data);
});

实践练习

使用原生 WebSocket API 创建一个简单的聊天应用,包含以下功能:

  1. 连接到公共 WebSocket 服务器(如 wss://echo.websocket.org)
  2. 发送和接收文本消息
  3. 实现基本的心跳检测功能
  4. 实现简单的重连机制
  5. 显示连接状态

这个练习将帮助你理解 WebSocket API 的基本操作和管理连接生命周期的实践技巧。

4. WebSocket 库与框架

思考问题

  1. 在什么情况下应该使用第三方 WebSocket 库而非原生 API?
  2. Socket.IO 相比原生 WebSocket 提供了哪些额外功能?
  3. 企业级应用中,为何 STOMP 协议经常与 WebSocket 结合使用?
  4. 如何在现有框架(如 React、Vue、Angular)中集成 WebSocket 功能?
  5. 如何评估一个 WebSocket 库的性能和可靠性?

思考这些问题,帮助你选择适合项目需求的 WebSocket 解决方案。

4.1 客户端 WebSocket 库

虽然浏览器原生 WebSocket API 功能完善,但第三方库可以提供更多高级特性和便利性:

4.1.1 Socket.IO 客户端

Socket.IO 是最流行的 WebSocket 库之一,提供了丰富的功能和优雅的回退机制:

// 安装
// npm install socket.io-client

// 使用
import { io } from "socket.io-client";

const socket = io("https://example.com", {
    reconnectionDelayMax: 10000,
    auth: {
        token: "user-token"
    }
});

// 事件处理
socket.on("connect", () => {
    console.log("已连接,ID:", socket.id);
});

// 发送事件
socket.emit("chat message", { text: "Hello!" });

// 接收事件
socket.on("chat message", (data) => {
    console.log("收到消息:", data);
});

// 命名空间和房间
const adminSocket = io("https://example.com/admin");
socket.emit("join", "room1");

// 断开连接
socket.disconnect();

4.1.2 SockJS

SockJS 提供了一个类似 WebSocket 的 API,但在不支持 WebSocket 的环境中具有多种回退选项:

// 安装
// npm install sockjs-client

// 使用
import SockJS from 'sockjs-client';

const sock = new SockJS('https://example.com/echo');

sock.onopen = function() {
    console.log('连接已打开');
    sock.send('test');
};

sock.onmessage = function(e) {
    console.log('收到消息:', e.data);
};

sock.onclose = function() {
    console.log('连接已关闭');
};

Socket.IO 与 SockJS 对比

特性 Socket.IO SockJS
基本协议 自定义协议(基于 WebSocket) WebSocket API 兼容
回退机制 多种回退(长轮询、AJAX等) 多种回退(XHR流、长轮询等)
命名空间/房间 原生支持 不支持(需自行实现)
自动重连 内置 需手动实现
服务器实现 需要 Socket.IO 服务器 SockJS 服务器(多语言支持)
生态系统 庞大,有丰富的插件 多语言支持,与 STOMP 结合良好

4.2 STOMP 协议

STOMP(Simple Text Oriented Messaging Protocol)是一个简单的基于帧的协议,可以在 WebSocket 上实现:

// 安装
// npm install @stomp/stompjs

// 使用
import { Client } from '@stomp/stompjs';

const client = new Client({
    brokerURL: 'ws://example.com/ws',
    connectHeaders: {
        login: 'user',
        passcode: 'password'
    },
    debug: function (str) {
        console.log(str);
    },
    reconnectDelay: 5000,
    heartbeatIncoming: 4000,
    heartbeatOutgoing: 4000
});

client.onConnect = function (frame) {
    // 订阅主题
    const subscription = client.subscribe('/topic/messages', function (message) {
        const payload = JSON.parse(message.body);
        console.log('收到消息:', payload);
    });

    // 发送消息到指定目的地
    client.publish({
        destination: '/app/chat',
        body: JSON.stringify({ content: 'Hello, STOMP' }),
        headers: { priority: '9' }
    });
};

client.onStompError = function (frame) {
    console.error('STOMP 错误:', frame.headers['message']);
};

client.activate();

STOMP 非常适合与 Spring 后端集成,Spring 提供了完整的 STOMP over WebSocket 支持。

4.3 框架集成

现代前端框架提供了便捷的方式集成 WebSocket 功能:

4.3.1 React 集成

// 创建 WebSocket 钩子
import { useState, useEffect, useCallback } from 'react';

function useWebSocket(url) {
    const [socket, setSocket] = useState(null);
    const [isConnected, setIsConnected] = useState(false);
    const [messages, setMessages] = useState([]);

    useEffect(() => {
        // 创建 WebSocket 连接
        const ws = new WebSocket(url);
        
        ws.onopen = () => {
            setIsConnected(true);
        };
        
        ws.onmessage = (event) => {
            try {
                // 尝试解析JSON数据
                const data = JSON.parse(event.data);
                setMessages(prev => [...prev, data]);
            } catch (e) {
                // 如果不是JSON,直接添加文本数据
                setMessages(prev => [...prev, event.data]);
            }
        };
        
        ws.onclose = () => {
            setIsConnected(false);
        };
        
        ws.onerror = (error) => {
            console.error('WebSocket错误:', error);
        };
        
        setSocket(ws);
        
        // 清理函数
        return () => {
            if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) {
                ws.close();
            }
        };
    }, [url]);
    
    // 发送消息方法
    const sendMessage = useCallback((data) => {
        if (socket && socket.readyState === WebSocket.OPEN) {
            // 如果数据是对象,转换为JSON字符串
            if (typeof data === 'object') {
                socket.send(JSON.stringify(data));
            } else {
                socket.send(data);
            }
            return true;
        }
        return false;
    }, [socket]);
    
    return { socket, isConnected, messages, sendMessage };
}

// 在组件中使用
function ChatComponent() {
    const { isConnected, messages, sendMessage } = useWebSocket('wss://example.com/chat');
    const [inputValue, setInputValue] = useState('');
    
    const handleSubmit = (e) => {
        e.preventDefault();
        if (inputValue && sendMessage(inputValue)) {
            setInputValue('');
        }
    };
    
    return (
        
状态: {isConnected ? '已连接' : '未连接'}
    {messages.map((msg, index) => (
  • {typeof msg === 'object' ? JSON.stringify(msg) : msg}
  • ))}
setInputValue(e.target.value)} placeholder="输入消息..." disabled={!isConnected} />
); }

4.3.2 Vue 集成

// Vue 3 WebSocket 组合式 API
import { ref, onMounted, onBeforeUnmount } from 'vue';

export function useWebSocket(url) {
    const socket = ref(null);
    const isConnected = ref(false);
    const messages = ref([]);
    
    const connect = () => {
        socket.value = new WebSocket(url);
        
        socket.value.onopen = () => {
            isConnected.value = true;
        };
        
        socket.value.onmessage = (event) => {
            messages.value.push(event.data);
        };
        
        socket.value.onclose = () => {
            isConnected.value = false;
        };
    };
    
    const sendMessage = (data) => {
        if (socket.value && isConnected.value) {
            socket.value.send(data);
            return true;
        }
        return false;
    };
    
    onMounted(() => {
        connect();
    });
    
    onBeforeUnmount(() => {
        if (socket.value) {
            socket.value.close();
        }
    });
    
    return {
        isConnected,
        messages,
        sendMessage
    };
}

// 在组件中使用


4.4 选择合适的解决方案

选择 WebSocket 实现方案时,考虑以下因素:

WebSocket 库选择决策树

                                    需要 WebSocket 功能
                                            │
                    ┌────────────────┬──────┴──────┬────────────────┐
                    │                │             │                │
                 简单需求      需要广泛兼容性     企业级应用      与特定框架集成
                    │                │             │                │
              原生 WebSocket      Socket.IO      STOMP       框架特定解决方案
                                     │             │         (React/Vue hooks)
                                     │             │
                                     └─────┬───────┘
                                           │
                                     根据后端技术选择
                                           │
                               ┌───────────┴───────────┐
                               │                       │
                          Node.js 后端             Java/Spring 后端
                               │                       │
                          Socket.IO              STOMP over SockJS
                

实践练习

对比实现:

  1. 使用原生 WebSocket API 实现一个简单的聊天应用
  2. 使用 Socket.IO 实现同样的功能
  3. 使用 STOMP 实现同样的功能
  4. 比较三种实现在代码复杂度、功能完整性和扩展性方面的差异

这个练习将帮助你理解不同 WebSocket 库的优缺点,为实际项目选择最合适的技术。

5. WebSocket 安全

思考问题

  1. WebSocket 连接面临哪些常见的安全威胁?
  2. 为什么应该使用 WSS 而非 WS 协议?
  3. 如何验证 WebSocket 连接的身份?
  4. WebSocket 消息需要哪些验证措施?
  5. 如何防止 WebSocket 服务器过载?

思考这些问题,帮助你构建更安全的 WebSocket 应用。

5.1 WebSocket 安全威胁

WebSocket 应用面临多种安全威胁,包括:

5.1.1 常见 WebSocket 安全漏洞

威胁 描述 影响
跨站 WebSocket 劫持 攻击者利用恶意网站发起 WebSocket 连接至受害者已授权的站点 未经授权访问用户数据,执行未授权操作
中间人攻击 攻击者拦截并可能修改未加密的 WebSocket 通信 数据泄露,通信篡改
拒绝服务攻击 通过大量 WebSocket 连接或消息使服务器资源耗尽 服务不可用
WebSocket 劫持 未授权第三方接管活跃的 WebSocket 连接 会话劫持,数据泄露
输入验证漏洞 未对通过 WebSocket 传输的数据进行适当验证 注入攻击,XSS,数据库漏洞

5.2 WebSocket 安全最佳实践

保护 WebSocket 应用需采取多层防御策略:

5.2.1 传输安全

// 始终使用 WSS 而非 WS
const socket = new WebSocket('wss://example.com/socket');

将 HTTP 升级为 HTTPS,同样将 WS 升级为 WSS:

永远不要在生产环境中使用非加密的 WS 协议,即使是内部应用也应使用 WSS。

5.2.2 身份验证与授权

在建立 WebSocket 连接时验证用户身份:

// 客户端:在握手时包含授权令牌
const token = localStorage.getItem('authToken');
const socket = new WebSocket(`wss://example.com/socket?token=${token}`);

// 或者通过自定义头部(需服务器支持)
fetch('wss://example.com/socket', {
    headers: {
        'Authorization': `Bearer ${token}`
    }
});

服务器端授权验证(Node.js 示例):

// 服务器:WebSocket 连接身份验证
const WebSocket = require('ws');
const url = require('url');
const jwt = require('jsonwebtoken');

const wss = new WebSocket.Server({ noServer: true });
const server = http.createServer();

server.on('upgrade', function (request, socket, head) {
    // 从URL查询参数中提取令牌
    const pathname = url.parse(request.url).pathname;
    const query = url.parse(request.url, true).query;
    
    // 验证令牌
    authenticateToken(query.token)
        .then(user => {
            // 存储用户信息,授权成功
            request.user = user;
            wss.handleUpgrade(request, socket, head, function (ws) {
                wss.emit('connection', ws, request);
            });
        })
        .catch(err => {
            // 授权失败,关闭连接
            socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
            socket.destroy();
        });
});

// 验证令牌函数
function authenticateToken(token) {
    return new Promise((resolve, reject) => {
        if (!token) return reject('需要令牌');
        
        jwt.verify(token, process.env.JWT_SECRET, (err, user) => {
            if (err) return reject('无效令牌');
            resolve(user);
        });
    });
}

最佳实践:单独为 WebSocket 连接创建短期令牌,而不是重用主应用的长期令牌。

5.2.3 消息验证和清理

始终验证从客户端接收的 WebSocket 消息:

// 客户端:发送结构化消息
socket.send(JSON.stringify({
    type: 'chat_message',
    content: message,
    timestamp: Date.now()
}));

// 服务器:验证消息结构和内容
wss.on('connection', (ws, request) => {
    const user = request.user; // 从身份验证中
    
    ws.on('message', (message) => {
        try {
            // 解析并验证消息
            const data = JSON.parse(message);
            
            // 检查必要字段
            if (!data.type || !data.content) {
                return sendError(ws, '无效消息格式');
            }
            
            // 验证内容(防止XSS等)
            if (!isValidContent(data.content)) {
                return sendError(ws, '包含不允许的内容');
            }
            
            // 附加已验证的用户信息
            data.userId = user.id;
            data.username = user.username;
            
            // 处理验证后的消息
            handleMessage(data, ws);
        } catch (e) {
            sendError(ws, '无效的JSON');
        }
    });
});

function isValidContent(content) {
    // 实现内容验证逻辑
    // 例如:长度限制、XSS过滤等
    return content.length > 0 && 
           content.length <= 1000 && 
           !/

5.2.4 速率限制与资源保护

防止 WebSocket 服务器过载:

// 服务器端速率限制实现
const clients = new Map(); // 跟踪客户端

wss.on('connection', (ws, request) => {
    const userId = request.user.id;
    
    // 存储客户端信息
    clients.set(ws, {
        userId: userId,
        connectionTime: Date.now(),
        messageCount: 0,
        lastMessageTime: Date.now()
    });
    
    // 设置连接限制(例如2小时后自动断开)
    const connectionTimeout = setTimeout(() => {
        ws.close(1000, '连接超时');
    }, 2 * 60 * 60 * 1000); // 2小时
    
    ws.on('message', (message) => {
        const clientInfo = clients.get(ws);
        const now = Date.now();
        
        // 速率限制:每分钟最多60条消息
        if (clientInfo.messageCount > 60 && 
            (now - clientInfo.lastMessageTime) < 60000) {
            ws.send(JSON.stringify({
                type: 'error',
                message: '消息速率超限,请稍后再试'
            }));
            return;
        }
        
        // 限制消息大小
        if (message.length > 10000) { // 10KB 限制
            ws.send(JSON.stringify({
                type: 'error',
                message: '消息过大'
            }));
            return;
        }
        
        // 更新客户端统计信息
        clientInfo.messageCount++;
        clientInfo.lastMessageTime = now;
        
        // 正常处理消息...
    });
    
    ws.on('close', () => {
        // 清理
        clearTimeout(connectionTimeout);
        clients.delete(ws);
    });
});

// 服务器全局限制
const MAX_CONNECTIONS = 10000;
wss.on('connection', (ws, request) => {
    if (wss.clients.size > MAX_CONNECTIONS) {
        ws.close(1013, '服务器已达最大连接数');
        return;
    }
    // 正常连接处理...
});

5.2.5 同源策略与CORS

虽然WebSocket不完全受制于同源策略,但仍应实施适当的CORS保护:

// Node.js服务器CORS验证
server.on('upgrade', function (request, socket, head) {
    // 检查源
    const origin = request.headers.origin;
    const allowedOrigins = [
        'https://example.com', 
        'https://subdomain.example.com'
    ];
    
    if (!origin || !allowedOrigins.includes(origin)) {
        socket.write('HTTP/1.1 403 Forbidden\r\n\r\n');
        socket.destroy();
        return;
    }
    
    // 继续处理连接...
});

5.3 监控与审计

实施 WebSocket 安全监控:

  • 记录所有连接尝试,包括成功和失败
  • 记录异常行为(高消息速率、大消息、格式错误的消息)
  • 实施实时异常检测
  • 保留连接和消息日志用于事后审计
// 服务器日志记录示例
wss.on('connection', (ws, request) => {
    const ip = request.socket.remoteAddress;
    const userId = request.user ? request.user.id : 'anonymous';
    
    console.log(`连接已建立 - 用户: ${userId}, IP: ${ip}, 时间: ${new Date().toISOString()}`);
    
    ws.on('message', (message) => {
        // 记录消息统计(不记录完整内容以保护隐私)
        console.log(`收到消息 - 用户: ${userId}, 大小: ${message.length}字节, 时间: ${new Date().toISOString()}`);
        
        // 对于可疑消息,记录更多信息
        if (message.length > 5000) {
            console.warn(`大消息警告 - 用户: ${userId}, 大小: ${message.length}字节`);
            // 可以触发安全警报
        }
    });
    
    ws.on('close', (code, reason) => {
        console.log(`连接已关闭 - 用户: ${userId}, 代码: ${code}, 原因: ${reason}, 时间: ${new Date().toISOString()}`);
    });
    
    ws.on('error', (error) => {
        console.error(`WebSocket错误 - 用户: ${userId}, 错误: ${error.message}, 时间: ${new Date().toISOString()}`);
    });
});

安全审查清单

在部署 WebSocket 应用前,请确认:

  • ✅ 使用 WSS 而非 WS 协议
  • ✅ 实施了强健的身份验证机制
  • ✅ 所有收到的消息都经过验证和清理
  • ✅ 已配置适当的速率限制
  • ✅ 已实施资源限制(连接超时、消息大小限制)
  • ✅ 已配置合适的日志记录
  • ✅ 已实施 CORS 保护
  • ✅ 已进行漏洞测试(包括DoS和未授权访问测试)
  • ✅ 已有安全事件响应计划

6. Java WebSocket 整合

思考问题

  1. Java 中有哪些实现 WebSocket 的主要技术和框架?
  2. JSR 356 规范与 Spring WebSocket 有何异同?
  3. 在企业应用中,如何选择合适的 Java WebSocket 实现方案?
  4. Java WebSocket 应用的部署需要考虑哪些特殊因素?
  5. 如何测试 Java WebSocket 应用?

在学习本章内容前,请思考这些问题。这将帮助你更好地理解 Java WebSocket 的核心概念和应用场景。

6.1 Java WebSocket API (JSR 356)

Java API for WebSocket (JSR 356) 是 Java EE 7 引入的标准规范,提供了在 Java 中创建 WebSocket 应用的官方 API。

6.1.1 创建 WebSocket 服务端

使用 Java WebSocket API 创建一个 WebSocket 端点非常简单,只需添加几个注解即可:

package com.example.websocket;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint("/chat")
public class ChatEndpoint {
    
    private static Set<Session> sessions = new CopyOnWriteArraySet<>();
    
    @OnOpen
    public void onOpen(Session session) {
        sessions.add(session);
        System.out.println("新连接建立: " + session.getId());
        broadcastMessage("系统消息:用户 " + session.getId() + " 加入了聊天室");
    }
    
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("收到消息: " + message + " 来自 " + session.getId());
        broadcastMessage("用户 " + session.getId() + ": " + message);
    }
    
    @OnClose
    public void onClose(Session session) {
        sessions.remove(session);
        System.out.println("连接关闭: " + session.getId());
        broadcastMessage("系统消息:用户 " + session.getId() + " 离开了聊天室");
    }
    
    @OnError
    public void onError(Throwable error, Session session) {
        System.err.println("错误发生在会话 " + session.getId() + ": " + error.getMessage());
    }
    
    private void broadcastMessage(String message) {
        for (Session session : sessions) {
            try {
                session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                System.err.println("发送消息失败: " + e.getMessage());
            }
        }
    }
}

6.1.2 服务端配置

在 Java EE 环境中,WebSocket 端点会被自动扫描并注册,无需额外配置。但在 Servlet 容器(如 Tomcat)中,需要确保在 web.xml 或通过注解配置 WebSocket 支持:

// 使用编程方式注册端点
import javax.websocket.server.ServerContainer;
import javax.websocket.server.ServerEndpointConfig;
import javax.servlet.ServletContextListener;
import javax.servlet.ServletContextEvent;
import javax.servlet.annotation.WebListener;

@WebListener
public class WebSocketInitializer implements ServletContextListener {
    
    @Override
    public void contextInitialized(ServletContextEvent sce) {
        ServerContainer serverContainer = (ServerContainer) sce.getServletContext()
                .getAttribute("javax.websocket.server.ServerContainer");
        try {
            serverContainer.addEndpoint(ChatEndpoint.class);
            // 或使用配置类
            serverContainer.addEndpoint(ServerEndpointConfig.Builder
                    .create(AdvancedChatEndpoint.class, "/advancedChat")
                    .configurator(new CustomConfigurator())
                    .build());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        // 清理资源
    }
}

6.1.3 WebSocket 客户端 API

Java WebSocket API 也支持创建客户端应用:

package com.example.websocket.client;

import java.net.URI;
import javax.websocket.ClientEndpoint;
import javax.websocket.ContainerProvider;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;

@ClientEndpoint
public class WebSocketClient {
    
    private Session session;
    
    public WebSocketClient(URI endpointURI) {
        try {
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
            container.connectToServer(this, endpointURI);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        System.out.println("已连接到服务器");
    }
    
    @OnMessage
    public void onMessage(String message) {
        System.out.println("收到消息: " + message);
    }
    
    public void sendMessage(String message) {
        try {
            session.getBasicRemote().sendText(message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    // 使用示例
    public static void main(String[] args) {
        try {
            WebSocketClient client = new WebSocketClient(new URI("ws://localhost:8080/chat"));
            // 等待连接建立
            Thread.sleep(1000);
            // 发送消息
            client.sendMessage("Hello from Java client!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

6.2 Spring WebSocket

Spring 框架提供了对 WebSocket 的强大支持,尤其是与 STOMP 子协议的集成,使得构建复杂的 WebSocket 应用更加简单。

6.2.1 Spring WebSocket 配置

在 Spring Boot 应用中配置 WebSocket:

package com.example.websocket.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 启用简单的内存消息代理
        registry.enableSimpleBroker("/topic", "/queue");
        // 设置应用前缀
        registry.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注册 STOMP 端点
        registry.addEndpoint("/ws")
                .setAllowedOrigins("*")
                .withSockJS(); // 启用 SockJS 回退
    }
}

6.2.2 控制器和消息处理

使用 Spring 的消息映射机制处理 WebSocket 消息:

package com.example.websocket.controller;

import com.example.websocket.model.ChatMessage;
import com.example.websocket.model.OutputMessage;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;

import java.text.SimpleDateFormat;
import java.util.Date;

@Controller
public class ChatController {

    @MessageMapping("/chat")
    @SendTo("/topic/messages")
    public OutputMessage send(ChatMessage message) {
        String time = new SimpleDateFormat("HH:mm:ss").format(new Date());
        return new OutputMessage(message.getFrom(), message.getText(), time);
    }
}

// 消息模型
package com.example.websocket.model;

public class ChatMessage {
    private String from;
    private String text;

    // getters and setters
    public String getFrom() {
        return from;
    }

    public void setFrom(String from) {
        this.from = from;
    }

    public String getText() {
        return text;
    }

    public void setText(String text) {
        this.text = text;
    }
}

public class OutputMessage {
    private String from;
    private String text;
    private String time;

    public OutputMessage(String from, String text, String time) {
        this.from = from;
        this.text = text;
        this.time = time;
    }

    // getters and setters
    public String getFrom() {
        return from;
    }

    public void setFrom(String from) {
        this.from = from;
    }

    public String getText() {
        return text;
    }

    public void setText(String text) {
        this.text = text;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }
}

6.2.3 用户认证与消息拦截

Spring Security 与 WebSocket 集成,实现认证和消息拦截:

package com.example.websocket.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.security.config.annotation.web.messaging.MessageSecurityMetadataSourceRegistry;
import org.springframework.security.config.annotation.web.socket.AbstractSecurityWebSocketMessageBrokerConfigurer;

@Configuration
public class WebSocketSecurityConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {

    @Override
    protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
        messages
            .simpTypeMatchers(SimpMessageType.CONNECT, SimpMessageType.HEARTBEAT, SimpMessageType.DISCONNECT).permitAll()
            .simpDestMatchers("/app/**").hasRole("USER")
            .simpSubscribeDestMatchers("/topic/**", "/queue/**").hasRole("USER")
            .anyMessage().denyAll();
    }

    @Override
    protected boolean sameOriginDisabled() {
        // 允许跨域
        return true;
    }
}

// 自定义通道拦截器
package com.example.websocket.interceptor;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.stereotype.Component;

@Component
public class WebSocketInterceptor implements ChannelInterceptor {

    @Override
    public Message preSend(Message message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            // 获取用户认证信息或其他自定义逻辑
            String authToken = accessor.getFirstNativeHeader("X-Auth-Token");
            if (authToken != null) {
                // 验证令牌并设置用户信息
                // accessor.setUser(...)
            }
        }
        
        return message;
    }
}

6.3 Spring WebFlux WebSocket

Spring WebFlux 提供了响应式 WebSocket 支持,适合构建高并发的 WebSocket 应用:

package com.example.websocket.reactive;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;

import java.util.HashMap;
import java.util.Map;

import reactor.core.publisher.Flux;

@Configuration
public class ReactiveWebSocketConfig {

    @Bean
    public HandlerMapping webSocketHandlerMapping() {
        Map map = new HashMap<>();
        map.put("/reactive-socket", new ReactiveWebSocketHandler());

        SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
        handlerMapping.setOrder(1);
        handlerMapping.setUrlMap(map);
        return handlerMapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

class ReactiveWebSocketHandler implements WebSocketHandler {

    @Override
    public reactor.core.publisher.Mono handle(org.springframework.web.reactive.socket.WebSocketSession session) {
        // 处理传入消息
        Flux output = session.receive()
                .map(message -> message.getPayloadAsText())
                .map(text -> "响应: " + text)
                .map(session::textMessage);

        return session.send(output);
    }
}

6.4 实际应用示例:实时股票交易平台

下面是一个使用 Spring Boot 和 WebSocket 构建的股票交易平台后端示例:

6.4.1 项目结构

src/main/java/com/example/stocktrading/
├── StockTradingApplication.java
├── config/
│   ├── WebSocketConfig.java
│   └── SchedulerConfig.java
├── controller/
│   └── StockController.java
├── model/
│   ├── StockQuote.java
│   └── TradeRequest.java
├── service/
│   ├── StockService.java
│   └── StockQuoteGenerator.java
└── exception/
    └── TradingException.java

6.4.2 WebSocket 配置

package com.example.stocktrading.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic");
        registry.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stock-trading")
               .setAllowedOrigins("*")
               .withSockJS();
    }
}

6.4.3 定时推送股票行情

package com.example.stocktrading.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

import com.example.stocktrading.model.StockQuote;
import com.example.stocktrading.service.StockQuoteGenerator;

import java.util.List;

@Configuration
@EnableScheduling
public class SchedulerConfig {

    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    
    @Autowired
    private StockQuoteGenerator quoteGenerator;
    
    @Scheduled(fixedRate = 2000) // 每2秒更新一次
    public void sendStockQuotes() {
        List quotes = quoteGenerator.generateRandomQuotes();
        messagingTemplate.convertAndSend("/topic/stock-quotes", quotes);
    }
}

6.4.4 控制器处理交易请求

package com.example.stocktrading.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;

import com.example.stocktrading.exception.TradingException;
import com.example.stocktrading.model.TradeRequest;
import com.example.stocktrading.model.TradeResponse;
import com.example.stocktrading.service.StockService;

@Controller
public class StockController {

    @Autowired
    private StockService stockService;
    
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    
    @MessageMapping("/trade")
    public void executeTrade(TradeRequest request) {
        try {
            TradeResponse response = stockService.processTrade(request);
            // 向特定用户发送交易确认
            messagingTemplate.convertAndSendToUser(
                request.getUserId(), 
                "/queue/trade-confirmation", 
                response
            );
            
            // 广播交易更新(不含敏感信息)
            messagingTemplate.convertAndSend(
                "/topic/market-activity", 
                stockService.getMarketUpdate(request.getSymbol())
            );
        } catch (TradingException e) {
            // 发送错误响应
            messagingTemplate.convertAndSendToUser(
                request.getUserId(), 
                "/queue/errors", 
                e.getMessage()
            );
        }
    }
    
    @MessageMapping("/subscribe-symbol")
    @SendTo("/topic/symbols/")
    public void subscribeToSymbol(String symbol) {
        // 用户订阅特定股票的更新
        // 可以在这里记录用户的订阅以便定向推送
        return;
    }
}

6.4.5 数据模型

package com.example.stocktrading.model;

import java.math.BigDecimal;
import java.time.LocalDateTime;

public class StockQuote {
    private String symbol;
    private BigDecimal price;
    private BigDecimal change;
    private BigDecimal changePercent;
    private long volume;
    private LocalDateTime timestamp;
    
    // 构造函数、getter和setter
}

public class TradeRequest {
    private String userId;
    private String symbol;
    private int quantity;
    private String orderType; // "BUY" or "SELL"
    private BigDecimal limitPrice; // 可选的限价
    
    // 构造函数、getter和setter
}

public class TradeResponse {
    private String tradeId;
    private String status;
    private BigDecimal executedPrice;
    private LocalDateTime executionTime;
    private String message;
    
    // 构造函数、getter和setter
}

实践练习

实现一个简单的实时聊天应用,包含以下功能:

  1. 使用 JSR 356 或 Spring WebSocket 创建服务端
  2. 实现用户加入/离开通知
  3. 支持一对一私聊和群聊
  4. 添加基本的用户认证
  5. 实现消息持久化(可选)

这个练习将帮助你理解 Java WebSocket 编程的核心概念和最佳实践。

7. Python中的WebSocket实现

Python拥有多个强大的WebSocket库,可以轻松实现客户端和服务器端WebSocket应用。接下来我们将介绍几个常用的Python WebSocket库及其使用方法。

7.1 websockets库

websockets是一个流行的Python库,提供了异步WebSocket客户端和服务器端实现,基于Python的asyncio库。

7.1.1 安装

pip install websockets

7.1.2 服务器端实现

以下是一个简单的WebSocket服务器实现:

#!/usr/bin/env python

import asyncio
import websockets
import json
from datetime import datetime

# 存储所有连接的客户端
connected_clients = set()

async def chat(websocket, path):
    # 注册客户端
    connected_clients.add(websocket)
    
    try:
        # 发送欢迎消息
        await websocket.send(json.dumps({
            "type": "system",
            "message": "欢迎加入聊天室!",
            "timestamp": datetime.now().strftime("%H:%M:%S")
        }))
        
        # 通知其他用户有新用户加入
        if len(connected_clients) > 1:
            await notify_others(websocket, "system", "新用户加入了聊天室")
        
        # 接收消息并广播
        async for message in websocket:
            try:
                data = json.loads(message)
                # 添加时间戳
                data["timestamp"] = datetime.now().strftime("%H:%M:%S")
                
                # 广播给所有客户端
                await broadcast(json.dumps(data))
            except json.JSONDecodeError:
                # 处理非JSON格式的消息
                await websocket.send(json.dumps({
                    "type": "error",
                    "message": "消息格式错误,请发送JSON格式",
                    "timestamp": datetime.now().strftime("%H:%M:%S")
                }))
                
    except websockets.exceptions.ConnectionClosed:
        print("客户端连接断开")
    finally:
        # 客户端断开连接时,从集合中移除
        connected_clients.remove(websocket)
        # 通知其他用户有用户离开
        if connected_clients:
            await notify_others(websocket, "system", "有用户离开了聊天室")

async def broadcast(message):
    """向所有连接的客户端广播消息"""
    if connected_clients:
        await asyncio.gather(
            *[client.send(message) for client in connected_clients]
        )

async def notify_others(sender, msg_type, message):
    """通知除发送者外的所有客户端"""
    others = connected_clients - {sender}
    if others:
        notification = json.dumps({
            "type": msg_type,
            "message": message,
            "timestamp": datetime.now().strftime("%H:%M:%S")
        })
        await asyncio.gather(*[client.send(notification) for client in others])

# 启动WebSocket服务器
start_server = websockets.serve(chat, "localhost", 8765)

print("WebSocket服务器启动在 ws://localhost:8765")

# 启动事件循环
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

7.1.3 客户端实现

以下是一个基于asyncio的WebSocket客户端实现:

#!/usr/bin/env python

import asyncio
import websockets
import json
import aioconsole

async def receive_messages(websocket):
    """接收并显示服务器发送的消息"""
    try:
        async for message in websocket:
            data = json.loads(message)
            
            if data["type"] == "system":
                print(f"\n[系统消息 {data['timestamp']}] {data['message']}")
            elif data["type"] == "chat":
                print(f"\n[{data['username']} {data['timestamp']}] {data['message']}")
            elif data["type"] == "error":
                print(f"\n[错误 {data['timestamp']}] {data['message']}")
    except websockets.exceptions.ConnectionClosed:
        print("\n连接已关闭")

async def send_messages(websocket, username):
    """从控制台读取消息并发送到服务器"""
    try:
        while True:
            message = await aioconsole.ainput("") 
            
            if message.lower() == "/quit":
                break
                
            await websocket.send(json.dumps({
                "type": "chat",
                "username": username,
                "message": message
            }))
    except websockets.exceptions.ConnectionClosed:
        pass

async def chat_client():
    # 获取用户名
    username = input("请输入您的用户名: ")
    
    # 连接到WebSocket服务器
    uri = "ws://localhost:8765"
    print(f"正在连接到 {uri}...")
    
    try:
        async with websockets.connect(uri) as websocket:
            print("连接成功!输入消息开始聊天,输入 /quit 退出。")
            
            # 创建两个任务:一个接收消息,一个发送消息
            receive_task = asyncio.create_task(receive_messages(websocket))
            send_task = asyncio.create_task(send_messages(websocket, username))
            
            # 等待任一任务完成
            done, pending = await asyncio.wait(
                [receive_task, send_task],
                return_when=asyncio.FIRST_COMPLETED
            )
            
            # 取消未完成的任务
            for task in pending:
                task.cancel()
                
    except (websockets.exceptions.InvalidStatusCode, ConnectionRefusedError) as e:
        print(f"连接错误: {e}")
    except Exception as e:
        print(f"发生未知错误: {e}")

# 运行客户端
asyncio.run(chat_client())

提示: 要运行上面的客户端示例,您需要安装 aioconsole 库用于异步控制台输入:pip install aioconsole

7.2 FastAPI与WebSockets

FastAPI是一个现代、高性能的Python Web框架,它也提供了内置的WebSocket支持。

7.2.1 安装

pip install fastapi uvicorn

7.2.2 实现聊天服务器

下面是一个基于FastAPI的WebSocket聊天服务器实现:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Dict, Optional
import json
from datetime import datetime, timedelta
from pydantic import BaseModel
import jwt
from passlib.context import CryptContext

# 创建FastAPI应用
app = FastAPI(title="FastAPI WebSocket聊天")

# 配置CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 在生产环境中应该限制来源
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 用于储存用户数据(实际应用中应使用数据库)
fake_users_db = {
    "admin": {
        "username": "admin",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",  # "password"
        "disabled": False,
    }
}

# 用于JWT认证的密钥
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 密码流程
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 用户模型
class User(BaseModel):
    username: str
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Optional[str] = None

# WebSocket连接管理器
class ConnectionManager:
    def __init__(self):
        # 存储活跃的WebSocket连接
        self.active_connections: Dict[str, WebSocket] = {}

    async def connect(self, websocket: WebSocket, user_id: str):
        await websocket.accept()
        self.active_connections[user_id] = websocket
        
        # 系统消息 - 用户加入
        await self.broadcast(
            json.dumps({
                "type": "system",
                "content": f"{user_id} 加入了聊天",
                "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            })
        )

    def disconnect(self, user_id: str):
        if user_id in self.active_connections:
            del self.active_connections[user_id]

    async def send_personal_message(self, message: str, user_id: str):
        if user_id in self.active_connections:
            await self.active_connections[user_id].send_text(message)

    async def broadcast(self, message: str):
        # 向所有连接的客户端广播消息
        for connection in self.active_connections.values():
            await connection.send_text(message)

# 创建连接管理器实例
manager = ConnectionManager()

# 辅助函数
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except jwt.PyJWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

# 路由
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # 认证用户
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # 创建访问令牌
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.websocket("/ws/{token}")
async def websocket_endpoint(websocket: WebSocket, token: str):
    try:
        # 验证令牌并获取用户
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username = payload.get("sub")
        if username is None or username not in fake_users_db:
            await websocket.close(code=1008, reason="Invalid token")
            return
            
        # 接受WebSocket连接
        await manager.connect(websocket, username)
        
        try:
            # 监听WebSocket消息
            while True:
                data = await websocket.receive_text()
                
                # 解析消息数据
                try:
                    message_data = json.loads(data)
                    # 添加发送者和时间戳
                    message_data["sender"] = username
                    message_data["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                    
                    # 广播消息给所有用户
                    await manager.broadcast(json.dumps(message_data))
                    
                except json.JSONDecodeError:
                    # 发送错误消息给用户
                    await manager.send_personal_message(
                        json.dumps({
                            "type": "error",
                            "content": "Invalid JSON format",
                            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        }),
                        username
                    )
                    
        except WebSocketDisconnect:
            # 用户断开连接
            manager.disconnect(username)
            # 通知其他用户
            await manager.broadcast(
                json.dumps({
                    "type": "system",
                    "content": f"{username} 离开了聊天",
                    "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                })
            )
    except jwt.PyJWTError:
        await websocket.close(code=1008, reason="Invalid token")

# 前端页面路由(实际应用中应该使用单独的前端)
@app.get("/")
async def get():
    return {"message": "WebSocket聊天服务器正在运行"}

# 运行应用(使用命令:uvicorn app:app --reload)
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

7.2.3 HTML客户端页面

下面是一个配合FastAPI WebSocket服务器使用的简单HTML客户端:

<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>WebSocket聊天室</title>
    <style>
        body {
            font-family: Arial, sans-serif;
            max-width: 800px;
            margin: 0 auto;
            padding: 20px;
        }
        #login-form, #chat-container {
            border: 1px solid #ccc;
            padding: 20px;
            border-radius: 5px;
            margin-bottom: 20px;
        }
        #messages {
            height: 300px;
            overflow-y: auto;
            border: 1px solid #eee;
            padding: 10px;
            margin-bottom: 10px;
            border-radius: 5px;
        }
        .message {
            margin-bottom: 10px;
            padding: 8px;
            border-radius: 5px;
        }
        .system {
            background-color: #f0f0f0;
            color: #555;
            font-style: italic;
        }
        .user {
            background-color: #e1f5fe;
        }
        .self {
            background-color: #e8f5e9;
            text-align: right;
        }
        .error {
            background-color: #ffebee;
            color: #c62828;
        }
        .hidden {
            display: none;
        }
        input[type="text"], input[type="password"], button {
            padding: 8px;
            margin-bottom: 10px;
        }
        #message-input {
            width: 80%;
            margin-right: 10px;
        }
        .timestamp {
            font-size: 0.8em;
            color: #999;
            display: block;
        }
        .sender {
            font-weight: bold;
        }
    </style>
</head>
<body>
    <h1>WebSocket聊天室</h1>
    
    <div id="login-form">
        <h2>登录</h2>
        <div>
            <label for="username">用户名:</label>
            <input type="text" id="username" placeholder="输入用户名" required>
        </div>
        <div>
            <label for="password">密码:</label>
            <input type="password" id="password" placeholder="输入密码" required>
        </div>
        <button id="login-btn">登录</button>
        <p id="login-error" style="color: red; display: none;"></p>
    </div>
    
    <div id="chat-container" class="hidden">
        <h2>聊天室</h2>
        <div id="messages"></div>
        <div id="input-container">
            <input type="text" id="message-input" placeholder="输入消息...">
            <button id="send-btn">发送</button>
        </div>
    </div>
    
    <script>
        // DOM元素
        const loginForm = document.getElementById('login-form');
        const loginError = document.getElementById('login-error');
        const chatContainer = document.getElementById('chat-container');
        const messagesDiv = document.getElementById('messages');
        const messageInput = document.getElementById('message-input');
        const sendBtn = document.getElementById('send-btn');
        const loginBtn = document.getElementById('login-btn');
        
        let socket = null;
        let token = null;
        let username = '';
        
        // 登录处理
        loginBtn.addEventListener('click', async () => {
            username = document.getElementById('username').value;
            const password = document.getElementById('password').value;
            
            if (!username || !password) {
                showLoginError('用户名和密码不能为空');
                return;
            }
            
            try {
                // 发送登录请求
                const response = await fetch('/token', {
                    method: 'POST',
                    headers: {
                        'Content-Type': 'application/x-www-form-urlencoded',
                    },
                    body: `username=${encodeURIComponent(username)}&password=${encodeURIComponent(password)}`,
                });
                
                const data = await response.json();
                
                if (response.ok) {
                    token = data.access_token;
                    // 登录成功,切换到聊天界面
                    loginForm.classList.add('hidden');
                    chatContainer.classList.remove('hidden');
                    // 连接WebSocket
                    connectWebSocket();
                } else {
                    showLoginError(data.detail || '登录失败');
                }
            } catch (error) {
                showLoginError('连接服务器失败');
                console.error('Login error:', error);
            }
        });
        
        // 显示登录错误
        function showLoginError(message) {
            loginError.textContent = message;
            loginError.style.display = 'block';
        }
        
        // 连接WebSocket
        function connectWebSocket() {
            // 使用JWT token建立WebSocket连接
            socket = new WebSocket(`ws://${window.location.host}/ws/${token}`);
            
            // 连接打开
            socket.onopen = function(event) {
                addMessage('system', '已连接到聊天服务器');
            };
            
            // 收到消息
            socket.onmessage = function(event) {
                const message = JSON.parse(event.data);
                
                if (message.type === 'system') {
                    addMessage('system', message.content, message.timestamp);
                } else if (message.type === 'chat') {
                    const isSelf = message.sender === username;
                    addMessage(isSelf ? 'self' : 'user', message.content, message.timestamp, message.sender);
                } else if (message.type === 'error') {
                    addMessage('error', message.content, message.timestamp);
                }
            };
            
            // 连接关闭
            socket.onclose = function(event) {
                addMessage('system', '与服务器的连接已关闭');
                // 尝试重新连接
                setTimeout(() => {
                    if (token) {
                        connectWebSocket();
                    }
                }, 5000);
            };
            
            // 连接错误
            socket.onerror = function(error) {
                addMessage('error', '连接错误');
                console.error('WebSocket error:', error);
            };
        }
        
        // 发送消息
        sendBtn.addEventListener('click', sendMessage);
        messageInput.addEventListener('keypress', function(e) {
            if (e.key === 'Enter') {
                sendMessage();
            }
        });
        
        function sendMessage() {
            const message = messageInput.value.trim();
            if (message && socket && socket.readyState === WebSocket.OPEN) {
                // 发送聊天消息
                socket.send(JSON.stringify({
                    type: 'chat',
                    content: message
                }));
                
                // 清空输入框
                messageInput.value = '';
            }
        }
        
        // 向聊天窗口添加消息
        function addMessage(type, content, timestamp = null, sender = null) {
            const messageElem = document.createElement('div');
            messageElem.className = `message ${type}`;
            
            let messageContent = '';
            
            if (sender && type !== 'self') {
                messageContent += `<span class="sender">${sender}</span>: `;
            }
            
            messageContent += content;
            
            if (timestamp) {
                messageContent += `<span class="timestamp">${timestamp}</span>`;
            }
            
            messageElem.innerHTML = messageContent;
            messagesDiv.appendChild(messageElem);
            
            // 滚动到底部
            messagesDiv.scrollTop = messagesDiv.scrollHeight;
        }
    </script>
</body>
</html>

7.3 其他Python WebSocket库

除了上述库外,Python还有其他几个WebSocket库值得关注:

  • aiohttp - 提供异步WebSocket客户端和服务器支持
  • Tornado - 包含WebSocket支持的异步网络库
  • Django Channels - 为Django添加WebSocket支持
  • Flask-SocketIO - 将Socket.IO集成到Flask应用中
  • pywebsocket - 适用于WSGI服务器的WebSocket实现

Python WebSocket库对比

异步支持 框架集成 类型 适用场景
websockets 是 (asyncio) 独立 纯WebSocket 需要完全控制WebSocket实现的应用
FastAPI 是 (asyncio) FastAPI 纯WebSocket 构建REST API同时需要WebSocket功能
aiohttp 是 (asyncio) aiohttp 纯WebSocket 异步HTTP客户端/服务器需要WebSocket
Django Channels 是 (asyncio) Django 多协议 在Django应用中添加实时功能
Flask-SocketIO 可选 Flask Socket.IO 在Flask应用中快速添加WebSocket功能

7.4 选择合适的Python WebSocket库的考虑因素

  • 现有技术栈:如果您已经使用Django或Flask,选择与其集成的WebSocket库可能更容易
  • 性能需求:对于高性能应用,基于asyncio的库通常是更好的选择
  • 跨平台兼容性:如果需要支持旧浏览器,考虑提供回退机制的库
  • 扩展性:评估是否需要自定义WebSocket协议或其他功能
  • 学习曲线:一些库比其他库更容易上手

Python WebSocket练习

尝试实现以下WebSocket练习:

  1. 使用websockets库创建一个简单的聊天服务器和客户端
  2. 在FastAPI中实现一个实时数据仪表板,使用WebSocket推送数据更新
  3. 创建一个多房间聊天应用,支持用户创建和加入不同的聊天室
  4. 实现一个协作绘图应用,使用WebSocket同步用户的绘图动作

实践练习

实现一个简单的实时聊天应用,包含以下功能:

  1. 使用 JSR 356 或 Spring WebSocket 创建服务端
  2. 实现用户加入/离开通知
  3. 支持一对一私聊和群聊
  4. 添加基本的用户认证
  5. 实现消息持久化(可选)

这个练习将帮助你理解 Java WebSocket 编程的核心概念和最佳实践。

WebSocket连接过程详解

连接建立过程

WebSocket连接的建立过程分为两个主要阶段:HTTP握手和WebSocket协议升级。

// 客户端发起握手请求
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Origin: http://example.com

// 服务器响应
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

握手请求关键字段说明:

  • Upgrade: websocket - 表示请求升级到WebSocket协议
  • Connection: Upgrade - 表示这是一个升级连接请求
  • Sec-WebSocket-Key - 客户端生成的随机密钥,用于安全验证
  • Sec-WebSocket-Version - 指定使用的WebSocket协议版本
  • Origin - 表示请求的来源,用于同源策略检查

握手响应关键字段说明:

  • HTTP/1.1 101 Switching Protocols - 表示协议切换成功
  • Sec-WebSocket-Accept - 服务器根据客户端密钥计算出的响应值

连接状态转换

WebSocket连接在其生命周期中会经历不同的状态:

// WebSocket状态常量
const CONNECTING = 0;  // 连接尚未建立
const OPEN = 1;        // 连接已建立,可以通信
const CLOSING = 2;     // 连接正在关闭
const CLOSED = 3;      // 连接已关闭或无法建立

状态转换过程:

  1. CONNECTING (0)
    • 创建WebSocket对象时进入此状态
    • 正在进行HTTP握手
    • 可以监听onopen和onerror事件
  2. OPEN (1)
    • 握手成功,连接已建立
    • 可以发送和接收消息
    • 可以监听onmessage事件
  3. CLOSING (2)
    • 正在关闭连接
    • 正在进行关闭握手
    • 不再发送新消息
  4. CLOSED (3)
    • 连接已完全关闭
    • 可以监听onclose事件
    • 无法发送或接收消息

消息帧格式

WebSocket消息以帧的形式传输,每个帧包含以下部分:

0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|     Extended payload length continued, if payload len == 127  |
+ - - - - - - - - - - - - - - - +-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------- - - - - - - - - - - - - - - - +
:                     Payload Data continued ...                :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|                     Payload Data continued ...                |
+---------------------------------------------------------------+

帧字段说明:

  • FIN (1 bit) - 表示这是消息的最后一个片段
  • RSV1, RSV2, RSV3 (各1 bit) - 保留位,必须为0
  • Opcode (4 bits) - 定义帧的类型:
    • 0x0: 继续帧
    • 0x1: 文本帧
    • 0x2: 二进制帧
    • 0x8: 连接关闭
    • 0x9: Ping
    • 0xA: Pong
  • Mask (1 bit) - 表示是否使用掩码
  • Payload Length (7 bits) - 负载长度
  • Masking-key (32 bits) - 掩码密钥(如果Mask=1)
  • Payload Data - 实际数据

心跳机制

WebSocket协议通过Ping/Pong帧实现心跳机制,用于保持连接活跃和检测连接状态。

// 服务器发送Ping帧
ws.ping('heartbeat');

// 客户端自动响应Pong帧
ws.on('pong', (data) => {
    console.log('收到Pong响应:', data);
});

心跳机制的作用:

  • 保持连接活跃,防止被中间设备(如防火墙)断开
  • 检测连接是否仍然有效
  • 测量网络延迟
  • 触发网络层保活机制

心跳实现建议:

  • 服务器定期发送Ping帧(如每30秒)
  • 客户端收到Ping后自动回复Pong
  • 如果多次未收到Pong响应,可以认为连接已断开
  • 可以根据网络状况动态调整心跳间隔

错误处理与重连机制

WebSocket连接可能会因为各种原因断开,需要实现适当的错误处理和重连机制。

class WebSocketClient {
    constructor(url) {
        this.url = url;
        this.ws = null;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
        this.reconnectDelay = 1000;
        this.connect();
    }
    
    connect() {
        this.ws = new WebSocket(this.url);
        
        this.ws.onopen = () => {
            console.log('连接已建立');
            this.reconnectAttempts = 0;
        };
        
        this.ws.onclose = (event) => {
            console.log('连接已关闭:', event.code, event.reason);
            this.handleReconnect();
        };
        
        this.ws.onerror = (error) => {
            console.error('连接错误:', error);
        };
    }
    
    handleReconnect() {
        if (this.reconnectAttempts < this.maxReconnectAttempts) {
            this.reconnectAttempts++;
            const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
            
            console.log(`将在 ${delay}ms 后尝试重连 (尝试 ${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
            
            setTimeout(() => {
                this.connect();
            }, delay);
        } else {
            console.error('达到最大重连次数,停止重连');
        }
    }
}

错误处理要点:

  • 监听onerror事件处理连接错误
  • 监听onclose事件处理连接关闭
  • 实现指数退避重连策略
  • 设置最大重连次数限制
  • 在重连过程中保持用户界面响应

常见错误码:

  • 1000: 正常关闭
  • 1001: 端点离开
  • 1002: 协议错误
  • 1003: 不支持的数据类型
  • 1006: 异常关闭
  • 1007: 无效数据
  • 1008: 策略违规
  • 1009: 消息太大
  • 1010: 需要扩展
  • 1011: 内部错误

实践练习

练习:实现一个健壮的WebSocket客户端

基于上述知识,实现一个具有以下特性的WebSocket客户端:

  1. 支持自动重连机制
  2. 实现心跳检测
  3. 处理各种连接状态
  4. 支持消息重发机制
  5. 提供连接状态监控

提示:

  • 使用状态机管理连接状态
  • 实现消息队列处理重发
  • 添加详细的日志记录
  • 考虑网络状况对重连策略的影响

WebSocket使用流程

1. 环境准备

在使用WebSocket之前,需要确保开发环境满足以下要求:

  • 现代浏览器支持(Chrome、Firefox、Safari、Edge等)
  • 服务器端WebSocket支持(Node.js、Java、Python等)
  • 网络环境允许WebSocket连接(某些网络可能限制WebSocket)

2. 服务器端设置

服务器端需要完成以下配置:

2.1 选择WebSocket服务器实现

  • Node.js: ws、Socket.IO、uWebSockets.js
  • Java: Spring WebSocket、Netty
  • Python: websockets、Django Channels

2.2 配置WebSocket服务器

// Node.js示例
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', function connection(ws) {
    console.log('新客户端连接');
    
    ws.on('message', function incoming(message) {
        console.log('收到消息:', message);
        // 处理消息
    });
    
    ws.on('close', function close() {
        console.log('客户端断开连接');
    });
});

2.3 配置安全措施

  • 启用WSS(WebSocket Secure)
  • 配置CORS策略
  • 实现身份验证机制
  • 设置消息大小限制

3. 客户端实现

客户端需要完成以下步骤:

3.1 创建WebSocket连接

// 创建WebSocket连接
const ws = new WebSocket('wss://example.com/ws');

// 连接建立时
ws.onopen = function() {
    console.log('连接已建立');
    // 发送初始消息
    ws.send('Hello Server!');
};

// 接收消息时
ws.onmessage = function(event) {
    console.log('收到消息:', event.data);
    // 处理接收到的消息
};

// 连接关闭时
ws.onclose = function(event) {
    console.log('连接已关闭:', event.code, event.reason);
};

// 发生错误时
ws.onerror = function(error) {
    console.error('WebSocket错误:', error);
};

3.2 实现消息处理

  • 定义消息格式(JSON、Protocol Buffers等)
  • 实现消息序列化和反序列化
  • 处理不同类型的消息
  • 实现消息队列和重发机制

3.3 实现重连机制

class WebSocketClient {
    constructor(url) {
        this.url = url;
        this.ws = null;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
        this.reconnectDelay = 1000;
        this.connect();
    }
    
    connect() {
        this.ws = new WebSocket(this.url);
        
        this.ws.onopen = () => {
            console.log('连接已建立');
            this.reconnectAttempts = 0;
        };
        
        this.ws.onclose = () => {
            this.handleReconnect();
        };
        
        this.ws.onerror = (error) => {
            console.error('连接错误:', error);
        };
    }
    
    handleReconnect() {
        if (this.reconnectAttempts < this.maxReconnectAttempts) {
            this.reconnectAttempts++;
            const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
            setTimeout(() => this.connect(), delay);
        }
    }
}

4. 测试与调试

开发过程中需要进行以下测试:

4.1 连接测试

  • 测试正常连接建立
  • 测试断线重连
  • 测试网络切换场景
  • 测试服务器重启场景

4.2 消息测试

  • 测试消息发送和接收
  • 测试大消息传输
  • 测试消息重发机制
  • 测试消息顺序

4.3 性能测试

  • 测试并发连接数
  • 测试消息吞吐量
  • 测试内存使用情况
  • 测试CPU使用率

5. 部署与监控

在生产环境中需要注意:

5.1 部署配置

  • 配置负载均衡
  • 设置适当的超时时间
  • 配置防火墙规则
  • 设置SSL证书

5.2 监控指标

  • 连接数统计
  • 消息吞吐量
  • 错误率统计
  • 响应时间监控

5.3 日志记录

  • 记录连接事件
  • 记录错误信息
  • 记录性能指标
  • 实现日志轮转

6. 最佳实践

使用WebSocket时应该遵循以下最佳实践:

6.1 连接管理

  • 实现优雅的关闭机制
  • 使用心跳保持连接活跃
  • 实现自动重连机制
  • 处理网络切换场景

6.2 消息处理

  • 使用二进制消息减少开销
  • 实现消息压缩
  • 使用消息队列处理积压
  • 实现消息确认机制

6.3 安全性

  • 始终使用WSS(WebSocket Secure)
  • 实现适当的身份验证
  • 验证消息来源
  • 限制消息大小

7. 常见问题与解决方案

开发过程中可能遇到的常见问题:

7.1 连接问题

  • 连接被防火墙阻止
    • 解决方案:使用WSS或配置防火墙规则
  • 连接频繁断开
    • 解决方案:实现心跳机制和自动重连

7.2 性能问题

  • 消息延迟
    • 解决方案:优化消息格式,使用二进制传输
  • 内存泄漏
    • 解决方案:正确管理连接和资源

7.3 兼容性问题

  • 浏览器兼容性
    • 解决方案:使用polyfill或降级方案
  • 协议版本不匹配
    • 解决方案:明确指定协议版本