Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Issue: Ensure proper locking in WorkflowSweeper to prevent race conditions #214

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,6 @@ void restart(String workflowId, boolean useLatestDefinitions)
*/
WorkflowModel decide(String workflowId);

/**
* @param workflow workflow to be evaluated
* @return updated workflow
*/
WorkflowModel decideWithLock(WorkflowModel workflow);

/**
* @param workflowId id of the workflow to be terminated
* @param reason termination reason to be recorded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1039,33 +1039,6 @@ public WorkflowModel decide(String workflowId) {
}
}

/**
* This method overloads the {@link #decide(String)}. It will acquire a lock and evaluate the
* state of the workflow.
*
* @param workflow the workflow to evaluate the state for
* @return the workflow
*/
@Override
public WorkflowModel decideWithLock(WorkflowModel workflow) {
if (workflow == null) {
return null;
}
StopWatch watch = new StopWatch();
watch.start();
if (!executionLockService.acquireLock(workflow.getWorkflowId())) {
return null;
}
try {
return decide(workflow);

} finally {
executionLockService.releaseLock(workflow.getWorkflowId());
watch.stop();
Monitors.recordWorkflowDecisionTime(watch.getTime());
}
}

/**
* @param workflow the workflow to evaluate the state for
* @return true if the workflow has completed (success or failed), false otherwise. Note: This
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.TaskModel.Status;
import com.netflix.conductor.model.WorkflowModel;
import com.netflix.conductor.service.ExecutionLockService;

import static com.netflix.conductor.core.config.SchedulerConfiguration.SWEEPER_EXECUTOR_NAME;
import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE;
Expand All @@ -49,6 +50,7 @@ public class WorkflowSweeper {
private final WorkflowRepairService workflowRepairService;
private final QueueDAO queueDAO;
private final ExecutionDAOFacade executionDAOFacade;
private final ExecutionLockService executionLockService;

private static final String CLASS_NAME = WorkflowSweeper.class.getSimpleName();

Expand All @@ -57,12 +59,14 @@ public WorkflowSweeper(
Optional<WorkflowRepairService> workflowRepairService,
ConductorProperties properties,
QueueDAO queueDAO,
ExecutionDAOFacade executionDAOFacade) {
ExecutionDAOFacade executionDAOFacade,
ExecutionLockService executionLockService) {
this.properties = properties;
this.queueDAO = queueDAO;
this.workflowExecutor = workflowExecutor;
this.executionDAOFacade = executionDAOFacade;
this.workflowRepairService = workflowRepairService.orElse(null);
this.executionLockService = executionLockService;
LOGGER.info("WorkflowSweeper initialized.");
}

Expand All @@ -73,25 +77,26 @@ public CompletableFuture<Void> sweepAsync(String workflowId) {
}

public void sweep(String workflowId) {
WorkflowContext workflowContext = new WorkflowContext(properties.getAppId());
WorkflowContext.set(workflowContext);
WorkflowModel workflow = null;
try {
WorkflowContext workflowContext = new WorkflowContext(properties.getAppId());
WorkflowContext.set(workflowContext);
LOGGER.debug("Running sweeper for workflow {}", workflowId);

if (!executionLockService.acquireLock(workflowId)) {
return;
rq-dbrady marked this conversation as resolved.
Show resolved Hide resolved
}
workflow = executionDAOFacade.getWorkflowModel(workflowId, true);

LOGGER.debug("Running sweeper for workflow {}", workflowId);
if (workflowRepairService != null) {
// Verify and repair tasks in the workflow.
workflowRepairService.verifyAndRepairWorkflowTasks(workflow);
}

workflow = workflowExecutor.decideWithLock(workflow);
long decideStartTime = System.currentTimeMillis();
workflow = workflowExecutor.decide(workflow.getWorkflowId());
Monitors.recordWorkflowDecisionTime(System.currentTimeMillis() - decideStartTime);
if (workflow != null && workflow.getStatus().isTerminal()) {
queueDAO.remove(DECIDER_QUEUE, workflowId);
return;
}

} catch (NotFoundException nfe) {
queueDAO.remove(DECIDER_QUEUE, workflowId);
LOGGER.info(
Expand All @@ -100,6 +105,8 @@ public void sweep(String workflowId) {
} catch (Exception e) {
Monitors.error(CLASS_NAME, "sweep");
LOGGER.error("Error running sweep for " + workflowId, e);
} finally {
executionLockService.releaseLock(workflowId);
}
long workflowOffsetTimeout =
workflowOffsetWithJitter(properties.getWorkflowOffsetTimeout().getSeconds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.TaskModel.Status;
import com.netflix.conductor.model.WorkflowModel;
import com.netflix.conductor.service.ExecutionLockService;

import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE;

Expand All @@ -45,6 +46,7 @@ public class TestWorkflowSweeper {
private QueueDAO queueDAO;
private ExecutionDAOFacade executionDAOFacade;
private WorkflowSweeper workflowSweeper;
private ExecutionLockService executionLockService;

private int defaultPostPoneOffSetSeconds = 1800;

Expand All @@ -55,13 +57,15 @@ public void setUp() {
queueDAO = mock(QueueDAO.class);
workflowRepairService = mock(WorkflowRepairService.class);
executionDAOFacade = mock(ExecutionDAOFacade.class);
executionLockService = mock(ExecutionLockService.class);
workflowSweeper =
new WorkflowSweeper(
workflowExecutor,
Optional.of(workflowRepairService),
properties,
queueDAO,
executionDAOFacade);
executionDAOFacade,
executionLockService);
}

@Test
Expand Down
Loading