From 8add7eda56d284f5cb773ab1bfaf3d4aabc35816 Mon Sep 17 00:00:00 2001 From: Bert Verstraete Date: Thu, 4 Apr 2024 21:33:59 +0200 Subject: [PATCH 1/3] fix join completing with errors when a task has failed or some tasks are not terminal yet --- .../conductor/core/execution/tasks/Join.java | 53 +++++++++-------- .../core/execution/tasks/TestJoin.java | 59 +++++++++++++++++++ 2 files changed, 86 insertions(+), 26 deletions(-) create mode 100644 core/src/test/java/com/netflix/conductor/core/execution/tasks/TestJoin.java diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java index 4114e39ab..0f414a28c 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java @@ -13,6 +13,7 @@ package com.netflix.conductor.core.execution.tasks; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; @@ -37,8 +38,6 @@ public Join() { public boolean execute( WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) { - boolean allDone = true; - boolean hasFailures = false; StringBuilder failureReason = new StringBuilder(); StringBuilder optionalTaskFailures = new StringBuilder(); List joinOn = (List) task.getInputData().get("joinOn"); @@ -47,41 +46,44 @@ public boolean execute( joinOn = joinOn.stream() .map(name -> TaskUtils.appendIteration(name, task.getIteration())) - .collect(Collectors.toList()); + .toList(); } + + boolean allTasksTerminal = + joinOn.stream() + .map(workflow::getTaskByRefName) + .allMatch(t -> t != null && t.getStatus().isTerminal()); + for (String joinOnRef : joinOn) { TaskModel forkedTask = workflow.getTaskByRefName(joinOnRef); if (forkedTask == null) { // Task is not even scheduled yet - allDone = false; - break; + continue; } + TaskModel.Status taskStatus = forkedTask.getStatus(); - hasFailures = + + // Only add to task output if it's not empty + if (!forkedTask.getOutputData().isEmpty()) { + task.addOutput(joinOnRef, forkedTask.getOutputData()); + } + + var isJoinFailure = !taskStatus.isSuccessful() && !forkedTask.getWorkflowTask().isOptional() - && (!forkedTask.getWorkflowTask().isPermissive() - || joinOn.stream() - .map(workflow::getTaskByRefName) - .allMatch(t -> t.getStatus().isTerminal())); - if (hasFailures) { + && (!forkedTask.getWorkflowTask().isPermissive() || allTasksTerminal); + if (isJoinFailure) { final String failureReasons = joinOn.stream() .map(workflow::getTaskByRefName) + .filter(Objects::nonNull) .filter(t -> !t.getStatus().isSuccessful()) .map(TaskModel::getReasonForIncompletion) .collect(Collectors.joining(" ")); failureReason.append(failureReasons); - } - // Only add to task output if it's not empty - if (!forkedTask.getOutputData().isEmpty()) { - task.addOutput(joinOnRef, forkedTask.getOutputData()); - } - if (!taskStatus.isTerminal()) { - allDone = false; - } - if (hasFailures) { - break; + task.setReasonForIncompletion(failureReason.toString()); + task.setStatus(TaskModel.Status.FAILED); + return true; } // check for optional task failures @@ -95,11 +97,9 @@ public boolean execute( .append(" "); } } - if (allDone || hasFailures || optionalTaskFailures.length() > 0) { - if (hasFailures) { - task.setReasonForIncompletion(failureReason.toString()); - task.setStatus(TaskModel.Status.FAILED); - } else if (optionalTaskFailures.length() > 0) { + + if (allTasksTerminal) { + if (!optionalTaskFailures.isEmpty()) { task.setStatus(TaskModel.Status.COMPLETED_WITH_ERRORS); optionalTaskFailures.append("completed with errors"); task.setReasonForIncompletion(optionalTaskFailures.toString()); @@ -108,6 +108,7 @@ public boolean execute( } return true; } + return false; } diff --git a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestJoin.java b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestJoin.java new file mode 100644 index 000000000..1e2f67f94 --- /dev/null +++ b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestJoin.java @@ -0,0 +1,59 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.tasks; + +import java.util.ArrayList; + +import org.junit.Test; + +import com.netflix.conductor.common.metadata.workflow.WorkflowTask; +import com.netflix.conductor.core.execution.WorkflowExecutor; +import com.netflix.conductor.model.TaskModel; +import com.netflix.conductor.model.WorkflowModel; + +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; + +public class TestJoin { + private final WorkflowExecutor executor = mock(WorkflowExecutor.class); + + @Test + public void test_should_not_mark_join_as_completed_with_errors_when_not_done() { + WorkflowModel workflow = new WorkflowModel(); + var task1 = new TaskModel(); + task1.setStatus(TaskModel.Status.COMPLETED_WITH_ERRORS); + task1.setReferenceTaskName("task1"); + var task1WorkflowTask = new WorkflowTask(); + task1WorkflowTask.setOptional(true); + task1.setWorkflowTask(task1WorkflowTask); + + var joinTask = new TaskModel(); + joinTask.setReferenceTaskName("join"); + joinTask.getInputData() + .put( + "joinOn", + new ArrayList() { + { + add("task1"); + add("task2"); + } + }); + + workflow.getTasks().add(task1); + workflow.getTasks().add(joinTask); + + var join = new Join(); + var result = join.execute(workflow, joinTask, executor); + assertFalse(result); + } +} From 00ec92cd641e6bdd152754ea00a7827c53d436e8 Mon Sep 17 00:00:00 2001 From: Bert Verstraete Date: Fri, 5 Apr 2024 12:10:52 +0200 Subject: [PATCH 2/3] add more tests --- .../core/execution/tasks/TestJoin.java | 185 +++++++++++++++--- 1 file changed, 159 insertions(+), 26 deletions(-) diff --git a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestJoin.java b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestJoin.java index 1e2f67f94..66082edd0 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestJoin.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestJoin.java @@ -12,8 +12,10 @@ */ package com.netflix.conductor.core.execution.tasks; -import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.junit.Test; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; @@ -21,39 +23,170 @@ import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.*; import static org.mockito.Mockito.mock; public class TestJoin { private final WorkflowExecutor executor = mock(WorkflowExecutor.class); - @Test - public void test_should_not_mark_join_as_completed_with_errors_when_not_done() { + private TaskModel createTask( + String referenceName, + TaskModel.Status status, + boolean isOptional, + boolean isPermissive) { + TaskModel task = new TaskModel(); + task.setStatus(status); + task.setReferenceTaskName(referenceName); + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setOptional(isOptional); + workflowTask.setPermissive(isPermissive); + task.setWorkflowTask(workflowTask); + return task; + } + + private Pair createJoinWorkflow( + List tasks, String... extraTaskRefNames) { WorkflowModel workflow = new WorkflowModel(); - var task1 = new TaskModel(); - task1.setStatus(TaskModel.Status.COMPLETED_WITH_ERRORS); - task1.setReferenceTaskName("task1"); - var task1WorkflowTask = new WorkflowTask(); - task1WorkflowTask.setOptional(true); - task1.setWorkflowTask(task1WorkflowTask); - - var joinTask = new TaskModel(); - joinTask.setReferenceTaskName("join"); - joinTask.getInputData() - .put( - "joinOn", - new ArrayList() { - { - add("task1"); - add("task2"); - } - }); - - workflow.getTasks().add(task1); - workflow.getTasks().add(joinTask); + var join = new TaskModel(); + join.setReferenceTaskName("join"); + var taskRefNames = + tasks.stream().map(TaskModel::getReferenceTaskName).collect(Collectors.toList()); + taskRefNames.addAll(List.of(extraTaskRefNames)); + join.getInputData().put("joinOn", taskRefNames); + workflow.getTasks().addAll(tasks); + workflow.getTasks().add(join); + return Pair.of(workflow, join); + } + + @Test + public void testShouldNotMarkJoinAsCompletedWithErrorsWhenNotDone() { + var task1 = createTask("task1", TaskModel.Status.COMPLETED_WITH_ERRORS, true, false); + + // task2 is not scheduled yet, so the join is not completed + var wfJoinPair = createJoinWorkflow(List.of(task1), "task2"); var join = new Join(); - var result = join.execute(workflow, joinTask, executor); + var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor); assertFalse(result); } + + @Test + public void testJoinCompletesSuccessfullyWhenAllTasksSucceed() { + var task1 = createTask("task1", TaskModel.Status.COMPLETED, false, false); + var task2 = createTask("task2", TaskModel.Status.COMPLETED, false, false); + + var wfJoinPair = createJoinWorkflow(List.of(task1, task2)); + + var join = new Join(); + var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor); + assertTrue("Join task should execute successfully when all tasks succeed", result); + assertEquals( + "Join task status should be COMPLETED when all tasks succeed", + TaskModel.Status.COMPLETED, + wfJoinPair.getRight().getStatus()); + } + + @Test + public void testJoinWaitsWhenAnyTaskIsNotTerminal() { + var task1 = createTask("task1", TaskModel.Status.IN_PROGRESS, false, false); + var task2 = createTask("task2", TaskModel.Status.COMPLETED, false, false); + + var wfJoinPair = createJoinWorkflow(List.of(task1, task2)); + + var join = new Join(); + var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor); + assertFalse("Join task should wait when any task is not in terminal state", result); + } + + @Test + public void testJoinFailsWhenMandatoryTaskFails() { + // Mandatory task fails + var task1 = createTask("task1", TaskModel.Status.FAILED, false, false); + // Optional task completes with errors + var task2 = createTask("task2", TaskModel.Status.COMPLETED_WITH_ERRORS, true, false); + + var wfJoinPair = createJoinWorkflow(List.of(task1, task2)); + + var join = new Join(); + var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor); + assertTrue("Join task should be executed when a mandatory task fails", result); + assertEquals( + "Join task status should be FAILED when a mandatory task fails", + TaskModel.Status.FAILED, + wfJoinPair.getRight().getStatus()); + } + + @Test + public void testJoinCompletesWithErrorsWhenOnlyOptionalTasksFail() { + // Mandatory task succeeds + var task1 = createTask("task1", TaskModel.Status.COMPLETED, false, false); + // Optional task completes with errors + var task2 = createTask("task2", TaskModel.Status.COMPLETED_WITH_ERRORS, true, false); + + var wfJoinPair = createJoinWorkflow(List.of(task1, task2)); + + var join = new Join(); + var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor); + assertTrue("Join task should be executed when only optional tasks fail", result); + assertEquals( + "Join task status should be COMPLETED_WITH_ERRORS when only optional tasks fail", + TaskModel.Status.COMPLETED_WITH_ERRORS, + wfJoinPair.getRight().getStatus()); + } + + @Test + public void testJoinAggregatesFailureReasonsCorrectly() { + var task1 = createTask("task1", TaskModel.Status.FAILED, false, false); + task1.setReasonForIncompletion("Task1 failed"); + var task2 = createTask("task2", TaskModel.Status.FAILED, false, false); + task2.setReasonForIncompletion("Task2 failed"); + + var wfJoinPair = createJoinWorkflow(List.of(task1, task2)); + + var join = new Join(); + var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor); + assertTrue("Join task should be executed when tasks fail", result); + assertEquals( + "Join task status should be FAILED when tasks fail", + TaskModel.Status.FAILED, + wfJoinPair.getRight().getStatus()); + assertTrue( + "Join task reason for incompletion should aggregate failure reasons", + wfJoinPair.getRight().getReasonForIncompletion().contains("Task1 failed") + && wfJoinPair + .getRight() + .getReasonForIncompletion() + .contains("Task2 failed")); + } + + @Test + public void testJoinWaitsForAllTasksBeforeFailingDueToPermissiveTaskFailure() { + // Task 1 is a permissive task that fails. + var task1 = createTask("task1", TaskModel.Status.FAILED, false, true); + // Task 2 is a non-permissive task that eventually succeeds. + var task2 = + createTask( + "task2", + TaskModel.Status.IN_PROGRESS, + false, + false); // Initially not in a terminal state. + + var wfJoinPair = createJoinWorkflow(List.of(task1, task2)); + + // First execution: Task 2 is not yet terminal. + var join = new Join(); + boolean result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor); + assertFalse("Join task should wait as not all tasks are terminal", result); + + // Simulate Task 2 reaching a terminal state. + task2.setStatus(TaskModel.Status.COMPLETED); + + // Second execution: Now all tasks are terminal. + result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor); + assertTrue("Join task should proceed as now all tasks are terminal", result); + assertEquals( + "Join task should be marked as FAILED due to permissive task failure", + TaskModel.Status.FAILED, + wfJoinPair.getRight().getStatus()); + } } From 6347e09f670fc26c175a69412b33ba35c36dcf2f Mon Sep 17 00:00:00 2001 From: Bert Verstraete Date: Fri, 5 Apr 2024 12:16:38 +0200 Subject: [PATCH 3/3] add more comments to join routine --- .../com/netflix/conductor/core/execution/tasks/Join.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java index 0f414a28c..5b0db258b 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java @@ -37,7 +37,6 @@ public Join() { @SuppressWarnings("unchecked") public boolean execute( WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) { - StringBuilder failureReason = new StringBuilder(); StringBuilder optionalTaskFailures = new StringBuilder(); List joinOn = (List) task.getInputData().get("joinOn"); @@ -57,7 +56,7 @@ public boolean execute( for (String joinOnRef : joinOn) { TaskModel forkedTask = workflow.getTaskByRefName(joinOnRef); if (forkedTask == null) { - // Task is not even scheduled yet + // Continue checking other tasks if a referenced task is not yet scheduled continue; } @@ -68,6 +67,9 @@ public boolean execute( task.addOutput(joinOnRef, forkedTask.getOutputData()); } + // Determine if the join task fails immediately due to a non-optional, non-permissive + // task failure, + // or waits for all tasks to be terminal if the failed task is permissive. var isJoinFailure = !taskStatus.isSuccessful() && !forkedTask.getWorkflowTask().isOptional() @@ -98,6 +100,7 @@ public boolean execute( } } + // Finalize the join task's status based on the outcomes of all referenced tasks. if (allTasksTerminal) { if (!optionalTaskFailures.isEmpty()) { task.setStatus(TaskModel.Status.COMPLETED_WITH_ERRORS); @@ -109,6 +112,7 @@ public boolean execute( return true; } + // Task execution not complete, waiting on more tasks to reach terminal state. return false; }