Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicate async system tasks #202

Open
arorashivam opened this issue Jul 8, 2024 · 0 comments
Open

Duplicate async system tasks #202

arorashivam opened this issue Jul 8, 2024 · 0 comments

Comments

@arorashivam
Copy link

Issue Description
We are seeing duplicate sub_workflows were getting triggered for a workflow

On further investigation, We identified a race condition b/w sweeper thread and system task worker thread leading to a duplicate SUB_WORKFLOW system task. The race condition is not just limited to sub_workflow system task but can be replicated with any async System task.

Explanation:

  1. The system task worker thread-1 polls and acknowledges the task from the task's corresponding queue & triggers the AsyncSystemTaskExecutor::execute. The execute method isn't yet completed, therefore the status is still SCHEDULED
  2. Meanwhile, sweeper tries to sweep this workflow and finds it's task is in repairable state. It then adds the task to the queue as it finds the task is not in the queue.
  3. The thread-1 now marks the status of the sub_workflow task to IN_PROGRESS
  4. The system task worker thread-2 polls the same task (added by sweeper) from the queue and executes it as new sub_workflow.

Details
Conductor version: 3.20.0
Persistence implementation: Postgres
Queue implementation: Dynoqueues
Lock: Redis
Workflow definition: This definition is just for reference purpose, to explain that we are using a sub_workflow task which is getting duplicated.
{ "name": "parent_workflow", "description": "Parent Workflow", "version": 1, "schemaVersion": 2, "inputParameters": [], "tasks": [ { "name": "loopTask", "taskReferenceName": "loopTask", "type": "DO_WHILE", "inputParameters": { "batch": "${workflow.input.batch}", "batchSize": "${workflow.input.batch.length()}" }, "loopCondition": "$.loopTask['iteration'] < $.batchSize", "loopOver": [ { "name": "loopTask_prepare", "taskReferenceName": "loopTask_prepare", "type": "INLINE", "inputParameters": { "evaluatorType": "javascript", "expression": "function sFun(){ if($.batch === null) {return null} else { return $.batch.get($.iteration-1) } } sFun();", "batch": "${loopTask.input.batch}", "iteration": "${loopTask.output.iteration}" }, "asyncComplete": false }, { "name": "sub_workflow_task", "taskReferenceName": "sub_workflow_task", "type": "SUB_WORKFLOW", "inputParameters": { "item": "${loopTask_prepare.output.result}", "index": "${loopTask.output.iteration}" }, "asyncComplete": false, "isOptional": true, "subWorkflowParam": { "name": "test_sub_workflow", "version": 1 } } ], "asyncComplete": false } ], "ownerEmail": "[email protected]", "outputParameters": {}, "inputTemplate": {}, "timeoutSeconds": 86400, "timeoutPolicy": "ALERT_ONLY", "restartable": true }

Task definition: We are not setting any task definition
Event handler definition: N/A in this case.

To Reproduce
Steps to reproduce the behavior: Race conditions are not straight-forward to reproduce, however to mimic the behaviour of race condition,

  1. We can add a delay in AsyncSystemTaskExecutor::execute method before executionDAOFacade.updateTask(task)
  2. The delay should be large enough to let sweeper trigger it's sweep logic for this workflow and finds out sub_workflow task in SCHEDULED state and the taskId is not present in the queue thereby putting the task back to the queue.

Expected behavior
Tasks shouldn't be duplicated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant