perf: 支持webhook触发流水线,新增触发类型图标显示

This commit is contained in:
xiaojunnuo
2026-01-09 01:10:43 +08:00
parent 2b353094eb
commit 1a29541140
17 changed files with 347 additions and 97 deletions
@@ -33,6 +33,12 @@ export class PipelineEntity {
@Column({ comment: '类型', nullable: true, default: 'cert' })
type: string;
@Column({ name: 'webhook_key', comment: 'webhookkey', length: 100, nullable: true })
webhookKey: string;
@Column({ name: 'trigger_count', comment: '触发器数量', nullable: true, default: 0 })
triggerCount: number;
// custom: 自定义; monitor: 监控;
@Column({ comment: '来源', nullable: true, default: '' })
from: string;
@@ -39,7 +39,7 @@ import { PluginConfigGetter } from "../../plugin/service/plugin-config-getter.js
import dayjs from "dayjs";
import { DbAdapter } from "../../db/index.js";
import { isComm, isPlus } from "@certd/plus-core";
import { logger } from "@certd/basic";
import { logger, utils } from "@certd/basic";
import { UrlService } from "./url-service.js";
import { NotificationService } from "./notification-service.js";
import { UserSuiteEntity, UserSuiteService } from "@certd/commercial-core";
@@ -177,7 +177,7 @@ export class PipelineService extends BaseService<PipelineEntity> {
if (!info.disabled) {
const pipeline = JSON.parse(info.content);
this.registerTriggers(pipeline, false);
}else {
} else {
this.unregisterTriggers(info);
}
}
@@ -214,6 +214,10 @@ export class PipelineService extends BaseService<PipelineEntity> {
//修改
old = await this.info(bean.id);
}
if (!old || !old.webhookKey ) {
bean.webhookKey = await this.genWebhookKey();
}
const isUpdate = bean.id > 0 && old != null;
@@ -273,6 +277,9 @@ export class PipelineService extends BaseService<PipelineEntity> {
pipeline.version = 0;
}
pipeline.version++;
bean.triggerCount = pipeline.triggers?.filter((trigger) => trigger.type === "timer").length || 0;
bean.content = JSON.stringify(pipeline);
await this.addOrUpdate(bean);
await this.registerTrigger(bean);
@@ -816,8 +823,8 @@ export class PipelineService extends BaseService<PipelineEntity> {
if (!isPlus()) {
throw new NeedVIPException("此功能需要升级专业版");
}
const query :any = {}
if(userId && userId>0){
const query: any = {}
if (userId && userId > 0) {
query.userId = userId;
}
await this.repository.update(
@@ -834,8 +841,8 @@ export class PipelineService extends BaseService<PipelineEntity> {
if (!isPlus()) {
throw new NeedVIPException("此功能需要升级专业版");
}
const query :any = {}
if(userId && userId>0){
const query: any = {}
if (userId && userId > 0) {
query.userId = userId;
}
const list = await this.find({
@@ -851,20 +858,20 @@ export class PipelineService extends BaseService<PipelineEntity> {
//清除trigger
pipeline.triggers = []
} else {
if(trigger.random === true){
//随机时间
const start = dayjs().format("YYYY-MM-DD") + " " + trigger.randomRange[0];
let end = dayjs().format("YYYY-MM-DD") + " " + trigger.randomRange[1];
if(trigger.randomRange[1]<trigger.randomRange[0]){
//跨天
end = dayjs().add(1, "day").format("YYYY-MM-DD") + " " + trigger.randomRange[1];
}
const startTime = dayjs(start).valueOf();
const endTime = dayjs(end).valueOf();
const randomTime = Math.floor(Math.random() * (endTime - startTime)) + startTime;
const time = dayjs(randomTime).format(" ss:mm:HH").replaceAll(":", " ").replaceAll(" 0", " ").trim();
set(trigger,"props.cron", `${time} * * *`)
}
if (trigger.random === true) {
//随机时间
const start = dayjs().format("YYYY-MM-DD") + " " + trigger.randomRange[0];
let end = dayjs().format("YYYY-MM-DD") + " " + trigger.randomRange[1];
if (trigger.randomRange[1] < trigger.randomRange[0]) {
//跨天
end = dayjs().add(1, "day").format("YYYY-MM-DD") + " " + trigger.randomRange[1];
}
const startTime = dayjs(start).valueOf();
const endTime = dayjs(end).valueOf();
const randomTime = Math.floor(Math.random() * (endTime - startTime)) + startTime;
const time = dayjs(randomTime).format(" ss:mm:HH").replaceAll(":", " ").replaceAll(" 0", " ").trim();
set(trigger, "props.cron", `${time} * * *`)
}
delete trigger.random
delete trigger.randomRange;
pipeline.triggers = [{
@@ -883,8 +890,8 @@ export class PipelineService extends BaseService<PipelineEntity> {
if (!isPlus()) {
throw new NeedVIPException("此功能需要升级专业版");
}
const query :any = {}
if(userId && userId>0){
const query: any = {}
if (userId && userId > 0) {
query.userId = userId;
}
const list = await this.find({
@@ -935,19 +942,19 @@ export class PipelineService extends BaseService<PipelineEntity> {
ids = list.map(item => item.id);
//异步执行
this.startBatchRerun(userId,ids, force);
this.startBatchRerun(userId, ids, force);
}
startBatchRerun(userId:number, ids: number[], force: boolean) {
startBatchRerun(userId: number, ids: number[], force: boolean) {
for (const id of ids) {
executorQueue.addTask(userId,{
task: async () => {
if (force) {
await this.run(id, null, "ALL");
} else {
await this.run(id, null);
}
executorQueue.addTask(userId, {
task: async () => {
if (force) {
await this.run(id, null, "ALL");
} else {
await this.run(id, null);
}
}
});
}
}
@@ -1107,4 +1114,33 @@ export class PipelineService extends BaseService<PipelineEntity> {
await this.repository.update(id, { disabled });
await this.registerTriggerById(id);
}
async refreshWebhookKey(id: number) {
const webhookKey = await this.genWebhookKey();
await this.repository.update(id, { webhookKey });
return webhookKey;
}
async genWebhookKey() {
return utils.id.simpleNanoId(24);
}
async triggerByWebhook(webhookKey: string) {
const pipelineEntity = await this.findOne({
select: {
id: true,
content: true,
},
where: {
webhookKey
}
})
if (!pipelineEntity) {
throw new Error("webhookKey不存在");
}
const pipeline = JSON.parse(pipelineEntity.content);
const trigger = pipeline.triggers.find((trigger: any) => trigger.type === "webhook");
if (!trigger) {
throw new Error("该流水线的webhook未启用");
}
await this.run(pipelineEntity.id, trigger.id);
}
}