Skip to content

Commit

Permalink
relaxing the join validation for nodes which have only store disabled…
Browse files Browse the repository at this point in the history
… but only publication enabled

Signed-off-by: Rajiv Kumar Vaidyanathan <[email protected]>
  • Loading branch information
rajiv-kv committed Aug 31, 2024
1 parent 15ff668 commit edd2a46
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joi
List<String> reposToValidate = new ArrayList<>(2);
reposToValidate.add(RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY);
reposToValidate.add(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY);
ensureRemoteStoreNodesCompatibility(joiningNode, remotePublicationNode.get(), reposToValidate);
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), reposToValidate);
}
}

Expand Down Expand Up @@ -620,17 +620,15 @@ private static void ensureRepositoryCompatibility(DiscoveryNode joiningNode, Dis
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);

for (String repoToValidate : reposToValidate) {
if (existingRemoteStoreNodeAttribute.equalsForRepositories(joiningRemoteStoreNodeAttribute, reposToValidate) == false) {
throw new IllegalStateException(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in "
+ "comparison with existing node ["
+ existingNode
+ "]"
);
}
if (existingRemoteStoreNodeAttribute.equalsForRepositories(joiningRemoteStoreNodeAttribute, reposToValidate) == false) {
throw new IllegalStateException(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in "
+ "comparison with existing node ["
+ existingNode
+ "]"
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,24 @@ public boolean equalsIgnoreGenerationsWithRepoSkip(@Nullable RepositoriesMetadat
.filter(repo -> !reposToSkip.contains(repo.name()))
.collect(Collectors.toList());

return equalsRepository(currentRepositories, otherRepositories);
}

public boolean equalsIgnoreGenerationsForRepo(@Nullable RepositoriesMetadata other, List<String> reposToValidate) {
if (other == null) {
return false;
}
List<RepositoryMetadata> currentRepositories = repositories.stream()
.filter(repo -> reposToValidate.contains(repo.name()))
.collect(Collectors.toList());
List<RepositoryMetadata> otherRepositories = other.repositories.stream()
.filter(repo -> reposToValidate.contains(repo.name()))
.collect(Collectors.toList());

return equalsRepository(currentRepositories, otherRepositories);
}

public static boolean equalsRepository(List<RepositoryMetadata> currentRepositories, List<RepositoryMetadata> otherRepositories) {
if (otherRepositories.size() != currentRepositories.size()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;

/**
* A discovery node represents a node that is part of the cluster.
Expand Down Expand Up @@ -475,13 +474,8 @@ public boolean isSearchNode() {
* @return true if the node contains remote store node attributes, false otherwise
*/
public boolean isRemoteStoreNode() {
return this.getAttributes()
.keySet()
.stream()
.anyMatch(
key -> key.startsWith(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY)
|| key.startsWith(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY)
);
return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY))
&& this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -270,30 +269,7 @@ public boolean equalsForRepositories(Object otherNode, List<String> repositoryTo
if (otherNode == null || getClass() != otherNode.getClass()) return false;

RemoteStoreNodeAttribute other = (RemoteStoreNodeAttribute) otherNode;
List<RepositoryMetadata> currentRepositories = this.repositoriesMetadata.repositories()
.stream()
.filter(repos -> repositoryToValidate.contains(repos.name()))
.collect(Collectors.toList());

List<RepositoryMetadata> otherRepositories = other.repositoriesMetadata.repositories()
.stream()
.filter(repos -> repositoryToValidate.contains(repos.name()))
.collect(Collectors.toList());

if (otherRepositories.size() != currentRepositories.size()) {
return false;
}
// Sort repos by name for ordered comparison
Comparator<RepositoryMetadata> compareByName = (o1, o2) -> o1.name().compareTo(o2.name());
currentRepositories.sort(compareByName);
otherRepositories.sort(compareByName);

for (int i = 0; i < currentRepositories.size(); i++) {
if (currentRepositories.get(i).equalsIgnoreGenerations(otherRepositories.get(i)) == false) {
return false;
}
}
return true;
return this.getRepositoriesMetadata().equalsIgnoreGenerationsForRepo(other.repositoriesMetadata, repositoryToValidate);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdaterTests.createIndexMetadataWithRemoteStoreSettings;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
Expand Down Expand Up @@ -901,6 +902,7 @@ public void testDontAllowSwitchingCompatibilityModeForClusterWithMultipleVersion

private Map<String, String> getRemoteStoreNodeAttributes() {
Map<String, String> remoteStoreNodeAttributes = new HashMap<>();
remoteStoreNodeAttributes.put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-cluster-repo-1");
remoteStoreNodeAttributes.put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-segment-repo-1");
remoteStoreNodeAttributes.put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-translog-repo-1");
return remoteStoreNodeAttributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,16 +628,7 @@ public void testPreventJoinClusterWithRemoteStateNodeJoiningRemoteStoreCluster()
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
assertTrue(
e.getMessage()
.equals(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
+ "]"
)
);
assertTrue(e.getMessage().equals("a non remote store node [" + joiningNode + "] is trying to join a remote store cluster"));
}

public void testPreventJoinClusterWithRemoteStoreNodeJoiningRemoteStateCluster() {
Expand All @@ -657,16 +648,7 @@ public void testPreventJoinClusterWithRemoteStoreNodeJoiningRemoteStateCluster()
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
assertTrue(
e.getMessage()
.equals(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
+ "]"
)
);
assertTrue(e.getMessage().equals("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"));
}

public void testUpdatesClusterStateWithSingleNodeCluster() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.indices.ShardLimitValidatorTests.createTestShardLimitService;
import static org.opensearch.node.Node.NODE_ATTRIBUTES;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo;
Expand Down Expand Up @@ -1644,7 +1645,12 @@ public void testNewIndexIsRemoteStoreBackedForRemoteStoreDirectionAndMixedMode()
null
);

Map<String, String> missingTranslogAttribute = Map.of(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-segment-repo-1");
Map<String, String> missingTranslogAttribute = Map.of(
REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
"cluster-state-repo-1",
REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY,
"my-segment-repo-1"
);

DiscoveryNodes finalDiscoveryNodes = DiscoveryNodes.builder()
.add(nonRemoteClusterManagerNode)
Expand Down Expand Up @@ -2292,6 +2298,7 @@ private void verifyRemoteStoreIndexSettings(

private DiscoveryNode getRemoteNode() {
Map<String, String> attributes = new HashMap<>();
attributes.put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-cluster-rep-1");
attributes.put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-segment-repo-1");
attributes.put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-translog-repo-1");
return new DiscoveryNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
Expand Down Expand Up @@ -853,6 +854,8 @@ public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() {

// add a remote node and start primary shard
Map<String, String> remoteStoreNodeAttributes = Map.of(
REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
"REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY",
REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY,
"REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_VALUE",
REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.NONE;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.REMOTE_STORE;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
Expand Down Expand Up @@ -617,6 +618,7 @@ private DiscoveryNode getRemoteNode() {
REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
"REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_VALUE"
);
attributes.put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_VALUE");
return new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX;
import static org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils.SEPARATOR;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
Expand Down Expand Up @@ -533,6 +534,7 @@ private RoutingTable createRoutingTableAllShardsStarted(

private Map<String, String> getRemoteStoreNodeAttributes() {
Map<String, String> remoteStoreNodeAttributes = new HashMap<>();
remoteStoreNodeAttributes.put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-cluster-repo-1");
remoteStoreNodeAttributes.put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-segment-repo-1");
remoteStoreNodeAttributes.put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-translog-repo-1");
return remoteStoreNodeAttributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;

public class IndexShardTestUtils {
public static final String MOCK_STATE_REPO_NAME = "state-test-repo";
public static final String MOCK_SEGMENT_REPO_NAME = "segment-test-repo";
public static final String MOCK_TLOG_REPO_NAME = "tlog-test-repo";

Expand All @@ -37,6 +38,7 @@ public static DiscoveryNode getFakeDiscoNode(String id) {

public static DiscoveryNode getFakeRemoteEnabledNode(String id) {
Map<String, String> remoteNodeAttributes = new HashMap<String, String>();
remoteNodeAttributes.put(RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, MOCK_STATE_REPO_NAME);
remoteNodeAttributes.put(RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, MOCK_SEGMENT_REPO_NAME);
remoteNodeAttributes.put(RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, MOCK_TLOG_REPO_NAME);
return new DiscoveryNode(
Expand Down

0 comments on commit edd2a46

Please sign in to comment.