IMS系统任务调度设计完全指南

任务调度是企业级应用的核心功能,用于处理定时任务、异步队列和批量处理。本文详细介绍 IMS 系统的任务调度架构设计。

任务类型分类

IMS 系统中的任务分为以下几类:
  • 定时任务:按固定时间间隔或 cron 表达式执行
  • 延迟任务:指定延迟时间后执行
  • 周期任务:按固定频率重复执行
  • 依赖任务:前置任务完成后触发
/* 任务定义 */
interface Task {
    id: string;
    name: string;
    type: 'cron' | 'delay' | 'interval' | 'dependent';
    handler: string;           // 处理器名称
    payload: object;             // 任务参数
    schedule?: string;           // Cron 表达式或延迟时间
    dependsOn?: string[];       // 依赖任务 ID 列表
    retryPolicy?: RetryPolicy;  // 重试策略
    timeout?: number;            // 超时时间(毫秒)
}

/* 重试策略 */
interface RetryPolicy {
    maxAttempts: number;      // 最大重试次数
    retryDelay: number;       // 重试间隔(毫秒)
    backoffMultiplier: number; // 退避倍数
    maxRetryDelay: number;    // 最大重试间隔
}

分布式任务调度

使用分布式调度确保任务高可用和负载均衡:

/* 任务调度器 */
class TaskScheduler {
    constructor(redis, zk) {
        this.redis = redis;
        this.lockKey = "task:lock:";
        this.queueKey = "task:queue";
    }

    /* 分布式锁获取 */
    async acquireLock(taskId: string, ttl: number): boolean {
        const lockValue = generateUUID();
        const result = await this.redis.set(
            this.lockKey + taskId,
            lockValue,
            "NX",  // 只在不存在时设置
            "PX",
            ttl
        );
        return result === "OK";
    }

    /* 执行任务 */
    async executeTask(task: Task) {
        // 尝试获取分布式锁
        const lock = await this.acquireLock(task.id, 300000);
        if (!lock) {
            console.log(`Task ${task.id} is being executed by another node`);
            return;
        }

        try {
            console.log(`Executing task: ${task.name}`);
            await runTaskHandler(task);
        } finally {
            await this.redis.del(this.lockKey + task.id);
        }
    }
}

异步任务队列

使用消息队列实现异步任务处理:

/* 任务队列服务 */
class TaskQueueService {
    constructor(rabbitmq) {
        this.channel = rabbitmq.createChannel();
        this.queueName = "ims_tasks";
    }

    /* 初始化队列 */
    async init() {
        await this.channel.assertQueue(this.queueName, {
            durable: true,
            arguments: {
                "x-dead-letter-exchange": "ims_tasks_dlx"
            }
        });
    }

    /* 投递任务 */
    async enqueue(task: Task) {
        const message = JSON.stringify(task);
        await this.channel.sendToQueue(
            this.queueName,
            Buffer.from(message),
            { persistent: true }
        );
    }

    /* 消费任务 */
    async consume(handler: Function) {
        await this.channel.consume(this.queueName, async (msg) => {
            if (msg) {
                const task = JSON.parse(msg.content.toString());
                try {
                    await handler(task);
                    this.channel.ack(msg);
                } catch (e) {
                    // 处理失败,重新入队或发送到死信队列
                    this.channel.nack(msg, false, false);
                }
            }
        });
    }
}

任务监控

建议为每个任务配置监控告警,包括:任务执行时长、失败率、积压数量等关键指标。

定时任务管理

基于数据库的定时任务配置:

/* 定时任务调度器 */
class CronScheduler {
    constructor(db, scheduler) {
        this.db = db;
        this.scheduler = scheduler;
    }

    /* 加载所有定时任务 */
    async loadCronJobs() {
        const jobs = await this.db.query(
            "SELECT * FROM cron_jobs WHERE enabled = true"
        );

        for (const job of jobs) {
            this.registerJob(job);
        }
    }

    /* 注册任务 */
    registerJob(job: CronJob) {
        this.scheduler.addJob({
            id: job.id,
            cron: job.cron_expression,
            handler: async () => {
                await executeTask(job.task_handler, job.task_payload);
            },
            options: {
                scheduled: job.enabled,
                timezone: job.timezone || "Asia/Shanghai"
            }
        });
    }
}

/* 常用 Cron 表达式示例 */
// 每天凌晨 2 点执行
// 0 0 2 * * *

// 每周一凌晨 3 点执行
// 0 0 3 * * 1

// 每 5 分钟执行一次
// 0 */5 * * * *

任务执行监控

完整的任务执行追踪:

/* 任务执行记录 */
const AUDIT_FIELDS = [
    "task_id",        // 任务 ID
    "task_name",      // 任务名称
    "start_time",     // 开始时间
    "end_time",       // 结束时间
    "duration",       // 执行时长
    "status",         // 执行状态
    "result",         // 执行结果
    "error_message",  // 错误信息
    "retry_count",    // 重试次数
    "executor"        // 执行节点
];

/* 统计指标 */
const METRICS = {
    totalExecutions: "任务总执行次数",
    successRate: "任务成功率",
    avgDuration: "平均执行时长",
    failedCount: "失败任务数",
    queueSize: "队列积压数"
};

总结

任务调度系统设计要点:

  • 支持多种任务类型:定时、延迟、周期、依赖
  • 使用分布式锁保证任务幂等性
  • 消息队列实现异步任务处理
  • 合理的重试策略和超时控制
  • 完整的任务执行监控和告警