Skip to content

Commit

Permalink
[Remote translog] Add integration tests for primary term validation (o…
Browse files Browse the repository at this point in the history
…pensearch-project#8406)

---------

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 authored and vikasvb90 committed Jul 12, 2023
1 parent 7f0bbcb commit 8e99099
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.indices.refresh.RefreshResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.coordination.FollowersChecker;
import org.opensearch.cluster.coordination.LeaderChecker;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.health.ClusterIndexHealth;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.shard.ShardNotFoundException;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.transport.MockTransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)

public class PrimaryTermValidationIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

public void testPrimaryTermValidation() throws Exception {
// Follower checker interval is lower compared to leader checker so that the cluster manager can remove the node
// with network partition faster. The follower check retry count is also kept 1.
Settings clusterSettings = Settings.builder()
.put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "1s")
.put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "20s")
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 4)
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "1s")
.put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "1s")
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.build();
internalCluster().startClusterManagerOnlyNode(clusterSettings);

// Create repository
absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);

// Start data nodes and create index
internalCluster().startDataOnlyNodes(2, clusterSettings);
createIndex(INDEX_NAME, remoteTranslogIndexSettings(1));
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

// Get the names of nodes to create network disruption
String primaryNode = primaryNodeName(INDEX_NAME);
String replicaNode = replicaNodeName(INDEX_NAME);
String clusterManagerNode = internalCluster().getClusterManagerName();
logger.info("Node names : clusterManager={} primary={} replica={}", clusterManagerNode, primaryNode, replicaNode);

// Index some docs and validate that both primary and replica node has it. Refresh is triggered to trigger segment replication
// to ensure replica is also upto date.
int numOfDocs = randomIntBetween(5, 10);
for (int i = 0; i < numOfDocs; i++) {
indexSameDoc(clusterManagerNode, INDEX_NAME);
}
refresh(INDEX_NAME);
assertBusy(
() -> assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), numOfDocs)
);
assertBusy(
() -> assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), numOfDocs)
);

// Start network disruption - primary node will be isolated
Set<String> nodesInOneSide = Stream.of(clusterManagerNode, replicaNode).collect(Collectors.toCollection(HashSet::new));
Set<String> nodesInOtherSide = Stream.of(primaryNode).collect(Collectors.toCollection(HashSet::new));
NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
NetworkDisruption.DISCONNECT
);
internalCluster().setDisruptionScheme(networkDisruption);
logger.info("--> network disruption is started");
networkDisruption.startDisrupting();

// Ensure the node which is partitioned is removed from the cluster
assertBusy(() -> {
NodesInfoResponse response = client(clusterManagerNode).admin().cluster().prepareNodesInfo().get();
assertThat(response.getNodes().size(), equalTo(2));
});

// Ensure that the cluster manager has latest information about the index
assertBusy(() -> {
ClusterHealthResponse clusterHealthResponse = client(clusterManagerNode).admin()
.cluster()
.health(new ClusterHealthRequest())
.actionGet(TimeValue.timeValueSeconds(1));
assertTrue(clusterHealthResponse.getIndices().containsKey(INDEX_NAME));
ClusterIndexHealth clusterIndexHealth = clusterHealthResponse.getIndices().get(INDEX_NAME);
assertEquals(ClusterHealthStatus.YELLOW, clusterHealthResponse.getStatus());
assertEquals(1, clusterIndexHealth.getNumberOfShards());
assertEquals(1, clusterIndexHealth.getActiveShards());
assertEquals(1, clusterIndexHealth.getUnassignedShards());
assertEquals(1, clusterIndexHealth.getUnassignedShards());
assertEquals(1, clusterIndexHealth.getActivePrimaryShards());
assertEquals(ClusterHealthStatus.YELLOW, clusterIndexHealth.getStatus());
});

// Index data to the newly promoted primary
indexSameDoc(clusterManagerNode, INDEX_NAME);
RefreshResponse refreshResponse = client(clusterManagerNode).admin()
.indices()
.prepareRefresh(INDEX_NAME)
.setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED)
.execute()
.actionGet();
assertNoFailures(refreshResponse);
assertEquals(1, refreshResponse.getSuccessfulShards());
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), numOfDocs + 1);

// At this point we stop the disruption. Since the follower checker has already failed and cluster manager has removed the node
// from cluster, failed node needs to start discovery process by leader checker call. We stop the disruption to allow the failed
// node to
// communicate with the other node which it assumes has replica.
networkDisruption.stopDisrupting();

// When the index call is made to the stale primary, it makes the primary term validation call to the other node (which
// it assumes has the replica node). At this moment, the stale primary realises that it is no more the primary and the caller
// received the following exception.
ShardNotFoundException exception = assertThrows(ShardNotFoundException.class, () -> indexSameDoc(primaryNode, INDEX_NAME));
assertTrue(exception.getMessage().contains("no such shard"));
ensureStableCluster(3);
ensureGreen(INDEX_NAME);
}

private IndexResponse indexSameDoc(String nodeName, String indexName) {
return client(nodeName).prepareIndex(indexName)
.setId(UUIDs.randomBase64UUID())
.setSource("{\"foo\" : \"bar\"}", XContentType.JSON)
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
Expand Down Expand Up @@ -100,8 +99,7 @@ protected void putRepository(Path path) {
);
}

@Before
public void setup() {
protected void setupRepo() {
internalCluster().startClusterManagerOnlyNode();
absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.remotestore;

import org.junit.Before;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
Expand Down Expand Up @@ -51,6 +52,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

@Before
public void setup() {
setupRepo();
}

@Override
public Settings indexSettings() {
return remoteStoreIndexSettings(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.remotestore;

import org.junit.Before;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsRequestBuilder;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
Expand All @@ -28,6 +29,11 @@ public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";

@Before
public void setup() {
setupRepo();
}

public void testStatsResponseFromAllNodes() {

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.remotestore;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.Before;
import org.opensearch.action.admin.indices.close.CloseIndexResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -31,6 +32,11 @@
public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIntegTestCase {
private int shard_count = 5;

@Before
public void setup() {
setupRepo();
}

@Override
public Settings indexSettings() {
return Settings.builder()
Expand Down

0 comments on commit 8e99099

Please sign in to comment.