Skip to content

Commit

Permalink
Fix PrimaryAllocationIT#testForceStaleReplicaToBePromotedToPrimary (#…
Browse files Browse the repository at this point in the history
…35728)


Closes 35497
  • Loading branch information
vladimirdolzhenko authored Nov 20, 2018
1 parent 328448e commit 953c858
Showing 1 changed file with 21 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
Expand All @@ -54,6 +56,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -212,14 +215,26 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
rerouteBuilder.add(new AllocateEmptyPrimaryAllocationCommand(idxName, shardId, storeStatus.getNode().getId(), true));
}
}
rerouteBuilder.get();

ClusterState state = client().admin().cluster().prepareState().get().getState();

Set<String> expectedAllocationIds = useStaleReplica
final Set<String> expectedAllocationIds = useStaleReplica
? Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID)
: Collections.emptySet();
assertEquals(expectedAllocationIds, state.metaData().index(idxName).inSyncAllocationIds(0));

final CountDownLatch clusterStateChangeLatch = new CountDownLatch(1);
final ClusterStateListener clusterStateListener = event -> {
final Set<String> allocationIds = event.state().metaData().index(idxName).inSyncAllocationIds(0);
if (expectedAllocationIds.equals(allocationIds)) {
clusterStateChangeLatch.countDown();
}
logger.info("expected allocation ids: {} actual allocation ids: {}", expectedAllocationIds, allocationIds);
};
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, master);
clusterService.addListener(clusterStateListener);

rerouteBuilder.get();

assertTrue(clusterStateChangeLatch.await(30, TimeUnit.SECONDS));
clusterService.removeListener(clusterStateListener);

logger.info("--> check that the stale primary shard gets allocated and that documents are available");
ensureYellow(idxName);
Expand All @@ -235,7 +250,7 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
assertHitCount(client().prepareSearch(idxName).setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L);

// allocation id of old primary was cleaned from the in-sync set
state = client().admin().cluster().prepareState().get().getState();
final ClusterState state = client().admin().cluster().prepareState().get().getState();

assertEquals(Collections.singleton(state.routingTable().index(idxName).shard(0).primary.allocationId().getId()),
state.metaData().index(idxName).inSyncAllocationIds(0));
Expand Down

0 comments on commit 953c858

Please sign in to comment.