Skip to content

Commit

Permalink
[Backport 2.x] Separate Remote State and Publication enabled and conf…
Browse files Browse the repository at this point in the history
…igured methods (#16249)

* Separate Remote State and Publication enabled and configured methods (#16080)

* Separate Remote State and Publication enabled and configured methods

Signed-off-by: Shivansh Arora <[email protected]>

* Revert remote publication method renaming in DiscoveryNode (#16250)

Signed-off-by: Sooraj Sinha <[email protected]>

---------

Signed-off-by: Shivansh Arora <[email protected]>
Signed-off-by: Sooraj Sinha <[email protected]>
Co-authored-by: Shivansh Arora <[email protected]>
  • Loading branch information
soosinha and shiv0408 authored Oct 10, 2024
1 parent c92eb41 commit d93c23e
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.junit.Assert;
import org.junit.Before;

import java.util.Collection;
Expand Down Expand Up @@ -118,6 +119,31 @@ public Settings.Builder remoteWithRoutingTableNodeSetting() {
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true);
}

public void testRemoteClusterStateServiceNotInitialized_WhenNodeAttributesNotPresent() {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);

ensureStableCluster(3);
ensureGreen();

internalCluster().getDataOrClusterManagerNodeInstances(RemoteClusterStateService.class).forEach(Assert::assertNull);
}

public void testServiceInitialized_WhenNodeAttributesPresent() {
internalCluster().startClusterManagerOnlyNode(
buildRemoteStateNodeAttributes(REPOSITORY_NAME, segmentRepoPath, ReloadableFsRepository.TYPE)
);
internalCluster().startDataOnlyNodes(
2,
buildRemoteStateNodeAttributes(REPOSITORY_NAME, segmentRepoPath, ReloadableFsRepository.TYPE)
);

ensureStableCluster(3);
ensureGreen();

internalCluster().getDataOrClusterManagerNodeInstances(RemoteClusterStateService.class).forEach(Assert::assertNotNull);
}

public void testRemotePublishConfigNodeJoinNonRemoteCluster() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,11 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
ensureStableCluster(5);
ensureGreen(INDEX_NAME);

assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
RemoteClusterStateService remoteClusterStateService = internalCluster().getCurrentClusterManagerNodeInstance(
RemoteClusterStateService.class
);

assertFalse(remoteClusterStateService.isRemotePublicationEnabled());
}

public void testRemotePublicationDownloadStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final PersistedStateRegistry persistedStateRegistry;
private final RemoteStoreNodeService remoteStoreNodeService;
private NodeConnectionsService nodeConnectionsService;
private final RemoteClusterStateService remoteClusterStateService;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand Down Expand Up @@ -313,6 +314,7 @@ public Coordinator(
this.persistedStateRegistry = persistedStateRegistry;
this.localNodeCommissioned = true;
this.remoteStoreNodeService = remoteStoreNodeService;
this.remoteClusterStateService = remoteClusterStateService;
}

private ClusterFormationState getClusterFormationState() {
Expand Down Expand Up @@ -913,9 +915,9 @@ public DiscoveryStats stats() {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
});
if (coordinationState.get().isRemotePublicationEnabled()) {
stats.add(publicationHandler.getFullDownloadStats());
stats.add(publicationHandler.getDiffDownloadStats());
if (remoteClusterStateService != null) {
stats.add(remoteClusterStateService.getFullDownloadStats());
stats.add(remoteClusterStateService.getDiffDownloadStats());
}
clusterStateStats.setPersistenceStats(stats);
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,6 @@ public PublishClusterStateStats stats() {
);
}

public PersistedStateStats getFullDownloadStats() {
return remoteClusterStateService.getFullDownloadStats();
}

public PersistedStateStats getDiffDownloadStats() {
return remoteClusterStateService.getDiffDownloadStats();
}

private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) {
ClusterState incomingState;
Expand Down Expand Up @@ -356,7 +348,7 @@ public PublicationContext newPublicationContext(
) {
if (isRemotePublicationEnabled == true) {
if (allNodesRemotePublicationEnabled.get() == false) {
if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
if (validateRemotePublicationConfiguredOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
allNodesRemotePublicationEnabled.set(true);
}
}
Expand All @@ -374,7 +366,7 @@ public PublicationContext newPublicationContext(
return publicationContext;
}

private boolean validateRemotePublicationOnAllNodes(DiscoveryNodes discoveryNodes) {
private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes discoveryNodes) {
assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0;
for (DiscoveryNode node : discoveryNodes.getNodes().values()) {
// if a node is non-remote then created local publication context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ public boolean isRemoteStoreNode() {
}

/**
* Returns whether remote cluster state publication is enabled on this node
* Returns whether settings required for remote cluster state publication is configured
* @return true if the node contains remote cluster state node attribute and remote routing table node attribute
*/
public boolean isRemoteStatePublicationEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured;

/**
* A Service which provides APIs to upload and download cluster metadata from remote store.
Expand Down Expand Up @@ -256,7 +256,7 @@ public RemoteClusterStateService(
List<IndexMetadataUploadListener> indexMetadataUploadListeners,
NamedWriteableRegistry namedWriteableRegistry
) {
assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled";
assert isRemoteClusterStateConfigured(settings) : "Remote cluster state is not configured";
this.nodeId = nodeId;
this.repositoriesService = repositoriesService;
this.settings = settings;
Expand Down Expand Up @@ -1061,7 +1061,7 @@ public void close() throws IOException {
}

public void start() {
assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled";
assert isRemoteClusterStateConfigured(settings) == true : "Remote cluster state is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH;
import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH;
import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStorePathStrategy;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* Uploads the remote store path for all possible combinations of {@link org.opensearch.index.remote.RemoteStoreEnums.DataCategory}
Expand Down Expand Up @@ -235,7 +235,7 @@ private Repository validateAndGetRepository(String repoSetting) {
}

public void start() {
assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled";
assert isRemoteClusterStateConfigured(settings) == true : "Remote cluster state is not configured";
if (isRemoteDataAttributePresent == false) {
// If remote store data attributes are not present than we skip this.
return;
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,9 @@
import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath;
import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used
Expand Down Expand Up @@ -796,7 +796,7 @@ protected Node(
final RemoteClusterStateService remoteClusterStateService;
final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
final RemoteIndexPathUploader remoteIndexPathUploader;
if (isRemoteStoreClusterStateEnabled(settings)) {
if (isRemoteClusterStateConfigured(settings)) {
remoteIndexPathUploader = new RemoteIndexPathUploader(
threadPool,
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public static boolean isRemoteDataAttributePresent(Settings settings) {
|| settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false;
}

public static boolean isRemoteClusterStateAttributePresent(Settings settings) {
public static boolean isRemoteClusterStateConfigured(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)
.isEmpty() == false;
}
Expand All @@ -194,8 +194,7 @@ public static String getRemoteStoreTranslogRepo(Settings settings) {
}

public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings)
&& isRemoteClusterStateAttributePresent(settings);
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) && isRemoteClusterStateConfigured(settings);
}

private static boolean isRemoteRoutingTableAttributePresent(Settings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2838,6 +2838,26 @@ public static Settings buildRemoteStoreNodeAttributes(
);
}

public static Settings buildRemoteStateNodeAttributes(String stateRepoName, Path stateRepoPath, String stateRepoType) {
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
stateRepoName
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
stateRepoName
);
String prefixModeVerificationSuffix = BlobStoreRepository.PREFIX_MODE_VERIFICATION_SETTING.getKey();
Settings.Builder settings = Settings.builder()
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, stateRepoName)
.put(stateRepoTypeAttributeKey, stateRepoType)
.put(stateRepoSettingsAttributeKeyPrefix + "location", stateRepoPath)
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable);
return settings.build();
}

private static Settings buildRemoteStoreNodeAttributes(
String segmentRepoName,
Path segmentRepoPath,
Expand Down Expand Up @@ -2893,16 +2913,6 @@ private static Settings buildRemoteStoreNodeAttributes(
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
translogRepoName
);
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
segmentRepoName
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
segmentRepoName
);
String routingTableRepoAttributeKey = null, routingTableRepoSettingsAttributeKeyPrefix = null;
if (routingTableRepoName != null) {
routingTableRepoAttributeKey = String.format(
Expand All @@ -2928,10 +2938,7 @@ private static Settings buildRemoteStoreNodeAttributes(
.put(translogRepoTypeAttributeKey, translogRepoType)
.put(translogRepoSettingsAttributeKeyPrefix + "location", translogRepoPath)
.put(translogRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable)
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
.put(stateRepoTypeAttributeKey, segmentRepoType)
.put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable);
.put(buildRemoteStateNodeAttributes(segmentRepoName, segmentRepoPath, segmentRepoType));
if (routingTableRepoName != null) {
settings.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName)
.put(routingTableRepoAttributeKey, routingTableRepoType)
Expand Down

0 comments on commit d93c23e

Please sign in to comment.