Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Wait/Pause" component for workflows #1686

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion App/FeatureSet/Workflow/Services/QueueWorkflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export default class QueueWorkflow {
public static async addWorkflowToQueue(
executeWorkflow: ExecuteWorkflowType,
scheduleAt?: string,
delay?: number,
): Promise<void> {
const workflowId: ObjectID = executeWorkflow.workflowId;

Expand Down Expand Up @@ -153,7 +154,7 @@ export default class QueueWorkflow {

// Add Workflow Run Log.
let workflowLog: WorkflowLog | null = null;
if (!scheduleAt) {
if (!scheduleAt && !delay) {
// if the workflow is to be run immediately.
const runLog: WorkflowLog = new WorkflowLog();
runLog.workflowId = workflowId;
Expand Down Expand Up @@ -186,6 +187,7 @@ export default class QueueWorkflow {
},
{
scheduleAt: scheduleAt,
delay: delay,
repeatableKey: workflow.repeatableJobKey || undefined,
},
);
Expand Down
87 changes: 86 additions & 1 deletion App/FeatureSet/Workflow/Services/RunWorkflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import WorkflowService from "Common/Server/Services/WorkflowService";
import WorkflowVariableService from "Common/Server/Services/WorkflowVariableService";
import QueryHelper from "Common/Server/Types/Database/QueryHelper";
import ComponentCode, {
Interactive,
RunReturnType,
} from "Common/Server/Types/Workflow/ComponentCode";
import Components from "Common/Server/Types/Workflow/Components/Index";
Expand All @@ -29,6 +30,8 @@ import VMAPI from "Common/Server/Utils/VM/VMAPI";
import Workflow from "Common/Models/DatabaseModels/Workflow";
import WorkflowLog from "Common/Models/DatabaseModels/WorkflowLog";
import WorkflowVariable from "Common/Models/DatabaseModels/WorkflowVariable";
import { ExecuteWorkflowType } from "Common/Server/Types/Workflow/TriggerCode";
import QueueWorkflow from "./QueueWorkflow";

const AllComponents: Dictionary<ComponentMetadata> = loadAllComponentMetadata();

Expand Down Expand Up @@ -83,6 +86,7 @@ export default class RunWorkflow {
select: {
graph: true,
projectId: true,
interactiveData: true,
},
props: {
isRoot: true,
Expand All @@ -97,6 +101,10 @@ export default class RunWorkflow {
throw new BadDataException("Workflow graph not found");
}

const interactive: Interactive | undefined =
workflow.interactiveData &&
(workflow.interactiveData as unknown as Interactive);

this.projectId = workflow.projectId || null;

if (!runProps.workflowLogId) {
Expand Down Expand Up @@ -146,7 +154,9 @@ export default class RunWorkflow {
variables = getVariableResult.variables;

// start execute different components.
let executeComponentId: string = runStack.startWithComponentId;
let executeComponentId: string =
interactive?.componentId ?? runStack.startWithComponentId;
// if there is an interactive node set for this workflow, start from that node, all the previous nodes are already executed

const fifoStackOfComponentsPendingExecution: Array<string> = [
executeComponentId,
Expand Down Expand Up @@ -230,6 +240,80 @@ export default class RunWorkflow {
returnValues: result.returnValues,
};

const interactiveRes: Interactive | undefined = result.interactive;
if (interactiveRes?.waiting) {
this.log(
"Interactive component " +
executeComponentId +
" is in the waiting state, delaying the workflow execution",
);

let delay: number | undefined =
new Date(interactiveRes.nextStateCheck).getTime() -
new Date().getTime();
if (delay < 0) {
delay = undefined;
}

this.log(result.interactive as any);
this.log(result.interactive?.lastTimeChecked);
this.log(new Date(result.interactive!.lastTimeChecked!).toString());
this.log(
"Adding the workflow to the queue with the delay " + delay + " ms",
);

// update workflow log.
await WorkflowLogService.updateOneById({
id: runProps.workflowLogId,
data: {
workflowStatus: WorkflowStatus.Waiting,
logs: this.logs.join("\n"),
completedAt: OneUptimeDate.getCurrentDate(),
},
props: {
isRoot: true,
},
});

await WorkflowService.updateOneById({
data: {
interactiveData: interactiveRes as any, // TS-2589
},
id: runProps.workflowId,
props: {
isRoot: true,
},
});
const executeWorkflow: ExecuteWorkflowType = {
workflowId: workflow.id!,
returnValues: {},
};

await QueueWorkflow.addWorkflowToQueue(
executeWorkflow,
undefined,
delay,
);
this.cleanLogs(variables);

return;
}

if (interactive?.waiting && interactiveRes && !interactiveRes.waiting) {
// The value from the database still has the interactive value in it, need to update it
this.log(
"Updating the workflow database record, removing the interactive data",
);
await WorkflowService.updateOneById({
data: {
interactiveData: interactiveRes as any, //TS-2589
},
id: runProps.workflowId,
props: {
isRoot: true,
},
});
}
const portToBeExecuted: Port | undefined = result.executePort;

if (!portToBeExecuted) {
Expand Down Expand Up @@ -397,6 +481,7 @@ export default class RunWorkflow {
workflowId: this.workflowId!,
workflowLogId: this.workflowLogId!,
projectId: this.projectId!,
nodeId: node.id,
onError: (exception: Exception) => {
this.log(exception);
onError();
Expand Down
17 changes: 17 additions & 0 deletions Common/Models/DatabaseModels/Workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -521,4 +521,21 @@ export default class Workflow extends BaseModel {
nullable: true,
})
public repeatableJobKey?: string = undefined;

// Interactive node, all kind of nodes that stop the workflow and schedule it again at some future time
@ColumnAccessControl({
create: [],
read: [],
update: [],
})
@TableColumn({
isDefaultValueColumn: false,
required: false,
type: TableColumnType.JSON,
})
@Column({
type: ColumnType.JSON,
nullable: true,
})
public interactiveData?: JSONObject = undefined;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class MigrationName1725811636668 implements MigrationInterface {
public name = "MigrationName1725811636668";
public async down(queryRunner: QueryRunner): Promise<any> {
await queryRunner.query(
`ALTER TABLE "Workflow" DROP COLUMN "interactiveData"`,
);
}

public async up(queryRunner: QueryRunner): Promise<any> {
await queryRunner.query(
`ALTER TABLE "Workflow" ADD "interactiveData" jsonb`,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import { MigrationName1725379949648 } from "./1725379949648-MigrationName";
import { MigrationName1725551629492 } from "./1725551629492-MigrationName";
import { MigrationName1725556630384 } from "./1725556630384-MigrationName";
import { MigrationName1725618842598 } from "./1725618842598-MigrationName";
import { MigrationName1725811636668 } from "./1725811636668-MigrationName";

export default [
InitialMigration,
Expand Down Expand Up @@ -106,4 +107,5 @@ export default [
MigrationName1725551629492,
MigrationName1725556630384,
MigrationName1725618842598,
MigrationName1725811636668,
];
6 changes: 4 additions & 2 deletions Common/Server/Infrastructure/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,15 @@ export default class Queue {
options?: {
scheduleAt?: string | undefined;
repeatableKey?: string | undefined;
delay?: number | undefined; // either delay or scheduleAt
},
): Promise<Job> {
const optionsObject: JobsOptions = {
jobId: jobId.toString(),
};

if (options && options.scheduleAt) {
if (options?.delay) {
optionsObject.delay = options.delay;
} else if (options?.scheduleAt) {
optionsObject.repeat = {
pattern: options.scheduleAt,
};
Expand Down
14 changes: 14 additions & 0 deletions Common/Server/Types/Workflow/ComponentCode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,26 @@ export interface RunOptions {
workflowLogId: ObjectID;
workflowId: ObjectID;
projectId: ObjectID;
nodeId: string;
onError: (exception: Exception) => Exception;
}

export interface Interactive {
// If the component returns Interactive property
// and waiting is true
// the workflow stops the execution, saves the current state
// and schedules the new workflow, starting from the id of the node
waiting: boolean;
componentId: string; // where is waiting in the workflow
startedWaiting: Date;
lastTimeChecked?: Date;
nextStateCheck: Date; // component is responsible to define when the next run date happens
}

export interface RunReturnType {
returnValues: JSONObject;
executePort?: Port | undefined;
interactive?: Interactive | undefined;
}

export default class ComponentCode {
Expand Down
2 changes: 2 additions & 0 deletions Common/Server/Types/Workflow/Components/Index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import Dictionary from "Common/Types/Dictionary";
import Text from "Common/Types/Text";
import ComponentID from "Common/Types/Workflow/ComponentID";
import ApiPatch from "./API/Patch";
import Delay from "./Interactive/Delay";

const Components: Dictionary<ComponentCode> = {
[ComponentID.Webhook]: new WebhookTrigger(),
Expand All @@ -53,6 +54,7 @@ const Components: Dictionary<ComponentCode> = {
[ComponentID.ApiPut]: new ApiPut(),
[ComponentID.SendEmail]: new Email(),
[ComponentID.IfElse]: new IfElse(),
[ComponentID.Delay]: new Delay(),
};

for (const baseModelService of Services) {
Expand Down
112 changes: 112 additions & 0 deletions Common/Server/Types/Workflow/Components/Interactive/Delay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import ComponentCode, {
Interactive,
RunOptions,
RunReturnType,
} from "../../ComponentCode";
import ComponentMetadata, {
Port,
} from "../../../../../Types/Workflow/Component";
import InteractiveComponents from "../../../../../Types/Workflow/Components/Interactive";
import ComponentID from "../../../../../Types/Workflow/ComponentID";
import BadDataException from "../../../../../Types/Exception/BadDataException";
import { JSONObject } from "../../../../../Types/JSON";
import Workflow from "../../../../../Models/DatabaseModels/Workflow";
import ObjectID from "../../../../../Types/ObjectID";
import WorkflowService from "../../../../Services/WorkflowService";

export default class Delay extends ComponentCode {
public constructor() {
super();

const DelayComponent: ComponentMetadata | undefined =
InteractiveComponents.find((i: ComponentMetadata) => {
return i.id === ComponentID.Delay;
});

if (!DelayComponent) {
throw new BadDataException("Component not found.");
}

this.setMetadata(DelayComponent);
}

public override async run(
args: JSONObject,
options: RunOptions,
): Promise<RunReturnType> {
const outPort: Port | undefined = this.getMetadata().outPorts.find(
(p: Port) => {
return p.id === "out";
},
);

if (!outPort) {
throw options.onError(new BadDataException("Out port not found"));
}

const _delay: string = args["delay"] as string;
const delay: number = parseInt(_delay, 10);
const workflowId: ObjectID = options.workflowId;
const workflow: Workflow | null = await WorkflowService.findOneById({
id: workflowId,
select: {
graph: true,
projectId: true,
interactiveData: true,
},
props: {
isRoot: true,
},
});
if (!workflow) {
throw options.onError(new BadDataException("Workflow not found"));
}
let interactive: Interactive | undefined = workflow.interactiveData
? (workflow.interactiveData as unknown as Interactive)
: undefined; // TODO this is not 100% correct, the Date attributes are still strings, find how to convert
if (interactive?.waiting) {
options.log(
"Interactive data present, workflow already in the waiting state",
);
options.log(workflow.interactiveData);
// already delayed, check is it the same node first
const interactive: Interactive =
workflow.interactiveData as unknown as Interactive;
if (interactive.componentId !== options.nodeId) {
throw options.onError(
new BadDataException(
"Waiting in different interactive component, aborting",
),
);
}
if (
new Date(interactive.startedWaiting).getTime() + delay <
new Date().getTime()
) {
interactive.waiting = false;
options.log("Delay expired, continuing to the next node");
} else {
options.log("Delay not yet expired, continuing with the wait");
}
interactive.lastTimeChecked = new Date();
return Promise.resolve({
returnValues: {},
executePort: outPort,
interactive,
});
}
interactive = {
componentId: options.nodeId,
lastTimeChecked: new Date(),
startedWaiting: new Date(),
nextStateCheck: new Date(new Date().getTime() + delay),
waiting: true,
};
options.log("Initiated the waiting node");
return Promise.resolve({
returnValues: {},
executePort: outPort,
interactive,
});
}
}
1 change: 1 addition & 0 deletions Common/Types/Workflow/ComponentID.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ enum ComponentID {
ApiPatch = "api-patch",
SendEmail = "send-email",
IfElse = "if-else",
Delay = "delay",
}

export default ComponentID;
Loading