消息通信

IMS系统消息推送设计完全指南

企业 IMS 系统需要实时推送各类通知和消息,包括系统通知、待办提醒、聊天消息等。本文详细介绍消息推送系统的设计与实现,包括 WebSocket 长连接、消息存储、离线推送等核心功能。

一、推送架构概述

/* 消息推送架构 */
┌─────────────────────────────────────────────────────┐
│                   消息推送系统                       │
├─────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │  消息生产   │  │  消息路由   │  │  推送通道   │ │
│  │  (业务服务) │→│  (消息队列) │→│  (Gateway)  │ │
│  └─────────────┘  └─────────────┘  └──────┬──────┘ │
└───────────────────────┬──────────────────────┬──────┘
                        ↓                      ↓
┌──────────────────┐   ┌──────────────────┐
│  在线用户        │   │  离线用户        │
│  (WebSocket)    │   │  (APNs/FCM/短信) │
└──────────────────┘   └──────────────────┘

二、WebSocket 连接管理

2.1 连接建立

/* WebSocket 握手升级 */
server.on('upgrade', (request, socket, head) => {
  // 1. 验证 Token
  const token = parseToken(request.url);

  if (!token || !verifyToken(token)) {
    socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
    socket.destroy();
    return;
  }

  // 2. 创建 WebSocket 连接
  const ws = new WebSocketServer({ noServer: true });
  ws.handleUpgrade(request, socket, head, (ws) => {
    // 3. 关联用户与连接
    const userId = getUserId(token);
    connectionManager.add(userId, ws);

    ws.on('message', (msg) => handleMessage(userId, msg));
    ws.on('close', () => connectionManager.remove(userId));
    ws.on('error', (err) => handleError(userId, err));
  });
});

2.2 连接管理器

/* 连接管理器 */
class ConnectionManager {
  constructor() {
    this.connections = new Map();  // userId -> WebSocket
    this.heartbeatInterval = 30000;
  }

  add(userId, ws) {
    this.connections.set(userId, ws);
    startHeartbeat(userId, ws);
    console.log(`用户 ${userId} 连接成功,当前在线: ${this.connections.size}`);
  }

  remove(userId) {
    this.connections.delete(userId);
    console.log(`用户 ${userId} 断开连接,当前在线: ${this.connections.size}`);
  }

  send(userId, message) {
    const ws = this.connections.get(userId);
    if (ws && ws.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify(message));
      return true;
    }
    return false;  // 用户离线
  }

  broadcast(message) {
    for (const [userId, ws] of this.connections) {
      this.send(userId, message);
    }
  }

  startHeartbeat(userId, ws) {
    const timer = setInterval(() => {
      if (ws.readyState !== WebSocket.OPEN) {
        clearInterval(timer);
        return;
      }
      ws.send(JSON.stringify({ type: 'ping' }));
    }, this.heartbeatInterval);

    ws.on('close', () => clearInterval(timer));
  }
}

三、消息存储设计

/* 消息表结构 */
CREATE TABLE im_message (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    msg_id VARCHAR(64) UNIQUE,           -- 消息唯一ID
    sender_id BIGINT NOT NULL,          -- 发送者ID
    receiver_id BIGINT NOT NULL,        -- 接收者ID
    msg_type TINYINT NOT NULL,         -- 消息类型:1-文本 2-图片 3-文件
    content TEXT,                        -- 消息内容
    status TINYINT DEFAULT 0,         -- 状态:0-未读 1-已读
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    read_at DATETIME,                     -- 阅读时间
    INDEX idx_receiver (receiver_id, status, created_at),
    INDEX idx_sender (sender_id, created_at)
);

/* 离线消息存储 */
CREATE TABLE im_offline_message (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    msg_id VARCHAR(64) NOT NULL,
    payload JSON,                        -- 消息内容
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_user (user_id)
);

四、消息推送流程

/* 消息推送服务 */
class PushService {
  async sendMessage(senderId, receiverId, content, type) {
    // 1. 生成消息ID
    const msgId = generateMessageId();

    // 2. 保存消息到数据库
    const message = await messageRepo.create({
      msgId, senderId, receiverId, content, type
    });

    // 3. 尝试实时推送
    const online = connectionManager.send(receiverId, {
      type: 'message',
      data: message
    });

    // 4. 如果不在线,存储离线消息
    if (!online) {
      await offlineMessageRepo.save({
        userId: receiverId,
        msgId,
        payload: message
      });
    }

    return message;
  }

  async sendNotification(userId, title, content, extra) {
    // 系统通知推送
    const notification = {
      type: 'notification',
      data: { title, content, extra, timestamp: Date.now() }
    };

    if (!connectionManager.send(userId, notification)) {
      // 离线用户,走第三方推送
      await this.sendOfflinePush(userId, title, content);
    }
  }
}

五、离线消息同步

/* 用户上线时拉取离线消息 */
async onUserOnline(userId) {
  // 1. 获取离线消息
  const offlineMessages = await offlineMessageRepo.getByUserId(userId);

  // 2. 批量推送离线消息
  if (offlineMessages.length > 0) {
    connectionManager.send(userId, {
      type: 'offline_messages',
      data: offlineMessages
    });

    // 3. 删除离线消息
    await offlineMessageRepo.deleteByUserId(userId);
  }

  // 4. 标记未读消息为已送达
  await messageRepo.updateStatusByReceiver(userId, {
    status: 'delivered'
  });
}
离线消息同步策略:
  • 用户上线时立即拉取离线消息
  • 离线消息设置过期时间(默认7天)
  • 大数量离线消息分批推送,避免阻塞
  • 推送成功后及时删除离线存储

六、推送策略优化

6.1 消息优先级

优先级消息类型推送策略
P0系统告警、安全通知强制推送,强制亮屏
P1聊天消息、@消息实时推送,振动提示
P2待办提醒、审批通知实时推送,静音
P3系统公告、资讯合并推送,节省电量

6.2 消息合并

/* 消息合并策略 */
// 短时间内多条同类型消息合并为一条
const BATCH_WINDOW = 5000;  // 5秒窗口
const MAX_BATCH = 10;

class MessageBatcher {
  add(message) {
    const key = `${message.receiverId}_${message.type}`;

    if (!this.batches.has(key)) {
      this.batches.set(key, {
        messages: [],
        timer: setTimeout(() => this.flush(key), BATCH_WINDOW)
      });
    }

    const batch = this.batches.get(key);
    batch.messages.push(message);

    // 达到上限立即发送
    if (batch.messages.length >= MAX_BATCH) {
      clearTimeout(batch.timer);
      this.flush(key);
    }
  }

  flush(key) {
    const batch = this.batches.get(key);
    if (!batch || batch.messages.length === 0) return;

    // 合并为一条推送
    const merged = {
      type: 'batch',
      count: batch.messages.length,
      lastMessage: batch.messages[batch.messages.length - 1]
    };

    connectionManager.send(merged.receiverId, merged);
    this.batches.delete(key);
  }
}

七、总结

  • WebSocket 是实时推送的核心,需要完善的连接管理和心跳机制
  • 消息需要持久化存储,支持离线消息同步
  • 离线用户需要通过第三方推送通道触达
  • 合理使用消息合并和优先级策略,优化推送效果
  • 建立完整的消息送达监控体系,保证消息不丢失