Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Add Segment Replication Specific Integration Tests #11773

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1e47a6e
Run few tests with Segment Replication enabled.
Rishikesh1159 Jan 4, 2024
56b1d57
Update reason for ignoring test.
Rishikesh1159 Jan 5, 2024
738df75
remove @ignore to resolve :server:forbiddenApisInternalClusterTest ch…
Rishikesh1159 Jan 5, 2024
086baed
fix spotlessCheck.
Rishikesh1159 Jan 5, 2024
cde361f
add conditional logic of force refresh.
Rishikesh1159 Jan 9, 2024
21d766a
Address comments on PR.
Rishikesh1159 Jan 16, 2024
867ecc4
fix failing errors.
Rishikesh1159 Jan 16, 2024
ac9ae54
Merge branch 'opensearch-project:main' into segrep-enabled-integTest
Rishikesh1159 Jan 22, 2024
6222ecb
Merge branch 'segrep-enabled-integTest' of https://github.com/Rishike…
Rishikesh1159 Jan 22, 2024
2f499df
Use parameterization for running segment replication tests.
Rishikesh1159 Jan 23, 2024
1464a3a
Fix failing tests.
Rishikesh1159 Jan 24, 2024
9c57edc
Fix failing test.
Rishikesh1159 Jan 24, 2024
c8a4caf
Merge branch 'opensearch-project:main' into segrep-enabled-integTest
Rishikesh1159 Jan 28, 2024
230b90e
add new waitForReplication() and refactor.
Rishikesh1159 Jan 28, 2024
1947a0b
Merge branch 'opensearch-project:main' into segrep-enabled-integTest
Rishikesh1159 Jan 31, 2024
7627fb9
Address comments on PR and revert back changes made to SegmentReplica…
Rishikesh1159 Jan 31, 2024
a8ee4cd
Merge branch 'opensearch-project:main' into segrep-enabled-integTest
Rishikesh1159 Jan 31, 2024
dcdc7bf
revert changes made to Segrep tests.
Rishikesh1159 Jan 31, 2024
cdd1f78
Merge branch 'segrep-enabled-integTest' of https://github.com/Rishike…
Rishikesh1159 Jan 31, 2024
3ccd516
Refactor and address comments.
Rishikesh1159 Feb 5, 2024
df63dca
fix failure of using forbidden api new Random().
Rishikesh1159 Feb 5, 2024
1c8ccfb
Add comments to debug.
Rishikesh1159 Feb 5, 2024
19d99d8
Remove non-critical tests from running with segrep.
Rishikesh1159 Feb 7, 2024
718ecb3
Merge branch 'opensearch-project:main' into segrep-enabled-integTest
Rishikesh1159 Feb 7, 2024
0eaa818
Fix test to run with segrep.
Rishikesh1159 Feb 7, 2024
4649c42
Merge branch 'segrep-enabled-integTest' of https://github.com/Rishike…
Rishikesh1159 Feb 7, 2024
bcc0ca0
Merge branch 'opensearch-project:main' into segrep-enabled-integTest
Rishikesh1159 Feb 8, 2024
d2a73b6
separate out refresh and waitForReplication into different methods.
Rishikesh1159 Feb 8, 2024
d7e9027
Merge branch 'segrep-enabled-integTest' of https://github.com/Rishike…
Rishikesh1159 Feb 8, 2024
de8e7e2
refactor with usage of waitForReplication().
Rishikesh1159 Feb 8, 2024
2c305cb
fix parameters passed in factory for IndexStatsIT.
Rishikesh1159 Feb 8, 2024
e6d3703
Update IndexstatsIT to run with segrep
Rishikesh1159 Feb 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationShardStats;
Expand Down Expand Up @@ -175,17 +174,6 @@ private IndexShard getIndexShard(ClusterState state, ShardRouting routing, Strin
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName);
}

/**
* Fetch IndexShard by shardId, multiple shards per node allowed.
*/
protected IndexShard getIndexShard(String node, ShardId shardId, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> id = indexService.shardIds().stream().filter(sid -> sid == shardId.id()).findFirst();
return indexService.getShard(id.get());
}

