IMS系统WebSocket实时通信完全指南

WebSocket 提供了全双工通信能力,是 IMS 系统实现实时通知、在线状态、实时协作等功能的基础。本文详细介绍 WebSocket 在 IMS 系统中的架构设计与最佳实践。

WebSocket 连接管理

管理大量并发的 WebSocket 连接:

/* WebSocket 服务 */
class WebSocketServer {
    constructor(server) {
        this.wss = new WebSocketServer({ server });
        this.clients = new Map();  // clientId -> WebSocket
        this.userConnections = new Map();  // userId -> Set

        this.wss.on('connection', (ws, req) => this.handleConnection(ws, req));
    }

    handleConnection(ws, req) {
        /* 解析 token 认证 */
        const token = parseToken(req.url);
        const user = verifyToken(token);

        if (!user) {
            ws.close(4001, 'Unauthorized');
            return;
        }

        /* 生成客户端 ID */
        const clientId = generateUUID();
        ws.clientId = clientId;
        ws.userId = user.id;

        /* 存储连接 */
        this.clients.set(clientId, ws);

        if (!this.userConnections.has(user.id)) {
            this.userConnections.set(user.id, new Set());
        }
        this.userConnections.get(user.id).add(clientId);

        /* 绑定事件 */
        ws.on('message', (msg) => this.handleMessage(ws, msg));
        ws.on('close', () => this.handleClose(ws));
        ws.on('error', (err) => this.handleError(ws, err));

        /* 发送连接成功消息 */
        this.send(ws, { type: 'connected', clientId });
    }

    handleClose(ws) {
        this.clients.delete(ws.clientId);

        const userConns = this.userConnections.get(ws.userId);
        if (userConns) {
            userConns.delete(ws.clientId);
            if (userConns.size === 0) {
                this.userConnections.delete(ws.userId);
            }
        }
    }
}

心跳检测机制

保持连接活跃,检测断开的客户端:

/* 心跳管理器 */
class HeartbeatManager {
    constructor(wsServer) {
        this.wsServer = wsServer;
        this.pingInterval = 30000;  // 30 秒 ping 一次
        this.pongTimeout = 10000;    // 10 秒未收到 pong 断开
        this.timers = new Map();  // clientId -> timer
    }

    start() {
        setInterval(() => this.pingAll(), this.pingInterval);
    }

    pingAll() {
        for (const [clientId, ws] of this.wsServer.clients) {
            if (ws.readyState === WebSocket.OPEN) {
                this.ping(ws, clientId);
            }
        }
    }

    ping(ws, clientId) {
        /* 发送 ping */
        ws.send(JSON.stringify({ type: 'ping', timestamp: Date.now() }));

        /* 设置超时 */
        const timer = setTimeout(() => {
            if (ws.readyState === WebSocket.OPEN) {
                console.log(`Client ${clientId} pong timeout, closing`);
                ws.close(4002, 'Pong timeout');
            }
        }, this.pongTimeout);

        this.timers.set(clientId, timer);
    }

    handlePong(clientId) {
        const timer = this.timers.get(clientId);
        if (timer) {
            clearTimeout(timer);
            this.timers.delete(clientId);
        }
    }
}

消息推送架构

实现高效的批量消息推送:

/* 消息推送服务 */
class MessagePushService {
    constructor(wsServer) {
        this.wsServer = wsServer;
        this.redis = new Redis();
        this.subscriber = this.redis.duplicate();
    }

    init() {
        /* 订阅消息队列 */
        this.subscriber.subscribe('ims:push');
        this.subscriber.on('message', (channel, msg) => {
            this.handlePushMessage(msg);
        });
    }

    handlePushMessage(msg) {
        const data = JSON.parse(msg);
        const { targetType, target, payload } = data;

        switch (targetType) {
            case 'user':
                this.pushToUser(target, payload);
                break;
            case 'role':
                this.pushToRole(target, payload);
                break;
            case 'all':
                this.pushToAll(payload);
                break;
        }
    }

    pushToUser(userId, payload) {
        const connections = this.wsServer.userConnections.get(userId);
        if (!connections) return;

        const message = JSON.stringify(payload);

        for (const clientId of connections) {
            const ws = this.wsServer.clients.get(clientId);
            if (ws && ws.readyState === WebSocket.OPEN) {
                ws.send(message);
            }
        }
    }

    pushToRole(roleId, payload) {
        /* 获取角色下的所有用户 */
        getUsersByRole(roleId).then(users => {
            users.forEach(user => this.pushToUser(user.id, payload));
        });
    }
}

消息可靠性

  • 离线消息存入数据库,重新上线时推送
  • 使用消息确认机制确保送达
  • 对重要消息做持久化记录

断线重连处理

客户端断线后自动重连:

/* 客户端重连逻辑 */
class WebSocketClient {
    constructor(url) {
        this.url = url;
        this.reconnectDelay = 1000;
        this.maxReconnectDelay = 30000;
        this.maxReconnectAttempts = 10;
    }

    connect() {
        this.ws = new WebSocket(this.url);

        this.ws.onclose = (event) => {
            if (!event.wasClean) {
                this.scheduleReconnect();
            }
        };

        this.ws.onerror = () => {
            this.scheduleReconnect();
        };
    }

    scheduleReconnect() {
        if (this.reconnectAttempts >= this.maxReconnectAttempts) {
            console.error('Max reconnect attempts reached');
            return;
        }

        setTimeout(() => {
            this.reconnectAttempts++;
            this.reconnectDelay = Math.min(
                this.reconnectDelay * 2,
                this.maxReconnectDelay
            );
            this.connect();
        }, this.reconnectDelay);
    }
}

消息类型定义

统一的消息协议格式:

/* 消息协议 */
const MESSAGE_TYPES = {
    /* 系统消息 */
    PING: 'ping',
    PONG: 'pong',
    CONNECTED: 'connected',
    ERROR: 'error',

    /* 业务消息 */
    NOTIFICATION: 'notification',  // 通知
    ORDER_UPDATE: 'order_update',  // 订单更新
    MESSAGE: 'message',          // 站内信
    ONLINE_STATUS: 'online_status', // 在线状态
    COLLABORATION: 'collaboration', // 实时协作
};

/* 消息格式 */
interface WSMessage {
    type: string;
    id?: string;
    timestamp: number;
    payload: object;
}

安全性考虑

  • 认证:使用 JWT token 认证,连接时验证
  • 加密:生产环境使用 WSS(WebSocket Secure)
  • 限流:限制单个连接的消息发送频率
  • IP 白名单:对企业内部服务限制访问 IP

总结

  • 连接管理:维护客户端与用户映射关系
  • 心跳检测:保持连接活跃,检测断开的客户端
  • 消息推送:支持按用户、角色、全局推送
  • 断线重连:指数退避算法实现稳定重连
  • 消息协议:统一的 JSON 消息格式