Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Issue: Ensure proper locking in WorkflowSweeper to prevent race conditions #214

Conversation

rq-dbrady
Copy link
Contributor

Pull Request type

  • Bugfix
  • Feature
  • Refactoring (no functional changes, no api changes)
  • Build related changes
  • WHOSUSING.md
  • Other (please describe):

NOTE: Please remember to run ./gradlew spotlessApply to fix any format violations.

Changes in this PR

Issue:#213
Correct locking mechanism in WorkflowSweeper to prevent race conditions

Previously, the WorkflowSweeper.class had a potential race condition due to the order of operations in the sweep method. The workflow was being fetched from the executionDaoFacade before acquiring the lock, followed by a verifyAndRepair operation that could mutate the state. This sequence allowed for a small window (~50µ to 100µ seconds) where a workflow could be in two different states across different threads, causing inconsistencies and failures in workflow listeners or completion checks.

Changes made:

  • Removed the decideWithLock method.
  • Moved the locking logic directly into the sweep method to ensure the workflow is locked before any operations are performed on it.
  • Ensured the workflow lock is only released after it is removed from the queue to prevent race conditions.

Observed issues:

  • Race conditions were observed at large scale (30 replicas in Kubernetes, Redis cluster with Redis lock, ~75-90 workflows/sec).
  • Workflows could be in a "Running" state even after triggering the finish, leading to listener and completion check failures.

New implementation in sweep method:

  • Acquire lock before fetching the workflow and performing any operations.
  • Handle verify and repair within the lock.
  • Ensure the workflow is locked throughout the decision process.
  • Release the lock only after removing the workflow from the queue.

This fix ensures atomicity in operations on workflows, preventing the race conditions previously observed.

@@ -74,24 +79,25 @@ public CompletableFuture<Void> sweepAsync(String workflowId) {

public void sweep(String workflowId) {
WorkflowModel workflow = null;
StopWatch watch = new StopWatch();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rq-dbrady , Instead of using stopwatch, Can we try to acquire a lock here and if successful, then we do repair or come out of sweep logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So currently it follows the logic:

  1. Create stopwatch.
  2. Set the workflow context.
  3. AcquireLock (if fail return from sweep)
  4. Gets workflow from store.
  5. Run repair
  6. Start Stop Watch.
  7. Decide
  8. Remove From DeciderQueue
  9. Release lock and Stop StopWatch.

Are you proposing we Acquire the lock at the method start and then start the stop watch ? so something like

 public void sweep(String workflowId) {
WorkflowModel workflow = null;
StopWatch watch = new StopWatch();
 if (!executionLockService.acquireLock(workflowId)) {
watch.start();
  //Do repair logic 
// Decide etc ...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rq-dbrady , Yes, the first thing we should try is to acquire a lock. then start a stop watch, run decide, stop stopwatch. release a lock. Optionally we dont need stop watch to measure execution time. We can use System.currentTimeMillis()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@manan164 , Applied changes in the logic order suggested !

  1. Acquire lock
  2. Run repair
  3. Start timer
  4. Decide
  5. End timer
  6. Release lock

@rq-dbrady rq-dbrady force-pushed the rq-dbrady/redisLockSweeperRaceconditionFixes branch 2 times, most recently from 0c37dca to 76df8d7 Compare July 23, 2024 17:40
@rq-dbrady rq-dbrady force-pushed the rq-dbrady/redisLockSweeperRaceconditionFixes branch from 76df8d7 to 42a708f Compare July 23, 2024 17:41
@rq-dbrady rq-dbrady changed the title fix: Ensure proper locking in WorkflowSweeper to prevent race conditions Fix Issue: Ensure proper locking in WorkflowSweeper to prevent race conditions Jul 23, 2024
@v1r3n v1r3n merged commit 9da6a4e into conductor-oss:main Jul 28, 2024
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants