Skip to content

Commit

Permalink
Integrate remote cluster state in publish/commit flow
Browse files Browse the repository at this point in the history
  • Loading branch information
soosinha committed Aug 22, 2023
1 parent a4f1137 commit c09471e
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@

import static org.opensearch.cluster.coordination.NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_ID;
import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.cluster.store.RemoteClusterStateService.CLUSTER_REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY;
Expand Down Expand Up @@ -148,6 +149,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final JoinHelper joinHelper;
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
private final Supplier<CoordinationState.PersistedState> persistedStateSupplier;
private final Supplier<CoordinationState.PersistedState> remotePersistedStateSupplier;
private final NoClusterManagerBlockService noClusterManagerBlockService;
final Object mutex = new Object(); // package-private to allow tests to call methods that assert that the mutex is held
private final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); // initialized on start-up (see doStart)
Expand Down Expand Up @@ -201,7 +203,8 @@ public Coordinator(
Random random,
RerouteService rerouteService,
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
Supplier<CoordinationState.PersistedState> remotePersistedStateSupplier
) {
this.settings = settings;
this.transportService = transportService;
Expand All @@ -226,6 +229,7 @@ public Coordinator(
namedWriteableRegistry
);
this.persistedStateSupplier = persistedStateSupplier;
this.remotePersistedStateSupplier = remotePersistedStateSupplier;
this.noClusterManagerBlockService = new NoClusterManagerBlockService(settings, clusterSettings);
this.lastKnownLeader = Optional.empty();
this.lastJoin = Optional.empty();
Expand Down Expand Up @@ -988,8 +992,8 @@ public boolean isInitialConfigurationSet() {
}

/**
* Sets the initial configuration to the given {@link VotingConfiguration}. This method is safe to call
* more than once, as long as the argument to each call is the same.
* Sets the initial configuration to the given {@link VotingConfiguration}. This method is safe to call more than once, as long as the argument to each call
* is the same.
*
* @param votingConfiguration The nodes that should form the initial configuration.
* @return whether this call successfully set the initial configuration - if false, the cluster has already been bootstrapped.
Expand Down Expand Up @@ -1103,9 +1107,9 @@ ClusterState improveConfiguration(ClusterState clusterState) {
}

/*
* Valid Voting Configuration Exclusion state criteria:
* 1. Every voting config exclusion with an ID of _absent_ should not match any nodes currently in the cluster by name
* 2. Every voting config exclusion with a name of _absent_ should not match any nodes currently in the cluster by ID
* Valid Voting Configuration Exclusion state criteria:
* 1. Every voting config exclusion with an ID of _absent_ should not match any nodes currently in the cluster by name
* 2. Every voting config exclusion with a name of _absent_ should not match any nodes currently in the cluster by ID
*/
static boolean validVotingConfigExclusionState(ClusterState clusterState) {
Set<VotingConfigExclusion> votingConfigExclusions = clusterState.getVotingConfigExclusions();
Expand Down Expand Up @@ -1308,6 +1312,12 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
leaderChecker.setCurrentNodes(publishNodes);
followersChecker.setCurrentNodes(publishNodes);
lagDetector.setTrackedNodes(publishNodes);
if (clusterState.nodes().isLocalNodeElectedClusterManager()
&& CLUSTER_REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) {
CoordinationState.PersistedState remotePersistedState = remotePersistedStateSupplier.get();
assert remotePersistedState != null : "Remote state has not been initialized";
remotePersistedState.setLastAcceptedState(clusterState);
}
publication.start(followersChecker.getFaultyNodes());
}
} catch (Exception e) {
Expand Down Expand Up @@ -1511,8 +1521,8 @@ public Iterable<DiscoveryNode> getFoundPeers() {
}

/**
* If there is any current committed publication, this method cancels it.
* This method is used exclusively by tests.
* If there is any current committed publication, this method cancels it. This method is used exclusively by tests.
*
* @return true if publication was cancelled, false if there is no current committed publication.
*/
boolean cancelCommittedPublication() {
Expand Down Expand Up @@ -1760,7 +1770,16 @@ protected boolean isPublishQuorum(VoteCollection votes) {
protected Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert getCurrentTerm() >= publishResponse.getTerm();
return coordinationState.get().handlePublishResponse(sourceNode, publishResponse);
final Optional<ApplyCommitRequest> optionalApplyCommitRequest = coordinationState.get()
.handlePublishResponse(sourceNode, publishResponse);
optionalApplyCommitRequest.ifPresent(applyCommitRequest -> {
if (CLUSTER_REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) {
final CoordinationState.PersistedState remotePersistedState = remotePersistedStateSupplier.get();
assert remotePersistedState != null : "Remote state has not been initialized";
remotePersistedState.markLastAcceptedStateAsCommitted();
}
});
return optionalApplyCommitRequest;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ public DiscoveryModule(
new Random(Randomness.get().nextLong()),
rerouteService,
electionStrategy,
nodeHealthService
nodeHealthService,
gatewayMetaState::getRemotePersistedState
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,20 @@ public class GatewayMetaState implements Closeable {

// Set by calling start()
private final SetOnce<PersistedState> persistedState = new SetOnce<>();
private final SetOnce<PersistedState> remotePersistedState = new SetOnce<>();

public PersistedState getPersistedState() {
final PersistedState persistedState = this.persistedState.get();
assert persistedState != null : "not started";
return persistedState;
}

public PersistedState getRemotePersistedState() {
final PersistedState persistedState = this.remotePersistedState.get();
assert persistedState != null : "not started";
return persistedState;
}

public Metadata getMetadata() {
return getPersistedState().getLastAcceptedState().metadata();
}
Expand All @@ -122,7 +129,8 @@ public void start(
MetaStateService metaStateService,
MetadataIndexUpgradeService metadataIndexUpgradeService,
MetadataUpgrader metadataUpgrader,
PersistedClusterStateService persistedClusterStateService
PersistedClusterStateService persistedClusterStateService,
RemoteClusterStateService remoteClusterStateService
) {
assert persistedState.get() == null : "should only start once, but already have " + persistedState.get();

Expand All @@ -146,6 +154,7 @@ public void start(
}

PersistedState persistedState = null;
PersistedState remotePersistedState = null;
boolean success = false;
try {
final ClusterState clusterState = prepareInitialClusterState(
Expand All @@ -159,6 +168,7 @@ public void start(

if (DiscoveryNode.isClusterManagerNode(settings)) {
persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
remotePersistedState = new RemotePersistedState(remoteClusterStateService);
} else {
persistedState = new AsyncLucenePersistedState(
settings,
Expand All @@ -184,6 +194,7 @@ public void start(
}

this.persistedState.set(persistedState);
this.remotePersistedState.set(remotePersistedState);
} catch (IOException e) {
throw new OpenSearchException("failed to load metadata", e);
}
Expand Down
9 changes: 8 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.cluster.routing.allocation.DiskThresholdMonitor;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.cluster.store.RemoteClusterStateService;
import org.opensearch.common.SetOnce;
import org.opensearch.common.StopWatch;
import org.opensearch.common.inject.Injector;
Expand Down Expand Up @@ -669,6 +670,10 @@ protected Node(
clusterService.getClusterSettings(),
threadPool::relativeTimeInMillis
);
final RemoteClusterStateService remoteClusterStateService = new RemoteClusterStateService(
repositoriesServiceReference::get,
settings
);

// collect engine factory providers from plugins
final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);
Expand Down Expand Up @@ -1155,6 +1160,7 @@ protected Node(
b.bind(SystemIndices.class).toInstance(systemIndices);
b.bind(IdentityService.class).toInstance(identityService);
b.bind(Tracer.class).toInstance(tracer);
b.bind(RemoteClusterStateService.class).toInstance(remoteClusterStateService);
});
injector = modules.createInjector();

Expand Down Expand Up @@ -1302,7 +1308,8 @@ public Node start() throws NodeValidationException {
injector.getInstance(MetaStateService.class),
injector.getInstance(MetadataIndexUpgradeService.class),
injector.getInstance(MetadataUpgrader.class),
injector.getInstance(PersistedClusterStateService.class)
injector.getInstance(PersistedClusterStateService.class),
injector.getInstance(RemoteClusterStateService.class)
);
if (Assertions.ENABLED) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ protected void onSendRequest(
random,
(s, p, r) -> {},
ElectionStrategy.DEFAULT_INSTANCE,
nodeHealthService
nodeHealthService,
() -> new InMemoryPersistedState(term, initialState) // todo
);
transportService.start();
transportService.acceptIncomingRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.cluster.store.RemoteClusterStateService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.BigArrays;
Expand All @@ -60,6 +61,7 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.env.TestEnvironment;
import org.opensearch.node.Node;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -83,6 +85,7 @@
import static org.mockito.Mockito.when;

public class GatewayMetaStatePersistedStateTests extends OpenSearchTestCase {

private NodeEnvironment nodeEnvironment;
private ClusterName clusterName;
private Settings settings;
Expand Down Expand Up @@ -425,14 +428,26 @@ public void testDataOnlyNodePersistence() throws Exception {
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
() -> 0L
);
final RemoteClusterStateService remoteClusterStateService = new RemoteClusterStateService(
() -> new RepositoriesService(
settings,
clusterService,
transportService,
Collections.emptyMap(),
Collections.emptyMap(),
transportService.getThreadPool()
),
settings
);
gateway.start(
settings,
transportService,
clusterService,
new MetaStateService(nodeEnvironment, xContentRegistry()),
null,
null,
persistedClusterStateService
persistedClusterStateService,
remoteClusterStateService
);
final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
assertThat(persistedState, instanceOf(GatewayMetaState.AsyncLucenePersistedState.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2506,7 +2506,8 @@ public void start(ClusterState initialState) {
random(),
rerouteService,
ElectionStrategy.DEFAULT_INSTANCE,
() -> new StatusInfo(HEALTHY, "healthy-info")
() -> new StatusInfo(HEALTHY, "healthy-info"),
() -> persistedState // todo
);
clusterManagerService.setClusterStatePublisher(coordinator);
coordinator.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,9 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
Randomness.get(),
(s, p, r) -> {},
getElectionStrategy(),
nodeHealthService
nodeHealthService,
this::getPersistedState //todo

);
clusterManagerService.setClusterStatePublisher(coordinator);
final GatewayService gatewayService = new GatewayService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,22 @@

package org.opensearch.gateway;

import java.util.Collections;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Manifest;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.MetadataIndexUpgradeService;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.cluster.store.RemoteClusterStateService;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.BigArrays;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugins.MetadataUpgrader;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -110,7 +113,10 @@ public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXCont
bigArrays,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
() -> 0L
)
),
new RemoteClusterStateService(
() -> new RepositoriesService(settings, clusterService, transportService, Collections.emptyMap(), Collections.emptyMap(),
transportService.getThreadPool()), settings)
);
}
}

0 comments on commit c09471e

Please sign in to comment.