实时网络通信技术详解与实战
在学习本章内容前,请思考这些问题。这将帮助你更好地理解 WebSocket 技术的价值和应用场景。
WebSocket 是一种网络通信协议,提供全双工通信通道,使服务器和客户端之间能够在单个 TCP 连接上进行双向实时数据传输。它是 HTML5 规范的一部分,被设计用来克服 HTTP 协议在实时通信方面的局限性。
与传统的 HTTP 请求-响应模式不同,WebSocket 一旦建立连接,就允许数据在任何时间点、从任何一方发送到另一方,无需重新建立连接,这使得它特别适合需要低延迟、高频率数据交换的应用场景。
WebSocket 协议流程图
WebSocket 协议由 Ian Hickson 起草,并由 IETF 作为 RFC 6455 标准化,同时由 W3C 将 WebSocket API 标准化为 Web IDL 规范的一部分。该协议于 2011 年完成标准化,目的是解决 Web 应用中实时通信的需求,替代当时流行但效率较低的解决方案,如长轮询(Long Polling)和 Comet。
随着移动互联网、物联网和实时 Web 应用的兴起,WebSocket 技术得到了广泛采用,成为现代 Web 开发中不可或缺的一部分。
WebSocket URL 使用 ws://
或 wss://
(安全 WebSocket)协议标识符,类似于 HTTP 使用 http://
和 https://
。
WebSocket 技术特别适合以下应用场景:
思考一个你熟悉的 Web 应用,它如何从 WebSocket 技术中受益?如果要将该应用改造为使用 WebSocket,主要的架构变化会是什么?
在学习本章内容前,请思考这些问题。这将帮助你理解不同协议的优缺点和适用场景。
HTTP 和 WebSocket 在通信模型上有根本性的区别:
特性 | HTTP | WebSocket |
---|---|---|
通信方式 | 单向(请求-响应) | 双向(全双工) |
连接特性 | 短连接(非持久)或多请求持久连接 | 长连接(持久) |
状态 | 无状态 | 有状态 |
头部开销 | 每次请求都有完整的头部 | 仅在建立连接时有完整头部,之后很小 |
实时性 | 依赖客户端请求 | 服务器可主动推送 |
交互模式 | 同步 | 异步 |
适用场景 | 常规网页、RESTful API、数据提交 | 聊天、游戏、实时监控、协作工具 |
HTTP 与 WebSocket 通信模型对比图
在 WebSocket 出现之前,开发者使用多种技术在 HTTP 上模拟实时通信:
客户端定期向服务器发送请求,检查是否有新数据。这种方法简单但效率低下,会产生大量无效请求和服务器负载。
// 简单的轮询示例
function poll() {
fetch('/api/updates')
.then(response => response.json())
.then(data => {
if(data.hasUpdates) {
processUpdates(data);
}
// 5秒后再次轮询
setTimeout(poll, 5000);
})
.catch(error => console.error('轮询出错:', error));
}
poll(); // 开始轮询
客户端发送请求后,服务器保持连接打开,直到有新数据或超时。这减少了轮询的频率,但仍有连接开销。
// 长轮询示例
function longPoll() {
fetch('/api/updates?timeout=30000')
.then(response => response.json())
.then(data => {
processUpdates(data);
// 立即发起新的长轮询请求
longPoll();
})
.catch(error => {
console.error('长轮询出错:', error);
// 出错时稍等后重试
setTimeout(longPoll, 1000);
});
}
longPoll(); // 开始长轮询
允许服务器通过单个 HTTP 连接向客户端推送数据。这是单向的(只能服务器到客户端),但比轮询更高效。
// SSE 示例
const eventSource = new EventSource('/api/events');
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
processUpdates(data);
};
eventSource.onerror = function(error) {
console.error('SSE 出错:', error);
eventSource.close();
};
WebSocket 相比 HTTP 实时通信解决方案有显著的性能优势:
性能指标 | 轮询 | 长轮询 | SSE | WebSocket |
---|---|---|---|---|
连接开销 | 高(频繁连接) | 中(较少连接) | 低(单一连接) | 低(单一连接) |
头部开销 | 高 | 高 | 中(仅首次高) | 低(仅首次高) |
数据实时性 | 低(受轮询间隔限制) | 中(有延迟) | 高(服务器推送) | 高(即时双向) |
服务器资源消耗 | 高 | 中 | 低-中 | 低 |
网络带宽使用 | 高 | 中 | 低-中 | 低 |
通信方向 | 客户端到服务器 | 客户端到服务器 | 服务器到客户端 | 双向 |
在高频数据交换(如在线游戏、协作编辑)或需要低延迟的场景中,WebSocket 的性能优势尤为明显。一项研究表明,在高并发环境下,WebSocket 可以比轮询减少高达 90% 的服务器负载。
选择合适的协议应基于应用需求:
分析一个现有的使用长轮询实现"实时"功能的应用,估算如果改用 WebSocket 协议,会节省多少请求次数和带宽。考虑用户数量、更新频率和每次请求的数据量等因素。
请思考这些问题,帮助你理解 WebSocket API 的核心概念和使用模式。
WebSocket 协议是一个独立的 TCP 协议,以 HTTP 握手开始,但随后转换为自己的协议。
WebSocket 使用特殊的 URL 方案:
ws://
- 非加密连接(等同于 http://)wss://
- 加密连接(等同于 https://)示例:wss://echo.websocket.org
在生产环境中,始终使用 wss://
协议以确保数据传输安全。非加密的 ws://
连接容易受到中间人攻击。
WebSocket 连接始于 HTTP 握手,客户端发送特殊的升级请求,服务器接受后协议切换为 WebSocket:
// 客户端请求
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
// 服务器响应
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat
Web 浏览器提供了简洁的原生 WebSocket API,用于创建和管理 WebSocket 连接。
// 创建一个新的 WebSocket 连接
const socket = new WebSocket('wss://example.com/socketserver');
// 可选:指定协议
const chatSocket = new WebSocket('wss://example.com/chat', ['json', 'xml']);
WebSocket API 提供了四个主要事件用于处理连接的生命周期和消息传递:
// 连接建立时触发
socket.onopen = function(event) {
console.log('WebSocket 连接已建立');
// 可以开始发送消息
socket.send('Hello Server!');
};
// 收到消息时触发
socket.onmessage = function(event) {
console.log('收到消息: ' + event.data);
// 处理收到的消息
};
// 连接关闭时触发
socket.onclose = function(event) {
console.log('WebSocket 连接已关闭');
// event.code: 关闭码
// event.reason: 关闭原因
// event.wasClean: 是否是干净关闭
};
// 发生错误时触发
socket.onerror = function(error) {
console.error('WebSocket 错误: ', error);
};
也可以使用 EventListener 模式:
socket.addEventListener('open', function(event) {
console.log('WebSocket 连接已建立');
});
socket.addEventListener('message', function(event) {
console.log('收到消息: ' + event.data);
});
WebSocket API 支持发送文本和二进制数据:
// 发送文本数据
socket.send('Hello Server!');
// 发送 JSON 数据
const data = {
type: 'chat',
message: 'Hello everyone!',
timestamp: Date.now()
};
socket.send(JSON.stringify(data));
// 发送二进制数据
const binaryData = new Uint8Array([0, 1, 2, 3]);
socket.send(binaryData.buffer);
WebSocket 连接可以由客户端或服务器关闭:
// 正常关闭连接
socket.close();
// 带关闭码和原因的关闭
socket.close(1000, "操作完成");
// 常见关闭码:
// 1000 - 正常关闭
// 1001 - 离开(如页面关闭)
// 1002 - 协议错误
// 1003 - 无法接受的数据类型
// 1008 - 违反策略
// 1011 - 服务器错误
WebSocket 对象提供了 readyState
属性,用于检查连接的当前状态:
// 检查连接状态
switch(socket.readyState) {
case WebSocket.CONNECTING: // 0
console.log('连接中...');
break;
case WebSocket.OPEN: // 1
console.log('连接已打开');
// 只有在连接打开时才能发送消息
socket.send('数据');
break;
case WebSocket.CLOSING: // 2
console.log('连接正在关闭');
break;
case WebSocket.CLOSED: // 3
console.log('连接已关闭或无法打开');
break;
}
在发送消息前,总是检查 readyState
是否为 WebSocket.OPEN
,以避免在连接未建立或已关闭时尝试发送数据而导致错误。
为了保持连接活跃并检测连接断开,可以实现心跳机制:
// 实现简单的心跳机制
function heartbeat() {
if (socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify({type: 'ping'}));
}
}
// 每30秒发送一次心跳
const heartbeatInterval = setInterval(heartbeat, 30000);
// 收到服务器响应重置计时器
let timeoutId = null;
socket.onmessage = function(event) {
// 处理消息...
// 清除上一个超时计时器
if (timeoutId) {
clearTimeout(timeoutId);
}
// 设置新的超时计时器
timeoutId = setTimeout(() => {
console.log('连接似乎已断开,尝试重连');
reconnect();
}, 35000); // 稍长于心跳间隔
};
实现可靠的自动重连机制,确保连接中断时能自动恢复:
// 更复杂的 WebSocket 包装器,包含自动重连功能
class ReconnectingWebSocket {
constructor(url, protocols) {
this.url = url;
this.protocols = protocols;
this.socket = null;
this.isConnected = false;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectInterval = 1000; // 起始为1秒
this.maxReconnectInterval = 30000; // 最大30秒
this.callbacks = {
open: [],
message: [],
close: [],
error: []
};
this.connect();
}
connect() {
this.socket = new WebSocket(this.url, this.protocols);
this.socket.onopen = (event) => {
console.log('WebSocket 连接已建立');
this.isConnected = true;
this.reconnectAttempts = 0;
this.callbacks.open.forEach(callback => callback(event));
};
this.socket.onmessage = (event) => {
this.callbacks.message.forEach(callback => callback(event));
};
this.socket.onclose = (event) => {
this.isConnected = false;
this.callbacks.close.forEach(callback => callback(event));
if (!event.wasClean) {
this.reconnect();
}
};
this.socket.onerror = (event) => {
this.callbacks.error.forEach(callback => callback(event));
};
}
reconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.log('达到最大重连次数');
return;
}
this.reconnectAttempts++;
const timeout = Math.min(
this.reconnectInterval * Math.pow(1.5, this.reconnectAttempts),
this.maxReconnectInterval
);
console.log(`尝试在 ${timeout}ms 后重连...`);
setTimeout(() => this.connect(), timeout);
}
addEventListener(type, callback) {
if (this.callbacks[type]) {
this.callbacks[type].push(callback);
}
}
send(data) {
if (this.isConnected) {
this.socket.send(data);
return true;
}
return false;
}
close(code, reason) {
if (this.socket) {
this.socket.close(code, reason);
}
}
}
// 使用示例
const ws = new ReconnectingWebSocket('wss://example.com/socket');
ws.addEventListener('message', function(event) {
console.log('收到消息:', event.data);
});
使用原生 WebSocket API 创建一个简单的聊天应用,包含以下功能:
这个练习将帮助你理解 WebSocket API 的基本操作和管理连接生命周期的实践技巧。
思考这些问题,帮助你选择适合项目需求的 WebSocket 解决方案。
虽然浏览器原生 WebSocket API 功能完善,但第三方库可以提供更多高级特性和便利性:
Socket.IO 是最流行的 WebSocket 库之一,提供了丰富的功能和优雅的回退机制:
// 安装
// npm install socket.io-client
// 使用
import { io } from "socket.io-client";
const socket = io("https://example.com", {
reconnectionDelayMax: 10000,
auth: {
token: "user-token"
}
});
// 事件处理
socket.on("connect", () => {
console.log("已连接,ID:", socket.id);
});
// 发送事件
socket.emit("chat message", { text: "Hello!" });
// 接收事件
socket.on("chat message", (data) => {
console.log("收到消息:", data);
});
// 命名空间和房间
const adminSocket = io("https://example.com/admin");
socket.emit("join", "room1");
// 断开连接
socket.disconnect();
SockJS 提供了一个类似 WebSocket 的 API,但在不支持 WebSocket 的环境中具有多种回退选项:
// 安装
// npm install sockjs-client
// 使用
import SockJS from 'sockjs-client';
const sock = new SockJS('https://example.com/echo');
sock.onopen = function() {
console.log('连接已打开');
sock.send('test');
};
sock.onmessage = function(e) {
console.log('收到消息:', e.data);
};
sock.onclose = function() {
console.log('连接已关闭');
};
特性 | Socket.IO | SockJS |
---|---|---|
基本协议 | 自定义协议(基于 WebSocket) | WebSocket API 兼容 |
回退机制 | 多种回退(长轮询、AJAX等) | 多种回退(XHR流、长轮询等) |
命名空间/房间 | 原生支持 | 不支持(需自行实现) |
自动重连 | 内置 | 需手动实现 |
服务器实现 | 需要 Socket.IO 服务器 | SockJS 服务器(多语言支持) |
生态系统 | 庞大,有丰富的插件 | 多语言支持,与 STOMP 结合良好 |
STOMP(Simple Text Oriented Messaging Protocol)是一个简单的基于帧的协议,可以在 WebSocket 上实现:
// 安装
// npm install @stomp/stompjs
// 使用
import { Client } from '@stomp/stompjs';
const client = new Client({
brokerURL: 'ws://example.com/ws',
connectHeaders: {
login: 'user',
passcode: 'password'
},
debug: function (str) {
console.log(str);
},
reconnectDelay: 5000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000
});
client.onConnect = function (frame) {
// 订阅主题
const subscription = client.subscribe('/topic/messages', function (message) {
const payload = JSON.parse(message.body);
console.log('收到消息:', payload);
});
// 发送消息到指定目的地
client.publish({
destination: '/app/chat',
body: JSON.stringify({ content: 'Hello, STOMP' }),
headers: { priority: '9' }
});
};
client.onStompError = function (frame) {
console.error('STOMP 错误:', frame.headers['message']);
};
client.activate();
STOMP 非常适合与 Spring 后端集成,Spring 提供了完整的 STOMP over WebSocket 支持。
现代前端框架提供了便捷的方式集成 WebSocket 功能:
// 创建 WebSocket 钩子
import { useState, useEffect, useCallback } from 'react';
function useWebSocket(url) {
const [socket, setSocket] = useState(null);
const [isConnected, setIsConnected] = useState(false);
const [messages, setMessages] = useState([]);
useEffect(() => {
// 创建 WebSocket 连接
const ws = new WebSocket(url);
ws.onopen = () => {
setIsConnected(true);
};
ws.onmessage = (event) => {
try {
// 尝试解析JSON数据
const data = JSON.parse(event.data);
setMessages(prev => [...prev, data]);
} catch (e) {
// 如果不是JSON,直接添加文本数据
setMessages(prev => [...prev, event.data]);
}
};
ws.onclose = () => {
setIsConnected(false);
};
ws.onerror = (error) => {
console.error('WebSocket错误:', error);
};
setSocket(ws);
// 清理函数
return () => {
if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) {
ws.close();
}
};
}, [url]);
// 发送消息方法
const sendMessage = useCallback((data) => {
if (socket && socket.readyState === WebSocket.OPEN) {
// 如果数据是对象,转换为JSON字符串
if (typeof data === 'object') {
socket.send(JSON.stringify(data));
} else {
socket.send(data);
}
return true;
}
return false;
}, [socket]);
return { socket, isConnected, messages, sendMessage };
}
// 在组件中使用
function ChatComponent() {
const { isConnected, messages, sendMessage } = useWebSocket('wss://example.com/chat');
const [inputValue, setInputValue] = useState('');
const handleSubmit = (e) => {
e.preventDefault();
if (inputValue && sendMessage(inputValue)) {
setInputValue('');
}
};
return (
状态: {isConnected ? '已连接' : '未连接'}
{messages.map((msg, index) => (
-
{typeof msg === 'object'
? JSON.stringify(msg)
: msg}
))}
);
}
// Vue 3 WebSocket 组合式 API
import { ref, onMounted, onBeforeUnmount } from 'vue';
export function useWebSocket(url) {
const socket = ref(null);
const isConnected = ref(false);
const messages = ref([]);
const connect = () => {
socket.value = new WebSocket(url);
socket.value.onopen = () => {
isConnected.value = true;
};
socket.value.onmessage = (event) => {
messages.value.push(event.data);
};
socket.value.onclose = () => {
isConnected.value = false;
};
};
const sendMessage = (data) => {
if (socket.value && isConnected.value) {
socket.value.send(data);
return true;
}
return false;
};
onMounted(() => {
connect();
});
onBeforeUnmount(() => {
if (socket.value) {
socket.value.close();
}
});
return {
isConnected,
messages,
sendMessage
};
}
// 在组件中使用
{{ isConnected ? '已连接' : '未连接' }}
- {{ msg }}
选择 WebSocket 实现方案时,考虑以下因素:
需要 WebSocket 功能 │ ┌────────────────┬──────┴──────┬────────────────┐ │ │ │ │ 简单需求 需要广泛兼容性 企业级应用 与特定框架集成 │ │ │ │ 原生 WebSocket Socket.IO STOMP 框架特定解决方案 │ │ (React/Vue hooks) │ │ └─────┬───────┘ │ 根据后端技术选择 │ ┌───────────┴───────────┐ │ │ Node.js 后端 Java/Spring 后端 │ │ Socket.IO STOMP over SockJS
对比实现:
这个练习将帮助你理解不同 WebSocket 库的优缺点,为实际项目选择最合适的技术。
思考这些问题,帮助你构建更安全的 WebSocket 应用。
WebSocket 应用面临多种安全威胁,包括:
威胁 | 描述 | 影响 |
---|---|---|
跨站 WebSocket 劫持 | 攻击者利用恶意网站发起 WebSocket 连接至受害者已授权的站点 | 未经授权访问用户数据,执行未授权操作 |
中间人攻击 | 攻击者拦截并可能修改未加密的 WebSocket 通信 | 数据泄露,通信篡改 |
拒绝服务攻击 | 通过大量 WebSocket 连接或消息使服务器资源耗尽 | 服务不可用 |
WebSocket 劫持 | 未授权第三方接管活跃的 WebSocket 连接 | 会话劫持,数据泄露 |
输入验证漏洞 | 未对通过 WebSocket 传输的数据进行适当验证 | 注入攻击,XSS,数据库漏洞 |
保护 WebSocket 应用需采取多层防御策略:
// 始终使用 WSS 而非 WS
const socket = new WebSocket('wss://example.com/socket');
将 HTTP 升级为 HTTPS,同样将 WS 升级为 WSS:
永远不要在生产环境中使用非加密的 WS 协议,即使是内部应用也应使用 WSS。
在建立 WebSocket 连接时验证用户身份:
// 客户端:在握手时包含授权令牌
const token = localStorage.getItem('authToken');
const socket = new WebSocket(`wss://example.com/socket?token=${token}`);
// 或者通过自定义头部(需服务器支持)
fetch('wss://example.com/socket', {
headers: {
'Authorization': `Bearer ${token}`
}
});
服务器端授权验证(Node.js 示例):
// 服务器:WebSocket 连接身份验证
const WebSocket = require('ws');
const url = require('url');
const jwt = require('jsonwebtoken');
const wss = new WebSocket.Server({ noServer: true });
const server = http.createServer();
server.on('upgrade', function (request, socket, head) {
// 从URL查询参数中提取令牌
const pathname = url.parse(request.url).pathname;
const query = url.parse(request.url, true).query;
// 验证令牌
authenticateToken(query.token)
.then(user => {
// 存储用户信息,授权成功
request.user = user;
wss.handleUpgrade(request, socket, head, function (ws) {
wss.emit('connection', ws, request);
});
})
.catch(err => {
// 授权失败,关闭连接
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
});
});
// 验证令牌函数
function authenticateToken(token) {
return new Promise((resolve, reject) => {
if (!token) return reject('需要令牌');
jwt.verify(token, process.env.JWT_SECRET, (err, user) => {
if (err) return reject('无效令牌');
resolve(user);
});
});
}
最佳实践:单独为 WebSocket 连接创建短期令牌,而不是重用主应用的长期令牌。
始终验证从客户端接收的 WebSocket 消息:
// 客户端:发送结构化消息
socket.send(JSON.stringify({
type: 'chat_message',
content: message,
timestamp: Date.now()
}));
// 服务器:验证消息结构和内容
wss.on('connection', (ws, request) => {
const user = request.user; // 从身份验证中
ws.on('message', (message) => {
try {
// 解析并验证消息
const data = JSON.parse(message);
// 检查必要字段
if (!data.type || !data.content) {
return sendError(ws, '无效消息格式');
}
// 验证内容(防止XSS等)
if (!isValidContent(data.content)) {
return sendError(ws, '包含不允许的内容');
}
// 附加已验证的用户信息
data.userId = user.id;
data.username = user.username;
// 处理验证后的消息
handleMessage(data, ws);
} catch (e) {
sendError(ws, '无效的JSON');
}
});
});
function isValidContent(content) {
// 实现内容验证逻辑
// 例如:长度限制、XSS过滤等
return content.length > 0 &&
content.length <= 1000 &&
!/
防止 WebSocket 服务器过载:
// 服务器端速率限制实现
const clients = new Map(); // 跟踪客户端
wss.on('connection', (ws, request) => {
const userId = request.user.id;
// 存储客户端信息
clients.set(ws, {
userId: userId,
connectionTime: Date.now(),
messageCount: 0,
lastMessageTime: Date.now()
});
// 设置连接限制(例如2小时后自动断开)
const connectionTimeout = setTimeout(() => {
ws.close(1000, '连接超时');
}, 2 * 60 * 60 * 1000); // 2小时
ws.on('message', (message) => {
const clientInfo = clients.get(ws);
const now = Date.now();
// 速率限制:每分钟最多60条消息
if (clientInfo.messageCount > 60 &&
(now - clientInfo.lastMessageTime) < 60000) {
ws.send(JSON.stringify({
type: 'error',
message: '消息速率超限,请稍后再试'
}));
return;
}
// 限制消息大小
if (message.length > 10000) { // 10KB 限制
ws.send(JSON.stringify({
type: 'error',
message: '消息过大'
}));
return;
}
// 更新客户端统计信息
clientInfo.messageCount++;
clientInfo.lastMessageTime = now;
// 正常处理消息...
});
ws.on('close', () => {
// 清理
clearTimeout(connectionTimeout);
clients.delete(ws);
});
});
// 服务器全局限制
const MAX_CONNECTIONS = 10000;
wss.on('connection', (ws, request) => {
if (wss.clients.size > MAX_CONNECTIONS) {
ws.close(1013, '服务器已达最大连接数');
return;
}
// 正常连接处理...
});
虽然WebSocket不完全受制于同源策略,但仍应实施适当的CORS保护:
// Node.js服务器CORS验证
server.on('upgrade', function (request, socket, head) {
// 检查源
const origin = request.headers.origin;
const allowedOrigins = [
'https://example.com',
'https://subdomain.example.com'
];
if (!origin || !allowedOrigins.includes(origin)) {
socket.write('HTTP/1.1 403 Forbidden\r\n\r\n');
socket.destroy();
return;
}
// 继续处理连接...
});
实施 WebSocket 安全监控:
// 服务器日志记录示例
wss.on('connection', (ws, request) => {
const ip = request.socket.remoteAddress;
const userId = request.user ? request.user.id : 'anonymous';
console.log(`连接已建立 - 用户: ${userId}, IP: ${ip}, 时间: ${new Date().toISOString()}`);
ws.on('message', (message) => {
// 记录消息统计(不记录完整内容以保护隐私)
console.log(`收到消息 - 用户: ${userId}, 大小: ${message.length}字节, 时间: ${new Date().toISOString()}`);
// 对于可疑消息,记录更多信息
if (message.length > 5000) {
console.warn(`大消息警告 - 用户: ${userId}, 大小: ${message.length}字节`);
// 可以触发安全警报
}
});
ws.on('close', (code, reason) => {
console.log(`连接已关闭 - 用户: ${userId}, 代码: ${code}, 原因: ${reason}, 时间: ${new Date().toISOString()}`);
});
ws.on('error', (error) => {
console.error(`WebSocket错误 - 用户: ${userId}, 错误: ${error.message}, 时间: ${new Date().toISOString()}`);
});
});
在部署 WebSocket 应用前,请确认:
在学习本章内容前,请思考这些问题。这将帮助你更好地理解 Java WebSocket 的核心概念和应用场景。
Java API for WebSocket (JSR 356) 是 Java EE 7 引入的标准规范,提供了在 Java 中创建 WebSocket 应用的官方 API。
使用 Java WebSocket API 创建一个 WebSocket 端点非常简单,只需添加几个注解即可:
package com.example.websocket;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
@ServerEndpoint("/chat")
public class ChatEndpoint {
private static Set<Session> sessions = new CopyOnWriteArraySet<>();
@OnOpen
public void onOpen(Session session) {
sessions.add(session);
System.out.println("新连接建立: " + session.getId());
broadcastMessage("系统消息:用户 " + session.getId() + " 加入了聊天室");
}
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("收到消息: " + message + " 来自 " + session.getId());
broadcastMessage("用户 " + session.getId() + ": " + message);
}
@OnClose
public void onClose(Session session) {
sessions.remove(session);
System.out.println("连接关闭: " + session.getId());
broadcastMessage("系统消息:用户 " + session.getId() + " 离开了聊天室");
}
@OnError
public void onError(Throwable error, Session session) {
System.err.println("错误发生在会话 " + session.getId() + ": " + error.getMessage());
}
private void broadcastMessage(String message) {
for (Session session : sessions) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
System.err.println("发送消息失败: " + e.getMessage());
}
}
}
}
在 Java EE 环境中,WebSocket 端点会被自动扫描并注册,无需额外配置。但在 Servlet 容器(如 Tomcat)中,需要确保在 web.xml 或通过注解配置 WebSocket 支持:
// 使用编程方式注册端点
import javax.websocket.server.ServerContainer;
import javax.websocket.server.ServerEndpointConfig;
import javax.servlet.ServletContextListener;
import javax.servlet.ServletContextEvent;
import javax.servlet.annotation.WebListener;
@WebListener
public class WebSocketInitializer implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent sce) {
ServerContainer serverContainer = (ServerContainer) sce.getServletContext()
.getAttribute("javax.websocket.server.ServerContainer");
try {
serverContainer.addEndpoint(ChatEndpoint.class);
// 或使用配置类
serverContainer.addEndpoint(ServerEndpointConfig.Builder
.create(AdvancedChatEndpoint.class, "/advancedChat")
.configurator(new CustomConfigurator())
.build());
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
// 清理资源
}
}
Java WebSocket API 也支持创建客户端应用:
package com.example.websocket.client;
import java.net.URI;
import javax.websocket.ClientEndpoint;
import javax.websocket.ContainerProvider;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
@ClientEndpoint
public class WebSocketClient {
private Session session;
public WebSocketClient(URI endpointURI) {
try {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
container.connectToServer(this, endpointURI);
} catch (Exception e) {
e.printStackTrace();
}
}
@OnOpen
public void onOpen(Session session) {
this.session = session;
System.out.println("已连接到服务器");
}
@OnMessage
public void onMessage(String message) {
System.out.println("收到消息: " + message);
}
public void sendMessage(String message) {
try {
session.getBasicRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
// 使用示例
public static void main(String[] args) {
try {
WebSocketClient client = new WebSocketClient(new URI("ws://localhost:8080/chat"));
// 等待连接建立
Thread.sleep(1000);
// 发送消息
client.sendMessage("Hello from Java client!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
Spring 框架提供了对 WebSocket 的强大支持,尤其是与 STOMP 子协议的集成,使得构建复杂的 WebSocket 应用更加简单。
在 Spring Boot 应用中配置 WebSocket:
package com.example.websocket.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 启用简单的内存消息代理
registry.enableSimpleBroker("/topic", "/queue");
// 设置应用前缀
registry.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册 STOMP 端点
registry.addEndpoint("/ws")
.setAllowedOrigins("*")
.withSockJS(); // 启用 SockJS 回退
}
}
使用 Spring 的消息映射机制处理 WebSocket 消息:
package com.example.websocket.controller;
import com.example.websocket.model.ChatMessage;
import com.example.websocket.model.OutputMessage;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
import java.text.SimpleDateFormat;
import java.util.Date;
@Controller
public class ChatController {
@MessageMapping("/chat")
@SendTo("/topic/messages")
public OutputMessage send(ChatMessage message) {
String time = new SimpleDateFormat("HH:mm:ss").format(new Date());
return new OutputMessage(message.getFrom(), message.getText(), time);
}
}
// 消息模型
package com.example.websocket.model;
public class ChatMessage {
private String from;
private String text;
// getters and setters
public String getFrom() {
return from;
}
public void setFrom(String from) {
this.from = from;
}
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
}
public class OutputMessage {
private String from;
private String text;
private String time;
public OutputMessage(String from, String text, String time) {
this.from = from;
this.text = text;
this.time = time;
}
// getters and setters
public String getFrom() {
return from;
}
public void setFrom(String from) {
this.from = from;
}
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
}
Spring Security 与 WebSocket 集成,实现认证和消息拦截:
package com.example.websocket.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.security.config.annotation.web.messaging.MessageSecurityMetadataSourceRegistry;
import org.springframework.security.config.annotation.web.socket.AbstractSecurityWebSocketMessageBrokerConfigurer;
@Configuration
public class WebSocketSecurityConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {
@Override
protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
messages
.simpTypeMatchers(SimpMessageType.CONNECT, SimpMessageType.HEARTBEAT, SimpMessageType.DISCONNECT).permitAll()
.simpDestMatchers("/app/**").hasRole("USER")
.simpSubscribeDestMatchers("/topic/**", "/queue/**").hasRole("USER")
.anyMessage().denyAll();
}
@Override
protected boolean sameOriginDisabled() {
// 允许跨域
return true;
}
}
// 自定义通道拦截器
package com.example.websocket.interceptor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.stereotype.Component;
@Component
public class WebSocketInterceptor implements ChannelInterceptor {
@Override
public Message> preSend(Message> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
// 获取用户认证信息或其他自定义逻辑
String authToken = accessor.getFirstNativeHeader("X-Auth-Token");
if (authToken != null) {
// 验证令牌并设置用户信息
// accessor.setUser(...)
}
}
return message;
}
}
Spring WebFlux 提供了响应式 WebSocket 支持,适合构建高并发的 WebSocket 应用:
package com.example.websocket.reactive;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import java.util.HashMap;
import java.util.Map;
import reactor.core.publisher.Flux;
@Configuration
public class ReactiveWebSocketConfig {
@Bean
public HandlerMapping webSocketHandlerMapping() {
Map map = new HashMap<>();
map.put("/reactive-socket", new ReactiveWebSocketHandler());
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(1);
handlerMapping.setUrlMap(map);
return handlerMapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
class ReactiveWebSocketHandler implements WebSocketHandler {
@Override
public reactor.core.publisher.Mono handle(org.springframework.web.reactive.socket.WebSocketSession session) {
// 处理传入消息
Flux output = session.receive()
.map(message -> message.getPayloadAsText())
.map(text -> "响应: " + text)
.map(session::textMessage);
return session.send(output);
}
}
下面是一个使用 Spring Boot 和 WebSocket 构建的股票交易平台后端示例:
src/main/java/com/example/stocktrading/
├── StockTradingApplication.java
├── config/
│ ├── WebSocketConfig.java
│ └── SchedulerConfig.java
├── controller/
│ └── StockController.java
├── model/
│ ├── StockQuote.java
│ └── TradeRequest.java
├── service/
│ ├── StockService.java
│ └── StockQuoteGenerator.java
└── exception/
└── TradingException.java
package com.example.stocktrading.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
registry.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/stock-trading")
.setAllowedOrigins("*")
.withSockJS();
}
}
package com.example.stocktrading.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import com.example.stocktrading.model.StockQuote;
import com.example.stocktrading.service.StockQuoteGenerator;
import java.util.List;
@Configuration
@EnableScheduling
public class SchedulerConfig {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private StockQuoteGenerator quoteGenerator;
@Scheduled(fixedRate = 2000) // 每2秒更新一次
public void sendStockQuotes() {
List quotes = quoteGenerator.generateRandomQuotes();
messagingTemplate.convertAndSend("/topic/stock-quotes", quotes);
}
}
package com.example.stocktrading.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
import com.example.stocktrading.exception.TradingException;
import com.example.stocktrading.model.TradeRequest;
import com.example.stocktrading.model.TradeResponse;
import com.example.stocktrading.service.StockService;
@Controller
public class StockController {
@Autowired
private StockService stockService;
@Autowired
private SimpMessagingTemplate messagingTemplate;
@MessageMapping("/trade")
public void executeTrade(TradeRequest request) {
try {
TradeResponse response = stockService.processTrade(request);
// 向特定用户发送交易确认
messagingTemplate.convertAndSendToUser(
request.getUserId(),
"/queue/trade-confirmation",
response
);
// 广播交易更新(不含敏感信息)
messagingTemplate.convertAndSend(
"/topic/market-activity",
stockService.getMarketUpdate(request.getSymbol())
);
} catch (TradingException e) {
// 发送错误响应
messagingTemplate.convertAndSendToUser(
request.getUserId(),
"/queue/errors",
e.getMessage()
);
}
}
@MessageMapping("/subscribe-symbol")
@SendTo("/topic/symbols/")
public void subscribeToSymbol(String symbol) {
// 用户订阅特定股票的更新
// 可以在这里记录用户的订阅以便定向推送
return;
}
}
package com.example.stocktrading.model;
import java.math.BigDecimal;
import java.time.LocalDateTime;
public class StockQuote {
private String symbol;
private BigDecimal price;
private BigDecimal change;
private BigDecimal changePercent;
private long volume;
private LocalDateTime timestamp;
// 构造函数、getter和setter
}
public class TradeRequest {
private String userId;
private String symbol;
private int quantity;
private String orderType; // "BUY" or "SELL"
private BigDecimal limitPrice; // 可选的限价
// 构造函数、getter和setter
}
public class TradeResponse {
private String tradeId;
private String status;
private BigDecimal executedPrice;
private LocalDateTime executionTime;
private String message;
// 构造函数、getter和setter
}
实现一个简单的实时聊天应用,包含以下功能:
这个练习将帮助你理解 Java WebSocket 编程的核心概念和最佳实践。
Python拥有多个强大的WebSocket库,可以轻松实现客户端和服务器端WebSocket应用。接下来我们将介绍几个常用的Python WebSocket库及其使用方法。
websockets
是一个流行的Python库,提供了异步WebSocket客户端和服务器端实现,基于Python的asyncio库。
pip install websockets
以下是一个简单的WebSocket服务器实现:
#!/usr/bin/env python
import asyncio
import websockets
import json
from datetime import datetime
# 存储所有连接的客户端
connected_clients = set()
async def chat(websocket, path):
# 注册客户端
connected_clients.add(websocket)
try:
# 发送欢迎消息
await websocket.send(json.dumps({
"type": "system",
"message": "欢迎加入聊天室!",
"timestamp": datetime.now().strftime("%H:%M:%S")
}))
# 通知其他用户有新用户加入
if len(connected_clients) > 1:
await notify_others(websocket, "system", "新用户加入了聊天室")
# 接收消息并广播
async for message in websocket:
try:
data = json.loads(message)
# 添加时间戳
data["timestamp"] = datetime.now().strftime("%H:%M:%S")
# 广播给所有客户端
await broadcast(json.dumps(data))
except json.JSONDecodeError:
# 处理非JSON格式的消息
await websocket.send(json.dumps({
"type": "error",
"message": "消息格式错误,请发送JSON格式",
"timestamp": datetime.now().strftime("%H:%M:%S")
}))
except websockets.exceptions.ConnectionClosed:
print("客户端连接断开")
finally:
# 客户端断开连接时,从集合中移除
connected_clients.remove(websocket)
# 通知其他用户有用户离开
if connected_clients:
await notify_others(websocket, "system", "有用户离开了聊天室")
async def broadcast(message):
"""向所有连接的客户端广播消息"""
if connected_clients:
await asyncio.gather(
*[client.send(message) for client in connected_clients]
)
async def notify_others(sender, msg_type, message):
"""通知除发送者外的所有客户端"""
others = connected_clients - {sender}
if others:
notification = json.dumps({
"type": msg_type,
"message": message,
"timestamp": datetime.now().strftime("%H:%M:%S")
})
await asyncio.gather(*[client.send(notification) for client in others])
# 启动WebSocket服务器
start_server = websockets.serve(chat, "localhost", 8765)
print("WebSocket服务器启动在 ws://localhost:8765")
# 启动事件循环
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
以下是一个基于asyncio的WebSocket客户端实现:
#!/usr/bin/env python
import asyncio
import websockets
import json
import aioconsole
async def receive_messages(websocket):
"""接收并显示服务器发送的消息"""
try:
async for message in websocket:
data = json.loads(message)
if data["type"] == "system":
print(f"\n[系统消息 {data['timestamp']}] {data['message']}")
elif data["type"] == "chat":
print(f"\n[{data['username']} {data['timestamp']}] {data['message']}")
elif data["type"] == "error":
print(f"\n[错误 {data['timestamp']}] {data['message']}")
except websockets.exceptions.ConnectionClosed:
print("\n连接已关闭")
async def send_messages(websocket, username):
"""从控制台读取消息并发送到服务器"""
try:
while True:
message = await aioconsole.ainput("")
if message.lower() == "/quit":
break
await websocket.send(json.dumps({
"type": "chat",
"username": username,
"message": message
}))
except websockets.exceptions.ConnectionClosed:
pass
async def chat_client():
# 获取用户名
username = input("请输入您的用户名: ")
# 连接到WebSocket服务器
uri = "ws://localhost:8765"
print(f"正在连接到 {uri}...")
try:
async with websockets.connect(uri) as websocket:
print("连接成功!输入消息开始聊天,输入 /quit 退出。")
# 创建两个任务:一个接收消息,一个发送消息
receive_task = asyncio.create_task(receive_messages(websocket))
send_task = asyncio.create_task(send_messages(websocket, username))
# 等待任一任务完成
done, pending = await asyncio.wait(
[receive_task, send_task],
return_when=asyncio.FIRST_COMPLETED
)
# 取消未完成的任务
for task in pending:
task.cancel()
except (websockets.exceptions.InvalidStatusCode, ConnectionRefusedError) as e:
print(f"连接错误: {e}")
except Exception as e:
print(f"发生未知错误: {e}")
# 运行客户端
asyncio.run(chat_client())
提示: 要运行上面的客户端示例,您需要安装 aioconsole
库用于异步控制台输入:pip install aioconsole
FastAPI是一个现代、高性能的Python Web框架,它也提供了内置的WebSocket支持。
pip install fastapi uvicorn
下面是一个基于FastAPI的WebSocket聊天服务器实现:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Dict, Optional
import json
from datetime import datetime, timedelta
from pydantic import BaseModel
import jwt
from passlib.context import CryptContext
# 创建FastAPI应用
app = FastAPI(title="FastAPI WebSocket聊天")
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 在生产环境中应该限制来源
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 用于储存用户数据(实际应用中应使用数据库)
fake_users_db = {
"admin": {
"username": "admin",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", # "password"
"disabled": False,
}
}
# 用于JWT认证的密钥
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 密码流程
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# 用户模型
class User(BaseModel):
username: str
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
# WebSocket连接管理器
class ConnectionManager:
def __init__(self):
# 存储活跃的WebSocket连接
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, user_id: str):
await websocket.accept()
self.active_connections[user_id] = websocket
# 系统消息 - 用户加入
await self.broadcast(
json.dumps({
"type": "system",
"content": f"{user_id} 加入了聊天",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
)
def disconnect(self, user_id: str):
if user_id in self.active_connections:
del self.active_connections[user_id]
async def send_personal_message(self, message: str, user_id: str):
if user_id in self.active_connections:
await self.active_connections[user_id].send_text(message)
async def broadcast(self, message: str):
# 向所有连接的客户端广播消息
for connection in self.active_connections.values():
await connection.send_text(message)
# 创建连接管理器实例
manager = ConnectionManager()
# 辅助函数
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except jwt.PyJWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
# 路由
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
# 认证用户
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# 创建访问令牌
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.websocket("/ws/{token}")
async def websocket_endpoint(websocket: WebSocket, token: str):
try:
# 验证令牌并获取用户
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None or username not in fake_users_db:
await websocket.close(code=1008, reason="Invalid token")
return
# 接受WebSocket连接
await manager.connect(websocket, username)
try:
# 监听WebSocket消息
while True:
data = await websocket.receive_text()
# 解析消息数据
try:
message_data = json.loads(data)
# 添加发送者和时间戳
message_data["sender"] = username
message_data["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 广播消息给所有用户
await manager.broadcast(json.dumps(message_data))
except json.JSONDecodeError:
# 发送错误消息给用户
await manager.send_personal_message(
json.dumps({
"type": "error",
"content": "Invalid JSON format",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}),
username
)
except WebSocketDisconnect:
# 用户断开连接
manager.disconnect(username)
# 通知其他用户
await manager.broadcast(
json.dumps({
"type": "system",
"content": f"{username} 离开了聊天",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
)
except jwt.PyJWTError:
await websocket.close(code=1008, reason="Invalid token")
# 前端页面路由(实际应用中应该使用单独的前端)
@app.get("/")
async def get():
return {"message": "WebSocket聊天服务器正在运行"}
# 运行应用(使用命令:uvicorn app:app --reload)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
下面是一个配合FastAPI WebSocket服务器使用的简单HTML客户端:
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebSocket聊天室</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
#login-form, #chat-container {
border: 1px solid #ccc;
padding: 20px;
border-radius: 5px;
margin-bottom: 20px;
}
#messages {
height: 300px;
overflow-y: auto;
border: 1px solid #eee;
padding: 10px;
margin-bottom: 10px;
border-radius: 5px;
}
.message {
margin-bottom: 10px;
padding: 8px;
border-radius: 5px;
}
.system {
background-color: #f0f0f0;
color: #555;
font-style: italic;
}
.user {
background-color: #e1f5fe;
}
.self {
background-color: #e8f5e9;
text-align: right;
}
.error {
background-color: #ffebee;
color: #c62828;
}
.hidden {
display: none;
}
input[type="text"], input[type="password"], button {
padding: 8px;
margin-bottom: 10px;
}
#message-input {
width: 80%;
margin-right: 10px;
}
.timestamp {
font-size: 0.8em;
color: #999;
display: block;
}
.sender {
font-weight: bold;
}
</style>
</head>
<body>
<h1>WebSocket聊天室</h1>
<div id="login-form">
<h2>登录</h2>
<div>
<label for="username">用户名:</label>
<input type="text" id="username" placeholder="输入用户名" required>
</div>
<div>
<label for="password">密码:</label>
<input type="password" id="password" placeholder="输入密码" required>
</div>
<button id="login-btn">登录</button>
<p id="login-error" style="color: red; display: none;"></p>
</div>
<div id="chat-container" class="hidden">
<h2>聊天室</h2>
<div id="messages"></div>
<div id="input-container">
<input type="text" id="message-input" placeholder="输入消息...">
<button id="send-btn">发送</button>
</div>
</div>
<script>
// DOM元素
const loginForm = document.getElementById('login-form');
const loginError = document.getElementById('login-error');
const chatContainer = document.getElementById('chat-container');
const messagesDiv = document.getElementById('messages');
const messageInput = document.getElementById('message-input');
const sendBtn = document.getElementById('send-btn');
const loginBtn = document.getElementById('login-btn');
let socket = null;
let token = null;
let username = '';
// 登录处理
loginBtn.addEventListener('click', async () => {
username = document.getElementById('username').value;
const password = document.getElementById('password').value;
if (!username || !password) {
showLoginError('用户名和密码不能为空');
return;
}
try {
// 发送登录请求
const response = await fetch('/token', {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
},
body: `username=${encodeURIComponent(username)}&password=${encodeURIComponent(password)}`,
});
const data = await response.json();
if (response.ok) {
token = data.access_token;
// 登录成功,切换到聊天界面
loginForm.classList.add('hidden');
chatContainer.classList.remove('hidden');
// 连接WebSocket
connectWebSocket();
} else {
showLoginError(data.detail || '登录失败');
}
} catch (error) {
showLoginError('连接服务器失败');
console.error('Login error:', error);
}
});
// 显示登录错误
function showLoginError(message) {
loginError.textContent = message;
loginError.style.display = 'block';
}
// 连接WebSocket
function connectWebSocket() {
// 使用JWT token建立WebSocket连接
socket = new WebSocket(`ws://${window.location.host}/ws/${token}`);
// 连接打开
socket.onopen = function(event) {
addMessage('system', '已连接到聊天服务器');
};
// 收到消息
socket.onmessage = function(event) {
const message = JSON.parse(event.data);
if (message.type === 'system') {
addMessage('system', message.content, message.timestamp);
} else if (message.type === 'chat') {
const isSelf = message.sender === username;
addMessage(isSelf ? 'self' : 'user', message.content, message.timestamp, message.sender);
} else if (message.type === 'error') {
addMessage('error', message.content, message.timestamp);
}
};
// 连接关闭
socket.onclose = function(event) {
addMessage('system', '与服务器的连接已关闭');
// 尝试重新连接
setTimeout(() => {
if (token) {
connectWebSocket();
}
}, 5000);
};
// 连接错误
socket.onerror = function(error) {
addMessage('error', '连接错误');
console.error('WebSocket error:', error);
};
}
// 发送消息
sendBtn.addEventListener('click', sendMessage);
messageInput.addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
function sendMessage() {
const message = messageInput.value.trim();
if (message && socket && socket.readyState === WebSocket.OPEN) {
// 发送聊天消息
socket.send(JSON.stringify({
type: 'chat',
content: message
}));
// 清空输入框
messageInput.value = '';
}
}
// 向聊天窗口添加消息
function addMessage(type, content, timestamp = null, sender = null) {
const messageElem = document.createElement('div');
messageElem.className = `message ${type}`;
let messageContent = '';
if (sender && type !== 'self') {
messageContent += `<span class="sender">${sender}</span>: `;
}
messageContent += content;
if (timestamp) {
messageContent += `<span class="timestamp">${timestamp}</span>`;
}
messageElem.innerHTML = messageContent;
messagesDiv.appendChild(messageElem);
// 滚动到底部
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
</script>
</body>
</html>
除了上述库外,Python还有其他几个WebSocket库值得关注:
库 | 异步支持 | 框架集成 | 类型 | 适用场景 |
---|---|---|---|---|
websockets | 是 (asyncio) | 独立 | 纯WebSocket | 需要完全控制WebSocket实现的应用 |
FastAPI | 是 (asyncio) | FastAPI | 纯WebSocket | 构建REST API同时需要WebSocket功能 |
aiohttp | 是 (asyncio) | aiohttp | 纯WebSocket | 异步HTTP客户端/服务器需要WebSocket |
Django Channels | 是 (asyncio) | Django | 多协议 | 在Django应用中添加实时功能 |
Flask-SocketIO | 可选 | Flask | Socket.IO | 在Flask应用中快速添加WebSocket功能 |
尝试实现以下WebSocket练习:
实现一个简单的实时聊天应用,包含以下功能:
这个练习将帮助你理解 Java WebSocket 编程的核心概念和最佳实践。
WebSocket连接的建立过程分为两个主要阶段:HTTP握手和WebSocket协议升级。
// 客户端发起握手请求
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Origin: http://example.com
// 服务器响应
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Upgrade: websocket
- 表示请求升级到WebSocket协议Connection: Upgrade
- 表示这是一个升级连接请求Sec-WebSocket-Key
- 客户端生成的随机密钥,用于安全验证Sec-WebSocket-Version
- 指定使用的WebSocket协议版本Origin
- 表示请求的来源,用于同源策略检查HTTP/1.1 101 Switching Protocols
- 表示协议切换成功Sec-WebSocket-Accept
- 服务器根据客户端密钥计算出的响应值WebSocket连接在其生命周期中会经历不同的状态:
// WebSocket状态常量
const CONNECTING = 0; // 连接尚未建立
const OPEN = 1; // 连接已建立,可以通信
const CLOSING = 2; // 连接正在关闭
const CLOSED = 3; // 连接已关闭或无法建立
WebSocket消息以帧的形式传输,每个帧包含以下部分:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
WebSocket协议通过Ping/Pong帧实现心跳机制,用于保持连接活跃和检测连接状态。
// 服务器发送Ping帧
ws.ping('heartbeat');
// 客户端自动响应Pong帧
ws.on('pong', (data) => {
console.log('收到Pong响应:', data);
});
WebSocket连接可能会因为各种原因断开,需要实现适当的错误处理和重连机制。
class WebSocketClient {
constructor(url) {
this.url = url;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('连接已建立');
this.reconnectAttempts = 0;
};
this.ws.onclose = (event) => {
console.log('连接已关闭:', event.code, event.reason);
this.handleReconnect();
};
this.ws.onerror = (error) => {
console.error('连接错误:', error);
};
}
handleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`将在 ${delay}ms 后尝试重连 (尝试 ${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
setTimeout(() => {
this.connect();
}, delay);
} else {
console.error('达到最大重连次数,停止重连');
}
}
}
基于上述知识,实现一个具有以下特性的WebSocket客户端:
提示:
在使用WebSocket之前,需要确保开发环境满足以下要求:
服务器端需要完成以下配置:
// Node.js示例
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', function connection(ws) {
console.log('新客户端连接');
ws.on('message', function incoming(message) {
console.log('收到消息:', message);
// 处理消息
});
ws.on('close', function close() {
console.log('客户端断开连接');
});
});
客户端需要完成以下步骤:
// 创建WebSocket连接
const ws = new WebSocket('wss://example.com/ws');
// 连接建立时
ws.onopen = function() {
console.log('连接已建立');
// 发送初始消息
ws.send('Hello Server!');
};
// 接收消息时
ws.onmessage = function(event) {
console.log('收到消息:', event.data);
// 处理接收到的消息
};
// 连接关闭时
ws.onclose = function(event) {
console.log('连接已关闭:', event.code, event.reason);
};
// 发生错误时
ws.onerror = function(error) {
console.error('WebSocket错误:', error);
};
class WebSocketClient {
constructor(url) {
this.url = url;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('连接已建立');
this.reconnectAttempts = 0;
};
this.ws.onclose = () => {
this.handleReconnect();
};
this.ws.onerror = (error) => {
console.error('连接错误:', error);
};
}
handleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
setTimeout(() => this.connect(), delay);
}
}
}
开发过程中需要进行以下测试:
在生产环境中需要注意:
使用WebSocket时应该遵循以下最佳实践:
开发过程中可能遇到的常见问题: