定时任务
方法 1:基于优先级队列的定时任务调度
使用优先级队列(例如小顶堆)来管理任务。每个任务按照其执行时间排序,队列的顶部始终是最早需要执行的任务。
实现步骤:
- 任务队列:使用一个优先级队列存储任务,按执行时间排序。
- 调度线程:使用一个独立的线程不断检查队列顶部的任务。
- 时间控制:利用
setTimeout
或setInterval
来周期性检查是否有任务需要执行。
示例代码:
class Task {
constructor(time, action) {
this.time = time; // 任务执行时间的时间戳
this.action = action; // 任务执行的动作
}
}
class TaskScheduler {
constructor() {
this.taskQueue = new MinHeap((a, b) => a.time - b.time);
this.running = false;
}
addTask(task) {
this.taskQueue.insert(task);
if (!this.running) {
this.scheduleNextTask();
}
}
scheduleNextTask() {
if (this.taskQueue.isEmpty()) {
this.running = false;
return;
}
this.running = true;
const now = Date.now();
const nextTask = this.taskQueue.peek();
if (nextTask.time <= now) {
this.taskQueue.pop().action();
this.scheduleNextTask();
} else {
const delay = nextTask.time - now;
setTimeout(() => {
this.taskQueue.pop().action();
this.scheduleNextTask();
}, delay);
}
}
}
class MinHeap {
constructor(compare) {
this.heap = [];
this.compare = compare;
}
insert(value) {
this.heap.push(value);
this.heapifyUp();
}
pop() {
if (this.heap.length === 1) return this.heap.pop();
const top = this.heap[0];
this.heap[0] = this.heap.pop();
this.heapifyDown();
return top;
}
peek() {
return this.heap[0];
}
isEmpty() {
return this.heap.length === 0;
}
heapifyUp() {
let index = this.heap.length - 1;
const element = this.heap[index];
while (index > 0) {
const parentIndex = Math.floor((index - 1) / 2);
const parent = this.heap[parentIndex];
if (this.compare(element, parent) >= 0) break;
this.heap[index] = parent;
index = parentIndex;
}
this.heap[index] = element;
}
heapifyDown() {
let index = 0;
const length = this.heap.length;
const element = this.heap[index];
while (true) {
const leftIndex = 2 * index + 1;
const rightIndex = 2 * index + 2;
let swapIndex = null;
if (leftIndex < length) {
const left = this.heap[leftIndex];
if (this.compare(left, element) < 0) {
swapIndex = leftIndex;
}
}
if (rightIndex < length) {
const right = this.heap[rightIndex];
if (
(swapIndex === null && this.compare(right, element) < 0) ||
(swapIndex !== null && this.compare(right, this.heap[swapIndex]) < 0)
) {
swapIndex = rightIndex;
}
}
if (swapIndex === null) break;
this.heap[index] = this.heap[swapIndex];
index = swapIndex;
}
this.heap[index] = element;
}
}
// 示例用法
const scheduler = new TaskScheduler();
scheduler.addTask(new Task(Date.now() + 5000, () => console.log('Task 1 executed')));
scheduler.addTask(new Task(Date.now() + 10000, () => console.log('Task 2 executed')));
scheduler.addTask(new Task(Date.now() + 3000, () => console.log('Task 3 executed')));
方法 2:基于时间轮的定时任务调度
使用时间轮算法来管理大量的定时任务。时间轮是一种类似于哈希表的调度结构,用于减少任务的调度复杂度。
实现步骤:
- 时间轮结构:设计一个时间轮,每个槽代表一个时间片。
- 槽管理:将任务分配到相应的时间槽中。
- 时间推进:使用定时器周期性地推进时间轮,检查并执行当前时间槽中的任务。
示例代码:
class TimeWheel {
constructor(slotCount, interval) {
this.slotCount = slotCount; // 槽的数量
this.interval = interval; // 时间片间隔
this.currentSlot = 0; // 当前槽
this.slots = new Array(slotCount).fill(null).map(() => []);
this.running = false;
}
addTask(delay, action) {
const targetSlot = (this.currentSlot + Math.floor(delay / this.interval)) % this.slotCount;
this.slots[targetSlot].push(action);
if (!this.running) {
this.start();
}
}
start() {
if (this.running) return;
this.running = true;
this.tick();
}
tick() {
setTimeout(() => {
this.executeCurrentSlot();
this.currentSlot = (this.currentSlot + 1) % this.slotCount;
if (this.running) {
this.tick();
}
}, this.interval);
}
executeCurrentSlot() {
const tasks = this.slots[this.currentSlot];
while (tasks.length > 0) {
const task = tasks.shift();
task();
}
}
stop() {
this.running = false;
}
}
// 示例用法
const timeWheel = new TimeWheel(60, 1000); // 60槽,每个槽代表1秒
timeWheel.addTask(5000, () => console.log('Task 1 executed'));
timeWheel.addTask(10000, () => console.log('Task 2 executed'));
timeWheel.addTask(3000, () => console.log('Task 3 executed'));
方法 3:使用现有任务调度库
利用现有的任务调度库(如 node-cron
、agenda
、bree
等)来管理和调度大量任务。这些库通常提供了高效的任务管理和调度机制,简化了开发工作。
示例代码(使用 node-cron
):
const cron = require('node-cron');
cron.schedule('*/5 * * * * *', () => {
console.log('Task 1 executed every 5 seconds');
});
cron.schedule('*/10 * * * * *', () => {
console.log('Task 2 executed every 10 seconds');
});
cron.schedule('*/15 * * * * *', () => {
console.log('Task 3 executed every 15 seconds');
});
方法 4:使用分布式任务调度系统
对于更大规模的任务调度,可以考虑使用分布式任务调度系统,如 Apache Kafka、Apache Flink、Celery 等。这些系统支持高并发、大规模的任务调度,并且可以处理复杂的任务依赖和调度策略。