IMS 系统通过消息队列实现异步处理、削峰填谷、系统解耦。本文介绍 RabbitMQ 在 IMS 中的应用,包括可靠投递、顺序消费、事务消息等核心场景。
一、为什么需要消息队列
- 异步处理:非核心流程异步执行,提升响应速度
- 削峰填谷:高峰期消息堆积,平滑处理突发流量
- 系统解耦:生产者和消费者独立演进
- 最终一致性:分布式事务的补偿机制基础
二、可靠消息投递
消息投递需要保证不丢失,常见模式:
- 生产者确认:publisher confirm 机制
- 持久化:消息和队列都要持久化
- 消费确认:手动 ACK 确认处理成功
/* 生产者确认 */ channel.confirmSelect(); channel.addConfirmListener((ack, tag) -> { // 消息投递成功 }, (nick, tag) -> { // 投递失败,重试或记录 }); /* 消费者手动 ACK */ DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); // 业务处理 processMessage(message); // 手动确认 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
三、消息顺序性
某些业务要求消息顺序处理,如订单状态流转:创建 → 支付 → 发货 → 签收。
- 方案一:单队列:同一业务的消息发送到同一个队列
- 方案二:hash 路由:用业务 ID 做 routing key
/* 同一订单的消息用相同 routing key */ -- 订单创建 channel.basicPublish("order.exchange", "order.123", ...); -- 订单支付 channel.basicPublish("order.exchange", "order.123", ...); -- 订单发货 channel.basicPublish("order.exchange", "order.123", ...);
四、消息幂等性
消费者可能收到重复消息,需要保证幂等处理。
- 唯一ID:每条消息携带全局唯一 ID
- 去重表:用 Redis 或数据库记录已处理消息 ID
- 状态机:利用业务状态流转的天然幂等性
五、TTL 与死信队列
/* 设置消息 TTL(5分钟) */ AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .expiration("300000") .build(); /* 配置死信队列 */ Mapargs = new HashMap<>(); args.put("x-dead-letter-exchange", "dlx.exchange"); args.put("x-dead-letter-routing-key", "dlx.order"); channel.queueDeclare("order.queue", true, false, false, args);
IMS 消息队列应用场景:
- 订单状态变更通知上下游系统
- 用户操作日志异步写入
- 报表数据异步计算
- 定时任务触发(工作流超时检查)
六、总结
- 可靠投递:持久化 + 生产者确认 + 消费 ACK
- 顺序消息:同一业务 ID 发到同一队列
- 幂等处理:用唯一 ID 去重
- TTL + 死信队列处理超时消息