/**
* Fetch IndexShard, assumes only a single shard per node.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
Expand All @@ -130,7 +131,8 @@ public IndexStatsIT(Settings settings) {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() }
);
}

Expand Down Expand Up @@ -175,7 +177,7 @@ public void testFieldDataStats() throws InterruptedException {
ensureGreen();
client().prepareIndex("test").setId("1").setSource("field", "value1", "field2", "value1").execute().actionGet();
client().prepareIndex("test").setId("2").setSource("field", "value2", "field2", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
refreshAndWaitForReplication();
indexRandomForConcurrentSearch("test");

NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
Expand Down Expand Up @@ -299,7 +301,7 @@ public void testClearAllCaches() throws Exception {
client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client().prepareIndex("test").setId("1").setSource("field", "value1").execute().actionGet();
client().prepareIndex("test").setId("2").setSource("field", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
refreshAndWaitForReplication();
indexRandomForConcurrentSearch("test");

NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
Expand Down Expand Up @@ -667,7 +669,7 @@ public void testSimpleStats() throws Exception {
client().prepareIndex("test1").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test1").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();
refresh();
refreshAndWaitForReplication();

NumShards test1 = getNumShards("test1");
long test1ExpectedWrites = 2 * test1.dataCopies;
Expand All @@ -682,7 +684,13 @@ public void testSimpleStats() throws Exception {
assertThat(stats.getPrimaries().getIndexing().getTotal().getIndexFailedCount(), equalTo(0L));
assertThat(stats.getPrimaries().getIndexing().getTotal().isThrottled(), equalTo(false));
assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis(), equalTo(0L));
assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites));

// This assert should not be done on segrep enabled indices because we are asserting Indexing/Write operations count on
// all primary and replica shards. But in case of segrep, Indexing/Write operation don't happen on replica shards. So we can
// ignore this assert check for segrep enabled indices.
if (isSegmentReplicationEnabledForIndex("test1") == false && isSegmentReplicationEnabledForIndex("test2") == false) {
assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites));
}
assertThat(stats.getTotal().getStore(), notNullValue());
assertThat(stats.getTotal().getMerge(), notNullValue());
assertThat(stats.getTotal().getFlush(), notNullValue());
Expand Down Expand Up @@ -825,6 +833,7 @@ public void testMergeStats() {
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
stats = client().admin().indices().prepareStats().setMerge(true).execute().actionGet();

refreshAndWaitForReplication();
assertThat(stats.getTotal().getMerge(), notNullValue());
assertThat(stats.getTotal().getMerge().getTotal(), greaterThan(0L));
}
Expand All @@ -851,7 +860,7 @@ public void testSegmentsStats() {

client().admin().indices().prepareFlush().get();
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
client().admin().indices().prepareRefresh().get();
refreshAndWaitForReplication();
stats = client().admin().indices().prepareStats().setSegments(true).get();

assertThat(stats.getTotal().getSegments(), notNullValue());
Expand All @@ -869,7 +878,7 @@ public void testAllFlags() throws Exception {
client().prepareIndex("test_index").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test_index_2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();

client().admin().indices().prepareRefresh().execute().actionGet();
refreshAndWaitForReplication();
IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats();
Flag[] values = CommonStatsFlags.Flag.values();
for (Flag flag : values) {
Expand Down Expand Up @@ -1453,6 +1462,7 @@ public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() {
.get()
.status()
);
refreshAndWaitForReplication();
ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).setTranslog(true).get().getShards()[0];
RemoteSegmentStats remoteSegmentStatsFromIndexStats = shard.getStats().getSegments().getRemoteSegmentStats();
assertZeroRemoteSegmentStats(remoteSegmentStatsFromIndexStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.recovery;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.refresh.RefreshResponse;
Expand All @@ -52,10 +54,11 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -69,12 +72,26 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoTimeout;

public class RecoveryWhileUnderLoadIT extends OpenSearchIntegTestCase {
public class RecoveryWhileUnderLoadIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {

public RecoveryWhileUnderLoadIT(Settings settings) {
super(settings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT).build() },
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() }
);
}

private final Logger logger = LogManager.getLogger(RecoveryWhileUnderLoadIT.class);

public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin {
Expand Down Expand Up @@ -150,7 +167,7 @@ public void testRecoverWhileUnderLoadAllocateReplicasTest() throws Exception {
logger.info("--> indexing threads stopped");

logger.info("--> refreshing the index");
refreshAndAssert();
assertAfterRefreshAndWaitForReplication();
logger.info("--> verifying indexed content");
iterateAssertCount(numberOfShards, 10, indexer.getIds());
}
Expand Down Expand Up @@ -211,7 +228,7 @@ public void testRecoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest() thr
logger.info("--> indexing threads stopped");

logger.info("--> refreshing the index");
refreshAndAssert();
assertAfterRefreshAndWaitForReplication();
logger.info("--> verifying indexed content");
iterateAssertCount(numberOfShards, 10, indexer.getIds());
}
Expand Down Expand Up @@ -325,7 +342,7 @@ public void testRecoverWhileUnderLoadWithReducedAllowedNodes() throws Exception
);

logger.info("--> refreshing the index");
refreshAndAssert();
assertAfterRefreshAndWaitForReplication();
logger.info("--> verifying indexed content");
iterateAssertCount(numberOfShards, 10, indexer.getIds());
}
Expand Down Expand Up @@ -375,7 +392,7 @@ public void testRecoverWhileRelocating() throws Exception {
ensureGreen(TimeValue.timeValueMinutes(5));

logger.info("--> refreshing the index");
refreshAndAssert();
assertAfterRefreshAndWaitForReplication();
logger.info("--> verifying indexed content");
iterateAssertCount(numShards, 10, indexer.getIds());
}
Expand Down Expand Up @@ -474,10 +491,11 @@ private void logSearchResponse(int numberOfShards, long numberOfDocs, int iterat
);
}

private void refreshAndAssert() throws Exception {
private void assertAfterRefreshAndWaitForReplication() throws Exception {
assertBusy(() -> {
RefreshResponse actionGet = client().admin().indices().prepareRefresh().get();
assertAllSuccessful(actionGet);
}, 5, TimeUnit.MINUTES);
waitForReplication();
}
}
Loading
Loading