Skip to content

Commit

Permalink
Make Remote Publication a dynamic setting
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Sep 16, 2024
1 parent 330b249 commit 68c358c
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
Expand All @@ -54,6 +55,7 @@

import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -87,8 +89,8 @@ public CoordinationState(
DiscoveryNode localNode,
PersistedStateRegistry persistedStateRegistry,
ElectionStrategy electionStrategy,
Settings settings
) {
Settings settings,
ClusterSettings clusterSettings) {
this.localNode = localNode;

// persisted state registry
Expand All @@ -105,10 +107,10 @@ public CoordinationState(
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
// ToDo: revisit this check while making the setting dynamic
this.isRemotePublicationEnabled = isRemoteStateEnabled
&& REMOTE_PUBLICATION_SETTING.get(settings)
&& localNode.isRemoteStatePublicationEnabled();
&& localNode.isRemoteStatePublicationConfigured();
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
}

public boolean isRemotePublicationEnabled() {
Expand Down Expand Up @@ -651,6 +653,15 @@ private boolean shouldCommitRemotePersistedState() {
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null;
}

private void setRemotePublicationSetting(boolean remotePublicationSetting) {
if (remotePublicationSetting == false) {
this.isRemotePublicationEnabled = false;
} else {
this.isRemotePublicationEnabled = isRemoteStateEnabled && localNode.isRemoteStatePublicationConfigured();
}

}

/**
* Pluggable persistence layer for {@link CoordinationState}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
private final NodeHealthService nodeHealthService;
private final PersistedStateRegistry persistedStateRegistry;
private final RemoteClusterStateService remoteClusterStateService;
private final RemoteStoreNodeService remoteStoreNodeService;
private final ClusterSettings clusterSettings;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand Down Expand Up @@ -310,6 +312,8 @@ public Coordinator(
this.persistedStateRegistry = persistedStateRegistry;
this.localNodeCommissioned = true;
this.remoteStoreNodeService = remoteStoreNodeService;
this.remoteClusterStateService = remoteClusterStateService;
this.clusterSettings = clusterSettings;
}

private ClusterFormationState getClusterFormationState() {
Expand Down Expand Up @@ -858,7 +862,7 @@ boolean publicationInProgress() {
@Override
protected void doStart() {
synchronized (mutex) {
coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings));
coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings, clusterSettings));
peerFinder.setCurrentTerm(getCurrentTerm());
configuredHostsResolver.start();
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
Expand Down Expand Up @@ -903,9 +907,9 @@ public DiscoveryStats stats() {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
});
if (coordinationState.get().isRemotePublicationEnabled()) {
stats.add(publicationHandler.getFullDownloadStats());
stats.add(publicationHandler.getDiffDownloadStats());
if (remoteClusterStateService != null) {
stats.add(remoteClusterStateService.getFullDownloadStats());
stats.add(remoteClusterStateService.getDiffDownloadStats());
}
clusterStateStats.setPersistenceStats(stats);
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,10 @@ private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joi

assert existingNodes.isEmpty() == false;
Optional<DiscoveryNode> remotePublicationNode = existingNodes.stream()
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
.filter(DiscoveryNode::isRemoteStatePublicationConfigured)
.findFirst();

if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) {
if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationConfigured()) {
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,6 @@ public PublishClusterStateStats stats() {
);
}

public PersistedStateStats getFullDownloadStats() {
return remoteClusterStateService.getFullDownloadStats();
}

public PersistedStateStats getDiffDownloadStats() {
return remoteClusterStateService.getDiffDownloadStats();
}

private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) {
ClusterState incomingState;
Expand Down Expand Up @@ -356,7 +348,7 @@ public PublicationContext newPublicationContext(
) {
if (isRemotePublicationEnabled == true) {
if (allNodesRemotePublicationEnabled.get() == false) {
if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
if (validateRemotePublicationConfiguredOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
allNodesRemotePublicationEnabled.set(true);
}
}
Expand All @@ -374,11 +366,11 @@ public PublicationContext newPublicationContext(
return publicationContext;
}

private boolean validateRemotePublicationOnAllNodes(DiscoveryNodes discoveryNodes) {
private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes discoveryNodes) {
assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0;
for (DiscoveryNode node : discoveryNodes.getNodes().values()) {
// if a node is non-remote then created local publication context
if (node.isRemoteStatePublicationEnabled() == false) {
if (node.isRemoteStatePublicationConfigured() == false) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ public boolean isRemoteStoreNode() {
* Returns whether remote cluster state publication is enabled on this node
* @return true if the node contains remote cluster state node attribute and remote routing table node attribute
*/
public boolean isRemoteStatePublicationEnabled() {
public boolean isRemoteStatePublicationConfigured() {
return this.getAttributes()
.keySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand All @@ -132,7 +133,7 @@ public class RemoteClusterStateService implements Closeable {
REMOTE_PUBLICATION_SETTING_KEY,
false,
Property.NodeScope,
Property.Final
Property.Dynamic
);

/**
Expand Down Expand Up @@ -232,7 +233,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) {
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
private final boolean isPublicationEnabled;
private boolean isPublicationEnabled;
private final String remotePathPrefix;

private final RemoteClusterStateCache remoteClusterStateCache;
Expand Down Expand Up @@ -273,9 +274,10 @@ public RemoteClusterStateService(
this.remoteStateStats = new RemotePersistenceStats();
this.namedWriteableRegistry = namedWriteableRegistry;
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.isPublicationEnabled = REMOTE_PUBLICATION_SETTING.get(settings)
this.isPublicationEnabled = clusterSettings.get(REMOTE_PUBLICATION_SETTING)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings);
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
Expand Down Expand Up @@ -1109,6 +1111,14 @@ private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteCl
this.remoteClusterStateValidationMode = remoteClusterStateValidationMode;
}

private void setRemotePublicationSetting(boolean remotePublicationSetting) {
if (remotePublicationSetting == false) {
this.isPublicationEnabled = false;
} else {
this.isPublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings);
}
}

// Package private for unit test
RemoteRoutingTableService getRemoteRoutingTableService() {
return this.remoteRoutingTableService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1283,7 +1283,7 @@ public static CoordinationState createCoordinationState(
DiscoveryNode localNode,
Settings settings
) {
return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings);
return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings, null);
}

public static ClusterState clusterState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ public void testPrevotingIndicatesElectionSuccess() {
localNode,
persistedStateRegistry,
ElectionStrategy.DEFAULT_INSTANCE,
Settings.EMPTY
);
Settings.EMPTY,
null);

final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class MockNode {
);
PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialState));
coordinationState = new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, Settings.EMPTY);
coordinationState = new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, Settings.EMPTY, null);
}

final DiscoveryNode localNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ static class ClusterNode {
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState);

this.electionStrategy = electionStrategy;
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY);
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null);
}

void reboot() {
Expand Down Expand Up @@ -189,7 +189,7 @@ void reboot() {
localNode.getVersion()
);

state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY);
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null);
}

void setInitialState(CoordinationMetadata.VotingConfiguration initialConfig, long initialValue) {
Expand Down

0 comments on commit 68c358c

Please sign in to comment.