消息队列是IMS系统解耦和异步处理的核心组件。通过消息队列,系统可以处理高并发、实现任务异步执行、提高系统可用性。
消息队列概述
消息队列的主要作用:
- 异步处理:将耗时操作从主流程中分离
- 解耦:生产者和消费者独立演进
- 削峰填谷:缓解高并发对系统的冲击
- 可靠传递:保证消息不丢失
消息队列架构
IMS系统采用RabbitMQ作为消息中间件:
/* 消息队列配置 */
interface QueueConfig {
host: string;
port: number;
username: string;
password: string;
vhost: string;
/* 队列名称 */
queues: {
email: string; // 邮件发送队列 */
sms: string; // 短信队列 */
push: string; // 推送队列 */
sync: string; // 数据同步队列 */
task: string; // 异步任务队列 */
};
}
const defaultConfig: QueueConfig = {
host: "localhost",
port: 5672,
username: "guest",
password: "guest",
vhost: "/",
queues: {
email: "ims.email.queue",
sms: "ims.sms.queue",
push: "ims.push.queue",
sync: "ims.sync.queue",
task: "ims.task.queue"
}
};
消息生产者
封装消息发送的通用接口:
/* 消息生产者 */
class MessageProducer {
private connection: any;
private channel: any;
async connect(config: QueueConfig) {
this.connection = await amqp.connect(config);
this.channel = await this.connection.createChannel();
/* 声明交换机 */
await this.channel.assertExchange(
"ims.direct",
"direct",
{ durable: true }
);
}
/* 发送消息 */
async publish(
queue: string,
message: any,
options?: any
) {
const content = JSON.stringify({
data: message,
timestamp: Date.now(),
id: generateId()
});
await this.channel.sendToQueue(
queue,
Buffer.from(content),
{
persistent: true,
...options
}
);
}
/* 发送邮件消息 */
async sendEmail(to: string, subject: string, body: string) {
await this.publish(defaultConfig.queues.email, {
type: "email",
to,
subject,
body
});
}
/* 发送任务消息 */
async sendTask(taskType: string, payload: any) {
await this.publish(defaultConfig.queues.task, {
type: taskType,
payload
});
}
}
消息消费者
实现消息的可靠消费:
/* 消息消费者 */
class MessageConsumer {
private channel: any;
/* 消费消息 */
async consume(
queue: string,
handler: (msg: any) => Promise<void>
) {
/* 声明队列 */
await this.channel.assertQueue(queue, {
durable: true
});
/* 设置预取数量,保证公平分发 */
await this.channel.prefetch(1);
this.channel.consume(queue, async (msg) => {
if (!msg) return;
try {
const content = JSON.parse(msg.content.toString());
console.log(`Processing message: ${content.id}`);
await handler(content.data);
/* 确认消息 */
this.channel.ack(msg);
} catch (error) {
console.error("Message processing failed:", error);
/* 重新入队 */
this.channel.nack(msg, false, true);
}
});
}
}
死信队列处理
处理无法正常消费的消息:
/* 死信队列配置 */
async setupDeadLetterQueue(channel, queueName: string) {
/* 声明死信交换机 */
await channel.assertExchange(
`${queueName}.dlx",
"direct",
{ durable: true }
);
/* 声明死信队列 */
await channel.assertQueue(`${queueName}.dlq", {
durable: true
});
/* 绑定死信队列 */
await channel.bindQueue(
`${queueName}.dlq",
`${queueName}.dlx",
queueName
);
/* 主队列绑定死信交换机 */
await channel.assertQueue(queueName, {
durable: true,
arguments: {
'x-dead-letter-exchange': `${queueName}.dlx",
'x-dead-letter-routing-key': queueName
}
});
}
延迟队列
实现定时任务功能:
/* 延迟队列实现 */
class DelayedQueue {
/* 使用消息TTL实现延迟 */
async publishDelayed(
channel,
queue: string,
message: any,
delayMs: number
) {
/* 延迟队列名称 */
const delayQueue = `${queue}.delay.${delayMs}`;
/* 声明延迟队列,消息过期后转到主队列 */
await channel.assertQueue(delayQueue, {
durable: true,
arguments: {
'x-message-ttl': delayMs,
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': queue
}
});
/* 发送消息到延迟队列 */
await channel.sendToQueue(
delayQueue,
Buffer.from(JSON.stringify(message)),
{ persistent: true }
);
}
}
/* 使用示例:5分钟后发送提醒 */
await delayedQueue.publishDelayed(
channel,
defaultConfig.queues.task,
{ type: "reminder", taskId: "123" },
5 * 60 * 1000
);
队列使用建议
- 合理设置消息TTL,避免死信队列积压
- 消费者做好幂等处理,防止重复消费
- 监控队列堆积情况,及时告警
- 重要业务使用消息确认机制
总结
- RabbitMQ提供可靠的消息传递
- 生产者与消费者解耦,提高系统扩展性
- 死信队列处理异常消息
- 延迟队列实现定时任务