perf: 优化流水线执行时的状态保存性能

This commit is contained in:
xiaojunnuo
2026-04-28 00:33:59 +08:00
parent 00e6d580c2
commit e00830bebc
3 changed files with 51 additions and 6 deletions
+7 -3
View File
@@ -23,6 +23,7 @@ export type ExecutorOptions = {
pipeline: Pipeline;
storage: IStorage;
onChanged: (history: RunHistory) => Promise<void>;
onFinished: (history: RunHistory) => Promise<void>;
accessService: IAccessService;
emailService: IEmailService;
notificationService: INotificationService;
@@ -47,16 +48,19 @@ export class Executor {
lastRuntime!: RunHistory;
options: ExecutorOptions;
abort: AbortController = new AbortController();
_inited = false;
onChanged: (history: RunHistory) => Promise<void>;
onFinished: (history: RunHistory) => Promise<void>;
constructor(options: ExecutorOptions) {
this.options = options;
this.pipeline = cloneDeep(options.pipeline);
this.onChanged = async (history: RunHistory) => {
await options.onChanged(history);
};
this.onFinished = async (history: RunHistory) => {
await options.onFinished(history);
};
this.pipeline.userId = options.user.id;
this.contextFactory = new ContextFactory(options.storage);
this.logger = logger;
@@ -77,7 +81,7 @@ export class Executor {
async cancel() {
this.abort.abort();
this.runtime?.cancel(this.pipeline);
await this.onChanged(this.runtime);
await this.onFinished(this.runtime);
}
async run(runtimeId: any = 0, triggerType: string) {
@@ -111,7 +115,7 @@ export class Executor {
this.logger.error("pipeline 执行失败", e);
} finally {
clearInterval(intervalFlushLogId);
await this.onChanged(this.runtime);
await this.onFinished(this.runtime);
//保存之前移除logs
const lastRuntime: any = {
...this.runtime,
@@ -80,7 +80,7 @@ const development = {
type: 'better-sqlite3',
database: './data/db.sqlite',
synchronize: false, // 如果第一次使用,不存在表,有同步的需求可以写 true
logging: true,
logging: false,
highlightSql: false,
// 配置实体模型 或者 entities: '/entity',
@@ -674,8 +674,8 @@ export class PipelineService extends BaseService<PipelineEntity> {
return;
}
}
const onChanged = async (history: RunHistory) => {
const doSaveHistory = async (history: RunHistory) => {
//保存执行历史
try {
logger.info("保存执行历史:", history.id);
@@ -690,6 +690,46 @@ export class PipelineService extends BaseService<PipelineEntity> {
throw e;
}
};
class HistorySaver {
latest: RunHistory = null;
interval: any = null;
started: boolean = false;
async save(){
const latest = this.latest;
this.latest = null;
if (latest == null) {
return;
}
await doSaveHistory(latest);
}
async start(){
this.started = true
await this.save();
this.interval = setInterval(()=>{
this.save();
}, 1000 * 5);
}
async push(history: RunHistory){
this.latest = history;
if(!this.started){
await this.start();
}
}
async done(){
clearInterval(this.interval);
await this.save();
}
}
const historySaver = new HistorySaver();
const onChanged = async (history: RunHistory)=>{
await historySaver.push(history);
}
const onFinished = async (history: RunHistory)=>{
await onChanged(history);
await historySaver.done();
}
const userId = entity.userId;
const projectId = entity.projectId;
@@ -723,6 +763,7 @@ export class PipelineService extends BaseService<PipelineEntity> {
user,
pipeline,
onChanged,
onFinished,
accessService: accessGetter,
cnameProxyService,
pluginConfigService: this.pluginConfigGetter,