WebSocket 提供了全双工通信能力,是 IMS 系统实现实时通知、在线状态、实时协作等功能的基础。本文详细介绍 WebSocket 在 IMS 系统中的架构设计与最佳实践。
WebSocket 连接管理
管理大量并发的 WebSocket 连接:
/* WebSocket 服务 */
class WebSocketServer {
constructor(server) {
this.wss = new WebSocketServer({ server });
this.clients = new Map(); // clientId -> WebSocket
this.userConnections = new Map(); // userId -> Set
this.wss.on('connection', (ws, req) => this.handleConnection(ws, req));
}
handleConnection(ws, req) {
/* 解析 token 认证 */
const token = parseToken(req.url);
const user = verifyToken(token);
if (!user) {
ws.close(4001, 'Unauthorized');
return;
}
/* 生成客户端 ID */
const clientId = generateUUID();
ws.clientId = clientId;
ws.userId = user.id;
/* 存储连接 */
this.clients.set(clientId, ws);
if (!this.userConnections.has(user.id)) {
this.userConnections.set(user.id, new Set());
}
this.userConnections.get(user.id).add(clientId);
/* 绑定事件 */
ws.on('message', (msg) => this.handleMessage(ws, msg));
ws.on('close', () => this.handleClose(ws));
ws.on('error', (err) => this.handleError(ws, err));
/* 发送连接成功消息 */
this.send(ws, { type: 'connected', clientId });
}
handleClose(ws) {
this.clients.delete(ws.clientId);
const userConns = this.userConnections.get(ws.userId);
if (userConns) {
userConns.delete(ws.clientId);
if (userConns.size === 0) {
this.userConnections.delete(ws.userId);
}
}
}
}
心跳检测机制
保持连接活跃,检测断开的客户端:
/* 心跳管理器 */
class HeartbeatManager {
constructor(wsServer) {
this.wsServer = wsServer;
this.pingInterval = 30000; // 30 秒 ping 一次
this.pongTimeout = 10000; // 10 秒未收到 pong 断开
this.timers = new Map(); // clientId -> timer
}
start() {
setInterval(() => this.pingAll(), this.pingInterval);
}
pingAll() {
for (const [clientId, ws] of this.wsServer.clients) {
if (ws.readyState === WebSocket.OPEN) {
this.ping(ws, clientId);
}
}
}
ping(ws, clientId) {
/* 发送 ping */
ws.send(JSON.stringify({ type: 'ping', timestamp: Date.now() }));
/* 设置超时 */
const timer = setTimeout(() => {
if (ws.readyState === WebSocket.OPEN) {
console.log(`Client ${clientId} pong timeout, closing`);
ws.close(4002, 'Pong timeout');
}
}, this.pongTimeout);
this.timers.set(clientId, timer);
}
handlePong(clientId) {
const timer = this.timers.get(clientId);
if (timer) {
clearTimeout(timer);
this.timers.delete(clientId);
}
}
}
消息推送架构
实现高效的批量消息推送:
/* 消息推送服务 */
class MessagePushService {
constructor(wsServer) {
this.wsServer = wsServer;
this.redis = new Redis();
this.subscriber = this.redis.duplicate();
}
init() {
/* 订阅消息队列 */
this.subscriber.subscribe('ims:push');
this.subscriber.on('message', (channel, msg) => {
this.handlePushMessage(msg);
});
}
handlePushMessage(msg) {
const data = JSON.parse(msg);
const { targetType, target, payload } = data;
switch (targetType) {
case 'user':
this.pushToUser(target, payload);
break;
case 'role':
this.pushToRole(target, payload);
break;
case 'all':
this.pushToAll(payload);
break;
}
}
pushToUser(userId, payload) {
const connections = this.wsServer.userConnections.get(userId);
if (!connections) return;
const message = JSON.stringify(payload);
for (const clientId of connections) {
const ws = this.wsServer.clients.get(clientId);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
}
}
pushToRole(roleId, payload) {
/* 获取角色下的所有用户 */
getUsersByRole(roleId).then(users => {
users.forEach(user => this.pushToUser(user.id, payload));
});
}
}
消息可靠性
- 离线消息存入数据库,重新上线时推送
- 使用消息确认机制确保送达
- 对重要消息做持久化记录
断线重连处理
客户端断线后自动重连:
/* 客户端重连逻辑 */
class WebSocketClient {
constructor(url) {
this.url = url;
this.reconnectDelay = 1000;
this.maxReconnectDelay = 30000;
this.maxReconnectAttempts = 10;
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onclose = (event) => {
if (!event.wasClean) {
this.scheduleReconnect();
}
};
this.ws.onerror = () => {
this.scheduleReconnect();
};
}
scheduleReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('Max reconnect attempts reached');
return;
}
setTimeout(() => {
this.reconnectAttempts++;
this.reconnectDelay = Math.min(
this.reconnectDelay * 2,
this.maxReconnectDelay
);
this.connect();
}, this.reconnectDelay);
}
}
消息类型定义
统一的消息协议格式:
/* 消息协议 */
const MESSAGE_TYPES = {
/* 系统消息 */
PING: 'ping',
PONG: 'pong',
CONNECTED: 'connected',
ERROR: 'error',
/* 业务消息 */
NOTIFICATION: 'notification', // 通知
ORDER_UPDATE: 'order_update', // 订单更新
MESSAGE: 'message', // 站内信
ONLINE_STATUS: 'online_status', // 在线状态
COLLABORATION: 'collaboration', // 实时协作
};
/* 消息格式 */
interface WSMessage {
type: string;
id?: string;
timestamp: number;
payload: object;
}
安全性考虑
- 认证:使用 JWT token 认证,连接时验证
- 加密:生产环境使用 WSS(WebSocket Secure)
- 限流:限制单个连接的消息发送频率
- IP 白名单:对企业内部服务限制访问 IP
总结
- 连接管理:维护客户端与用户映射关系
- 心跳检测:保持连接活跃,检测断开的客户端
- 消息推送:支持按用户、角色、全局推送
- 断线重连:指数退避算法实现稳定重连
- 消息协议:统一的 JSON 消息格式