Skip to content

Commit

Permalink
Treat started segments as running when aborting.
Browse files Browse the repository at this point in the history
On startup (or periodically in case of the Cassandra storage), the
RepairManager aborts segments that do not have a leader or that belong
to paused repair runs.

However, this code would only abort segments in the RUNNING state. This
is a problem because segments might also be left in the STARTED state
when a Cassandra Reaper instance is restarted. Such segments would
block other segments from running (because when checking for busy hosts,
STARTED segments are treated like RUNNING segments), but would never
finish or be restarted, thus effectively blocking the repair process.

This patch fixes this problem by treating RUNNING and STARTED segments
in the same way when aborting segments that do not have a leader or that
belong to paused repair runs.
  • Loading branch information
smarsching authored and adejanovski committed Apr 9, 2020
1 parent 2cf9456 commit 64b786c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,11 @@ private void abortAllRunningSegmentsWithNoLeader(Collection<RepairRun> runningRe
.forEach((repairRun) -> {
Collection<RepairSegment> runningSegments
= context.storage.getSegmentsWithState(repairRun.getId(), RepairSegment.State.RUNNING);
Collection<RepairSegment> startedSegments
= context.storage.getSegmentsWithState(repairRun.getId(), RepairSegment.State.STARTED);

abortSegmentsWithNoLeader(repairRun, runningSegments);
abortSegmentsWithNoLeader(repairRun, startedSegments);
});
}

Expand Down Expand Up @@ -187,11 +190,14 @@ private void abortAllRunningSegmentsInKnownPausedRepairRuns(Collection<RepairRun
.stream()
.filter((pausedRepairRun) -> repairRunners.containsKey(pausedRepairRun.getId()))
.forEach((pausedRepairRun) -> {
// Abort all running segments for paused repair runs
// Abort all running and started segments for paused repair runs
Collection<RepairSegment> runningSegments
= context.storage.getSegmentsWithState(pausedRepairRun.getId(), RepairSegment.State.RUNNING);
Collection<RepairSegment> startedSegments
= context.storage.getSegmentsWithState(pausedRepairRun.getId(), RepairSegment.State.STARTED);

abortSegments(runningSegments, pausedRepairRun);
abortSegments(startedSegments, pausedRepairRun);
});
} finally {
repairRunnersLock.unlock();
Expand Down Expand Up @@ -223,7 +229,7 @@ private void abortSegmentsWithNoLeader(RepairRun repairRun, Collection<RepairSeg
repairRunnersLock.lock();
if (context.storage instanceof IDistributedStorage || !repairRunners.containsKey(repairRun.getId())) {
// When multiple Reapers are in use, we can get stuck segments when one instance is rebooted
// Any segment in RUNNING state but with no leader should be killed
// Any segment in RUNNING or STARTED state but with no leader should be killed
List<UUID> leaders = context.storage instanceof IDistributedStorage
? ((IDistributedStorage) context.storage).getLeaders()
: Collections.emptyList();
Expand Down Expand Up @@ -276,7 +282,8 @@ void abortSegments(Collection<RepairSegment> runningSegments, RepairRun repairRu
try {
// refresh segment once we're inside leader-election
segment = context.storage.getRepairSegment(repairRun.getId(), segment.getId()).get();
if (RepairSegment.State.RUNNING == segment.getState()) {
if (RepairSegment.State.RUNNING == segment.getState()
|| RepairSegment.State.STARTED == segment.getState()) {
JmxProxy jmxProxy = ClusterFacade.create(context).connect(
context.storage.getCluster(repairRun.getClusterName()),
Arrays.asList(segment.getCoordinatorHost()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void abortRunningSegmentWithNoLeader() throws ReaperException, Interrupte
context.repairManager.resumeRunningRepairRuns();

// Check that abortSegments was invoked is at least one segment, meaning abortion occurs
Mockito.verify(context.repairManager, Mockito.times(1)).abortSegments(Mockito.argThat(new NotEmptyList()), any());
Mockito.verify(context.repairManager, Mockito.times(2)).abortSegments(Mockito.argThat(new NotEmptyList()), any());
}

/**
Expand Down Expand Up @@ -202,7 +202,7 @@ public void doNotAbortRunningSegmentWithLeader() throws ReaperException, Interru
context.repairManager.resumeRunningRepairRuns();

// Check that abortSegments was invoked with an empty list, meaning no abortion occurs
Mockito.verify(context.repairManager, Mockito.times(1)).abortSegments(Mockito.argThat(new EmptyList()), any());
Mockito.verify(context.repairManager, Mockito.times(2)).abortSegments(Mockito.argThat(new EmptyList()), any());
}

/**
Expand Down Expand Up @@ -349,7 +349,7 @@ public void abortRunningSegmentWithNoRepairRunnerAndNoDistributedStorage()
context.repairManager.resumeRunningRepairRuns();

// Check that abortSegments was invoked with an non empty list, meaning abortion occurs
Mockito.verify(context.repairManager, Mockito.times(1)).abortSegments(Mockito.argThat(new NotEmptyList()), any());
Mockito.verify(context.repairManager, Mockito.times(2)).abortSegments(Mockito.argThat(new NotEmptyList()), any());
}

@Test
Expand Down

0 comments on commit 64b786c

Please sign in to comment.