diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 0b4396048..b70c4c5ed 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -105,12 +105,6 @@ void restart(String workflowId, boolean useLatestDefinitions) */ WorkflowModel decide(String workflowId); - /** - * @param workflow workflow to be evaluated - * @return updated workflow - */ - WorkflowModel decideWithLock(WorkflowModel workflow); - /** * @param workflowId id of the workflow to be terminated * @param reason termination reason to be recorded diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorOps.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorOps.java index 8c8db4b57..f40a32297 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorOps.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorOps.java @@ -1039,33 +1039,6 @@ public WorkflowModel decide(String workflowId) { } } - /** - * This method overloads the {@link #decide(String)}. It will acquire a lock and evaluate the - * state of the workflow. - * - * @param workflow the workflow to evaluate the state for - * @return the workflow - */ - @Override - public WorkflowModel decideWithLock(WorkflowModel workflow) { - if (workflow == null) { - return null; - } - StopWatch watch = new StopWatch(); - watch.start(); - if (!executionLockService.acquireLock(workflow.getWorkflowId())) { - return null; - } - try { - return decide(workflow); - - } finally { - executionLockService.releaseLock(workflow.getWorkflowId()); - watch.stop(); - Monitors.recordWorkflowDecisionTime(watch.getTime()); - } - } - /** * @param workflow the workflow to evaluate the state for * @return true if the workflow has completed (success or failed), false otherwise. Note: This diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index b12529a62..3f5a9c5b6 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -35,6 +35,7 @@ import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.TaskModel.Status; import com.netflix.conductor.model.WorkflowModel; +import com.netflix.conductor.service.ExecutionLockService; import static com.netflix.conductor.core.config.SchedulerConfiguration.SWEEPER_EXECUTOR_NAME; import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE; @@ -49,6 +50,7 @@ public class WorkflowSweeper { private final WorkflowRepairService workflowRepairService; private final QueueDAO queueDAO; private final ExecutionDAOFacade executionDAOFacade; + private final ExecutionLockService executionLockService; private static final String CLASS_NAME = WorkflowSweeper.class.getSimpleName(); @@ -57,12 +59,14 @@ public WorkflowSweeper( Optional workflowRepairService, ConductorProperties properties, QueueDAO queueDAO, - ExecutionDAOFacade executionDAOFacade) { + ExecutionDAOFacade executionDAOFacade, + ExecutionLockService executionLockService) { this.properties = properties; this.queueDAO = queueDAO; this.workflowExecutor = workflowExecutor; this.executionDAOFacade = executionDAOFacade; this.workflowRepairService = workflowRepairService.orElse(null); + this.executionLockService = executionLockService; LOGGER.info("WorkflowSweeper initialized."); } @@ -73,25 +77,26 @@ public CompletableFuture sweepAsync(String workflowId) { } public void sweep(String workflowId) { + WorkflowContext workflowContext = new WorkflowContext(properties.getAppId()); + WorkflowContext.set(workflowContext); WorkflowModel workflow = null; try { - WorkflowContext workflowContext = new WorkflowContext(properties.getAppId()); - WorkflowContext.set(workflowContext); - LOGGER.debug("Running sweeper for workflow {}", workflowId); - + if (!executionLockService.acquireLock(workflowId)) { + return; + } workflow = executionDAOFacade.getWorkflowModel(workflowId, true); - + LOGGER.debug("Running sweeper for workflow {}", workflowId); if (workflowRepairService != null) { // Verify and repair tasks in the workflow. workflowRepairService.verifyAndRepairWorkflowTasks(workflow); } - - workflow = workflowExecutor.decideWithLock(workflow); + long decideStartTime = System.currentTimeMillis(); + workflow = workflowExecutor.decide(workflow.getWorkflowId()); + Monitors.recordWorkflowDecisionTime(System.currentTimeMillis() - decideStartTime); if (workflow != null && workflow.getStatus().isTerminal()) { queueDAO.remove(DECIDER_QUEUE, workflowId); return; } - } catch (NotFoundException nfe) { queueDAO.remove(DECIDER_QUEUE, workflowId); LOGGER.info( @@ -100,6 +105,8 @@ public void sweep(String workflowId) { } catch (Exception e) { Monitors.error(CLASS_NAME, "sweep"); LOGGER.error("Error running sweep for " + workflowId, e); + } finally { + executionLockService.releaseLock(workflowId); } long workflowOffsetTimeout = workflowOffsetWithJitter(properties.getWorkflowOffsetTimeout().getSeconds()); diff --git a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java index 5accfb580..53bd3bb48 100644 --- a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java +++ b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java @@ -29,6 +29,7 @@ import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.TaskModel.Status; import com.netflix.conductor.model.WorkflowModel; +import com.netflix.conductor.service.ExecutionLockService; import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE; @@ -45,6 +46,7 @@ public class TestWorkflowSweeper { private QueueDAO queueDAO; private ExecutionDAOFacade executionDAOFacade; private WorkflowSweeper workflowSweeper; + private ExecutionLockService executionLockService; private int defaultPostPoneOffSetSeconds = 1800; @@ -55,13 +57,15 @@ public void setUp() { queueDAO = mock(QueueDAO.class); workflowRepairService = mock(WorkflowRepairService.class); executionDAOFacade = mock(ExecutionDAOFacade.class); + executionLockService = mock(ExecutionLockService.class); workflowSweeper = new WorkflowSweeper( workflowExecutor, Optional.of(workflowRepairService), properties, queueDAO, - executionDAOFacade); + executionDAOFacade, + executionLockService); } @Test