Skip to content

Commit

Permalink
fix(raft): notify role change listener only when transition completed
Browse files Browse the repository at this point in the history
  • Loading branch information
romansmirnov committed Dec 7, 2021
1 parent 520e82e commit 91cb50d
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 28 deletions.
32 changes: 30 additions & 2 deletions atomix/cluster/src/main/java/io/atomix/raft/impl/RaftContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ public class RaftContext implements AutoCloseable, HealthMonitorable {
private PersistedSnapshot currentSnapshot;
private volatile HealthStatus health = HealthStatus.HEALTHY;

private boolean ongoingTransition = false;

private long lastHeartbeat;
private final RaftPartitionConfig partitionConfig;
private final int partitionId;
Expand Down Expand Up @@ -291,12 +293,24 @@ public int getMaxAppendsPerFollower() {
}

/**
* Adds a role change listener.
* Adds a role change listener. If there isn't currently a transition ongoing the listener is
* called immediately after adding the listener.
*
* @param listener The role change listener.
*/
public void addRoleChangeListener(final RaftRoleChangeListener listener) {
roleChangeListeners.add(listener);
threadContext.execute(
() -> {
roleChangeListeners.add(listener);

// When a transition is currently ongoing, then the given
// listener will be called when the transition completes.
if (!ongoingTransition) {
// Otherwise, the listener will called directly for the last
// completed transition.
listener.onNewRole(getRole(), getTerm());
}
});
}

/**
Expand Down Expand Up @@ -587,6 +601,8 @@ public void transition(final Role role) {

log.info("Transitioning to {}", role);

startTransition();

// Close the old state.
try {
this.role.stop().get();
Expand Down Expand Up @@ -616,10 +632,14 @@ public void transition(final Role role) {
() -> {
if (this.role == leaderRole) { // ensure no other role change happened in between
notifyRoleChangeListeners();
// Transitioning to leader completes
// once the initial entry gets committed
completeTransition();
}
});
} else {
notifyRoleChangeListeners();
completeTransition();
}
}
}
Expand All @@ -641,6 +661,14 @@ public void removeFailureListener(final FailureListener listener) {
failureListeners.remove(listener);
}

private void startTransition() {
ongoingTransition = true;
}

private void completeTransition() {
ongoingTransition = false;
}

private void notifyRoleChangeListeners() {
try {
roleChangeListeners.forEach(l -> l.onNewRole(role.role(), getTerm()));
Expand Down
43 changes: 38 additions & 5 deletions atomix/cluster/src/test/java/io/atomix/raft/RaftTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -344,15 +344,22 @@ private void waitUntil(final BooleanSupplier condition, int retries) {
public void testRoleChangeNotificationAfterInitialEntryOnLeader() throws Throwable {
// given
final List<RaftServer> servers = createServers(3);
final RaftServer leader = getLeader(servers).get();
final RaftServer previousLeader = getLeader(servers).get();
final long previousLeaderTerm = previousLeader.getTerm();

final CountDownLatch transitionCompleted = new CountDownLatch(1);

servers.forEach(
server ->
server.addRoleChangeListener(
(role, term) ->
assertLastReadInitialEntry(role, term, server, transitionCompleted)));
(role, term) -> {
if (term > previousLeaderTerm) {
assertLastReadInitialEntry(role, term, server, transitionCompleted);
}
}));

// when
leader.stepDown();
previousLeader.stepDown();

// then
assertTrue(transitionCompleted.await(1000, TimeUnit.SECONDS));
Expand Down Expand Up @@ -468,7 +475,7 @@ public void shouldFailOverOnLeaderDisconnect() throws Throwable {
s ->
s.addRoleChangeListener(
(role, term) -> {
if (role == Role.LEADER) {
if (role == Role.LEADER && !s.equals(leader)) {
newLeaderId.set(s.getContext().getCluster().getLocalMember().memberId());
newLeaderElected.countDown();
}
Expand Down Expand Up @@ -577,6 +584,32 @@ public void shouldReSendPollRequestOnTimeouts() throws Throwable {
verify(followerServer, timeout(5000).atLeast(2)).poll(any(), any());
}

@Test
public void shouldNotifyListenerWhenNoTransitionIsOngoing() throws Throwable {
// given
final var listenerLatch = new CountDownLatch(1);
final AtomicReference<Role> roleWithinListener = new AtomicReference<>(null);
final AtomicLong termWithinListener = new AtomicLong(-1L);

final var server = createServers(1).get(0);

// expect
assertThat(server.isLeader()).isTrue();

// when
server.addRoleChangeListener(
(role, term) -> {
roleWithinListener.set(role);
termWithinListener.set(term);
listenerLatch.countDown();
});

// then
assertThat(listenerLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(roleWithinListener.get()).isEqualTo(server.getRole());
assertThat(termWithinListener.get()).isEqualTo(server.getTerm());
}

private void appendEntries(final RaftServer leader, final int count) {
for (int i = 0; i < count; i++) {
appendEntryAsync(leader, 1024);
Expand Down
5 changes: 0 additions & 5 deletions broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,6 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ public void onActorStarting() {

context = transitionContext.getPartitionContext();

registerListenersAndTriggerRoleChange();
registerListeners();
});
} else {
registerListenersAndTriggerRoleChange();
registerListeners();
}
}

Expand Down Expand Up @@ -312,10 +312,9 @@ private ActorFuture<Void> transitionToInactive() {
return inactiveTransitionFuture;
}

private void registerListenersAndTriggerRoleChange() {
private void registerListeners() {
context.getRaftPartition().addRoleChangeListener(this);
context.getComponentHealthMonitor().addFailureListener(this);
onRoleChange(context.getRaftPartition().getRole(), context.getRaftPartition().term());
context.getRaftPartition().getServer().addSnapshotReplicationListener(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
*/
package io.camunda.zeebe.broker.partitioning;

import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

import io.atomix.cluster.AtomixCluster;
import io.atomix.cluster.AtomixClusterBuilder;
Expand Down Expand Up @@ -115,7 +112,7 @@ public void testStepDownInRoleChangedListener() throws Exception {
followerLatch.await(10, TimeUnit.SECONDS);

// single node becomes directly leader again
assertThat(roles, contains(Role.LEADER, Role.LEADER));
assertThat(roles).containsSequence(Role.INACTIVE, Role.LEADER, Role.LEADER);
}

@Test
Expand Down Expand Up @@ -212,12 +209,12 @@ public void testStepDownOnRoleChangeInCluster() throws Exception {

// then
CompletableFuture.allOf(nodeOneFuture, nodeTwoFuture, nodeThreeFuture).join();
assertTrue(latch.await(15, TimeUnit.SECONDS));
assertThat(latch.await(15, TimeUnit.SECONDS)).isTrue();

// expect normal leaders are not the leaders this time
assertEquals(Role.FOLLOWER, nodeRoles.get(0).get(1));
assertEquals(Role.FOLLOWER, nodeRoles.get(1).get(2));
assertEquals(Role.FOLLOWER, nodeRoles.get(2).get(3));
assertThat(nodeRoles.get(0)).containsEntry(1, Role.FOLLOWER);
assertThat(nodeRoles.get(1)).containsEntry(2, Role.FOLLOWER);
assertThat(nodeRoles.get(2)).containsEntry(3, Role.FOLLOWER);

final List<Role> leaderRoles =
nodeRoles.stream()
Expand All @@ -231,8 +228,8 @@ public void testStepDownOnRoleChangeInCluster() throws Exception {
.filter(r -> r == Role.FOLLOWER)
.collect(Collectors.toList());

assertEquals(3, leaderRoles.size());
assertEquals(6, followerRoles.size());
assertThat(leaderRoles).hasSize(3);
assertThat(followerRoles).hasSize(6);
}

private CompletableFuture<Void> startSingleNodeSinglePartitionWithPartitionConsumer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public void shouldGoInactiveAfterFailedFollowerTransition() throws InterruptedEx

// when
schedulerRule.submitActor(partition);
partition.onNewRole(Role.FOLLOWER, 0);
schedulerRule.workUntilDone();
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();

Expand Down Expand Up @@ -207,6 +208,7 @@ public void shouldGoInactiveIfTransitionHasUnrecoverableFailure() throws Interru

// when
schedulerRule.submitActor(partition);
partition.onNewRole(raft.getRole(), raft.term());
schedulerRule.workUntilDone();
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public void shouldTransitionToAndCloseInSequence() {

// then
final InOrder inOrder = Mockito.inOrder(transition);
inOrder.verify(transition).toInactive();
inOrder.verify(transition).toLeader(1);
inOrder.verify(transition).toFollower(1);
inOrder.verify(transition).toInactive();
Expand Down

0 comments on commit 91cb50d

Please sign in to comment.