IMS系统消息队列完全指南

消息队列是IMS系统解耦和异步处理的核心组件。通过消息队列,系统可以处理高并发、实现任务异步执行、提高系统可用性。

消息队列概述

消息队列的主要作用:

  • 异步处理:将耗时操作从主流程中分离
  • 解耦:生产者和消费者独立演进
  • 削峰填谷:缓解高并发对系统的冲击
  • 可靠传递:保证消息不丢失

消息队列架构

IMS系统采用RabbitMQ作为消息中间件:

/* 消息队列配置 */
interface QueueConfig {
    host: string;
    port: number;
    username: string;
    password: string;
    vhost: string;
    /* 队列名称 */
    queues: {
        email: string;        // 邮件发送队列 */
        sms: string;           // 短信队列 */
        push: string;          // 推送队列 */
        sync: string;          // 数据同步队列 */
        task: string;          // 异步任务队列 */
    };
}

const defaultConfig: QueueConfig = {
    host: "localhost",
    port: 5672,
    username: "guest",
    password: "guest",
    vhost: "/",
    queues: {
        email: "ims.email.queue",
        sms: "ims.sms.queue",
        push: "ims.push.queue",
        sync: "ims.sync.queue",
        task: "ims.task.queue"
    }
};

消息生产者

封装消息发送的通用接口:

/* 消息生产者 */
class MessageProducer {
    private connection: any;
    private channel: any;

    async connect(config: QueueConfig) {
        this.connection = await amqp.connect(config);
        this.channel = await this.connection.createChannel();

        /* 声明交换机 */
        await this.channel.assertExchange(
            "ims.direct",
            "direct",
            { durable: true }
        );
    }

    /* 发送消息 */
    async publish(
        queue: string,
        message: any,
        options?: any
    ) {
        const content = JSON.stringify({
            data: message,
            timestamp: Date.now(),
            id: generateId()
        });

        await this.channel.sendToQueue(
            queue,
            Buffer.from(content),
            {
                persistent: true,
                ...options
            }
        );
    }

    /* 发送邮件消息 */
    async sendEmail(to: string, subject: string, body: string) {
        await this.publish(defaultConfig.queues.email, {
            type: "email",
            to,
            subject,
            body
        });
    }

    /* 发送任务消息 */
    async sendTask(taskType: string, payload: any) {
        await this.publish(defaultConfig.queues.task, {
            type: taskType,
            payload
        });
    }
}

消息消费者

实现消息的可靠消费:

/* 消息消费者 */
class MessageConsumer {
    private channel: any;

    /* 消费消息 */
    async consume(
        queue: string,
        handler: (msg: any) => Promise<void>
    ) {
        /* 声明队列 */
        await this.channel.assertQueue(queue, {
            durable: true
        });

        /* 设置预取数量,保证公平分发 */
        await this.channel.prefetch(1);

        this.channel.consume(queue, async (msg) => {
            if (!msg) return;

            try {
                const content = JSON.parse(msg.content.toString());
                console.log(`Processing message: ${content.id}`);

                await handler(content.data);

                /* 确认消息 */
                this.channel.ack(msg);
            } catch (error) {
                console.error("Message processing failed:", error);

                /* 重新入队 */
                this.channel.nack(msg, false, true);
            }
        });
    }
}

死信队列处理

处理无法正常消费的消息:

/* 死信队列配置 */
async setupDeadLetterQueue(channel, queueName: string) {
    /* 声明死信交换机 */
    await channel.assertExchange(
        `${queueName}.dlx",
        "direct",
        { durable: true }
    );

    /* 声明死信队列 */
    await channel.assertQueue(`${queueName}.dlq", {
        durable: true
    });

    /* 绑定死信队列 */
    await channel.bindQueue(
        `${queueName}.dlq",
        `${queueName}.dlx",
        queueName
    );

    /* 主队列绑定死信交换机 */
    await channel.assertQueue(queueName, {
        durable: true,
        arguments: {
            'x-dead-letter-exchange': `${queueName}.dlx",
            'x-dead-letter-routing-key': queueName
        }
    });
}

延迟队列

实现定时任务功能:

/* 延迟队列实现 */
class DelayedQueue {
    /* 使用消息TTL实现延迟 */
    async publishDelayed(
        channel,
        queue: string,
        message: any,
        delayMs: number
    ) {
        /* 延迟队列名称 */
        const delayQueue = `${queue}.delay.${delayMs}`;

        /* 声明延迟队列,消息过期后转到主队列 */
        await channel.assertQueue(delayQueue, {
            durable: true,
            arguments: {
                'x-message-ttl': delayMs,
                'x-dead-letter-exchange': '',
                'x-dead-letter-routing-key': queue
            }
        });

        /* 发送消息到延迟队列 */
        await channel.sendToQueue(
            delayQueue,
            Buffer.from(JSON.stringify(message)),
            { persistent: true }
        );
    }
}

/* 使用示例:5分钟后发送提醒 */
await delayedQueue.publishDelayed(
    channel,
    defaultConfig.queues.task,
    { type: "reminder", taskId: "123" },
    5 * 60 * 1000
);

队列使用建议

  • 合理设置消息TTL,避免死信队列积压
  • 消费者做好幂等处理,防止重复消费
  • 监控队列堆积情况,及时告警
  • 重要业务使用消息确认机制

总结

  • RabbitMQ提供可靠的消息传递
  • 生产者与消费者解耦,提高系统扩展性
  • 死信队列处理异常消息
  • 延迟队列实现定时任务