Skip to content

Commit

Permalink
Merge branch 'main' into fix-10682
Browse files Browse the repository at this point in the history
  • Loading branch information
shiv0408 committed Sep 5, 2024
2 parents bd9c314 + f33c786 commit d8a272d
Show file tree
Hide file tree
Showing 52 changed files with 1,792 additions and 319 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428)))
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- [Reader Writer Separation] Add experimental search replica shard type to achieve reader writer separation ([#15237](https://github.com/opensearch-project/OpenSearch/pull/15237))
- [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788))
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363))
Expand All @@ -53,6 +53,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- ClusterManagerTaskThrottler Improvements ([#15508](https://github.com/opensearch-project/OpenSearch/pull/15508))
- Reset DiscoveryNodes in all transport node actions request ([#15131](https://github.com/opensearch-project/OpenSearch/pull/15131))
- Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471))
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.allocation;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
}

public void testSearchReplicaDedicatedIncludes() {
List<String> nodesIds = internalCluster().startNodes(3);
final String node_0 = nodesIds.get(0);
final String node_1 = nodesIds.get(1);
final String node_2 = nodesIds.get(2);
assertEquals(3, cluster().size());

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0)
)
.execute()
.actionGet();

createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureGreen("test");
// ensure primary is not on node 0 or 1,
IndexShardRoutingTable routingTable = getRoutingTable();
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));

String existingSearchReplicaNode = getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId());
String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0;

// set the included nodes to the other open node, search replica should relocate to that node.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode))
.execute()
.actionGet();
ensureGreen("test");

routingTable = getRoutingTable();
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
assertEquals(emptyAllowedNode, getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId()));
}

public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() {
List<String> nodesIds = internalCluster().startNodes(3);
final String node_0 = nodesIds.get(0);
final String node_1 = nodesIds.get(1);
final String node_2 = nodesIds.get(2);
assertEquals(3, cluster().size());

// set filter on 1 node and set search replica count to 2 - should leave 1 unassigned
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1))
.execute()
.actionGet();

logger.info("--> creating an index with no replicas");
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureYellowAndNoInitializingShards("test");
IndexShardRoutingTable routingTable = getRoutingTable();
assertEquals(2, routingTable.searchOnlyReplicas().size());
List<ShardRouting> assignedSearchShards = routingTable.searchOnlyReplicas()
.stream()
.filter(ShardRouting::assignedToNode)
.collect(Collectors.toList());
assertEquals(1, assignedSearchShards.size());
assertEquals(node_1, getNodeName(assignedSearchShards.get(0).currentNodeId()));
assertEquals(1, routingTable.searchOnlyReplicas().stream().filter(ShardRouting::unassigned).count());
}

private IndexShardRoutingTable getRoutingTable() {
IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0);
return routingTable;
}

private String getNodeName(String id) {
return getClusterState().nodes().get(id).getName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;

private Boolean useRemoteStore;

@Before
public void randomizeRemoteStoreEnabled() {
useRemoteStore = randomBoolean();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (useRemoteStore) {
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
}
return super.nodeSettings(nodeOrdinal);
}

@After
public void teardown() {
if (useRemoteStore) {
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
}
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
}

public void testReplication() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(docCount, primary, replica);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1)
public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -53,4 +54,15 @@ public void testUpdateFeatureFlagDisabled() {
});
assertTrue(settingsException.getMessage().contains("unknown setting"));
}

public void testFilterAllocationSettingNotRegistered() {
expectThrows(SettingsException.class, () -> {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", "node"))
.execute()
.actionGet();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@

package org.opensearch.indices.settings;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -110,7 +113,6 @@ public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOExceptio
// add back a node
internalCluster().startDataOnlyNode();
ensureGreen(TEST_INDEX);

}

public void testFailoverWithSearchReplica_WithoutWriterReplicas() throws IOException {
Expand Down Expand Up @@ -175,6 +177,39 @@ public void testSearchReplicaScaling() {
assertActiveSearchShards(0);
}

public void testSearchReplicaRoutingPreference() throws IOException {
int numSearchReplicas = 1;
int numWriterReplicas = 1;
internalCluster().startClusterManagerOnlyNode();
String primaryNodeName = internalCluster().startDataOnlyNode();
createIndex(
TEST_INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas)
.build()
);
ensureYellow(TEST_INDEX);
client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
// add 2 nodes for the replicas
internalCluster().startDataOnlyNodes(2);
ensureGreen(TEST_INDEX);

assertActiveShardCounts(numSearchReplicas, numWriterReplicas);

// set preference to search replica here - we default to this when there are
// search replicas but tests will randomize this value if unset
SearchResponse response = client().prepareSearch(TEST_INDEX)
.setPreference(Preference.SEARCH_REPLICA.type())
.setQuery(QueryBuilders.matchAllQuery())
.get();

String nodeId = response.getHits().getAt(0).getShard().getNodeId();
IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable();
assertEquals(nodeId, indexShardRoutingTable.searchOnlyReplicas().get(0).currentNodeId());
}

/**
* Helper to assert counts of active shards for each type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import static org.hamcrest.Matchers.lessThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class DeleteSnapshotITV2 extends AbstractSnapshotIntegTestCase {
public class DeleteSnapshotV2IT extends AbstractSnapshotIntegTestCase {

private static final String REMOTE_REPO_NAME = "remote-store-repo-name";

Expand Down Expand Up @@ -206,6 +206,7 @@ public void testDeleteShallowCopyV2MultipleSnapshots() throws Exception {

}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15692")
public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Expand Down Expand Up @@ -276,9 +277,11 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
Path shardPath = Path.of(String.valueOf(indexPath), "0");
Path segmentsPath = Path.of(String.valueOf(shardPath), "segments");
Path translogPath = Path.of(String.valueOf(shardPath), "translog");

// Get total segments remote store directory file count for deleted index and shard 0
int segmentFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath);
int translogFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(translogPath);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

Expand Down Expand Up @@ -312,6 +315,13 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountAfterDeletingSnapshot1));
} catch (Exception e) {}
}, 60, TimeUnit.SECONDS);

assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(translogPath), lessThan(translogFilesCountBeforeDeletingSnapshot1));
} catch (Exception e) {}
}, 60, TimeUnit.SECONDS);

}

private Settings snapshotV2Settings(Path remoteStoreRepoPath) {
Expand Down
Loading

0 comments on commit d8a272d

Please sign in to comment.