feat: save files

This commit is contained in:
xiaojunnuo
2023-06-25 23:25:56 +08:00
parent 2851a33eb2
commit 671d273e2f
30 changed files with 253 additions and 222 deletions
+20 -8
View File
@@ -1,7 +1,7 @@
import { ConcurrencyStrategy, NotificationWhen, Pipeline, ResultType, Runnable, RunStrategy, Stage, Step, Task } from "../d.ts";
import _ from "lodash";
import { RunHistory, RunnableCollection } from "./run-history";
import { AbstractTaskPlugin, PluginDefine, pluginRegistry } from "../plugin";
import { AbstractTaskPlugin, PluginDefine, pluginRegistry, TaskInstanceContext } from "../plugin";
import { ContextFactory, IContext } from "./context";
import { IStorage } from "./storage";
import { logger } from "../utils/util.log";
@@ -11,6 +11,7 @@ import { IAccessService } from "../access";
import { RegistryItem } from "../registry";
import { Decorator } from "../decorator";
import { IEmailService } from "../service";
import { FileStore } from "./file-store";
export type ExecutorOptions = {
userId: any;
@@ -19,6 +20,7 @@ export type ExecutorOptions = {
onChanged: (history: RunHistory) => Promise<void>;
accessService: IAccessService;
emailService: IEmailService;
fileRootDir?: string;
};
export class Executor {
pipeline: Pipeline;
@@ -59,7 +61,7 @@ export class Executor {
});
await this.notification("success");
} catch (e) {
await this.notification("error");
await this.notification("error", e);
this.logger.error("pipeline 执行失败", e);
} finally {
await this.pipelineContext.setObj("lastRuntime", this.runtime);
@@ -185,28 +187,38 @@ export class Executor {
}
});
const context: any = {
const taskCtx: TaskInstanceContext = {
pipeline: this.pipeline,
step,
lastStatus,
http: request,
logger: this.runtime._loggers[step.id],
accessService: this.options.accessService,
emailService: this.options.emailService,
pipelineContext: this.pipelineContext,
lastStatus,
userContext: this.contextFactory.getContext("user", this.options.userId),
http: request,
fileStore: new FileStore({
scope: this.pipeline.id,
parent: this.runtime.id,
rootDir: this.options.fileRootDir,
}),
};
Decorator.inject(define.autowire, instance, context);
instance.setCtx(taskCtx);
await instance.onInstance();
await instance.execute();
if (instance.result.clearLastStatus) {
if (instance._result.clearLastStatus) {
this.lastStatusMap.clear();
}
//输出到output context
_.forEach(define.output, (item, key) => {
step!.status!.output[key] = instance[key];
step.status!.output[key] = instance[key];
const stepOutputKey = `step.${step.id}.${key}`;
this.runtime.context[stepOutputKey] = instance[key];
});
step.status!.files = instance.getFiles();
}
async notification(when: NotificationWhen, error?: any) {
@@ -0,0 +1,38 @@
import { fileUtils } from "../utils/util.file";
import dayjs from "dayjs";
import path from "path";
import fs from "fs";
export type FileStoreOptions = {
rootDir?: string;
scope: string;
parent: string;
};
export class FileStore {
rootDir: string;
scope: string;
parent: string;
constructor(options?: FileStoreOptions) {
this.rootDir = fileUtils.getFileRootDir(options?.rootDir);
this.scope = options?.scope || "0";
this.parent = options?.parent || "0";
}
readFile(filePath: string) {
if (!fs.existsSync(filePath)) {
return null;
}
return fs.readFileSync(filePath);
}
writeFile(filename: string, file: Buffer) {
const localPath = this.buildFilePath(filename);
fs.writeFileSync(localPath, file);
return localPath;
}
private buildFilePath(filename: string) {
return path.join(this.rootDir, this.scope, dayjs().format("YYYY-MM-DD"), this.parent, filename);
}
}
+2 -11
View File
@@ -1,7 +1,6 @@
import fs from "fs";
import path from "path";
import { fileUtils } from "../utils/util.file";
export interface IStorage {
get(scope: string, namespace: string, version: string, key: string): Promise<string | null>;
@@ -12,15 +11,7 @@ export interface IStorage {
export class FileStorage implements IStorage {
root: string;
constructor(rootDir?: string) {
if (rootDir == null) {
const userHome = process.env.HOME || process.env.USERPROFILE;
rootDir = userHome + "/.certd/storage/";
}
this.root = rootDir;
if (!fs.existsSync(this.root)) {
fs.mkdirSync(this.root, { recursive: true });
}
this.root = fileUtils.getFileRootDir(rootDir);
}
async remove(scope: string, namespace: string, version: string, key: string): Promise<void> {
@@ -55,6 +55,10 @@ export type Trigger = {
type: string;
};
export type FileItem = {
filename: string;
path: string;
};
export type Runnable = {
id: string;
title: string;
@@ -64,6 +68,7 @@ export type Runnable = {
default?: {
[key: string]: any;
};
files?: FileItem[];
};
export type EmailOptions = {
@@ -113,6 +118,7 @@ export type HistoryResultGroup = {
export type HistoryResult = {
input: any;
output: any;
files?: FileItem[];
/**
* 任务状态
*/
+51 -9
View File
@@ -1,5 +1,11 @@
import { Registrable } from "../registry";
import { FormItemProps } from "../d.ts";
import { FileItem, FormItemProps, Pipeline, Runnable, Step } from "../d.ts";
import { FileStore } from "../core/file-store";
import { Logger } from "log4js";
import { IAccessService } from "../access";
import { IEmailService } from "../service";
import { IContext } from "../core";
import { AxiosInstance } from "axios";
export enum ContextScope {
global,
@@ -7,16 +13,11 @@ export enum ContextScope {
runtime,
}
export type Storage = {
scope: ContextScope;
path: string;
};
export type TaskOutputDefine = {
title: string;
value?: any;
storage?: Storage;
};
export type TaskInputDefine = FormItemProps;
export type PluginDefine = Registrable & {
@@ -47,15 +48,56 @@ export type ITaskPlugin = {
export type TaskResult = {
clearLastStatus?: boolean;
files?: FileItem[];
};
export type TaskInstanceContext = {
pipeline: Pipeline;
step: Step;
logger: Logger;
accessService: IAccessService;
emailService: IEmailService;
pipelineContext: IContext;
userContext: IContext;
http: AxiosInstance;
fileStore: FileStore;
lastStatus?: Runnable;
};
export abstract class AbstractTaskPlugin implements ITaskPlugin {
result: TaskResult = {};
_result: TaskResult = { clearLastStatus: false, files: [] };
ctx!: TaskInstanceContext;
clearLastStatus() {
this.result.clearLastStatus = true;
this._result.clearLastStatus = true;
}
getFiles() {
return this._result.files;
}
setCtx(ctx: TaskInstanceContext) {
this.ctx = ctx;
}
saveFile(filename: string, file: Buffer) {
const filePath = this.ctx.fileStore.writeFile(filename, file);
this._result.files!.push({
filename,
path: filePath,
});
}
get pipeline() {
return this.ctx.pipeline;
}
get step() {
return this.ctx.step;
}
async onInstance(): Promise<void> {
return;
}
abstract execute(): Promise<void>;
}
@@ -61,3 +61,11 @@ export function TaskOutput(output?: TaskOutputDefine): PropertyDecorator {
Reflect.defineMetadata(PLUGIN_OUTPUT_KEY, output, target, propertyKey);
};
}
export const PLUGIN_DOWNLOAD_KEY = "pipeline:plugin:download";
export function TaskDownload(output?: TaskOutputDefine): PropertyDecorator {
return (target, propertyKey) => {
target = Decorator.target(target, propertyKey);
Reflect.defineMetadata(PLUGIN_DOWNLOAD_KEY, output, target, propertyKey);
};
}
@@ -0,0 +1,16 @@
import fs from "fs";
function getFileRootDir(rootDir?: string) {
if (rootDir == null) {
const userHome = process.env.HOME || process.env.USERPROFILE;
rootDir = userHome + "/.certd/storage/";
}
if (!fs.existsSync(rootDir)) {
fs.mkdirSync(rootDir, { recursive: true });
}
return rootDir;
}
export const fileUtils = {
getFileRootDir,
};