Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate Remote State and Publication enabled and configured methods #16080

Merged
merged 2 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.junit.Assert;
import org.junit.Before;

import java.util.Collection;
Expand Down Expand Up @@ -118,6 +119,31 @@ public Settings.Builder remoteWithRoutingTableNodeSetting() {
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true);
}

public void testRemoteClusterStateServiceNotInitialized_WhenNodeAttributesNotPresent() {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);

ensureStableCluster(3);
ensureGreen();

internalCluster().getDataOrClusterManagerNodeInstances(RemoteClusterStateService.class).forEach(Assert::assertNull);
}

public void testServiceInitialized_WhenNodeAttributesPresent() {
internalCluster().startClusterManagerOnlyNode(
buildRemoteStateNodeAttributes(REPOSITORY_NAME, segmentRepoPath, ReloadableFsRepository.TYPE)
);
internalCluster().startDataOnlyNodes(
2,
buildRemoteStateNodeAttributes(REPOSITORY_NAME, segmentRepoPath, ReloadableFsRepository.TYPE)
);

ensureStableCluster(3);
ensureGreen();

internalCluster().getDataOrClusterManagerNodeInstances(RemoteClusterStateService.class).forEach(Assert::assertNotNull);
}

public void testRemotePublishConfigNodeJoinNonRemoteCluster() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,11 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
ensureStableCluster(5);
ensureGreen(INDEX_NAME);

assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
RemoteClusterStateService remoteClusterStateService = internalCluster().getCurrentClusterManagerNodeInstance(
shiv0408 marked this conversation as resolved.
Show resolved Hide resolved
RemoteClusterStateService.class
);

assertFalse(remoteClusterStateService.isRemotePublicationEnabled());
}

public void testRemotePublicationDownloadStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public CoordinationState(
// ToDo: revisit this check while making the setting dynamic
this.isRemotePublicationEnabled = isRemoteStateEnabled
&& REMOTE_PUBLICATION_SETTING.get(settings)
&& localNode.isRemoteStatePublicationEnabled();
&& localNode.isRemoteStatePublicationConfigured();
}

public boolean isRemotePublicationEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@
private final PersistedStateRegistry persistedStateRegistry;
private final RemoteStoreNodeService remoteStoreNodeService;
private NodeConnectionsService nodeConnectionsService;
private final RemoteClusterStateService remoteClusterStateService;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand Down Expand Up @@ -312,6 +313,7 @@
this.persistedStateRegistry = persistedStateRegistry;
this.localNodeCommissioned = true;
this.remoteStoreNodeService = remoteStoreNodeService;
this.remoteClusterStateService = remoteClusterStateService;
}

private ClusterFormationState getClusterFormationState() {
Expand Down Expand Up @@ -912,9 +914,9 @@
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
});
if (coordinationState.get().isRemotePublicationEnabled()) {
stats.add(publicationHandler.getFullDownloadStats());
stats.add(publicationHandler.getDiffDownloadStats());
if (remoteClusterStateService != null) {
stats.add(remoteClusterStateService.getFullDownloadStats());
stats.add(remoteClusterStateService.getDiffDownloadStats());

Check warning on line 919 in server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java#L918-L919

Added lines #L918 - L919 were not covered by tests
}
clusterStateStats.setPersistenceStats(stats);
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,10 @@ private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joi

assert existingNodes.isEmpty() == false;
Optional<DiscoveryNode> remotePublicationNode = existingNodes.stream()
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
.filter(DiscoveryNode::isRemoteStatePublicationConfigured)
.findFirst();

if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) {
if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationConfigured()) {
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,6 @@ public PublishClusterStateStats stats() {
);
}

public PersistedStateStats getFullDownloadStats() {
return remoteClusterStateService.getFullDownloadStats();
}

public PersistedStateStats getDiffDownloadStats() {
return remoteClusterStateService.getDiffDownloadStats();
}

private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) {
ClusterState incomingState;
Expand Down Expand Up @@ -356,7 +348,7 @@ public PublicationContext newPublicationContext(
) {
if (isRemotePublicationEnabled == true) {
if (allNodesRemotePublicationEnabled.get() == false) {
if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
if (validateRemotePublicationConfiguredOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
allNodesRemotePublicationEnabled.set(true);
}
}
Expand All @@ -374,11 +366,11 @@ public PublicationContext newPublicationContext(
return publicationContext;
}

private boolean validateRemotePublicationOnAllNodes(DiscoveryNodes discoveryNodes) {
private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes discoveryNodes) {
assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0;
for (DiscoveryNode node : discoveryNodes.getNodes().values()) {
// if a node is non-remote then created local publication context
if (node.isRemoteStatePublicationEnabled() == false) {
if (node.isRemoteStatePublicationConfigured() == false) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,10 @@ public boolean isRemoteStoreNode() {
}

/**
* Returns whether remote cluster state publication is enabled on this node
* Returns whether settings required for remote cluster state publication is configured
* @return true if the node contains remote cluster state node attribute and remote routing table node attribute
*/
public boolean isRemoteStatePublicationEnabled() {
public boolean isRemoteStatePublicationConfigured() {
return this.getAttributes()
.keySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured;

/**
* A Service which provides APIs to upload and download cluster metadata from remote store.
Expand Down Expand Up @@ -256,7 +256,7 @@ public RemoteClusterStateService(
List<IndexMetadataUploadListener> indexMetadataUploadListeners,
NamedWriteableRegistry namedWriteableRegistry
) {
assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled";
assert isRemoteClusterStateConfigured(settings) : "Remote cluster state is not configured";
this.nodeId = nodeId;
this.repositoriesService = repositoriesService;
this.settings = settings;
Expand Down Expand Up @@ -1055,7 +1055,7 @@ public void close() throws IOException {
}

public void start() {
assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled";
assert isRemoteClusterStateConfigured(settings) == true : "Remote cluster state is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH;
import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH;
import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStorePathStrategy;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* Uploads the remote store path for all possible combinations of {@link org.opensearch.index.remote.RemoteStoreEnums.DataCategory}
Expand Down Expand Up @@ -235,7 +235,7 @@ private Repository validateAndGetRepository(String repoSetting) {
}

public void start() {
assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled";
assert isRemoteClusterStateConfigured(settings) == true : "Remote cluster state is not configured";
if (isRemoteDataAttributePresent == false) {
// If remote store data attributes are not present than we skip this.
return;
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,9 @@
import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath;
import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used
Expand Down Expand Up @@ -796,7 +796,7 @@ protected Node(
final RemoteClusterStateService remoteClusterStateService;
final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
final RemoteIndexPathUploader remoteIndexPathUploader;
if (isRemoteStoreClusterStateEnabled(settings)) {
if (isRemoteClusterStateConfigured(settings)) {
shiv0408 marked this conversation as resolved.
Show resolved Hide resolved
remoteIndexPathUploader = new RemoteIndexPathUploader(
threadPool,
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public static boolean isRemoteDataAttributePresent(Settings settings) {
|| settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false;
}

public static boolean isRemoteClusterStateAttributePresent(Settings settings) {
public static boolean isRemoteClusterStateConfigured(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)
.isEmpty() == false;
}
Expand All @@ -194,8 +194,7 @@ public static String getRemoteStoreTranslogRepo(Settings settings) {
}

public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings)
&& isRemoteClusterStateAttributePresent(settings);
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) && isRemoteClusterStateConfigured(settings);
}

private static boolean isRemoteRoutingTableAttributePresent(Settings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2795,6 +2795,26 @@ public static Settings buildRemoteStoreNodeAttributes(
);
}

public static Settings buildRemoteStateNodeAttributes(String stateRepoName, Path stateRepoPath, String stateRepoType) {
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
stateRepoName
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
stateRepoName
);
String prefixModeVerificationSuffix = BlobStoreRepository.PREFIX_MODE_VERIFICATION_SETTING.getKey();
Settings.Builder settings = Settings.builder()
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, stateRepoName)
.put(stateRepoTypeAttributeKey, stateRepoType)
.put(stateRepoSettingsAttributeKeyPrefix + "location", stateRepoPath)
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable);
return settings.build();
}

private static Settings buildRemoteStoreNodeAttributes(
String segmentRepoName,
Path segmentRepoPath,
Expand Down Expand Up @@ -2850,16 +2870,6 @@ private static Settings buildRemoteStoreNodeAttributes(
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
translogRepoName
);
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
segmentRepoName
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
segmentRepoName
);
String routingTableRepoAttributeKey = null, routingTableRepoSettingsAttributeKeyPrefix = null;
if (routingTableRepoName != null) {
routingTableRepoAttributeKey = String.format(
Expand All @@ -2885,10 +2895,7 @@ private static Settings buildRemoteStoreNodeAttributes(
.put(translogRepoTypeAttributeKey, translogRepoType)
.put(translogRepoSettingsAttributeKeyPrefix + "location", translogRepoPath)
.put(translogRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable)
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
.put(stateRepoTypeAttributeKey, segmentRepoType)
.put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable);
.put(buildRemoteStateNodeAttributes(segmentRepoName, segmentRepoPath, segmentRepoType));
if (routingTableRepoName != null) {
settings.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName)
.put(routingTableRepoAttributeKey, routingTableRepoType)
Expand Down
Loading