Skip to content

Commit

Permalink
[Metadata Immutability] Change different indices lookup objects from …
Browse files Browse the repository at this point in the history
…array type to lists

Changed the arrays to immutable List instances, added new versions of the getters which returns List instances.

Resolves #8647

Signed-off-by: Abdul Muneer Kolarkunnu <[email protected]>
  • Loading branch information
akolarkunnu committed Jul 11, 2024
1 parent 098be2e commit 26886e0
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static java.util.Collections.emptyMap;
import static org.opensearch.core.xcontent.ConstructingObjectParser.constructorArg;
Expand Down Expand Up @@ -215,13 +216,13 @@ public ClusterHealthResponse(StreamInput in) throws IOException {
}

/** needed for plugins BWC */
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState) {
public ClusterHealthResponse(String clusterName, Set<String> concreteIndices, ClusterState clusterState) {
this(clusterName, concreteIndices, clusterState, -1, -1, -1, TimeValue.timeValueHours(0));
}

public ClusterHealthResponse(
String clusterName,
String[] concreteIndices,
Set<String> concreteIndices,
ClusterState clusterState,
int numberOfPendingTasks,
int numberOfInFlightFetch,
Expand All @@ -242,7 +243,7 @@ public ClusterHealthResponse(
String clusterName,
ClusterState clusterState,
ClusterSettings clusterSettings,
String[] concreteIndices,
Set<String> concreteIndices,
String awarenessAttributeName,
int numberOfPendingTasks,
int numberOfInFlightFetch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
Expand All @@ -71,6 +70,8 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -490,10 +491,10 @@ private ClusterHealthResponse clusterHealth(
logger.trace("Calculating health based on state version [{}]", clusterState.version());
}

String[] concreteIndices;
Set<String> concreteIndices;
if (request.level().equals(ClusterHealthRequest.Level.AWARENESS_ATTRIBUTES)) {
String awarenessAttribute = request.getAwarenessAttribute();
concreteIndices = clusterState.getMetadata().getConcreteAllIndices().toArray(String[]::new);
concreteIndices = clusterState.getMetadata().getConcreteAllIndices();
return new ClusterHealthResponse(
clusterState.getClusterName().value(),
clusterState,
Expand All @@ -508,12 +509,12 @@ private ClusterHealthResponse clusterHealth(
}

try {
concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, request);
concreteIndices = Set.of(indexNameExpressionResolver.concreteIndexNames(clusterState, request));
} catch (IndexNotFoundException e) {
// one of the specified indices is not there - treat it as RED.
ClusterHealthResponse response = new ClusterHealthResponse(
clusterState.getClusterName().value(),
Strings.EMPTY_ARRAY,
Collections.emptySet(),
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ protected void clusterManagerOperation(Request request, ClusterState state, Acti
}
ClusterStateHealth streamHealth = new ClusterStateHealth(
state,
dataStream.getIndices().stream().map(Index::getName).toArray(String[]::new)
dataStream.getIndices().stream().map(Index::getName).collect(Collectors.toSet())
);
dataStreamInfos.add(new Response.DataStreamInfo(dataStream, streamHealth.getStatus(), indexTemplate));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* Cluster state health information
Expand All @@ -73,16 +74,16 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
* @param clusterState The current cluster state. Must not be null.
*/
public ClusterStateHealth(final ClusterState clusterState) {
this(clusterState, clusterState.metadata().getConcreteAllIndices().toArray(String[]::new));
this(clusterState, clusterState.metadata().getConcreteAllIndices());
}

/**
* Creates a new <code>ClusterStateHealth</code> instance considering the current cluster state and the provided index names.
*
* @param clusterState The current cluster state. Must not be null.
* @param concreteIndices An array of index names to consider. Must not be null but may be empty.
* @param concreteIndices A set of index names to consider. Must not be null but may be empty.
*/
public ClusterStateHealth(final ClusterState clusterState, final String[] concreteIndices) {
public ClusterStateHealth(final ClusterState clusterState, final Set<String> concreteIndices) {
numberOfNodes = clusterState.nodes().getSize();
numberOfDataNodes = clusterState.nodes().getDataNodes().size();
hasDiscoveredClusterManager = clusterState.nodes().getClusterManagerNodeId() != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -93,7 +94,7 @@ public void testClusterHealth() throws IOException {
TimeValue pendingTaskInQueueTime = TimeValue.timeValueMillis(randomIntBetween(1000, 100000));
ClusterHealthResponse clusterHealth = new ClusterHealthResponse(
"bla",
new String[] { Metadata.ALL },
Set.of(Metadata.ALL),
clusterState,
pendingTasks,
inFlight,
Expand Down Expand Up @@ -121,7 +122,7 @@ public void testClusterHealthVerifyClusterManagerNodeDiscovery() throws IOExcept
TimeValue pendingTaskInQueueTime = TimeValue.timeValueMillis(randomIntBetween(1000, 100000));
ClusterHealthResponse clusterHealth = new ClusterHealthResponse(
"bla",
new String[] { Metadata.ALL },
Set.of(Metadata.ALL),
clusterState,
pendingTasks,
inFlight,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;

import static org.hamcrest.core.IsEqual.equalTo;

public class TransportClusterHealthActionTests extends OpenSearchTestCase {

public void testWaitForInitializingShards() throws Exception {
final String[] indices = { "test" };
final Set<String> indices = Set.of("test");
final ClusterHealthRequest request = new ClusterHealthRequest();
request.waitForNoInitializingShards(true);
ClusterState clusterState = randomClusterStateWithInitializingShards("test", 0);
Expand All @@ -76,7 +77,7 @@ public void testWaitForInitializingShards() throws Exception {
}

public void testWaitForAllShards() {
final String[] indices = { "test" };
final Set<String> indices = Set.of("test");
final ClusterHealthRequest request = new ClusterHealthRequest();
request.waitForActiveShards(ActiveShardCount.ALL);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,8 @@ public void testClusterHealth() throws IOException {
.routingTable(routingTable.build())
.nodes(clusterService.state().nodes())
.build();
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(
clusterState,
IndicesOptions.strictExpand(),
(String[]) null
Set<String> concreteIndices = Set.of(
indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.strictExpand(), (String[]) null)
);
ClusterStateHealth clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices);
logger.info("cluster status: {}, expected {}", clusterStateHealth.getStatus(), counter.status());
Expand All @@ -230,7 +228,7 @@ public void testClusterHealth() throws IOException {

public void testClusterHealthOnIndexCreation() {
final String indexName = "test-idx";
final String[] indices = new String[] { indexName };
final Set<String> indices = Set.of(indexName);
final List<ClusterState> clusterStates = simulateIndexCreationStates(indexName, false);
for (int i = 0; i < clusterStates.size(); i++) {
// make sure cluster health is always YELLOW, up until the last state where it should be GREEN
Expand All @@ -246,7 +244,7 @@ public void testClusterHealthOnIndexCreation() {

public void testClusterHealthOnIndexCreationWithFailedAllocations() {
final String indexName = "test-idx";
final String[] indices = new String[] { indexName };
final Set<String> indices = Set.of(indexName);
final List<ClusterState> clusterStates = simulateIndexCreationStates(indexName, true);
for (int i = 0; i < clusterStates.size(); i++) {
// make sure cluster health is YELLOW up until the final cluster state, which contains primary shard
Expand All @@ -263,7 +261,7 @@ public void testClusterHealthOnIndexCreationWithFailedAllocations() {

public void testClusterHealthOnClusterRecovery() {
final String indexName = "test-idx";
final String[] indices = new String[] { indexName };
final Set<String> indices = Set.of(indexName);
final List<ClusterState> clusterStates = simulateClusterRecoveryStates(indexName, false, false);
for (int i = 0; i < clusterStates.size(); i++) {
// make sure cluster health is YELLOW up until the final cluster state, when it turns GREEN
Expand All @@ -279,7 +277,7 @@ public void testClusterHealthOnClusterRecovery() {

public void testClusterHealthOnClusterRecoveryWithFailures() {
final String indexName = "test-idx";
final String[] indices = new String[] { indexName };
final Set<String> indices = Set.of(indexName);
final List<ClusterState> clusterStates = simulateClusterRecoveryStates(indexName, false, true);
for (int i = 0; i < clusterStates.size(); i++) {
// make sure cluster health is YELLOW up until the final cluster state, which contains primary shard
Expand All @@ -296,7 +294,7 @@ public void testClusterHealthOnClusterRecoveryWithFailures() {

public void testClusterHealthOnClusterRecoveryWithPreviousAllocationIds() {
final String indexName = "test-idx";
final String[] indices = new String[] { indexName };
final Set<String> indices = Set.of(indexName);
final List<ClusterState> clusterStates = simulateClusterRecoveryStates(indexName, true, false);
for (int i = 0; i < clusterStates.size(); i++) {
// because there were previous allocation ids, we should be RED until the primaries are started,
Expand All @@ -319,7 +317,7 @@ public void testClusterHealthOnClusterRecoveryWithPreviousAllocationIds() {

public void testClusterHealthOnClusterRecoveryWithPreviousAllocationIdsAndAllocationFailures() {
final String indexName = "test-idx";
final String[] indices = new String[] { indexName };
final Set<String> indices = Set.of(indexName);
for (final ClusterState clusterState : simulateClusterRecoveryStates(indexName, true, true)) {
final ClusterStateHealth health = new ClusterStateHealth(clusterState, indices);
// if the inactive primaries are due solely to recovery (not failed allocation or previously being allocated)
Expand Down

0 comments on commit 26886e0

Please sign in to comment.