Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote State] Create service to publish cluster state to remote store #9160

Merged
merged 11 commits into from
Aug 31, 2023
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Segment Replication] Support realtime reads for GET requests ([#9212](https://github.com/opensearch-project/OpenSearch/pull/9212))
- [Feature] Expose term frequency in Painless script score context ([#9081](https://github.com/opensearch-project/OpenSearch/pull/9081))
- Add support for reading partial files to HDFS repository ([#9513](https://github.com/opensearch-project/OpenSearch/issues/9513))
- Add support for extensions to search responses using SearchExtBuilder ([#9379](https://github.com/opensearch-project/OpenSearch/pull/9379))
- Add support for extensions to search responses using SearchExtBuilder ([#9379](https://github.com/opensearch-project/OpenSearch/pull/9379))
- [Remote State] Create service to publish cluster state to remote store ([#9160](https://github.com/opensearch-project/OpenSearch/pull/9160))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down Expand Up @@ -181,4 +182,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.opensearch.gateway.DanglingIndicesState;
import org.opensearch.gateway.GatewayService;
import org.opensearch.gateway.PersistedClusterStateService;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
Expand Down Expand Up @@ -660,7 +661,11 @@ public void apply(Settings value, Settings current, Settings previous) {

// Related to monitoring of task cancellation
TaskCancellationMonitoringSettings.IS_ENABLED_SETTING,
TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING
TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING,

// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING
)
)
);
Expand Down
134 changes: 125 additions & 9 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@
import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.env.NodeMetadata;
import org.opensearch.gateway.remote.ClusterMetadataMarker;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.plugins.MetadataUpgrader;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -84,19 +87,19 @@
/**
* Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
*
* When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that
* the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link
* ClusterState#metadata()} because it might be stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and
* non-stale state, and cluster-manager-ineligible nodes receive the real cluster state from the elected cluster-manager after joining the cluster.
* When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that the state being
* loaded when constructing the instance of this class is not necessarily the state that will be used as {@link ClusterState#metadata()} because it might be
* stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and non-stale state, and cluster-manager-ineligible nodes
* receive the real cluster state from the elected cluster-manager after joining the cluster.
*
* @opensearch.internal
*/
public class GatewayMetaState implements Closeable {

/**
* Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially
* stale (since it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is
* restarted as a cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state.
* Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially stale (since
* it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is restarted as a
* cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state.
*/
public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG";

Expand Down Expand Up @@ -234,8 +237,8 @@ Metadata upgradeMetadataForNode(
}

/**
* This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current
* version. The MetadataIndexUpgradeService might also update obsolete settings if needed.
* This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current version. The MetadataIndexUpgradeService
* might also update obsolete settings if needed.
*
* @return input <code>metadata</code> if no upgrade is needed or an upgraded metadata
*/
Expand Down Expand Up @@ -599,4 +602,117 @@ public void close() throws IOException {
IOUtils.close(persistenceWriter.getAndSet(null));
}
}

/**
* Encapsulates the writing of metadata to a remote store using {@link RemoteClusterStateService}.
*/
public static class RemotePersistedState implements PersistedState {
soosinha marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger logger = LogManager.getLogger(RemotePersistedState.class);

private ClusterState lastAcceptedState;
private ClusterMetadataMarker lastAcceptedMarker;
private final RemoteClusterStateService remoteClusterStateService;

public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService) {
this.remoteClusterStateService = remoteClusterStateService;
}

@Override
public long getCurrentTerm() {
return lastAcceptedState != null ? lastAcceptedState.term() : 0L;
}

@Override
public ClusterState getLastAcceptedState() {
return lastAcceptedState;
soosinha marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void setCurrentTerm(long currentTerm) {
// no-op
soosinha marked this conversation as resolved.
Show resolved Hide resolved
// For LucenePersistedState, setCurrentTerm is used only while handling StartJoinRequest by all follower nodes.
// But for RemotePersistedState, the state is only pushed by the active cluster. So this method is not required.
}

@Override
public void setLastAcceptedState(ClusterState clusterState) {
try {
if (lastAcceptedState == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
logger.trace("Cluster is not yet ready to publish state to remote store");
soosinha marked this conversation as resolved.
Show resolved Hide resolved
lastAcceptedState = clusterState;
soosinha marked this conversation as resolved.
Show resolved Hide resolved
return;
}
final ClusterMetadataMarker marker;
if (shouldWriteFullClusterState(clusterState)) {
marker = remoteClusterStateService.writeFullMetadata(clusterState);
} else {
assert verifyMarkerAndClusterState(lastAcceptedMarker, lastAcceptedState) == true
: "Previous Marker and previous ClusterState are not in sync";
marker = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedMarker);
}
assert verifyMarkerAndClusterState(marker, clusterState) == true : "Marker and ClusterState are not in sync";
lastAcceptedMarker = marker;
soosinha marked this conversation as resolved.
Show resolved Hide resolved
lastAcceptedState = clusterState;
} catch (RepositoryMissingException e) {
logger.error("Remote repository is not yet registered");
lastAcceptedState = clusterState;
soosinha marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
handleExceptionOnWrite(e);
}
}

private boolean verifyMarkerAndClusterState(ClusterMetadataMarker marker, ClusterState clusterState) {
assert marker != null : "ClusterMetadataMarker is null";
assert clusterState != null : "ClusterState is null";
assert clusterState.metadata().indices().size() == marker.getIndices().size()
: "Number of indices in last accepted state and marker are different";
marker.getIndices().stream().forEach(md -> {
assert clusterState.metadata().indices().containsKey(md.getIndexName()) : "Last accepted state and marker are not in sync";
assert clusterState.metadata().indices().get(md.getIndexName()).getIndexUUID().equals(md.getIndexUUID())
: "Last accepted state and marker are not in sync";
soosinha marked this conversation as resolved.
Show resolved Hide resolved
});
return true;
}

private boolean shouldWriteFullClusterState(ClusterState clusterState) {
if (lastAcceptedState == null
|| lastAcceptedMarker == null
|| lastAcceptedState.term() != clusterState.term()
|| lastAcceptedMarker.getOpensearchVersion() != Version.CURRENT) {
return true;
}
return false;
}

@Override
public void markLastAcceptedStateAsCommitted() {
try {
if (lastAcceptedState == null
|| lastAcceptedMarker == null
|| lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
soosinha marked this conversation as resolved.
Show resolved Hide resolved
logger.trace("Cluster is not yet ready to publish state to remote store");
return;
}
final ClusterMetadataMarker committedMarker = remoteClusterStateService.markLastStateAsCommitted(
lastAcceptedState,
lastAcceptedMarker
);
lastAcceptedMarker = committedMarker;
} catch (Exception e) {
handleExceptionOnWrite(e);
}
}

@Override
public void close() throws IOException {
remoteClusterStateService.close();
}

private void handleExceptionOnWrite(Exception e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}
}
Loading
Loading