Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Add Segment Replication Specific Integration Tests #11773

Merged
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1e47a6e
Run few tests with Segment Replication enabled.
Rishikesh1159 Jan 4, 2024
56b1d57
Update reason for ignoring test.
Rishikesh1159 Jan 5, 2024
738df75
remove @ignore to resolve :server:forbiddenApisInternalClusterTest ch…
Rishikesh1159 Jan 5, 2024
086baed
fix spotlessCheck.
Rishikesh1159 Jan 5, 2024
cde361f
add conditional logic of force refresh.
Rishikesh1159 Jan 9, 2024
21d766a
Address comments on PR.
Rishikesh1159 Jan 16, 2024
867ecc4
fix failing errors.
Rishikesh1159 Jan 16, 2024
ac9ae54
Merge branch 'opensearch-project:main' into segrep-enabled-integTest
Rishikesh1159 Jan 22, 2024
6222ecb
Merge branch 'segrep-enabled-integTest' of https://github.com/Rishike…
Rishikesh1159 Jan 22, 2024
2f499df
Use parameterization for running segment replication tests.
Rishikesh1159 Jan 23, 2024
1464a3a
Fix failing tests.
Rishikesh1159 Jan 24, 2024
9c57edc
Fix failing test.
Rishikesh1159 Jan 24, 2024
c8a4caf
Merge branch 'opensearch-project:main' into segrep-enabled-integTest
Rishikesh1159 Jan 28, 2024
230b90e
add new waitForReplication() and refactor.
Rishikesh1159 Jan 28, 2024
1947a0b
Merge branch 'opensearch-project:main' into segrep-enabled-integTest
Rishikesh1159 Jan 31, 2024
7627fb9
Address comments on PR and revert back changes made to SegmentReplica…
Rishikesh1159 Jan 31, 2024
a8ee4cd
Merge branch 'opensearch-project:main' into segrep-enabled-integTest
Rishikesh1159 Jan 31, 2024
dcdc7bf
revert changes made to Segrep tests.
Rishikesh1159 Jan 31, 2024
cdd1f78
Merge branch 'segrep-enabled-integTest' of https://github.com/Rishike…
Rishikesh1159 Jan 31, 2024
3ccd516
Refactor and address comments.
Rishikesh1159 Feb 5, 2024
df63dca
fix failure of using forbidden api new Random().
Rishikesh1159 Feb 5, 2024
1c8ccfb
Add comments to debug.
Rishikesh1159 Feb 5, 2024
19d99d8
Remove non-critical tests from running with segrep.
Rishikesh1159 Feb 7, 2024
718ecb3
Merge branch 'opensearch-project:main' into segrep-enabled-integTest
Rishikesh1159 Feb 7, 2024
0eaa818
Fix test to run with segrep.
Rishikesh1159 Feb 7, 2024
4649c42
Merge branch 'segrep-enabled-integTest' of https://github.com/Rishike…
Rishikesh1159 Feb 7, 2024
bcc0ca0
Merge branch 'opensearch-project:main' into segrep-enabled-integTest
Rishikesh1159 Feb 8, 2024
d2a73b6
separate out refresh and waitForReplication into different methods.
Rishikesh1159 Feb 8, 2024
d7e9027
Merge branch 'segrep-enabled-integTest' of https://github.com/Rishike…
Rishikesh1159 Feb 8, 2024
de8e7e2
refactor with usage of waitForReplication().
Rishikesh1159 Feb 8, 2024
2c305cb
fix parameters passed in factory for IndexStatsIT.
Rishikesh1159 Feb 8, 2024
e6d3703
Update IndexstatsIT to run with segrep
Rishikesh1159 Feb 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationShardStats;
Expand Down Expand Up @@ -175,17 +174,6 @@ private IndexShard getIndexShard(ClusterState state, ShardRouting routing, Strin
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName);
}

/**
* Fetch IndexShard by shardId, multiple shards per node allowed.
*/
protected IndexShard getIndexShard(String node, ShardId shardId, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> id = indexService.shardIds().stream().filter(sid -> sid == shardId.id()).findFirst();
return indexService.getShard(id.get());
}

