消息队列

消息队列实践

IMS 系统通过消息队列实现异步处理、削峰填谷、系统解耦。本文介绍 RabbitMQ 在 IMS 中的应用,包括可靠投递、顺序消费、事务消息等核心场景。

一、为什么需要消息队列

  • 异步处理:非核心流程异步执行,提升响应速度
  • 削峰填谷:高峰期消息堆积,平滑处理突发流量
  • 系统解耦:生产者和消费者独立演进
  • 最终一致性:分布式事务的补偿机制基础

二、可靠消息投递

消息投递需要保证不丢失,常见模式:

  • 生产者确认:publisher confirm 机制
  • 持久化:消息和队列都要持久化
  • 消费确认:手动 ACK 确认处理成功
/* 生产者确认 */
channel.confirmSelect();
channel.addConfirmListener((ack, tag) -> {
    // 消息投递成功
}, (nick, tag) -> {
    // 投递失败,重试或记录
});

/* 消费者手动 ACK */
 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
     String message = new String(delivery.getBody(), "UTF-8");
     // 业务处理
     processMessage(message);
     // 手动确认
     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
 };
 channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});

三、消息顺序性

某些业务要求消息顺序处理,如订单状态流转:创建 → 支付 → 发货 → 签收。

  • 方案一:单队列:同一业务的消息发送到同一个队列
  • 方案二:hash 路由:用业务 ID 做 routing key
/* 同一订单的消息用相同 routing key */
-- 订单创建
channel.basicPublish("order.exchange", "order.123", ...);

-- 订单支付
channel.basicPublish("order.exchange", "order.123", ...);

-- 订单发货
channel.basicPublish("order.exchange", "order.123", ...);

四、消息幂等性

消费者可能收到重复消息,需要保证幂等处理。

  • 唯一ID:每条消息携带全局唯一 ID
  • 去重表:用 Redis 或数据库记录已处理消息 ID
  • 状态机:利用业务状态流转的天然幂等性

五、TTL 与死信队列

/* 设置消息 TTL(5分钟) */
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .expiration("300000")
    .build();

/* 配置死信队列 */
Map args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlx.order");
channel.queueDeclare("order.queue", true, false, false, args);
IMS 消息队列应用场景:
  • 订单状态变更通知上下游系统
  • 用户操作日志异步写入
  • 报表数据异步计算
  • 定时任务触发(工作流超时检查)

六、总结

  • 可靠投递:持久化 + 生产者确认 + 消费 ACK
  • 顺序消息:同一业务 ID 发到同一队列
  • 幂等处理:用唯一 ID 去重
  • TTL + 死信队列处理超时消息