perf: 优化定时器

This commit is contained in:
xiaojunnuo
2023-05-24 15:41:35 +08:00
parent 6f6606d76d
commit 3751fcd4c9
29 changed files with 381 additions and 163 deletions
@@ -52,6 +52,7 @@ export class PipelineController extends CrudController<PipelineService> {
await this.service.checkUserId(bean.id, this.ctx.user.id);
}
await this.service.save(bean);
await this.service.registerTriggerById(bean.id);
return this.ok(bean.id);
}
@@ -14,6 +14,9 @@ export class StorageEntity {
@Column({ name: 'namespace', comment: '命名空间' })
namespace: string;
@Column({ comment: 'version', length: 100, nullable: true })
version: string;
@Column({ comment: 'key', length: 100, nullable: true })
key: string;
@@ -12,15 +12,26 @@ export class DbStorage implements IStorage {
this.storageService = storageService;
}
remove(
scope: string,
namespace: string,
version: string,
key: string
): Promise<void> {
throw new Error('Method not implemented.');
}
async get(
scope: string,
namespace: string,
version: string,
key: string
): Promise<string | null> {
const storageEntity = await this.storageService.get({
userId: this.userId,
scope: scope,
namespace: namespace,
version,
key,
});
@@ -33,6 +44,7 @@ export class DbStorage implements IStorage {
async set(
scope: string,
namespace: string,
version: string,
key: string,
value: string
): Promise<void> {
@@ -40,6 +52,7 @@ export class DbStorage implements IStorage {
userId: this.userId,
scope: scope,
namespace: namespace,
version,
key,
value,
});
@@ -42,11 +42,9 @@ export class PipelineService extends BaseService<PipelineEntity> {
async update(entity) {
await super.update(entity);
await this.registerTriggerById(entity.id);
}
private async registerTriggerById(pipelineId) {
public async registerTriggerById(pipelineId) {
if (pipelineId == null) {
return;
}
@@ -70,13 +68,13 @@ export class PipelineService extends BaseService<PipelineEntity> {
const pipeline = JSON.parse(bean.content);
bean.title = pipeline.title;
await this.addOrUpdate(bean);
await this.registerTriggerById(bean.id);
}
/**
* 应用启动后初始加载记录
*/
async onStartup() {
logger.info('加载定时trigger开始');
const idEntityList = await this.repository.find({
select: {
id: true,
@@ -111,7 +109,7 @@ export class PipelineService extends BaseService<PipelineEntity> {
this.registerTriggers(pipeline);
}
}
logger.info('定时器数量:', this.cron.getList());
logger.info('定时器数量:', this.cron.getListSize());
}
registerTriggers(pipeline?: Pipeline) {
@@ -121,6 +119,7 @@ export class PipelineService extends BaseService<PipelineEntity> {
for (const trigger of pipeline.triggers) {
this.registerCron(pipeline.id, trigger);
}
logger.info('当前定时器数量:', this.cron.getListSize());
}
async trigger(id) {
@@ -128,10 +127,11 @@ export class PipelineService extends BaseService<PipelineEntity> {
name: `pipeline.${id}.trigger.once`,
cron: null,
job: async () => {
logger.info('job准备启动,当前定时器数量:', this.cron.getListSize());
await this.run(id, null);
},
});
logger.info('定时器数量:', this.cron.getList());
logger.info('定时器数量:', this.cron.getListSize());
}
registerCron(pipelineId, trigger) {
@@ -141,10 +141,11 @@ export class PipelineService extends BaseService<PipelineEntity> {
}
if (cron.startsWith('*')) {
cron = '0' + cron.substring(1, cron.length);
return;
}
const name = this.buildCronKey(pipelineId, trigger.id);
this.cron.remove(name);
this.cron.register({
name: this.buildCronKey(pipelineId, trigger.id),
name,
cron: cron,
job: async () => {
logger.info('定时任务触发:', pipelineId, trigger.id);
@@ -191,8 +192,13 @@ export class PipelineService extends BaseService<PipelineEntity> {
accessService: this.accessService,
storage: new DbStorage(userId, this.storageService),
});
await executor.run(historyId, triggerType);
try {
await executor.init();
await executor.run(historyId, triggerType);
} catch (e) {
logger.error('执行失败:', e);
throw e;
}
}
private getTriggerType(triggerId, pipeline) {
@@ -230,7 +236,7 @@ export class PipelineService extends BaseService<PipelineEntity> {
//修改pipeline状态
const pipelineEntity = new PipelineEntity();
pipelineEntity.id = parseInt(history.pipeline.id);
pipelineEntity.status = history.pipeline.status.status;
pipelineEntity.status = history.pipeline.status.status + '';
pipelineEntity.lastHistoryTime = history.pipeline.status.startTime;
await this.update(pipelineEntity);
@@ -1,4 +1,4 @@
import { Provide, Scope, ScopeEnum } from "@midwayjs/decorator";
import { Provide, Scope, ScopeEnum } from '@midwayjs/decorator';
import { InjectEntityModel } from '@midwayjs/typeorm';
import { Repository } from 'typeorm';
import { BaseService } from '../../../basic/base-service';
@@ -20,6 +20,7 @@ export class StorageService extends BaseService<StorageEntity> {
scope: any;
namespace: any;
userId: number;
version: string;
key: string;
}) {
if (where.userId == null) {
@@ -35,6 +36,7 @@ export class StorageService extends BaseService<StorageEntity> {
scope: any;
namespace: any;
userId: number;
version: string;
value: string;
key: string;
}) {