Skip to content

Commit

Permalink
Merge pull request #214 from rq-dbrady/rq-dbrady/redisLockSweeperRace…
Browse files Browse the repository at this point in the history
…conditionFixes

Fix Issue: Ensure proper locking in WorkflowSweeper to prevent race conditions
  • Loading branch information
v1r3n authored Jul 28, 2024
2 parents 7e877cb + 42a708f commit 9da6a4e
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,6 @@ void restart(String workflowId, boolean useLatestDefinitions)
*/
WorkflowModel decide(String workflowId);

/**
* @param workflow workflow to be evaluated
* @return updated workflow
*/
WorkflowModel decideWithLock(WorkflowModel workflow);

/**
* @param workflowId id of the workflow to be terminated
* @param reason termination reason to be recorded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1039,33 +1039,6 @@ public WorkflowModel decide(String workflowId) {
}
}

/**
* This method overloads the {@link #decide(String)}. It will acquire a lock and evaluate the
* state of the workflow.
*
* @param workflow the workflow to evaluate the state for
* @return the workflow
*/
@Override
public WorkflowModel decideWithLock(WorkflowModel workflow) {
if (workflow == null) {
return null;
}
StopWatch watch = new StopWatch();
watch.start();
if (!executionLockService.acquireLock(workflow.getWorkflowId())) {
return null;
}
try {
return decide(workflow);

} finally {
executionLockService.releaseLock(workflow.getWorkflowId());
watch.stop();
Monitors.recordWorkflowDecisionTime(watch.getTime());
}
}

/**
* @param workflow the workflow to evaluate the state for
* @return true if the workflow has completed (success or failed), false otherwise. Note: This
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.TaskModel.Status;
import com.netflix.conductor.model.WorkflowModel;
import com.netflix.conductor.service.ExecutionLockService;

import static com.netflix.conductor.core.config.SchedulerConfiguration.SWEEPER_EXECUTOR_NAME;
import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE;
Expand All @@ -49,6 +50,7 @@ public class WorkflowSweeper {
private final WorkflowRepairService workflowRepairService;
private final QueueDAO queueDAO;
private final ExecutionDAOFacade executionDAOFacade;
private final ExecutionLockService executionLockService;

private static final String CLASS_NAME = WorkflowSweeper.class.getSimpleName();

Expand All @@ -57,12 +59,14 @@ public WorkflowSweeper(
Optional<WorkflowRepairService> workflowRepairService,
ConductorProperties properties,
QueueDAO queueDAO,
ExecutionDAOFacade executionDAOFacade) {
ExecutionDAOFacade executionDAOFacade,
ExecutionLockService executionLockService) {
this.properties = properties;
this.queueDAO = queueDAO;
this.workflowExecutor = workflowExecutor;
this.executionDAOFacade = executionDAOFacade;
this.workflowRepairService = workflowRepairService.orElse(null);
this.executionLockService = executionLockService;
LOGGER.info("WorkflowSweeper initialized.");
}

Expand All @@ -73,25 +77,26 @@ public CompletableFuture<Void> sweepAsync(String workflowId) {
}

public void sweep(String workflowId) {
WorkflowContext workflowContext = new WorkflowContext(properties.getAppId());
WorkflowContext.set(workflowContext);
WorkflowModel workflow = null;
try {
WorkflowContext workflowContext = new WorkflowContext(properties.getAppId());
WorkflowContext.set(workflowContext);
LOGGER.debug("Running sweeper for workflow {}", workflowId);

if (!executionLockService.acquireLock(workflowId)) {
return;
}
workflow = executionDAOFacade.getWorkflowModel(workflowId, true);

LOGGER.debug("Running sweeper for workflow {}", workflowId);
if (workflowRepairService != null) {
// Verify and repair tasks in the workflow.
workflowRepairService.verifyAndRepairWorkflowTasks(workflow);
}

workflow = workflowExecutor.decideWithLock(workflow);
long decideStartTime = System.currentTimeMillis();
workflow = workflowExecutor.decide(workflow.getWorkflowId());
Monitors.recordWorkflowDecisionTime(System.currentTimeMillis() - decideStartTime);
if (workflow != null && workflow.getStatus().isTerminal()) {
queueDAO.remove(DECIDER_QUEUE, workflowId);
return;
}

} catch (NotFoundException nfe) {
queueDAO.remove(DECIDER_QUEUE, workflowId);
LOGGER.info(
Expand All @@ -100,6 +105,8 @@ public void sweep(String workflowId) {
} catch (Exception e) {
Monitors.error(CLASS_NAME, "sweep");
LOGGER.error("Error running sweep for " + workflowId, e);
} finally {
executionLockService.releaseLock(workflowId);
}
long workflowOffsetTimeout =
workflowOffsetWithJitter(properties.getWorkflowOffsetTimeout().getSeconds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.TaskModel.Status;
import com.netflix.conductor.model.WorkflowModel;
import com.netflix.conductor.service.ExecutionLockService;

import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE;

Expand All @@ -45,6 +46,7 @@ public class TestWorkflowSweeper {
private QueueDAO queueDAO;
private ExecutionDAOFacade executionDAOFacade;
private WorkflowSweeper workflowSweeper;
private ExecutionLockService executionLockService;

private int defaultPostPoneOffSetSeconds = 1800;

Expand All @@ -55,13 +57,15 @@ public void setUp() {
queueDAO = mock(QueueDAO.class);
workflowRepairService = mock(WorkflowRepairService.class);
executionDAOFacade = mock(ExecutionDAOFacade.class);
executionLockService = mock(ExecutionLockService.class);
workflowSweeper =
new WorkflowSweeper(
workflowExecutor,
Optional.of(workflowRepairService),
properties,
queueDAO,
executionDAOFacade);
executionDAOFacade,
executionLockService);
}

@Test
Expand Down

0 comments on commit 9da6a4e

Please sign in to comment.