企业 IMS 系统需要实时推送各类通知和消息,包括系统通知、待办提醒、聊天消息等。本文详细介绍消息推送系统的设计与实现,包括 WebSocket 长连接、消息存储、离线推送等核心功能。
一、推送架构概述
/* 消息推送架构 */ ┌─────────────────────────────────────────────────────┐ │ 消息推送系统 │ ├─────────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ 消息生产 │ │ 消息路由 │ │ 推送通道 │ │ │ │ (业务服务) │→│ (消息队列) │→│ (Gateway) │ │ │ └─────────────┘ └─────────────┘ └──────┬──────┘ │ └───────────────────────┬──────────────────────┬──────┘ ↓ ↓ ┌──────────────────┐ ┌──────────────────┐ │ 在线用户 │ │ 离线用户 │ │ (WebSocket) │ │ (APNs/FCM/短信) │ └──────────────────┘ └──────────────────┘
二、WebSocket 连接管理
2.1 连接建立
/* WebSocket 握手升级 */ server.on('upgrade', (request, socket, head) => { // 1. 验证 Token const token = parseToken(request.url); if (!token || !verifyToken(token)) { socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); socket.destroy(); return; } // 2. 创建 WebSocket 连接 const ws = new WebSocketServer({ noServer: true }); ws.handleUpgrade(request, socket, head, (ws) => { // 3. 关联用户与连接 const userId = getUserId(token); connectionManager.add(userId, ws); ws.on('message', (msg) => handleMessage(userId, msg)); ws.on('close', () => connectionManager.remove(userId)); ws.on('error', (err) => handleError(userId, err)); }); });
2.2 连接管理器
/* 连接管理器 */ class ConnectionManager { constructor() { this.connections = new Map(); // userId -> WebSocket this.heartbeatInterval = 30000; } add(userId, ws) { this.connections.set(userId, ws); startHeartbeat(userId, ws); console.log(`用户 ${userId} 连接成功,当前在线: ${this.connections.size}`); } remove(userId) { this.connections.delete(userId); console.log(`用户 ${userId} 断开连接,当前在线: ${this.connections.size}`); } send(userId, message) { const ws = this.connections.get(userId); if (ws && ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify(message)); return true; } return false; // 用户离线 } broadcast(message) { for (const [userId, ws] of this.connections) { this.send(userId, message); } } startHeartbeat(userId, ws) { const timer = setInterval(() => { if (ws.readyState !== WebSocket.OPEN) { clearInterval(timer); return; } ws.send(JSON.stringify({ type: 'ping' })); }, this.heartbeatInterval); ws.on('close', () => clearInterval(timer)); } }
三、消息存储设计
/* 消息表结构 */ CREATE TABLE im_message ( id BIGINT PRIMARY KEY AUTO_INCREMENT, msg_id VARCHAR(64) UNIQUE, -- 消息唯一ID sender_id BIGINT NOT NULL, -- 发送者ID receiver_id BIGINT NOT NULL, -- 接收者ID msg_type TINYINT NOT NULL, -- 消息类型:1-文本 2-图片 3-文件 content TEXT, -- 消息内容 status TINYINT DEFAULT 0, -- 状态:0-未读 1-已读 created_at DATETIME DEFAULT CURRENT_TIMESTAMP, read_at DATETIME, -- 阅读时间 INDEX idx_receiver (receiver_id, status, created_at), INDEX idx_sender (sender_id, created_at) ); /* 离线消息存储 */ CREATE TABLE im_offline_message ( id BIGINT PRIMARY KEY AUTO_INCREMENT, user_id BIGINT NOT NULL, msg_id VARCHAR(64) NOT NULL, payload JSON, -- 消息内容 created_at DATETIME DEFAULT CURRENT_TIMESTAMP, INDEX idx_user (user_id) );
四、消息推送流程
/* 消息推送服务 */ class PushService { async sendMessage(senderId, receiverId, content, type) { // 1. 生成消息ID const msgId = generateMessageId(); // 2. 保存消息到数据库 const message = await messageRepo.create({ msgId, senderId, receiverId, content, type }); // 3. 尝试实时推送 const online = connectionManager.send(receiverId, { type: 'message', data: message }); // 4. 如果不在线,存储离线消息 if (!online) { await offlineMessageRepo.save({ userId: receiverId, msgId, payload: message }); } return message; } async sendNotification(userId, title, content, extra) { // 系统通知推送 const notification = { type: 'notification', data: { title, content, extra, timestamp: Date.now() } }; if (!connectionManager.send(userId, notification)) { // 离线用户,走第三方推送 await this.sendOfflinePush(userId, title, content); } } }
五、离线消息同步
/* 用户上线时拉取离线消息 */ async onUserOnline(userId) { // 1. 获取离线消息 const offlineMessages = await offlineMessageRepo.getByUserId(userId); // 2. 批量推送离线消息 if (offlineMessages.length > 0) { connectionManager.send(userId, { type: 'offline_messages', data: offlineMessages }); // 3. 删除离线消息 await offlineMessageRepo.deleteByUserId(userId); } // 4. 标记未读消息为已送达 await messageRepo.updateStatusByReceiver(userId, { status: 'delivered' }); }
离线消息同步策略:
- 用户上线时立即拉取离线消息
- 离线消息设置过期时间(默认7天)
- 大数量离线消息分批推送,避免阻塞
- 推送成功后及时删除离线存储
六、推送策略优化
6.1 消息优先级
| 优先级 | 消息类型 | 推送策略 |
|---|---|---|
| P0 | 系统告警、安全通知 | 强制推送,强制亮屏 |
| P1 | 聊天消息、@消息 | 实时推送,振动提示 |
| P2 | 待办提醒、审批通知 | 实时推送,静音 |
| P3 | 系统公告、资讯 | 合并推送,节省电量 |
6.2 消息合并
/* 消息合并策略 */ // 短时间内多条同类型消息合并为一条 const BATCH_WINDOW = 5000; // 5秒窗口 const MAX_BATCH = 10; class MessageBatcher { add(message) { const key = `${message.receiverId}_${message.type}`; if (!this.batches.has(key)) { this.batches.set(key, { messages: [], timer: setTimeout(() => this.flush(key), BATCH_WINDOW) }); } const batch = this.batches.get(key); batch.messages.push(message); // 达到上限立即发送 if (batch.messages.length >= MAX_BATCH) { clearTimeout(batch.timer); this.flush(key); } } flush(key) { const batch = this.batches.get(key); if (!batch || batch.messages.length === 0) return; // 合并为一条推送 const merged = { type: 'batch', count: batch.messages.length, lastMessage: batch.messages[batch.messages.length - 1] }; connectionManager.send(merged.receiverId, merged); this.batches.delete(key); } }
七、总结
- WebSocket 是实时推送的核心,需要完善的连接管理和心跳机制
- 消息需要持久化存储,支持离线消息同步
- 离线用户需要通过第三方推送通道触达
- 合理使用消息合并和优先级策略,优化推送效果
- 建立完整的消息送达监控体系,保证消息不丢失