任务调度是企业级应用的核心功能,用于处理定时任务、异步队列和批量处理。本文详细介绍 IMS 系统的任务调度架构设计。
任务类型分类
IMS 系统中的任务分为以下几类:- 定时任务:按固定时间间隔或 cron 表达式执行
- 延迟任务:指定延迟时间后执行
- 周期任务:按固定频率重复执行
- 依赖任务:前置任务完成后触发
/* 任务定义 */
interface Task {
id: string;
name: string;
type: 'cron' | 'delay' | 'interval' | 'dependent';
handler: string; // 处理器名称
payload: object; // 任务参数
schedule?: string; // Cron 表达式或延迟时间
dependsOn?: string[]; // 依赖任务 ID 列表
retryPolicy?: RetryPolicy; // 重试策略
timeout?: number; // 超时时间(毫秒)
}
/* 重试策略 */
interface RetryPolicy {
maxAttempts: number; // 最大重试次数
retryDelay: number; // 重试间隔(毫秒)
backoffMultiplier: number; // 退避倍数
maxRetryDelay: number; // 最大重试间隔
}
分布式任务调度
使用分布式调度确保任务高可用和负载均衡:
/* 任务调度器 */
class TaskScheduler {
constructor(redis, zk) {
this.redis = redis;
this.lockKey = "task:lock:";
this.queueKey = "task:queue";
}
/* 分布式锁获取 */
async acquireLock(taskId: string, ttl: number): boolean {
const lockValue = generateUUID();
const result = await this.redis.set(
this.lockKey + taskId,
lockValue,
"NX", // 只在不存在时设置
"PX",
ttl
);
return result === "OK";
}
/* 执行任务 */
async executeTask(task: Task) {
// 尝试获取分布式锁
const lock = await this.acquireLock(task.id, 300000);
if (!lock) {
console.log(`Task ${task.id} is being executed by another node`);
return;
}
try {
console.log(`Executing task: ${task.name}`);
await runTaskHandler(task);
} finally {
await this.redis.del(this.lockKey + task.id);
}
}
}
异步任务队列
使用消息队列实现异步任务处理:
/* 任务队列服务 */
class TaskQueueService {
constructor(rabbitmq) {
this.channel = rabbitmq.createChannel();
this.queueName = "ims_tasks";
}
/* 初始化队列 */
async init() {
await this.channel.assertQueue(this.queueName, {
durable: true,
arguments: {
"x-dead-letter-exchange": "ims_tasks_dlx"
}
});
}
/* 投递任务 */
async enqueue(task: Task) {
const message = JSON.stringify(task);
await this.channel.sendToQueue(
this.queueName,
Buffer.from(message),
{ persistent: true }
);
}
/* 消费任务 */
async consume(handler: Function) {
await this.channel.consume(this.queueName, async (msg) => {
if (msg) {
const task = JSON.parse(msg.content.toString());
try {
await handler(task);
this.channel.ack(msg);
} catch (e) {
// 处理失败,重新入队或发送到死信队列
this.channel.nack(msg, false, false);
}
}
});
}
}
任务监控
建议为每个任务配置监控告警,包括:任务执行时长、失败率、积压数量等关键指标。
定时任务管理
基于数据库的定时任务配置:
/* 定时任务调度器 */
class CronScheduler {
constructor(db, scheduler) {
this.db = db;
this.scheduler = scheduler;
}
/* 加载所有定时任务 */
async loadCronJobs() {
const jobs = await this.db.query(
"SELECT * FROM cron_jobs WHERE enabled = true"
);
for (const job of jobs) {
this.registerJob(job);
}
}
/* 注册任务 */
registerJob(job: CronJob) {
this.scheduler.addJob({
id: job.id,
cron: job.cron_expression,
handler: async () => {
await executeTask(job.task_handler, job.task_payload);
},
options: {
scheduled: job.enabled,
timezone: job.timezone || "Asia/Shanghai"
}
});
}
}
/* 常用 Cron 表达式示例 */
// 每天凌晨 2 点执行
// 0 0 2 * * *
// 每周一凌晨 3 点执行
// 0 0 3 * * 1
// 每 5 分钟执行一次
// 0 */5 * * * *
任务执行监控
完整的任务执行追踪:
/* 任务执行记录 */
const AUDIT_FIELDS = [
"task_id", // 任务 ID
"task_name", // 任务名称
"start_time", // 开始时间
"end_time", // 结束时间
"duration", // 执行时长
"status", // 执行状态
"result", // 执行结果
"error_message", // 错误信息
"retry_count", // 重试次数
"executor" // 执行节点
];
/* 统计指标 */
const METRICS = {
totalExecutions: "任务总执行次数",
successRate: "任务成功率",
avgDuration: "平均执行时长",
failedCount: "失败任务数",
queueSize: "队列积压数"
};
总结
任务调度系统设计要点:
- 支持多种任务类型:定时、延迟、周期、依赖
- 使用分布式锁保证任务幂等性
- 消息队列实现异步任务处理
- 合理的重试策略和超时控制
- 完整的任务执行监控和告警