diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 0274073ddfdc7..b9f80ff589a8d 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -107,6 +107,7 @@ import static org.opensearch.cluster.coordination.NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_ID; import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned; +import static org.opensearch.cluster.store.RemoteClusterStateService.CLUSTER_REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered; import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY; @@ -148,6 +149,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final JoinHelper joinHelper; private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; private final Supplier persistedStateSupplier; + private final Supplier remotePersistedStateSupplier; private final NoClusterManagerBlockService noClusterManagerBlockService; final Object mutex = new Object(); // package-private to allow tests to call methods that assert that the mutex is held private final SetOnce coordinationState = new SetOnce<>(); // initialized on start-up (see doStart) @@ -201,7 +203,8 @@ public Coordinator( Random random, RerouteService rerouteService, ElectionStrategy electionStrategy, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + Supplier remotePersistedStateSupplier ) { this.settings = settings; this.transportService = transportService; @@ -226,6 +229,7 @@ public Coordinator( namedWriteableRegistry ); this.persistedStateSupplier = persistedStateSupplier; + this.remotePersistedStateSupplier = remotePersistedStateSupplier; this.noClusterManagerBlockService = new NoClusterManagerBlockService(settings, clusterSettings); this.lastKnownLeader = Optional.empty(); this.lastJoin = Optional.empty(); @@ -988,8 +992,8 @@ public boolean isInitialConfigurationSet() { } /** - * Sets the initial configuration to the given {@link VotingConfiguration}. This method is safe to call - * more than once, as long as the argument to each call is the same. + * Sets the initial configuration to the given {@link VotingConfiguration}. This method is safe to call more than once, as long as the argument to each call + * is the same. * * @param votingConfiguration The nodes that should form the initial configuration. * @return whether this call successfully set the initial configuration - if false, the cluster has already been bootstrapped. @@ -1103,9 +1107,9 @@ ClusterState improveConfiguration(ClusterState clusterState) { } /* - * Valid Voting Configuration Exclusion state criteria: - * 1. Every voting config exclusion with an ID of _absent_ should not match any nodes currently in the cluster by name - * 2. Every voting config exclusion with a name of _absent_ should not match any nodes currently in the cluster by ID + * Valid Voting Configuration Exclusion state criteria: + * 1. Every voting config exclusion with an ID of _absent_ should not match any nodes currently in the cluster by name + * 2. Every voting config exclusion with a name of _absent_ should not match any nodes currently in the cluster by ID */ static boolean validVotingConfigExclusionState(ClusterState clusterState) { Set votingConfigExclusions = clusterState.getVotingConfigExclusions(); @@ -1308,6 +1312,12 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) leaderChecker.setCurrentNodes(publishNodes); followersChecker.setCurrentNodes(publishNodes); lagDetector.setTrackedNodes(publishNodes); + if (clusterState.nodes().isLocalNodeElectedClusterManager() + && CLUSTER_REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) { + CoordinationState.PersistedState remotePersistedState = remotePersistedStateSupplier.get(); + assert remotePersistedState != null : "Remote state has not been initialized"; + remotePersistedState.setLastAcceptedState(clusterState); + } publication.start(followersChecker.getFaultyNodes()); } } catch (Exception e) { @@ -1511,8 +1521,8 @@ public Iterable getFoundPeers() { } /** - * If there is any current committed publication, this method cancels it. - * This method is used exclusively by tests. + * If there is any current committed publication, this method cancels it. This method is used exclusively by tests. + * * @return true if publication was cancelled, false if there is no current committed publication. */ boolean cancelCommittedPublication() { @@ -1760,7 +1770,16 @@ protected boolean isPublishQuorum(VoteCollection votes) { protected Optional handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert getCurrentTerm() >= publishResponse.getTerm(); - return coordinationState.get().handlePublishResponse(sourceNode, publishResponse); + final Optional optionalApplyCommitRequest = coordinationState.get() + .handlePublishResponse(sourceNode, publishResponse); + optionalApplyCommitRequest.ifPresent(applyCommitRequest -> { + if (CLUSTER_REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) { + final CoordinationState.PersistedState remotePersistedState = remotePersistedStateSupplier.get(); + assert remotePersistedState != null : "Remote state has not been initialized"; + remotePersistedState.markLastAcceptedStateAsCommitted(); + } + }); + return optionalApplyCommitRequest; } @Override diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index 68fce4d9b9bb4..31c69c1e1df6a 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -205,7 +205,8 @@ public DiscoveryModule( new Random(Randomness.get().nextLong()), rerouteService, electionStrategy, - nodeHealthService + nodeHealthService, + gatewayMetaState::getRemotePersistedState ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index bf1baf1b5dc39..7738ae37fb615 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -104,6 +104,7 @@ public class GatewayMetaState implements Closeable { // Set by calling start() private final SetOnce persistedState = new SetOnce<>(); + private final SetOnce remotePersistedState = new SetOnce<>(); public PersistedState getPersistedState() { final PersistedState persistedState = this.persistedState.get(); @@ -111,6 +112,12 @@ public PersistedState getPersistedState() { return persistedState; } + public PersistedState getRemotePersistedState() { + final PersistedState persistedState = this.remotePersistedState.get(); + assert persistedState != null : "not started"; + return persistedState; + } + public Metadata getMetadata() { return getPersistedState().getLastAcceptedState().metadata(); } @@ -122,7 +129,8 @@ public void start( MetaStateService metaStateService, MetadataIndexUpgradeService metadataIndexUpgradeService, MetadataUpgrader metadataUpgrader, - PersistedClusterStateService persistedClusterStateService + PersistedClusterStateService persistedClusterStateService, + RemoteClusterStateService remoteClusterStateService ) { assert persistedState.get() == null : "should only start once, but already have " + persistedState.get(); @@ -146,6 +154,7 @@ public void start( } PersistedState persistedState = null; + PersistedState remotePersistedState = null; boolean success = false; try { final ClusterState clusterState = prepareInitialClusterState( @@ -159,6 +168,7 @@ public void start( if (DiscoveryNode.isClusterManagerNode(settings)) { persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState); + remotePersistedState = new RemotePersistedState(remoteClusterStateService); } else { persistedState = new AsyncLucenePersistedState( settings, @@ -184,6 +194,7 @@ public void start( } this.persistedState.set(persistedState); + this.remotePersistedState.set(remotePersistedState); } catch (IOException e) { throw new OpenSearchException("failed to load metadata", e); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 51cc7c9007159..39ace9be05390 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -76,6 +76,7 @@ import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; import org.opensearch.cluster.routing.allocation.DiskThresholdMonitor; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.cluster.store.RemoteClusterStateService; import org.opensearch.common.SetOnce; import org.opensearch.common.StopWatch; import org.opensearch.common.inject.Injector; @@ -669,6 +670,10 @@ protected Node( clusterService.getClusterSettings(), threadPool::relativeTimeInMillis ); + final RemoteClusterStateService remoteClusterStateService = new RemoteClusterStateService( + repositoriesServiceReference::get, + settings + ); // collect engine factory providers from plugins final Collection enginePlugins = pluginsService.filterPlugins(EnginePlugin.class); @@ -1155,6 +1160,7 @@ protected Node( b.bind(SystemIndices.class).toInstance(systemIndices); b.bind(IdentityService.class).toInstance(identityService); b.bind(Tracer.class).toInstance(tracer); + b.bind(RemoteClusterStateService.class).toInstance(remoteClusterStateService); }); injector = modules.createInjector(); @@ -1302,7 +1308,8 @@ public Node start() throws NodeValidationException { injector.getInstance(MetaStateService.class), injector.getInstance(MetadataIndexUpgradeService.class), injector.getInstance(MetadataUpgrader.class), - injector.getInstance(PersistedClusterStateService.class) + injector.getInstance(PersistedClusterStateService.class), + injector.getInstance(RemoteClusterStateService.class) ); if (Assertions.ENABLED) { try { diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index ab91099cae11f..2134d1515444c 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -260,7 +260,8 @@ protected void onSendRequest( random, (s, p, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE, - nodeHealthService + nodeHealthService, + () -> new InMemoryPersistedState(term, initialState) // todo ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index c6b44eaa9d364..18c578e712e36 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.cluster.store.RemoteClusterStateService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.BigArrays; @@ -60,6 +61,7 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.env.TestEnvironment; import org.opensearch.node.Node; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -83,6 +85,7 @@ import static org.mockito.Mockito.when; public class GatewayMetaStatePersistedStateTests extends OpenSearchTestCase { + private NodeEnvironment nodeEnvironment; private ClusterName clusterName; private Settings settings; @@ -425,6 +428,17 @@ public void testDataOnlyNodePersistence() throws Exception { new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ); + final RemoteClusterStateService remoteClusterStateService = new RemoteClusterStateService( + () -> new RepositoriesService( + settings, + clusterService, + transportService, + Collections.emptyMap(), + Collections.emptyMap(), + transportService.getThreadPool() + ), + settings + ); gateway.start( settings, transportService, @@ -432,7 +446,8 @@ public void testDataOnlyNodePersistence() throws Exception { new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, - persistedClusterStateService + persistedClusterStateService, + remoteClusterStateService ); final CoordinationState.PersistedState persistedState = gateway.getPersistedState(); assertThat(persistedState, instanceOf(GatewayMetaState.AsyncLucenePersistedState.class)); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 4f7697660096e..a8edfc2753392 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2506,7 +2506,8 @@ public void start(ClusterState initialState) { random(), rerouteService, ElectionStrategy.DEFAULT_INSTANCE, - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + () -> persistedState // todo ); clusterManagerService.setClusterStatePublisher(coordinator); coordinator.start(); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index 8fac407547a9d..9d75f4926b409 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1144,7 +1144,9 @@ protected Optional getDisruptableMockTransport(Transpo Randomness.get(), (s, p, r) -> {}, getElectionStrategy(), - nodeHealthService + nodeHealthService, + this::getPersistedState //todo + ); clusterManagerService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService( diff --git a/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java index 6a3748e55394e..15ab00b8209f5 100644 --- a/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java @@ -32,12 +32,14 @@ package org.opensearch.gateway; +import java.util.Collections; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Manifest; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.MetadataIndexUpgradeService; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.cluster.store.RemoteClusterStateService; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -45,6 +47,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.NodeEnvironment; import org.opensearch.plugins.MetadataUpgrader; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -110,7 +113,10 @@ public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXCont bigArrays, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L - ) + ), + new RemoteClusterStateService( + () -> new RepositoriesService(settings, clusterService, transportService, Collections.emptyMap(), Collections.emptyMap(), + transportService.getThreadPool()), settings) ); } }