perf: 执行队列数量支持设置

This commit is contained in:
xiaojunnuo
2025-12-26 18:17:05 +08:00
parent 888d9591fe
commit cd944882c3
9 changed files with 125 additions and 60 deletions
@@ -1,54 +0,0 @@
import { logger } from "@certd/basic";
export type TaskItem = {
task: ()=>Promise<void>;
}
export class ExecutorQueue{
pendingQueue: TaskItem[] = [];
runningQueue: TaskItem[] = [];
maxRunningCount: number = 10;
setMaxRunningCount(count: number) {
this.maxRunningCount = count;
}
addTask(task: TaskItem) {
this.pendingQueue.push(task);
this.runTask();
}
runTask() {
logger.info(`当前运行队列:${this.runningQueue.length}, 等待队列:${this.pendingQueue.length}`);
if (this.runningQueue.length >= this.maxRunningCount) {
logger.info(`当前运行队列已满,等待队列:${this.pendingQueue.length}`);
return;
}
if (this.pendingQueue.length === 0) {
return;
}
const task = this.pendingQueue.shift();
if (!task) {
return;
}
// 执行任务
this.runningQueue.push(task);
const call = async ()=>{
try{
await task.task();
}finally{
// 任务执行完成,从运行队列中移除
const index = this.runningQueue.indexOf(task);
if (index > -1) {
this.runningQueue.splice(index, 1);
}
// 继续执行下一个任务
this.runTask();
}
}
call()
}
}
export const executorQueue = new ExecutorQueue();
@@ -47,7 +47,7 @@ import { CertInfoService } from "../../monitor/service/cert-info-service.js";
import { TaskServiceBuilder } from "./getter/task-service-getter.js";
import { nanoid } from "nanoid";
import { set } from "lodash-es";
import { executorQueue } from "./executor-queue.js";
import { executorQueue } from "@certd/lib-server";
const runningTasks: Map<string | number, Executor> = new Map();
@@ -373,7 +373,7 @@ export class PipelineService extends BaseService<PipelineEntity> {
return;
}
for (const trigger of pipeline.triggers) {
this.registerCron(pipeline.id, trigger);
this.registerCron(pipeline.id, pipeline.userId, trigger);
}
if (immediateTriggerOnce) {
@@ -461,7 +461,7 @@ export class PipelineService extends BaseService<PipelineEntity> {
logger.info("当前定时器数量:", this.cron.getTaskSize());
}
registerCron(pipelineId, trigger) {
registerCron(pipelineId: number, userId: number, trigger) {
if (pipelineId == null) {
logger.warn("pipelineId为空,无法注册定时任务");
return;
@@ -491,7 +491,7 @@ export class PipelineService extends BaseService<PipelineEntity> {
return;
}
//加入执行队列
executorQueue.addTask({
executorQueue.addTask(userId, {
task: async () => {
try {
await this.run(pipelineId, triggerId);
@@ -678,7 +678,7 @@ export class PipelineService extends BaseService<PipelineEntity> {
this.cron.remove(key);
triggerType = null;
} else {
logger.info(`timer trigger:${key}, ${found.title}, ${found.props}`);
logger.info(`timer trigger:${key}, ${found.title}, ${JSON.stringify(found.props)}`);
triggerType = "timer";
}
}