/**
* Fetch IndexShard, assumes only a single shard per node.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
Expand All @@ -130,7 +131,9 @@ public IndexStatsIT(Settings settings) {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT).build() },
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() }
);
}

Expand Down Expand Up @@ -175,7 +178,7 @@ public void testFieldDataStats() throws InterruptedException {
ensureGreen();
client().prepareIndex("test").setId("1").setSource("field", "value1", "field2", "value1").execute().actionGet();
client().prepareIndex("test").setId("2").setSource("field", "value2", "field2", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
refreshAndWaitForReplication();
indexRandomForConcurrentSearch("test");

NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
Expand Down Expand Up @@ -299,7 +302,7 @@ public void testClearAllCaches() throws Exception {
client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client().prepareIndex("test").setId("1").setSource("field", "value1").execute().actionGet();
client().prepareIndex("test").setId("2").setSource("field", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
refreshAndWaitForReplication();
indexRandomForConcurrentSearch("test");

NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
Expand Down Expand Up @@ -667,7 +670,7 @@ public void testSimpleStats() throws Exception {
client().prepareIndex("test1").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test1").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();
refresh();
refreshAndWaitForReplication();

NumShards test1 = getNumShards("test1");
long test1ExpectedWrites = 2 * test1.dataCopies;
Expand All @@ -682,7 +685,13 @@ public void testSimpleStats() throws Exception {
assertThat(stats.getPrimaries().getIndexing().getTotal().getIndexFailedCount(), equalTo(0L));
assertThat(stats.getPrimaries().getIndexing().getTotal().isThrottled(), equalTo(false));
assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis(), equalTo(0L));
assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites));

// This assert should not be done on segrep enabled indices because we are asserting Indexing/Write operations count on
// all primary and replica shards. But in case of segrep, Indexing/Write operation don't happen on replica shards. So we can
// ignore this assert check for segrep enabled indices.
if (isSegmentReplicationEnabledForIndex("test1") == false && isSegmentReplicationEnabledForIndex("test2") == false) {
assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites));
}
assertThat(stats.getTotal().getStore(), notNullValue());
assertThat(stats.getTotal().getMerge(), notNullValue());
assertThat(stats.getTotal().getFlush(), notNullValue());
Expand Down Expand Up @@ -825,6 +834,7 @@ public void testMergeStats() {
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
stats = client().admin().indices().prepareStats().setMerge(true).execute().actionGet();

refreshAndWaitForReplication();
assertThat(stats.getTotal().getMerge(), notNullValue());
assertThat(stats.getTotal().getMerge().getTotal(), greaterThan(0L));
}
Expand All @@ -851,7 +861,7 @@ public void testSegmentsStats() {

client().admin().indices().prepareFlush().get();
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
client().admin().indices().prepareRefresh().get();
refreshAndWaitForReplication();
stats = client().admin().indices().prepareStats().setSegments(true).get();

assertThat(stats.getTotal().getSegments(), notNullValue());
Expand All @@ -869,7 +879,7 @@ public void testAllFlags() throws Exception {
client().prepareIndex("test_index").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test_index_2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();

client().admin().indices().prepareRefresh().execute().actionGet();
refreshAndWaitForReplication();
IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats();
Flag[] values = CommonStatsFlags.Flag.values();
for (Flag flag : values) {
Expand Down Expand Up @@ -1453,6 +1463,7 @@ public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() {
.get()
.status()
);
refreshAndWaitForReplication();
ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).setTranslog(true).get().getShards()[0];
RemoteSegmentStats remoteSegmentStatsFromIndexStats = shard.getStats().getSegments().getRemoteSegmentStats();
assertZeroRemoteSegmentStats(remoteSegmentStatsFromIndexStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.recovery;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.refresh.RefreshResponse;
Expand All @@ -52,10 +54,11 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -69,12 +72,26 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoTimeout;

public class RecoveryWhileUnderLoadIT extends OpenSearchIntegTestCase {
public class RecoveryWhileUnderLoadIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {

public RecoveryWhileUnderLoadIT(Settings settings) {
super(settings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT).build() },
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() }
);
}

private final Logger logger = LogManager.getLogger(RecoveryWhileUnderLoadIT.class);

public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin {
Expand Down Expand Up @@ -476,7 +493,7 @@ private void logSearchResponse(int numberOfShards, long numberOfDocs, int iterat

private void refreshAndAssert() throws Exception {
assertBusy(() -> {
RefreshResponse actionGet = client().admin().indices().prepareRefresh().get();
RefreshResponse actionGet = refreshAndWaitForReplication();
Rishikesh1159 marked this conversation as resolved.
Show resolved Hide resolved
assertAllSuccessful(actionGet);
}, 5, TimeUnit.MINUTES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.tests.util.LuceneTestCase;
Expand Down Expand Up @@ -92,6 +93,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.FeatureFlagSettings;
Expand All @@ -114,6 +116,7 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand All @@ -123,6 +126,7 @@
import org.opensearch.env.Environment;
import org.opensearch.env.TestEnvironment;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergeSchedulerConfig;
import org.opensearch.index.MockEngineFactoryPlugin;
Expand All @@ -131,10 +135,12 @@
import org.opensearch.index.engine.Segment;
import org.opensearch.index.mapper.CompletionFieldMapper;
import org.opensearch.index.mapper.MockFieldFilterPlugin;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.monitor.os.OsInfo;
import org.opensearch.node.NodeMocksPlugin;
Expand Down Expand Up @@ -182,6 +188,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -1548,22 +1555,20 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma
);
}
}
if (forceRefresh) {
assertNoFailures(
client().admin().indices().prepareRefresh(indicesArray).setIndicesOptions(IndicesOptions.lenientExpandOpen()).get()
);
}
if (dummyDocuments) {
indexRandomForMultipleSlices(indicesArray);
}
if (forceRefresh) {
refreshAndWaitForReplication();
}
}

/*
* This method ingests bogus documents for the given indices such that multiple slices
* are formed. This is useful for testing with the concurrent search use-case as it creates
* multiple slices based on segment count.
* @param indices the indices in which bogus documents should be ingested
* */
* This method ingests bogus documents for the given indices such that multiple slices
* are formed. This is useful for testing with the concurrent search use-case as it creates
* multiple slices based on segment count.
* @param indices the indices in which bogus documents should be ingested
* */
protected void indexRandomForMultipleSlices(String... indices) throws InterruptedException {
Set<List<String>> bogusIds = new HashSet<>();
int refreshCount = randomIntBetween(2, 3);
Expand Down Expand Up @@ -2357,4 +2362,88 @@ protected ClusterState getClusterState() {
return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
}

/**
* Refreshes the indices in the cluster and checks if active/started replica shards
* caught up with primary shard only when Segment Replication is enabled.
* This doesn't wait for inactive/non-started replica shards to become active/started.
*/
protected RefreshResponse refreshAndWaitForReplication(String... indices) {
RefreshResponse refreshResponse = refresh(indices);
if (indices.length == 0) {
indices = getClusterState().routingTable().indicesRouting().keySet().toArray(String[]::new);
}
try {
for (String index : indices) {
if (isSegmentReplicationEnabledForIndex(index)) {
if (isInternalCluster()) {
IndexRoutingTable indexRoutingTable = getClusterState().routingTable().index(index);
if (indexRoutingTable != null) {
assertBusy(() -> {
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
final ShardRouting primaryRouting = shardRoutingTable.primaryShard();
if (primaryRouting.state().toString().equals("STARTED")) {
if (isSegmentReplicationEnabledForIndex(index)) {
final List<ShardRouting> replicaRouting = shardRoutingTable.replicaShards();
final IndexShard primaryShard = getIndexShard(primaryRouting, index);
for (ShardRouting replica : replicaRouting) {
if (replica.state().toString().equals("STARTED")) {
IndexShard replicaShard = getIndexShard(replica, index);
assertEquals(
"replica shards haven't caught up with primary",
getLatestSegmentInfoVersion(primaryShard),
getLatestSegmentInfoVersion(replicaShard)
);
}
}
}
}
}
}, 30, TimeUnit.SECONDS);
}
} else {
throw new IllegalStateException(
"Segment Replication is not supported for testing tests using External Test Cluster"
);
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return refreshResponse;
}

/**
* Checks if Segment Replication is enabled on Index.
*/
protected boolean isSegmentReplicationEnabledForIndex(String index) {
return clusterService().state().getMetadata().isSegmentReplicationEnabled(index);
}

protected IndexShard getIndexShard(ShardRouting routing, String indexName) {
return getIndexShard(getClusterState().nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName);
}

/**
* Fetch IndexShard by shardId, multiple shards per node allowed.
*/
protected IndexShard getIndexShard(String node, ShardId shardId, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> id = indexService.shardIds().stream().filter(sid -> sid.equals(shardId.id())).findFirst();
return indexService.getShard(id.get());
}

/**
* Fetch latest segment info snapshot version of an index.
*/
protected long getLatestSegmentInfoVersion(IndexShard shard) {
try (final GatedCloseable<SegmentInfos> snapshot = shard.getSegmentInfosSnapshot()) {
return snapshot.get().version;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ abstract class ParameterizedOpenSearchIntegTestCase extends OpenSearchIntegTestC

// This method shouldn't be called in setupSuiteScopeCluster(). Only call this method inside single test.
public void indexRandomForConcurrentSearch(String... indices) throws InterruptedException {
if (settings.get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey()).equals("true")) {
if (CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.get(settings)) {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
indexRandomForMultipleSlices(indices);
}
}
Expand Down
Loading