Skip to content

Commit

Permalink
Merge branch 'main' of github.com:opensearch-project/OpenSearch into …
Browse files Browse the repository at this point in the history
…avgfix
  • Loading branch information
bharath-techie committed Sep 4, 2024
2 parents 87abc91 + d2bc9fc commit 1207ded
Show file tree
Hide file tree
Showing 16 changed files with 241 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010))
- ClusterManagerTaskThrottler Improvements ([#15508](https://github.com/opensearch-project/OpenSearch/pull/15508))
- Reset DiscoveryNodes in all transport node actions request ([#15131](https://github.com/opensearch-project/OpenSearch/pull/15131))
- Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@

import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT;
Expand Down Expand Up @@ -458,7 +459,7 @@ public static void ensureNodesCompatibility(
);
}

ensureRemoteStoreNodesCompatibility(joiningNode, currentNodes, metadata);
ensureRemoteRepositoryCompatibility(joiningNode, currentNodes, metadata);
}

/**
Expand Down Expand Up @@ -491,6 +492,30 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata)
}
}

public static void ensureRemoteRepositoryCompatibility(DiscoveryNode joiningNode, DiscoveryNodes currentNodes, Metadata metadata) {
List<DiscoveryNode> existingNodes = new ArrayList<>(currentNodes.getNodes().values());

boolean isClusterRemoteStoreEnabled = existingNodes.stream().anyMatch(DiscoveryNode::isRemoteStoreNode);
if (isClusterRemoteStoreEnabled || joiningNode.isRemoteStoreNode()) {
ensureRemoteStoreNodesCompatibility(joiningNode, currentNodes, metadata);
} else {
ensureRemoteClusterStateNodesCompatibility(joiningNode, currentNodes);
}
}

private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNodes currentNodes) {
List<DiscoveryNode> existingNodes = new ArrayList<>(currentNodes.getNodes().values());

assert existingNodes.isEmpty() == false;
Optional<DiscoveryNode> remotePublicationNode = existingNodes.stream()
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
.findFirst();

if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) {
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES);
}
}

/**
* The method ensures homogeneity -
* 1. The joining node has to be a remote store backed if it's joining a remote store backed cluster. Validates
Expand All @@ -506,6 +531,7 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata)
* needs to be modified.
*/
private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNodes currentNodes, Metadata metadata) {

List<DiscoveryNode> existingNodes = new ArrayList<>(currentNodes.getNodes().values());

assert existingNodes.isEmpty() == false;
Expand Down Expand Up @@ -587,6 +613,23 @@ private static void ensureRemoteStoreNodesCompatibility(
}
}

private static void ensureRepositoryCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode, List<String> reposToValidate) {

RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);

if (existingRemoteStoreNodeAttribute.equalsForRepositories(joiningRemoteStoreNodeAttribute, reposToValidate) == false) {
throw new IllegalStateException(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in "
+ "comparison with existing node ["
+ existingNode
+ "]"
);
}
}

