From 3a0b14f08a20dc6ba44eb7d69ab0f6cad5687caf Mon Sep 17 00:00:00 2001 From: Dusan Miloradovic Date: Sun, 8 Sep 2024 18:18:22 +0400 Subject: [PATCH 1/2] Delay interactive node --- .../Workflow/Services/QueueWorkflow.ts | 4 +- .../Workflow/Services/RunWorkflow.ts | 71 ++++++++++- Common/Models/DatabaseModels/Workflow.ts | 17 +++ .../1725811636668-MigrationName.ts | 16 +++ .../Postgres/SchemaMigrations/Index.ts | 2 + Common/Server/Infrastructure/Queue.ts | 6 +- Common/Server/Types/Workflow/ComponentCode.ts | 14 +++ .../Server/Types/Workflow/Components/Index.ts | 2 + .../Workflow/Components/Interactive/Delay.ts | 112 ++++++++++++++++++ Common/Types/Workflow/ComponentID.ts | 1 + Common/Types/Workflow/Components.ts | 8 ++ .../Types/Workflow/Components/Interactive.ts | 45 +++++++ Common/Types/Workflow/WorkflowStatus.ts | 1 + .../UI/Components/Workflow/WorkflowStatus.tsx | 4 + 14 files changed, 299 insertions(+), 4 deletions(-) create mode 100644 Common/Server/Infrastructure/Postgres/SchemaMigrations/1725811636668-MigrationName.ts create mode 100644 Common/Server/Types/Workflow/Components/Interactive/Delay.ts create mode 100644 Common/Types/Workflow/Components/Interactive.ts diff --git a/App/FeatureSet/Workflow/Services/QueueWorkflow.ts b/App/FeatureSet/Workflow/Services/QueueWorkflow.ts index 27565ec1cd9..e6f354007c3 100644 --- a/App/FeatureSet/Workflow/Services/QueueWorkflow.ts +++ b/App/FeatureSet/Workflow/Services/QueueWorkflow.ts @@ -56,6 +56,7 @@ export default class QueueWorkflow { public static async addWorkflowToQueue( executeWorkflow: ExecuteWorkflowType, scheduleAt?: string, + delay?: number, ): Promise { const workflowId: ObjectID = executeWorkflow.workflowId; @@ -153,7 +154,7 @@ export default class QueueWorkflow { // Add Workflow Run Log. let workflowLog: WorkflowLog | null = null; - if (!scheduleAt) { + if (!scheduleAt && !delay) { // if the workflow is to be run immediately. const runLog: WorkflowLog = new WorkflowLog(); runLog.workflowId = workflowId; @@ -186,6 +187,7 @@ export default class QueueWorkflow { }, { scheduleAt: scheduleAt, + delay: delay, repeatableKey: workflow.repeatableJobKey || undefined, }, ); diff --git a/App/FeatureSet/Workflow/Services/RunWorkflow.ts b/App/FeatureSet/Workflow/Services/RunWorkflow.ts index 6a2106b68c7..17cf57b933f 100644 --- a/App/FeatureSet/Workflow/Services/RunWorkflow.ts +++ b/App/FeatureSet/Workflow/Services/RunWorkflow.ts @@ -20,6 +20,7 @@ import WorkflowService from "Common/Server/Services/WorkflowService"; import WorkflowVariableService from "Common/Server/Services/WorkflowVariableService"; import QueryHelper from "Common/Server/Types/Database/QueryHelper"; import ComponentCode, { + Interactive, RunReturnType, } from "Common/Server/Types/Workflow/ComponentCode"; import Components from "Common/Server/Types/Workflow/Components/Index"; @@ -29,6 +30,8 @@ import VMAPI from "Common/Server/Utils/VM/VMAPI"; import Workflow from "Common/Models/DatabaseModels/Workflow"; import WorkflowLog from "Common/Models/DatabaseModels/WorkflowLog"; import WorkflowVariable from "Common/Models/DatabaseModels/WorkflowVariable"; +import { ExecuteWorkflowType } from "Common/Server/Types/Workflow/TriggerCode"; +import QueueWorkflow from "./QueueWorkflow"; const AllComponents: Dictionary = loadAllComponentMetadata(); @@ -83,6 +86,7 @@ export default class RunWorkflow { select: { graph: true, projectId: true, + interactiveData: true, }, props: { isRoot: true, @@ -97,6 +101,10 @@ export default class RunWorkflow { throw new BadDataException("Workflow graph not found"); } + const interactive: Interactive | undefined = + workflow.interactiveData && + (workflow.interactiveData as unknown as Interactive); + this.projectId = workflow.projectId || null; if (!runProps.workflowLogId) { @@ -146,7 +154,9 @@ export default class RunWorkflow { variables = getVariableResult.variables; // start execute different components. - let executeComponentId: string = runStack.startWithComponentId; + let executeComponentId: string = + interactive?.componentId ?? runStack.startWithComponentId; + // if there is an interactive node set for this workflow, start from that node, all the previous nodes are already executed const fifoStackOfComponentsPendingExecution: Array = [ executeComponentId, @@ -230,6 +240,64 @@ export default class RunWorkflow { returnValues: result.returnValues, }; + const interactive: Interactive | undefined = result.interactive; // TODO the actual data conversion, dates are converted into strings when writing to the database + if (interactive?.waiting) { + this.log( + "Interactive component " + + executeComponentId + + " is in the waiting state, delaying the workflow execution", + ); + + let delay: number | undefined = + new Date(interactive.nextStateCheck).getTime() - + new Date().getTime(); + if (delay < 0) { + delay = undefined; + } + + this.log(result.interactive as any); + this.log(result.interactive?.lastTimeChecked); + this.log(new Date(result.interactive!.lastTimeChecked!).toString()); + this.log( + "Adding the workflow to the queue with the delay " + delay + " ms", + ); + + // update workflow log. + await WorkflowLogService.updateOneById({ + id: runProps.workflowLogId, + data: { + workflowStatus: WorkflowStatus.Waiting, + logs: this.logs.join("\n"), + completedAt: OneUptimeDate.getCurrentDate(), + }, + props: { + isRoot: true, + }, + }); + + await WorkflowService.updateOneById({ + data: { + interactiveData: interactive as any, // TS-2589 + }, + id: runProps.workflowId, + props: { + isRoot: true, + }, + }); + const executeWorkflow: ExecuteWorkflowType = { + workflowId: workflow.id!, + returnValues: {}, + }; + + await QueueWorkflow.addWorkflowToQueue( + executeWorkflow, + undefined, + delay, + ); + this.cleanLogs(variables); + + return; + } const portToBeExecuted: Port | undefined = result.executePort; if (!portToBeExecuted) { @@ -397,6 +465,7 @@ export default class RunWorkflow { workflowId: this.workflowId!, workflowLogId: this.workflowLogId!, projectId: this.projectId!, + nodeId: node.id, onError: (exception: Exception) => { this.log(exception); onError(); diff --git a/Common/Models/DatabaseModels/Workflow.ts b/Common/Models/DatabaseModels/Workflow.ts index 574ba32c882..e5f6debc9e7 100644 --- a/Common/Models/DatabaseModels/Workflow.ts +++ b/Common/Models/DatabaseModels/Workflow.ts @@ -521,4 +521,21 @@ export default class Workflow extends BaseModel { nullable: true, }) public repeatableJobKey?: string = undefined; + + // Interactive node, all kind of nodes that stop the workflow and schedule it again at some future time + @ColumnAccessControl({ + create: [], + read: [], + update: [], + }) + @TableColumn({ + isDefaultValueColumn: false, + required: false, + type: TableColumnType.JSON, + }) + @Column({ + type: ColumnType.JSON, + nullable: true, + }) + public interactiveData?: JSONObject = undefined; } diff --git a/Common/Server/Infrastructure/Postgres/SchemaMigrations/1725811636668-MigrationName.ts b/Common/Server/Infrastructure/Postgres/SchemaMigrations/1725811636668-MigrationName.ts new file mode 100644 index 00000000000..5facf8bc73e --- /dev/null +++ b/Common/Server/Infrastructure/Postgres/SchemaMigrations/1725811636668-MigrationName.ts @@ -0,0 +1,16 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class MigrationName1725811636668 implements MigrationInterface { + public name = "MigrationName1725811636668"; + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE "Workflow" DROP COLUMN "interactiveData"`, + ); + } + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE "Workflow" ADD "interactiveData" jsonb`, + ); + } +} diff --git a/Common/Server/Infrastructure/Postgres/SchemaMigrations/Index.ts b/Common/Server/Infrastructure/Postgres/SchemaMigrations/Index.ts index 69b23b1b5fb..f2e1ee524b6 100644 --- a/Common/Server/Infrastructure/Postgres/SchemaMigrations/Index.ts +++ b/Common/Server/Infrastructure/Postgres/SchemaMigrations/Index.ts @@ -51,6 +51,7 @@ import { MigrationName1725379949648 } from "./1725379949648-MigrationName"; import { MigrationName1725551629492 } from "./1725551629492-MigrationName"; import { MigrationName1725556630384 } from "./1725556630384-MigrationName"; import { MigrationName1725618842598 } from "./1725618842598-MigrationName"; +import { MigrationName1725811636668 } from "./1725811636668-MigrationName"; export default [ InitialMigration, @@ -106,4 +107,5 @@ export default [ MigrationName1725551629492, MigrationName1725556630384, MigrationName1725618842598, + MigrationName1725811636668, ]; diff --git a/Common/Server/Infrastructure/Queue.ts b/Common/Server/Infrastructure/Queue.ts index 4a68d36d78a..3a6d5b95f07 100644 --- a/Common/Server/Infrastructure/Queue.ts +++ b/Common/Server/Infrastructure/Queue.ts @@ -94,13 +94,15 @@ export default class Queue { options?: { scheduleAt?: string | undefined; repeatableKey?: string | undefined; + delay?: number | undefined; // either delay or scheduleAt }, ): Promise { const optionsObject: JobsOptions = { jobId: jobId.toString(), }; - - if (options && options.scheduleAt) { + if (options?.delay) { + optionsObject.delay = options.delay; + } else if (options?.scheduleAt) { optionsObject.repeat = { pattern: options.scheduleAt, }; diff --git a/Common/Server/Types/Workflow/ComponentCode.ts b/Common/Server/Types/Workflow/ComponentCode.ts index 33e642ccff9..3026c3bd029 100644 --- a/Common/Server/Types/Workflow/ComponentCode.ts +++ b/Common/Server/Types/Workflow/ComponentCode.ts @@ -11,12 +11,26 @@ export interface RunOptions { workflowLogId: ObjectID; workflowId: ObjectID; projectId: ObjectID; + nodeId: string; onError: (exception: Exception) => Exception; } +export interface Interactive { + // If the component returns Interactive property + // and waiting is true + // the workflow stops the execution, saves the current state + // and schedules the new workflow, starting from the id of the node + waiting: boolean; + componentId: string; // where is waiting in the workflow + startedWaiting: Date; + lastTimeChecked?: Date; + nextStateCheck: Date; // component is responsible to define when the next run date happens +} + export interface RunReturnType { returnValues: JSONObject; executePort?: Port | undefined; + interactive?: Interactive | undefined; } export default class ComponentCode { diff --git a/Common/Server/Types/Workflow/Components/Index.ts b/Common/Server/Types/Workflow/Components/Index.ts index 0e6a8eb05c3..4f6d05b65a3 100644 --- a/Common/Server/Types/Workflow/Components/Index.ts +++ b/Common/Server/Types/Workflow/Components/Index.ts @@ -33,6 +33,7 @@ import Dictionary from "Common/Types/Dictionary"; import Text from "Common/Types/Text"; import ComponentID from "Common/Types/Workflow/ComponentID"; import ApiPatch from "./API/Patch"; +import Delay from "./Interactive/Delay"; const Components: Dictionary = { [ComponentID.Webhook]: new WebhookTrigger(), @@ -53,6 +54,7 @@ const Components: Dictionary = { [ComponentID.ApiPut]: new ApiPut(), [ComponentID.SendEmail]: new Email(), [ComponentID.IfElse]: new IfElse(), + [ComponentID.Delay]: new Delay(), }; for (const baseModelService of Services) { diff --git a/Common/Server/Types/Workflow/Components/Interactive/Delay.ts b/Common/Server/Types/Workflow/Components/Interactive/Delay.ts new file mode 100644 index 00000000000..e9bb26f238d --- /dev/null +++ b/Common/Server/Types/Workflow/Components/Interactive/Delay.ts @@ -0,0 +1,112 @@ +import ComponentCode, { + Interactive, + RunOptions, + RunReturnType, +} from "../../ComponentCode"; +import ComponentMetadata, { + Port, +} from "../../../../../Types/Workflow/Component"; +import InteractiveComponents from "../../../../../Types/Workflow/Components/Interactive"; +import ComponentID from "../../../../../Types/Workflow/ComponentID"; +import BadDataException from "../../../../../Types/Exception/BadDataException"; +import { JSONObject } from "../../../../../Types/JSON"; +import Workflow from "../../../../../Models/DatabaseModels/Workflow"; +import ObjectID from "../../../../../Types/ObjectID"; +import WorkflowService from "../../../../Services/WorkflowService"; + +export default class Delay extends ComponentCode { + public constructor() { + super(); + + const DelayComponent: ComponentMetadata | undefined = + InteractiveComponents.find((i: ComponentMetadata) => { + return i.id === ComponentID.Delay; + }); + + if (!DelayComponent) { + throw new BadDataException("Component not found."); + } + + this.setMetadata(DelayComponent); + } + + public override async run( + args: JSONObject, + options: RunOptions, + ): Promise { + const outPort: Port | undefined = this.getMetadata().outPorts.find( + (p: Port) => { + return p.id === "out"; + }, + ); + + if (!outPort) { + throw options.onError(new BadDataException("Out port not found")); + } + + const _delay: string = args["delay"] as string; + const delay: number = parseInt(_delay, 10); + const workflowId: ObjectID = options.workflowId; + const workflow: Workflow | null = await WorkflowService.findOneById({ + id: workflowId, + select: { + graph: true, + projectId: true, + interactiveData: true, + }, + props: { + isRoot: true, + }, + }); + if (!workflow) { + throw options.onError(new BadDataException("Workflow not found")); + } + if (workflow.interactiveData) { + options.log( + "Interactive data present, workflow already in the waiting state", + ); + options.log(workflow.interactiveData); + // already is delayed, check is it the same node first + const interactive: Interactive = + workflow.interactiveData as unknown as Interactive; // TODO this is not 100% correct, the Date attributes are still strings, find how to convert + if (interactive.componentId !== options.nodeId) { + throw options.onError( + new BadDataException( + "Waiting in different interactive component, aborting", + ), + ); + } + options.log(JSON.stringify(interactive)); + if ( + new Date(interactive.startedWaiting).getTime() + delay < + new Date().getTime() + ) { + options.log("Delay expired, continuing to the next node"); + return Promise.resolve({ + returnValues: {}, + executePort: outPort, + }); + } + options.log("Delay not yet expired, continuing with the wait"); + interactive.lastTimeChecked = new Date(); + return Promise.resolve({ + returnValues: {}, + executePort: outPort, + interactive, + }); + } + const interactive: Interactive = { + componentId: options.nodeId, + lastTimeChecked: new Date(), + startedWaiting: new Date(), + nextStateCheck: new Date(new Date().getTime() + delay), + waiting: true, + }; + options.log("Initated the waiting node"); + return Promise.resolve({ + returnValues: {}, + executePort: outPort, + interactive, + }); + } +} diff --git a/Common/Types/Workflow/ComponentID.ts b/Common/Types/Workflow/ComponentID.ts index 5540f221e5e..69d7aca0194 100644 --- a/Common/Types/Workflow/ComponentID.ts +++ b/Common/Types/Workflow/ComponentID.ts @@ -16,6 +16,7 @@ enum ComponentID { ApiPatch = "api-patch", SendEmail = "send-email", IfElse = "if-else", + Delay = "delay", } export default ComponentID; diff --git a/Common/Types/Workflow/Components.ts b/Common/Types/Workflow/Components.ts index ada54fb0861..53de8f676cf 100644 --- a/Common/Types/Workflow/Components.ts +++ b/Common/Types/Workflow/Components.ts @@ -12,6 +12,7 @@ import ScheduleComponents from "./Components/Schedule"; import SlackComponents from "./Components/Slack"; import WebhookComponents from "./Components/Webhook"; import WorkflowComponents from "./Components/Workflow"; +import InteractiveComponents from "./Components/Interactive"; const components: Array = [ ...LogComponents, @@ -26,6 +27,7 @@ const components: Array = [ ...WorkflowComponents, ...ManualComponents, ...MicrosoftTeamsComponents, + ...InteractiveComponents, ]; export default components; @@ -81,4 +83,10 @@ export const Categories: Array = [ description: "Utils that make workflow design simpler.", icon: IconProp.Window, }, + { + name: "Interactive", + description: + "Pause a workflow until external event occurs - delay expiry, manual or webhook interaction", + icon: IconProp.Stop, + }, ]; diff --git a/Common/Types/Workflow/Components/Interactive.ts b/Common/Types/Workflow/Components/Interactive.ts new file mode 100644 index 00000000000..37da46b4a64 --- /dev/null +++ b/Common/Types/Workflow/Components/Interactive.ts @@ -0,0 +1,45 @@ +import ComponentMetadata, { + ComponentInputType, + ComponentType, +} from "../Component"; +import ComponentID from "../ComponentID"; +import IconProp from "../../Icon/IconProp"; + +const components: Array = [ + { + id: ComponentID.Delay, + title: "Delay", + category: "Interactive", + description: "Wait for fixed amount of time", + iconProp: IconProp.Calendar, + componentType: ComponentType.Component, + arguments: [ + { + type: ComponentInputType.AnyValue, + name: "Milliseconds", + description: "Milliseconds to wait", + required: true, + id: "delay", + }, + ], + returnValues: [], + inPorts: [ + { + title: "In", + description: + "Please connect components to this port for this component to work.", + id: "in", + }, + ], + outPorts: [ + { + title: "Out", + description: + "Connect to this port if you want other components to execute after the delay expired", + id: "out", + }, + ], + }, +]; + +export default components; diff --git a/Common/Types/Workflow/WorkflowStatus.ts b/Common/Types/Workflow/WorkflowStatus.ts index 8a8d105895b..cab85bd6164 100644 --- a/Common/Types/Workflow/WorkflowStatus.ts +++ b/Common/Types/Workflow/WorkflowStatus.ts @@ -4,6 +4,7 @@ enum WorkflowStatus { Success = "Success", Error = "Error", Timeout = "Timeout", + Waiting = "Waiting", WorkflowCountExceeded = "Workflow Count Exceeded", } diff --git a/Common/UI/Components/Workflow/WorkflowStatus.tsx b/Common/UI/Components/Workflow/WorkflowStatus.tsx index a3f655cdca2..40402b4efbd 100644 --- a/Common/UI/Components/Workflow/WorkflowStatus.tsx +++ b/Common/UI/Components/Workflow/WorkflowStatus.tsx @@ -27,6 +27,10 @@ const WorkflowStatusElement: FunctionComponent = ( return ; } + if (props.status === WorkflowStatus.Waiting) { + return ; + } + if (props.status === WorkflowStatus.WorkflowCountExceeded) { return ; } From e15338df6ef4dbcca388803958f3e79c3cabc878 Mon Sep 17 00:00:00 2001 From: Dusan Miloradovic Date: Tue, 10 Sep 2024 09:34:54 +0400 Subject: [PATCH 2/2] bug fix - the interactive data stays written in the database even when the delay was finished --- .../Workflow/Services/RunWorkflow.ts | 24 +++++++++++++++---- .../Workflow/Components/Interactive/Delay.ts | 22 ++++++++--------- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/App/FeatureSet/Workflow/Services/RunWorkflow.ts b/App/FeatureSet/Workflow/Services/RunWorkflow.ts index 17cf57b933f..fca03312968 100644 --- a/App/FeatureSet/Workflow/Services/RunWorkflow.ts +++ b/App/FeatureSet/Workflow/Services/RunWorkflow.ts @@ -240,8 +240,8 @@ export default class RunWorkflow { returnValues: result.returnValues, }; - const interactive: Interactive | undefined = result.interactive; // TODO the actual data conversion, dates are converted into strings when writing to the database - if (interactive?.waiting) { + const interactiveRes: Interactive | undefined = result.interactive; + if (interactiveRes?.waiting) { this.log( "Interactive component " + executeComponentId + @@ -249,7 +249,7 @@ export default class RunWorkflow { ); let delay: number | undefined = - new Date(interactive.nextStateCheck).getTime() - + new Date(interactiveRes.nextStateCheck).getTime() - new Date().getTime(); if (delay < 0) { delay = undefined; @@ -277,7 +277,7 @@ export default class RunWorkflow { await WorkflowService.updateOneById({ data: { - interactiveData: interactive as any, // TS-2589 + interactiveData: interactiveRes as any, // TS-2589 }, id: runProps.workflowId, props: { @@ -298,6 +298,22 @@ export default class RunWorkflow { return; } + + if (interactive?.waiting && interactiveRes && !interactiveRes.waiting) { + // The value from the database still has the interactive value in it, need to update it + this.log( + "Updating the workflow database record, removing the interactive data", + ); + await WorkflowService.updateOneById({ + data: { + interactiveData: interactiveRes as any, //TS-2589 + }, + id: runProps.workflowId, + props: { + isRoot: true, + }, + }); + } const portToBeExecuted: Port | undefined = result.executePort; if (!portToBeExecuted) { diff --git a/Common/Server/Types/Workflow/Components/Interactive/Delay.ts b/Common/Server/Types/Workflow/Components/Interactive/Delay.ts index e9bb26f238d..6c33beae045 100644 --- a/Common/Server/Types/Workflow/Components/Interactive/Delay.ts +++ b/Common/Server/Types/Workflow/Components/Interactive/Delay.ts @@ -61,14 +61,17 @@ export default class Delay extends ComponentCode { if (!workflow) { throw options.onError(new BadDataException("Workflow not found")); } - if (workflow.interactiveData) { + let interactive: Interactive | undefined = workflow.interactiveData + ? (workflow.interactiveData as unknown as Interactive) + : undefined; // TODO this is not 100% correct, the Date attributes are still strings, find how to convert + if (interactive?.waiting) { options.log( "Interactive data present, workflow already in the waiting state", ); options.log(workflow.interactiveData); - // already is delayed, check is it the same node first + // already delayed, check is it the same node first const interactive: Interactive = - workflow.interactiveData as unknown as Interactive; // TODO this is not 100% correct, the Date attributes are still strings, find how to convert + workflow.interactiveData as unknown as Interactive; if (interactive.componentId !== options.nodeId) { throw options.onError( new BadDataException( @@ -76,18 +79,15 @@ export default class Delay extends ComponentCode { ), ); } - options.log(JSON.stringify(interactive)); if ( new Date(interactive.startedWaiting).getTime() + delay < new Date().getTime() ) { + interactive.waiting = false; options.log("Delay expired, continuing to the next node"); - return Promise.resolve({ - returnValues: {}, - executePort: outPort, - }); + } else { + options.log("Delay not yet expired, continuing with the wait"); } - options.log("Delay not yet expired, continuing with the wait"); interactive.lastTimeChecked = new Date(); return Promise.resolve({ returnValues: {}, @@ -95,14 +95,14 @@ export default class Delay extends ComponentCode { interactive, }); } - const interactive: Interactive = { + interactive = { componentId: options.nodeId, lastTimeChecked: new Date(), startedWaiting: new Date(), nextStateCheck: new Date(new Date().getTime() + delay), waiting: true, }; - options.log("Initated the waiting node"); + options.log("Initiated the waiting node"); return Promise.resolve({ returnValues: {}, executePort: outPort,