diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4ff22076f5530..e65cc30aecd7e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Feature] Expose term frequency in Painless script score context ([#9081](https://github.com/opensearch-project/OpenSearch/pull/9081))
- Add support for reading partial files to HDFS repository ([#9513](https://github.com/opensearch-project/OpenSearch/issues/9513))
- Add support for extensions to search responses using SearchExtBuilder ([#9379](https://github.com/opensearch-project/OpenSearch/pull/9379))
+- [Remote State] Create service to publish cluster state to remote store ([#9160](https://github.com/opensearch-project/OpenSearch/pull/9160))
- [BWC and API enforcement] Decorate the existing APIs with proper annotations (part 1) ([#9520](https://github.com/opensearch-project/OpenSearch/pull/9520))
- Add concurrent segment search related metrics to node and index stats ([#9622](https://github.com/opensearch-project/OpenSearch/issues/9622))
- Decouple replication lag from logic to fail stale replicas ([#9507](https://github.com/opensearch-project/OpenSearch/pull/9507))
diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
index 6cb4992932b8e..e00e7e3bf4ea7 100644
--- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
+++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
@@ -97,6 +97,7 @@
import org.opensearch.gateway.DanglingIndicesState;
import org.opensearch.gateway.GatewayService;
import org.opensearch.gateway.PersistedClusterStateService;
+import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
@@ -664,7 +665,11 @@ public void apply(Settings value, Settings current, Settings previous) {
// Related to monitoring of task cancellation
TaskCancellationMonitoringSettings.IS_ENABLED_SETTING,
- TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING
+ TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING,
+
+ // Remote cluster state settings
+ RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
+ RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING
)
)
);
diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
index af894bdbc117e..02f1e5049b95c 100644
--- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
+++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
@@ -60,8 +60,11 @@
import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.env.NodeMetadata;
+import org.opensearch.gateway.remote.ClusterMetadataManifest;
+import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.plugins.MetadataUpgrader;
+import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
@@ -84,19 +87,19 @@
/**
* Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
*
- * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that
- * the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link
- * ClusterState#metadata()} because it might be stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and
- * non-stale state, and cluster-manager-ineligible nodes receive the real cluster state from the elected cluster-manager after joining the cluster.
+ * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that the state being
+ * loaded when constructing the instance of this class is not necessarily the state that will be used as {@link ClusterState#metadata()} because it might be
+ * stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and non-stale state, and cluster-manager-ineligible nodes
+ * receive the real cluster state from the elected cluster-manager after joining the cluster.
*
* @opensearch.internal
*/
public class GatewayMetaState implements Closeable {
/**
- * Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially
- * stale (since it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is
- * restarted as a cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state.
+ * Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially stale (since
+ * it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is restarted as a
+ * cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state.
*/
public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG";
@@ -234,8 +237,8 @@ Metadata upgradeMetadataForNode(
}
/**
- * This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current
- * version. The MetadataIndexUpgradeService might also update obsolete settings if needed.
+ * This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current version. The MetadataIndexUpgradeService
+ * might also update obsolete settings if needed.
*
* @return input metadata
if no upgrade is needed or an upgraded metadata
*/
@@ -599,4 +602,121 @@ public void close() throws IOException {
IOUtils.close(persistenceWriter.getAndSet(null));
}
}
+
+ /**
+ * Encapsulates the writing of metadata to a remote store using {@link RemoteClusterStateService}.
+ */
+ public static class RemotePersistedState implements PersistedState {
+
+ private static final Logger logger = LogManager.getLogger(RemotePersistedState.class);
+
+ private ClusterState lastAcceptedState;
+ private ClusterMetadataManifest lastAcceptedManifest;
+ private final RemoteClusterStateService remoteClusterStateService;
+
+ public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService) {
+ this.remoteClusterStateService = remoteClusterStateService;
+ }
+
+ @Override
+ public long getCurrentTerm() {
+ return lastAcceptedState != null ? lastAcceptedState.term() : 0L;
+ }
+
+ @Override
+ public ClusterState getLastAcceptedState() {
+ return lastAcceptedState;
+ }
+
+ @Override
+ public void setCurrentTerm(long currentTerm) {
+ // no-op
+ // For LucenePersistedState, setCurrentTerm is used only while handling StartJoinRequest by all follower nodes.
+ // But for RemotePersistedState, the state is only pushed by the active cluster. So this method is not required.
+ }
+
+ @Override
+ public void setLastAcceptedState(ClusterState clusterState) {
+ try {
+ if (lastAcceptedState == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
+ // On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
+ logger.info("Cluster is not yet ready to publish state to remote store");
+ lastAcceptedState = clusterState;
+ return;
+ }
+ final ClusterMetadataManifest manifest;
+ if (shouldWriteFullClusterState(clusterState)) {
+ manifest = remoteClusterStateService.writeFullMetadata(clusterState);
+ } else {
+ assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true
+ : "Previous manifest and previous ClusterState are not in sync";
+ manifest = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedManifest);
+ }
+ assert verifyManifestAndClusterState(manifest, clusterState) == true : "Manifest and ClusterState are not in sync";
+ lastAcceptedManifest = manifest;
+ lastAcceptedState = clusterState;
+ } catch (RepositoryMissingException e) {
+ // TODO This logic needs to be modified once PR for repo registration during bootstrap is pushed
+ // https://github.com/opensearch-project/OpenSearch/pull/9105/
+ // After the above PR is pushed, we can remove this silent failure and throw the exception instead.
+ logger.error("Remote repository is not yet registered");
+ lastAcceptedState = clusterState;
+ } catch (Exception e) {
+ handleExceptionOnWrite(e);
+ }
+ }
+
+ private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, ClusterState clusterState) {
+ assert manifest != null : "ClusterMetadataManifest is null";
+ assert clusterState != null : "ClusterState is null";
+ assert clusterState.metadata().indices().size() == manifest.getIndices().size()
+ : "Number of indices in last accepted state and manifest are different";
+ manifest.getIndices().stream().forEach(md -> {
+ assert clusterState.metadata().indices().containsKey(md.getIndexName())
+ : "Last accepted state does not contain the index : " + md.getIndexName();
+ assert clusterState.metadata().indices().get(md.getIndexName()).getIndexUUID().equals(md.getIndexUUID())
+ : "Last accepted state and manifest do not have same UUID for index : " + md.getIndexName();
+ });
+ return true;
+ }
+
+ private boolean shouldWriteFullClusterState(ClusterState clusterState) {
+ if (lastAcceptedState == null
+ || lastAcceptedManifest == null
+ || lastAcceptedState.term() != clusterState.term()
+ || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void markLastAcceptedStateAsCommitted() {
+ try {
+ if (lastAcceptedState == null
+ || lastAcceptedManifest == null
+ || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
+ // On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
+ logger.trace("Cluster is not yet ready to publish state to remote store");
+ return;
+ }
+ final ClusterMetadataManifest committedManifest = remoteClusterStateService.markLastStateAsCommitted(
+ lastAcceptedState,
+ lastAcceptedManifest
+ );
+ lastAcceptedManifest = committedManifest;
+ } catch (Exception e) {
+ handleExceptionOnWrite(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ remoteClusterStateService.close();
+ }
+
+ private void handleExceptionOnWrite(Exception e) {
+ throw ExceptionsHelper.convertToRuntime(e);
+ }
+ }
}
diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java
new file mode 100644
index 0000000000000..cac77f9996438
--- /dev/null
+++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java
@@ -0,0 +1,446 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway.remote;
+
+import org.opensearch.Version;
+import org.opensearch.core.ParseField;
+import org.opensearch.core.common.Strings;
+import org.opensearch.core.common.io.stream.StreamInput;
+import org.opensearch.core.common.io.stream.StreamOutput;
+import org.opensearch.core.common.io.stream.Writeable;
+import org.opensearch.core.xcontent.ConstructingObjectParser;
+import org.opensearch.core.xcontent.MediaTypeRegistry;
+import org.opensearch.core.xcontent.ToXContentFragment;
+import org.opensearch.core.xcontent.XContentBuilder;
+import org.opensearch.core.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Manifest file which contains the details of the uploaded entity metadata
+ *
+ * @opensearch.internal
+ */
+public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
+
+ private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term");
+ private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version");
+ private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid");
+ private static final ParseField STATE_UUID_FIELD = new ParseField("state_uuid");
+ private static final ParseField OPENSEARCH_VERSION_FIELD = new ParseField("opensearch_version");
+ private static final ParseField NODE_ID_FIELD = new ParseField("node_id");
+ private static final ParseField COMMITTED_FIELD = new ParseField("committed");
+ private static final ParseField INDICES_FIELD = new ParseField("indices");
+
+ private static long term(Object[] fields) {
+ return (long) fields[0];
+ }
+
+ private static long version(Object[] fields) {
+ return (long) fields[1];
+ }
+
+ private static String clusterUUID(Object[] fields) {
+ return (String) fields[2];
+ }
+
+ private static String stateUUID(Object[] fields) {
+ return (String) fields[3];
+ }
+
+ private static Version opensearchVersion(Object[] fields) {
+ return Version.fromId((int) fields[4]);
+ }
+
+ private static String nodeId(Object[] fields) {
+ return (String) fields[5];
+ }
+
+ private static boolean committed(Object[] fields) {
+ return (boolean) fields[6];
+ }
+
+ private static List indices(Object[] fields) {
+ return (List) fields[7];
+ }
+
+ private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(
+ "cluster_metadata_manifest",
+ fields -> new ClusterMetadataManifest(
+ term(fields),
+ version(fields),
+ clusterUUID(fields),
+ stateUUID(fields),
+ opensearchVersion(fields),
+ nodeId(fields),
+ committed(fields),
+ indices(fields)
+ )
+ );
+
+ static {
+ PARSER.declareLong(ConstructingObjectParser.constructorArg(), CLUSTER_TERM_FIELD);
+ PARSER.declareLong(ConstructingObjectParser.constructorArg(), STATE_VERSION_FIELD);
+ PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD);
+ PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_UUID_FIELD);
+ PARSER.declareInt(ConstructingObjectParser.constructorArg(), OPENSEARCH_VERSION_FIELD);
+ PARSER.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_FIELD);
+ PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), COMMITTED_FIELD);
+ PARSER.declareObjectArray(
+ ConstructingObjectParser.constructorArg(),
+ (p, c) -> UploadedIndexMetadata.fromXContent(p),
+ INDICES_FIELD
+ );
+ }
+
+ private final List indices;
+ private final long clusterTerm;
+ private final long stateVersion;
+ private final String clusterUUID;
+ private final String stateUUID;
+ private final Version opensearchVersion;
+ private final String nodeId;
+ private final boolean committed;
+
+ public List getIndices() {
+ return indices;
+ }
+
+ public long getClusterTerm() {
+ return clusterTerm;
+ }
+
+ public long getStateVersion() {
+ return stateVersion;
+ }
+
+ public String getClusterUUID() {
+ return clusterUUID;
+ }
+
+ public String getStateUUID() {
+ return stateUUID;
+ }
+
+ public Version getOpensearchVersion() {
+ return opensearchVersion;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public boolean isCommitted() {
+ return committed;
+ }
+
+ public ClusterMetadataManifest(
+ long clusterTerm,
+ long version,
+ String clusterUUID,
+ String stateUUID,
+ Version opensearchVersion,
+ String nodeId,
+ boolean committed,
+ List indices
+ ) {
+ this.clusterTerm = clusterTerm;
+ this.stateVersion = version;
+ this.clusterUUID = clusterUUID;
+ this.stateUUID = stateUUID;
+ this.opensearchVersion = opensearchVersion;
+ this.nodeId = nodeId;
+ this.committed = committed;
+ this.indices = Collections.unmodifiableList(indices);
+ }
+
+ public ClusterMetadataManifest(StreamInput in) throws IOException {
+ this.clusterTerm = in.readVLong();
+ this.stateVersion = in.readVLong();
+ this.clusterUUID = in.readString();
+ this.stateUUID = in.readString();
+ this.opensearchVersion = Version.fromId(in.readInt());
+ this.nodeId = in.readString();
+ this.committed = in.readBoolean();
+ this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new));
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static Builder builder(ClusterMetadataManifest manifest) {
+ return new Builder(manifest);
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.field(CLUSTER_TERM_FIELD.getPreferredName(), getClusterTerm())
+ .field(STATE_VERSION_FIELD.getPreferredName(), getStateVersion())
+ .field(CLUSTER_UUID_FIELD.getPreferredName(), getClusterUUID())
+ .field(STATE_UUID_FIELD.getPreferredName(), getStateUUID())
+ .field(OPENSEARCH_VERSION_FIELD.getPreferredName(), getOpensearchVersion().id)
+ .field(NODE_ID_FIELD.getPreferredName(), getNodeId())
+ .field(COMMITTED_FIELD.getPreferredName(), isCommitted());
+ builder.startArray(INDICES_FIELD.getPreferredName());
+ {
+ for (UploadedIndexMetadata uploadedIndexMetadata : indices) {
+ uploadedIndexMetadata.toXContent(builder, params);
+ }
+ }
+ builder.endArray();
+ return builder;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeVLong(clusterTerm);
+ out.writeVLong(stateVersion);
+ out.writeString(clusterUUID);
+ out.writeString(stateUUID);
+ out.writeInt(opensearchVersion.id);
+ out.writeString(nodeId);
+ out.writeBoolean(committed);
+ out.writeCollection(indices);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final ClusterMetadataManifest that = (ClusterMetadataManifest) o;
+ return Objects.equals(indices, that.indices)
+ && clusterTerm == that.clusterTerm
+ && stateVersion == that.stateVersion
+ && Objects.equals(clusterUUID, that.clusterUUID)
+ && Objects.equals(stateUUID, that.stateUUID)
+ && Objects.equals(opensearchVersion, that.opensearchVersion)
+ && Objects.equals(nodeId, that.nodeId)
+ && Objects.equals(committed, that.committed);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(indices, clusterTerm, stateVersion, clusterUUID, stateUUID, opensearchVersion, nodeId, committed);
+ }
+
+ @Override
+ public String toString() {
+ return Strings.toString(MediaTypeRegistry.JSON, this);
+ }
+
+ public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException {
+ return PARSER.parse(parser, null);
+ }
+
+ /**
+ * Builder for ClusterMetadataManifest
+ *
+ * @opensearch.internal
+ */
+ public static class Builder {
+
+ private List indices;
+ private long clusterTerm;
+ private long stateVersion;
+ private String clusterUUID;
+ private String stateUUID;
+ private Version opensearchVersion;
+ private String nodeId;
+ private boolean committed;
+
+ public Builder indices(List indices) {
+ this.indices = indices;
+ return this;
+ }
+
+ public Builder clusterTerm(long clusterTerm) {
+ this.clusterTerm = clusterTerm;
+ return this;
+ }
+
+ public Builder stateVersion(long stateVersion) {
+ this.stateVersion = stateVersion;
+ return this;
+ }
+
+ public Builder clusterUUID(String clusterUUID) {
+ this.clusterUUID = clusterUUID;
+ return this;
+ }
+
+ public Builder stateUUID(String stateUUID) {
+ this.stateUUID = stateUUID;
+ return this;
+ }
+
+ public Builder opensearchVersion(Version opensearchVersion) {
+ this.opensearchVersion = opensearchVersion;
+ return this;
+ }
+
+ public Builder nodeId(String nodeId) {
+ this.nodeId = nodeId;
+ return this;
+ }
+
+ public Builder committed(boolean committed) {
+ this.committed = committed;
+ return this;
+ }
+
+ public List getIndices() {
+ return indices;
+ }
+
+ public Builder() {
+ indices = new ArrayList<>();
+ }
+
+ public Builder(ClusterMetadataManifest manifest) {
+ this.clusterTerm = manifest.clusterTerm;
+ this.stateVersion = manifest.stateVersion;
+ this.clusterUUID = manifest.clusterUUID;
+ this.stateUUID = manifest.stateUUID;
+ this.opensearchVersion = manifest.opensearchVersion;
+ this.nodeId = manifest.nodeId;
+ this.committed = manifest.committed;
+ this.indices = new ArrayList<>(manifest.indices);
+ }
+
+ public ClusterMetadataManifest build() {
+ return new ClusterMetadataManifest(
+ clusterTerm,
+ stateVersion,
+ clusterUUID,
+ stateUUID,
+ opensearchVersion,
+ nodeId,
+ committed,
+ indices
+ );
+ }
+
+ }
+
+ /**
+ * Metadata for uploaded index metadata
+ *
+ * @opensearch.internal
+ */
+ public static class UploadedIndexMetadata implements Writeable, ToXContentFragment {
+
+ private static final ParseField INDEX_NAME_FIELD = new ParseField("index_name");
+ private static final ParseField INDEX_UUID_FIELD = new ParseField("index_uuid");
+ private static final ParseField UPLOADED_FILENAME_FIELD = new ParseField("uploaded_filename");
+
+ private static String indexName(Object[] fields) {
+ return (String) fields[0];
+ }
+
+ private static String indexUUID(Object[] fields) {
+ return (String) fields[1];
+ }
+
+ private static String uploadedFilename(Object[] fields) {
+ return (String) fields[2];
+ }
+
+ private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(
+ "uploaded_index_metadata",
+ fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields))
+ );
+
+ static {
+ PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_NAME_FIELD);
+ PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_UUID_FIELD);
+ PARSER.declareString(ConstructingObjectParser.constructorArg(), UPLOADED_FILENAME_FIELD);
+ }
+
+ private final String indexName;
+ private final String indexUUID;
+ private final String uploadedFilename;
+
+ public UploadedIndexMetadata(String indexName, String indexUUID, String uploadedFileName) {
+ this.indexName = indexName;
+ this.indexUUID = indexUUID;
+ this.uploadedFilename = uploadedFileName;
+ }
+
+ public UploadedIndexMetadata(StreamInput in) throws IOException {
+ this.indexName = in.readString();
+ this.indexUUID = in.readString();
+ this.uploadedFilename = in.readString();
+ }
+
+ public String getUploadedFilename() {
+ return uploadedFilename;
+ }
+
+ public String getIndexName() {
+ return indexName;
+ }
+
+ public String getIndexUUID() {
+ return indexUUID;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ return builder.startObject()
+ .field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
+ .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID())
+ .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename())
+ .endObject();
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeString(indexName);
+ out.writeString(indexUUID);
+ out.writeString(uploadedFilename);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final UploadedIndexMetadata that = (UploadedIndexMetadata) o;
+ return Objects.equals(indexName, that.indexName)
+ && Objects.equals(indexUUID, that.indexUUID)
+ && Objects.equals(uploadedFilename, that.uploadedFilename);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(indexName, indexUUID, uploadedFilename);
+ }
+
+ @Override
+ public String toString() {
+ return Strings.toString(MediaTypeRegistry.JSON, this);
+ }
+
+ public static UploadedIndexMetadata fromXContent(XContentParser parser) throws IOException {
+ return PARSER.parse(parser, null);
+ }
+ }
+}
diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java
new file mode 100644
index 0000000000000..491c04bab3adb
--- /dev/null
+++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java
@@ -0,0 +1,370 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway.remote;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.opensearch.Version;
+import org.opensearch.cluster.ClusterState;
+import org.opensearch.cluster.metadata.IndexMetadata;
+import org.opensearch.common.Nullable;
+import org.opensearch.common.blobstore.BlobContainer;
+import org.opensearch.common.settings.ClusterSettings;
+import org.opensearch.common.settings.Setting;
+import org.opensearch.common.settings.Setting.Property;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.common.util.io.IOUtils;
+import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
+import org.opensearch.index.remote.RemoteStoreUtils;
+import org.opensearch.repositories.RepositoriesService;
+import org.opensearch.repositories.Repository;
+import org.opensearch.repositories.blobstore.BlobStoreRepository;
+import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
+
+/**
+ * A Service which provides APIs to upload and download cluster metadata from remote store.
+ *
+ * @opensearch.internal
+ */
+public class RemoteClusterStateService implements Closeable {
+
+ public static final String METADATA_NAME_FORMAT = "%s.dat";
+
+ public static final String METADATA_MANIFEST_NAME_FORMAT = "%s";
+
+ public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
+ "index-metadata",
+ METADATA_NAME_FORMAT,
+ IndexMetadata::fromXContent
+ );
+
+ public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>(
+ "cluster-metadata-manifest",
+ METADATA_MANIFEST_NAME_FORMAT,
+ ClusterMetadataManifest::fromXContent
+ );
+ /**
+ * Used to specify if cluster state metadata should be published to remote store
+ */
+ // TODO The remote state enabled and repository settings should be read from node attributes.
+ // Dependent on https://github.com/opensearch-project/OpenSearch/pull/9105/
+ public static final Setting REMOTE_CLUSTER_STATE_ENABLED_SETTING = Setting.boolSetting(
+ "cluster.remote_store.state.enabled",
+ false,
+ Property.NodeScope,
+ Property.Final
+ );
+ /**
+ * Used to specify default repo to use for cluster state metadata upload
+ */
+ public static final Setting REMOTE_CLUSTER_STATE_REPOSITORY_SETTING = Setting.simpleString(
+ "cluster.remote_store.state.repository",
+ "",
+ Property.NodeScope,
+ Property.Final
+ );
+ private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);
+
+ private static final String DELIMITER = "__";
+
+ private final String nodeId;
+ private final Supplier repositoriesService;
+ private final Settings settings;
+ private final LongSupplier relativeTimeMillisSupplier;
+ private BlobStoreRepository blobStoreRepository;
+ private volatile TimeValue slowWriteLoggingThreshold;
+
+ public RemoteClusterStateService(
+ String nodeId,
+ Supplier repositoriesService,
+ Settings settings,
+ ClusterSettings clusterSettings,
+ LongSupplier relativeTimeMillisSupplier
+ ) {
+ assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled";
+ this.nodeId = nodeId;
+ this.repositoriesService = repositoriesService;
+ this.settings = settings;
+ this.relativeTimeMillisSupplier = relativeTimeMillisSupplier;
+ this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
+ clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
+ }
+
+ /**
+ * This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be
+ * invoked by the elected cluster manager when the remote cluster state is enabled.
+ *
+ * @return A manifest object which contains the details of uploaded entity metadata.
+ */
+ @Nullable
+ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState) throws IOException {
+ final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();
+ if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
+ logger.error("Local node is not elected cluster manager. Exiting");
+ return null;
+ }
+ ensureRepositorySet();
+
+ final List allUploadedIndexMetadata = new ArrayList<>();
+ // todo parallel upload
+ // any validations before/after upload ?
+ for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
+ // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
+ final String indexMetadataKey = writeIndexMetadata(
+ clusterState.getClusterName().value(),
+ clusterState.getMetadata().clusterUUID(),
+ indexMetadata,
+ indexMetadataFileName(indexMetadata)
+ );
+ final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(
+ indexMetadata.getIndex().getName(),
+ indexMetadata.getIndexUUID(),
+ indexMetadataKey
+ );
+ allUploadedIndexMetadata.add(uploadedIndexMetadata);
+ }
+ final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, false);
+ final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;
+ if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
+ logger.warn(
+ "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + "wrote full state with [{}] indices",
+ durationMillis,
+ slowWriteLoggingThreshold,
+ allUploadedIndexMetadata.size()
+ );
+ } else {
+ // todo change to debug
+ logger.info(
+ "writing cluster state took [{}ms]; " + "wrote full state with [{}] indices",
+ durationMillis,
+ allUploadedIndexMetadata.size()
+ );
+ }
+ return manifest;
+ }
+
+ /**
+ * This method uploads the diff between the previous cluster state and the current cluster state. The previous manifest file is needed to create the new
+ * manifest. The new manifest file is created by using the unchanged metadata from the previous manifest and the new metadata changes from the current cluster
+ * state.
+ *
+ * @return The uploaded ClusterMetadataManifest file
+ */
+ @Nullable
+ public ClusterMetadataManifest writeIncrementalMetadata(
+ ClusterState previousClusterState,
+ ClusterState clusterState,
+ ClusterMetadataManifest previousManifest
+ ) throws IOException {
+ final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();
+ if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
+ logger.error("Local node is not elected cluster manager. Exiting");
+ return null;
+ }
+ assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term();
+ final Map previousStateIndexMetadataVersionByName = new HashMap<>();
+ for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) {
+ previousStateIndexMetadataVersionByName.put(indexMetadata.getIndex().getName(), indexMetadata.getVersion());
+ }
+
+ int numIndicesUpdated = 0;
+ int numIndicesUnchanged = 0;
+ final Map allUploadedIndexMetadata = previousManifest.getIndices()
+ .stream()
+ .collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity()));
+ for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
+ final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName());
+ if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
+ logger.trace(
+ "updating metadata for [{}], changing version from [{}] to [{}]",
+ indexMetadata.getIndex(),
+ previousVersion,
+ indexMetadata.getVersion()
+ );
+ numIndicesUpdated++;
+ final String indexMetadataKey = writeIndexMetadata(
+ clusterState.getClusterName().value(),
+ clusterState.getMetadata().clusterUUID(),
+ indexMetadata,
+ indexMetadataFileName(indexMetadata)
+ );
+ final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(
+ indexMetadata.getIndex().getName(),
+ indexMetadata.getIndexUUID(),
+ indexMetadataKey
+ );
+ allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata);
+ } else {
+ numIndicesUnchanged++;
+ }
+ previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName());
+ }
+
+ for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) {
+ allUploadedIndexMetadata.remove(removedIndexName);
+ }
+ final ClusterMetadataManifest manifest = uploadManifest(
+ clusterState,
+ allUploadedIndexMetadata.values().stream().collect(Collectors.toList()),
+ false
+ );
+ final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;
+ if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
+ logger.warn(
+ "writing cluster state took [{}ms] which is above the warn threshold of [{}]; "
+ + "wrote metadata for [{}] indices and skipped [{}] unchanged indices",
+ durationMillis,
+ slowWriteLoggingThreshold,
+ numIndicesUpdated,
+ numIndicesUnchanged
+ );
+ } else {
+ logger.trace(
+ "writing cluster state took [{}ms]; " + "wrote metadata for [{}] indices and skipped [{}] unchanged indices",
+ durationMillis,
+ numIndicesUpdated,
+ numIndicesUnchanged
+ );
+ }
+ return manifest;
+ }
+
+ @Nullable
+ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest)
+ throws IOException {
+ if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
+ logger.error("Local node is not elected cluster manager. Exiting");
+ return null;
+ }
+ assert clusterState != null : "Last accepted cluster state is not set";
+ assert previousManifest != null : "Last cluster metadata manifest is not set";
+ return uploadManifest(clusterState, previousManifest.getIndices(), true);
+ }
+
+ public ClusterState getLatestClusterState(String clusterUUID) {
+ // todo
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (blobStoreRepository != null) {
+ IOUtils.close(blobStoreRepository);
+ }
+ }
+
+ // Visible for testing
+ void ensureRepositorySet() {
+ if (blobStoreRepository != null) {
+ return;
+ }
+ final String remoteStoreRepo = REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.get(settings);
+ assert remoteStoreRepo != null : "Remote Cluster State repository is not configured";
+ final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
+ assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
+ blobStoreRepository = (BlobStoreRepository) repository;
+ }
+
+ private ClusterMetadataManifest uploadManifest(
+ ClusterState clusterState,
+ List uploadedIndexMetadata,
+ boolean committed
+ ) throws IOException {
+ synchronized (this) {
+ final String manifestFileName = getManifestFileName(clusterState.term(), clusterState.version());
+ final ClusterMetadataManifest manifest = new ClusterMetadataManifest(
+ clusterState.term(),
+ clusterState.getVersion(),
+ clusterState.metadata().clusterUUID(),
+ clusterState.stateUUID(),
+ Version.CURRENT,
+ nodeId,
+ committed,
+ uploadedIndexMetadata
+ );
+ writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName);
+ return manifest;
+ }
+ }
+
+ private String writeIndexMetadata(String clusterName, String clusterUUID, IndexMetadata uploadIndexMetadata, String fileName)
+ throws IOException {
+ final BlobContainer indexMetadataContainer = indexMetadataContainer(clusterName, clusterUUID, uploadIndexMetadata.getIndexUUID());
+ INDEX_METADATA_FORMAT.write(uploadIndexMetadata, indexMetadataContainer, fileName, blobStoreRepository.getCompressor());
+ // returning full path
+ return indexMetadataContainer.path().buildAsString() + fileName;
+ }
+
+ private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName)
+ throws IOException {
+ final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID);
+ CLUSTER_METADATA_MANIFEST_FORMAT.write(uploadManifest, metadataManifestContainer, fileName, blobStoreRepository.getCompressor());
+ }
+
+ private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) {
+ // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX
+ return blobStoreRepository.blobStore()
+ .blobContainer(
+ blobStoreRepository.basePath()
+ .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8)))
+ .add("cluster-state")
+ .add(clusterUUID)
+ .add("index")
+ .add(indexUUID)
+ );
+ }
+
+ private BlobContainer manifestContainer(String clusterName, String clusterUUID) {
+ // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest
+ return blobStoreRepository.blobStore()
+ .blobContainer(
+ blobStoreRepository.basePath()
+ .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8)))
+ .add("cluster-state")
+ .add(clusterUUID)
+ .add("manifest")
+ );
+ }
+
+ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) {
+ this.slowWriteLoggingThreshold = slowWriteLoggingThreshold;
+ }
+
+ private static String getManifestFileName(long term, long version) {
+ // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest_2147483642_2147483637_456536447
+ return String.join(
+ DELIMITER,
+ "manifest",
+ RemoteStoreUtils.invertLong(term),
+ RemoteStoreUtils.invertLong(version),
+ RemoteStoreUtils.invertLong(System.currentTimeMillis())
+ );
+ }
+
+ private static String indexMetadataFileName(IndexMetadata indexMetadata) {
+ return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis()));
+ }
+
+}
diff --git a/server/src/main/java/org/opensearch/gateway/remote/package-info.java b/server/src/main/java/org/opensearch/gateway/remote/package-info.java
new file mode 100644
index 0000000000000..286e739f66289
--- /dev/null
+++ b/server/src/main/java/org/opensearch/gateway/remote/package-info.java
@@ -0,0 +1,12 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+/**
+ * Package containing class to perform operations on remote cluster state
+ */
+package org.opensearch.gateway.remote;
diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java
index 108a022a2612b..ad8168f48558f 100644
--- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java
@@ -801,6 +801,10 @@ public RepositoryMetadata getMetadata() {
return metadata;
}
+ public Compressor getCompressor() {
+ return compressor;
+ }
+
@Override
public RepositoryStats stats() {
final BlobStore store = blobStore.get();
diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java
index c6b44eaa9d364..47fea55242240 100644
--- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java
+++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java
@@ -35,6 +35,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.opensearch.ExceptionsHelper;
+import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
@@ -59,6 +60,9 @@
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.env.TestEnvironment;
+import org.opensearch.gateway.GatewayMetaState.RemotePersistedState;
+import org.opensearch.gateway.remote.ClusterMetadataManifest;
+import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
@@ -75,11 +79,15 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.mockito.Mockito;
+
import static org.opensearch.test.NodeRoles.nonClusterManagerNode;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
public class GatewayMetaStatePersistedStateTests extends OpenSearchTestCase {
@@ -647,6 +655,70 @@ Directory createDirectory(Path path) {
}
}
+ public void testRemotePersistedState() throws IOException {
+ final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class);
+ final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(1L).stateVersion(5L).build();
+ Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any())).thenReturn(manifest);
+
+ Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(manifest);
+ CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService);
+
+ assertThat(remotePersistedState.getLastAcceptedState(), nullValue());
+ assertThat(remotePersistedState.getCurrentTerm(), equalTo(0L));
+
+ final long clusterTerm = randomNonNegativeLong();
+ final ClusterState clusterState = createClusterState(
+ randomNonNegativeLong(),
+ Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build()
+ );
+
+ remotePersistedState.setLastAcceptedState(clusterState);
+ Mockito.verify(remoteClusterStateService, times(0)).writeFullMetadata(Mockito.any());
+
+ assertThat(remotePersistedState.getLastAcceptedState(), equalTo(clusterState));
+ assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm));
+
+ final ClusterState secondClusterState = createClusterState(
+ randomNonNegativeLong(),
+ Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build()
+ );
+
+ remotePersistedState.setLastAcceptedState(secondClusterState);
+ Mockito.verify(remoteClusterStateService, times(1)).writeFullMetadata(Mockito.any());
+
+ assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState));
+ assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm));
+
+ remotePersistedState.markLastAcceptedStateAsCommitted();
+ Mockito.verify(remoteClusterStateService, times(1)).markLastStateAsCommitted(Mockito.any(), Mockito.any());
+
+ assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState));
+ assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm));
+
+ }
+
+ public void testRemotePersistedStateExceptionOnFullStateUpload() throws IOException {
+ final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class);
+ Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any());
+
+ CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService);
+
+ final long clusterTerm = randomNonNegativeLong();
+ final ClusterState clusterState = createClusterState(
+ randomNonNegativeLong(),
+ Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build()
+ );
+
+ remotePersistedState.setLastAcceptedState(clusterState);
+
+ final ClusterState secondClusterState = createClusterState(
+ randomNonNegativeLong(),
+ Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build()
+ );
+
+ assertThrows(OpenSearchException.class, () -> remotePersistedState.setLastAcceptedState(secondClusterState));
+ }
+
private static BigArrays getBigArrays() {
return usually()
? BigArrays.NON_RECYCLING_INSTANCE
diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java
new file mode 100644
index 0000000000000..eafa191581d65
--- /dev/null
+++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java
@@ -0,0 +1,149 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway.remote;
+
+import org.opensearch.Version;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.core.common.bytes.BytesReference;
+import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
+import org.opensearch.core.xcontent.ToXContent;
+import org.opensearch.core.xcontent.XContentBuilder;
+import org.opensearch.core.xcontent.XContentParser;
+import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
+import org.opensearch.test.EqualsHashCodeTestUtils;
+import org.opensearch.test.OpenSearchTestCase;
+import org.opensearch.test.VersionUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class ClusterMetadataManifestTests extends OpenSearchTestCase {
+
+ public void testClusterMetadataManifestXContent() throws IOException {
+ UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path");
+ ClusterMetadataManifest originalManifest = new ClusterMetadataManifest(
+ 1L,
+ 1L,
+ "test-cluster-uuid",
+ "test-state-uuid",
+ Version.CURRENT,
+ "test-node-id",
+ false,
+ Collections.singletonList(uploadedIndexMetadata)
+ );
+ final XContentBuilder builder = JsonXContent.contentBuilder();
+ builder.startObject();
+ originalManifest.toXContent(builder, ToXContent.EMPTY_PARAMS);
+ builder.endObject();
+
+ try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
+ final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContent(parser);
+ assertEquals(originalManifest, fromXContentManifest);
+ }
+ }
+
+ public void testClusterMetadataManifestSerializationEqualsHashCode() {
+ ClusterMetadataManifest initialManifest = new ClusterMetadataManifest(
+ randomNonNegativeLong(),
+ randomNonNegativeLong(),
+ randomAlphaOfLength(10),
+ randomAlphaOfLength(10),
+ VersionUtils.randomOpenSearchVersion(random()),
+ randomAlphaOfLength(10),
+ randomBoolean(),
+ randomUploadedIndexMetadataList()
+ );
+ EqualsHashCodeTestUtils.checkEqualsAndHashCode(
+ initialManifest,
+ orig -> OpenSearchTestCase.copyWriteable(
+ orig,
+ new NamedWriteableRegistry(Collections.emptyList()),
+ ClusterMetadataManifest::new
+ ),
+ manifest -> {
+ ClusterMetadataManifest.Builder builder = ClusterMetadataManifest.builder(manifest);
+ switch (randomInt(7)) {
+ case 0:
+ builder.clusterTerm(randomNonNegativeLong());
+ break;
+ case 1:
+ builder.stateVersion(randomNonNegativeLong());
+ break;
+ case 2:
+ builder.clusterUUID(randomAlphaOfLength(10));
+ break;
+ case 3:
+ builder.stateUUID(randomAlphaOfLength(10));
+ break;
+ case 4:
+ builder.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()));
+ break;
+ case 5:
+ builder.nodeId(randomAlphaOfLength(10));
+ break;
+ case 6:
+ builder.committed(randomBoolean());
+ break;
+ case 7:
+ builder.indices(randomUploadedIndexMetadataList());
+ break;
+ }
+ return builder.build();
+ }
+ );
+ }
+
+ private List randomUploadedIndexMetadataList() {
+ final int size = randomIntBetween(1, 10);
+ final List uploadedIndexMetadataList = new ArrayList<>(size);
+ while (uploadedIndexMetadataList.size() < size) {
+ assertTrue(uploadedIndexMetadataList.add(randomUploadedIndexMetadata()));
+ }
+ return uploadedIndexMetadataList;
+ }
+
+ private UploadedIndexMetadata randomUploadedIndexMetadata() {
+ return new UploadedIndexMetadata(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
+ }
+
+ public void testUploadedIndexMetadataSerializationEqualsHashCode() {
+ UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path");
+ EqualsHashCodeTestUtils.checkEqualsAndHashCode(
+ uploadedIndexMetadata,
+ orig -> OpenSearchTestCase.copyWriteable(orig, new NamedWriteableRegistry(Collections.emptyList()), UploadedIndexMetadata::new),
+ metadata -> randomlyChangingUploadedIndexMetadata(uploadedIndexMetadata)
+ );
+ }
+
+ private UploadedIndexMetadata randomlyChangingUploadedIndexMetadata(UploadedIndexMetadata uploadedIndexMetadata) {
+ switch (randomInt(2)) {
+ case 0:
+ return new UploadedIndexMetadata(
+ randomAlphaOfLength(10),
+ uploadedIndexMetadata.getIndexUUID(),
+ uploadedIndexMetadata.getUploadedFilename()
+ );
+ case 1:
+ return new UploadedIndexMetadata(
+ uploadedIndexMetadata.getIndexName(),
+ randomAlphaOfLength(10),
+ uploadedIndexMetadata.getUploadedFilename()
+ );
+ case 2:
+ return new UploadedIndexMetadata(
+ uploadedIndexMetadata.getIndexName(),
+ uploadedIndexMetadata.getIndexUUID(),
+ randomAlphaOfLength(10)
+ );
+ }
+ return uploadedIndexMetadata;
+ }
+}
diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java
new file mode 100644
index 0000000000000..215673642cce5
--- /dev/null
+++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java
@@ -0,0 +1,254 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway.remote;
+
+import org.opensearch.Version;
+import org.opensearch.cluster.ClusterName;
+import org.opensearch.cluster.ClusterState;
+import org.opensearch.cluster.coordination.CoordinationMetadata;
+import org.opensearch.cluster.metadata.IndexMetadata;
+import org.opensearch.cluster.metadata.Metadata;
+import org.opensearch.cluster.node.DiscoveryNodes;
+import org.opensearch.common.blobstore.BlobContainer;
+import org.opensearch.common.blobstore.BlobPath;
+import org.opensearch.common.blobstore.BlobStore;
+import org.opensearch.common.compress.DeflateCompressor;
+import org.opensearch.common.settings.ClusterSettings;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.core.index.Index;
+import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
+import org.opensearch.repositories.FilterRepository;
+import org.opensearch.repositories.RepositoriesService;
+import org.opensearch.repositories.RepositoryMissingException;
+import org.opensearch.repositories.blobstore.BlobStoreRepository;
+import org.opensearch.test.OpenSearchTestCase;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
+import org.mockito.ArgumentMatchers;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RemoteClusterStateServiceTests extends OpenSearchTestCase {
+
+ private RemoteClusterStateService remoteClusterStateService;
+ private Supplier repositoriesServiceSupplier;
+ private RepositoriesService repositoriesService;
+ private BlobStoreRepository blobStoreRepository;
+
+ @Before
+ public void setup() {
+ repositoriesServiceSupplier = mock(Supplier.class);
+ repositoriesService = mock(RepositoriesService.class);
+ when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService);
+ final Settings settings = Settings.builder()
+ .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
+ .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.getKey(), "remote_store_repository")
+ .build();
+ blobStoreRepository = mock(BlobStoreRepository.class);
+ when(repositoriesService.repository("remote_store_repository")).thenReturn(blobStoreRepository);
+ remoteClusterStateService = new RemoteClusterStateService(
+ "test-node-id",
+ repositoriesServiceSupplier,
+ settings,
+ new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+ () -> 0L
+ );
+ }
+
+ public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException {
+ final ClusterState clusterState = generateClusterStateWithOneIndex().build();
+ final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState);
+ Assert.assertThat(manifest, nullValue());
+ }
+
+ public void testFailInitializationWhenRemoteStateDisabled() throws IOException {
+ final Settings settings = Settings.builder().build();
+ assertThrows(
+ AssertionError.class,
+ () -> new RemoteClusterStateService(
+ "test-node-id",
+ repositoriesServiceSupplier,
+ settings,
+ new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+ () -> 0L
+ )
+ );
+ }
+
+ public void testFailWriteFullMetadataWhenRepositoryNotSet() {
+ final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+ doThrow(new RepositoryMissingException("repository missing")).when(repositoriesService).repository("remote_store_repository");
+ assertThrows(RepositoryMissingException.class, () -> remoteClusterStateService.writeFullMetadata(clusterState));
+ }
+
+ public void testFailWriteFullMetadataWhenNotBlobRepository() {
+ final FilterRepository filterRepository = mock(FilterRepository.class);
+ when(repositoriesService.repository("remote_store_repository")).thenReturn(filterRepository);
+ final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+ assertThrows(AssertionError.class, () -> remoteClusterStateService.writeFullMetadata(clusterState));
+ }
+
+ public void testWriteFullMetadataSuccess() throws IOException {
+ final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+ mockBlobStoreObjects();
+ final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState);
+ final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename");
+ List indices = List.of(uploadedIndexMetadata);
+
+ final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder()
+ .indices(indices)
+ .clusterTerm(1L)
+ .stateVersion(1L)
+ .stateUUID("state-uuid")
+ .clusterUUID("cluster-uuid")
+ .build();
+
+ assertThat(manifest.getIndices().size(), is(1));
+ assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName()));
+ assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID()));
+ assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue());
+ assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm()));
+ assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion()));
+ assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID()));
+ assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID()));
+ }
+
+ public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOException {
+ final ClusterState clusterState = generateClusterStateWithOneIndex().build();
+ final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata(clusterState, clusterState, null);
+ Assert.assertThat(manifest, nullValue());
+ }
+
+ public void testFailWriteIncrementalMetadataWhenTermChanged() {
+ final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+ final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(2L).build();
+ final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT)
+ .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata))
+ .build();
+ assertThrows(
+ AssertionError.class,
+ () -> remoteClusterStateService.writeIncrementalMetadata(previousClusterState, clusterState, null)
+ );
+ }
+
+ public void testWriteIncrementalMetadataSuccess() throws IOException {
+ final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+ mockBlobStoreObjects();
+ final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
+ final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT)
+ .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata))
+ .build();
+
+ final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build();
+
+ remoteClusterStateService.ensureRepositorySet();
+ final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata(
+ previousClusterState,
+ clusterState,
+ previousManifest
+ );
+ final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename");
+ final List indices = List.of(uploadedIndexMetadata);
+
+ final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder()
+ .indices(indices)
+ .clusterTerm(1L)
+ .stateVersion(1L)
+ .stateUUID("state-uuid")
+ .clusterUUID("cluster-uuid")
+ .build();
+
+ assertThat(manifest.getIndices().size(), is(1));
+ assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName()));
+ assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID()));
+ assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue());
+ assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm()));
+ assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion()));
+ assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID()));
+ assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID()));
+ }
+
+ public void testMarkLastStateAsCommittedSuccess() throws IOException {
+ final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+ mockBlobStoreObjects();
+ remoteClusterStateService.ensureRepositorySet();
+ final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename");
+ List indices = List.of(uploadedIndexMetadata);
+ final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(indices).build();
+
+ final ClusterMetadataManifest manifest = remoteClusterStateService.markLastStateAsCommitted(clusterState, previousManifest);
+
+ final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder()
+ .indices(indices)
+ .clusterTerm(1L)
+ .stateVersion(1L)
+ .stateUUID("state-uuid")
+ .clusterUUID("cluster-uuid")
+ .build();
+
+ assertThat(manifest.getIndices().size(), is(1));
+ assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName()));
+ assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID()));
+ assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue());
+ assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm()));
+ assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion()));
+ assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID()));
+ assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID()));
+ }
+
+ private void mockBlobStoreObjects() {
+ final BlobStore blobStore = mock(BlobStore.class);
+ when(blobStoreRepository.blobStore()).thenReturn(blobStore);
+ final BlobPath blobPath = mock(BlobPath.class);
+ when((blobStoreRepository.basePath())).thenReturn(blobPath);
+ when(blobPath.add(anyString())).thenReturn(blobPath);
+ when(blobPath.buildAsString()).thenReturn("/blob/path/");
+ final BlobContainer blobContainer = mock(BlobContainer.class);
+ when(blobContainer.path()).thenReturn(blobPath);
+ when(blobStore.blobContainer(ArgumentMatchers.any())).thenReturn(blobContainer);
+ when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor());
+ }
+
+ private static ClusterState.Builder generateClusterStateWithOneIndex() {
+ final Index index = new Index("test-index", "index-uuid");
+ final Settings idxSettings = Settings.builder()
+ .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+ .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID())
+ .build();
+ final IndexMetadata indexMetadata = new IndexMetadata.Builder(index.getName()).settings(idxSettings)
+ .numberOfShards(1)
+ .numberOfReplicas(0)
+ .build();
+ final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
+
+ return ClusterState.builder(ClusterName.DEFAULT)
+ .version(1L)
+ .stateUUID("state-uuid")
+ .metadata(
+ Metadata.builder().put(indexMetadata, true).clusterUUID("cluster-uuid").coordinationMetadata(coordinationMetadata).build()
+ );
+ }
+
+ private static DiscoveryNodes nodesWithLocalNodeClusterManager() {
+ return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").build();
+ }
+
+}