public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoinValidators(
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,24 @@ public boolean equalsIgnoreGenerationsWithRepoSkip(@Nullable RepositoriesMetadat
.filter(repo -> !reposToSkip.contains(repo.name()))
.collect(Collectors.toList());

return equalsRepository(currentRepositories, otherRepositories);
}

public boolean equalsIgnoreGenerationsForRepo(@Nullable RepositoriesMetadata other, List<String> reposToValidate) {
if (other == null) {
return false;
}
List<RepositoryMetadata> currentRepositories = repositories.stream()
.filter(repo -> reposToValidate.contains(repo.name()))
.collect(Collectors.toList());
List<RepositoryMetadata> otherRepositories = other.repositories.stream()
.filter(repo -> reposToValidate.contains(repo.name()))
.collect(Collectors.toList());

return equalsRepository(currentRepositories, otherRepositories);
}

public static boolean equalsRepository(List<RepositoryMetadata> currentRepositories, List<RepositoryMetadata> otherRepositories) {
if (otherRepositories.size() != currentRepositories.size()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;

/**
* A discovery node represents a node that is part of the cluster.
Expand Down Expand Up @@ -509,7 +510,8 @@ public boolean isSearchNode() {
* @return true if the node contains remote store node attributes, false otherwise
*/
public boolean isRemoteStoreNode() {
return this.getAttributes().keySet().stream().anyMatch(key -> key.startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX));
return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY))
&& this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public class RemoteStoreNodeAttribute {
REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY
);

public static List<String> REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES = List.of(
REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
);

/**
* Creates a new {@link RemoteStoreNodeAttribute}
*/
Expand Down Expand Up @@ -261,6 +266,14 @@ public boolean equalsWithRepoSkip(Object o, List<String> reposToSkip) {
return this.getRepositoriesMetadata().equalsIgnoreGenerationsWithRepoSkip(that.getRepositoriesMetadata(), reposToSkip);
}

public boolean equalsForRepositories(Object otherNode, List<String> repositoryToValidate) {
if (this == otherNode) return true;
if (otherNode == null || getClass() != otherNode.getClass()) return false;

RemoteStoreNodeAttribute other = (RemoteStoreNodeAttribute) otherNode;
return this.getRepositoriesMetadata().equalsIgnoreGenerationsForRepo(other.repositoriesMetadata, repositoryToValidate);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/repositories/IndexId.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public IndexId(String name, String id, int shardPathType) {
public IndexId(final StreamInput in) throws IOException {
this.name = in.readString();
this.id = in.readString();
if (in.getVersion().onOrAfter(Version.CURRENT)) {
if (in.getVersion().onOrAfter(Version.V_2_17_0)) {
this.shardPathType = in.readVInt();
} else {
this.shardPathType = DEFAULT_SHARD_PATH_TYPE;
Expand Down Expand Up @@ -145,7 +145,7 @@ private int computeHashCode() {
public void writeTo(final StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(id);
if (out.getVersion().onOrAfter(Version.CURRENT)) {
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
out.writeVInt(shardPathType);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ public List<IndexId> resolveNewIndices(List<String> indicesToResolve, Map<String

// Visible for testing only
public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final Version repoMetaVersion) throws IOException {
return snapshotsToXContent(builder, repoMetaVersion, Version.CURRENT);
return snapshotsToXContent(builder, repoMetaVersion, Version.V_2_17_0);
}

/**
Expand Down Expand Up @@ -589,7 +589,7 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final
for (final IndexId indexId : getIndices().values()) {
builder.startObject(indexId.getName());
builder.field(INDEX_ID, indexId.getId());
if (minNodeVersion.onOrAfter(Version.CURRENT)) {
if (minNodeVersion.onOrAfter(Version.V_2_17_0)) {
builder.field(IndexId.SHARD_PATH_TYPE, indexId.getShardPathType());
}
builder.startArray(SNAPSHOTS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ public ClusterState execute(ClusterState currentState) {

logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);

int pathType = clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.CURRENT)
int pathType = clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_2_17_0)
? SHARD_PATH_TYPE.get(repository.getMetadata().settings()).getCode()
: IndexId.DEFAULT_SHARD_PATH_TYPE;
final List<IndexId> indexIds = repositoryData.resolveNewIndices(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdaterTests.createIndexMetadataWithRemoteStoreSettings;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
Expand Down Expand Up @@ -901,6 +902,7 @@ public void testDontAllowSwitchingCompatibilityModeForClusterWithMultipleVersion

private Map<String, String> getRemoteStoreNodeAttributes() {
Map<String, String> remoteStoreNodeAttributes = new HashMap<>();
remoteStoreNodeAttributes.put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-cluster-repo-1");
remoteStoreNodeAttributes.put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-segment-repo-1");
remoteStoreNodeAttributes.put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-translog-repo-1");
return remoteStoreNodeAttributes;
Expand Down
Loading

0 comments on commit 1207ded

Please sign in to comment.