Skip to content

Commit

Permalink
read cluster state custom from diff
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Jun 5, 2024
1 parent 7f2391c commit caf61a6
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,10 @@ public boolean isDiscoveryNodesUpdated() {
return discoveryNodesUpdated;
}

public boolean isHashesOfConsistentSettingsUpdated() {
return hashesOfConsistentSettingsUpdated;
}

public List<String> getIndicesRoutingUpdated() {
return indicesRoutingUpdated;
}
Expand All @@ -467,6 +471,14 @@ public List<String> getIndicesRoutingDeleted() {
return indicesRoutingDeleted;
}

public List<String> getClusterStateCustomUpdated() {
return clusterStateCustomUpdated;
}

public List<String> getClusterStateCustomDeleted() {
return clusterStateCustomDeleted;
}

public static Builder builder() {
return new Builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Set;

import static org.opensearch.gateway.remote.model.RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM;
import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER;

public class RemoteClusterStateAttributesManager {
public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute";
Expand Down Expand Up @@ -100,16 +101,17 @@ public CheckedRunnable<IOException> getAsyncMetadataReadAction(
String uploadedFilename,
LatchedActionListener<RemoteReadResult> listener
) {
ActionListener actionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)), listener::onFailure);
final ActionListener actionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)), listener::onFailure);
if (component.equals(RemoteDiscoveryNodes.DISCOVERY_NODES)) {
RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(uploadedFilename, clusterUUID, compressor, namedXContentRegistry);
return () -> discoveryNodesBlobStore.readAsync(remoteDiscoveryNodes, actionListener);
} else if (component.equals(RemoteClusterBlocks.CLUSTER_BLOCKS)) {
RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(uploadedFilename, clusterUUID, compressor, namedXContentRegistry);
return () -> clusterBlocksBlobStore.readAsync(remoteClusterBlocks, actionListener);
} else if (component.equals(CLUSTER_STATE_CUSTOM)) {
final ActionListener customActionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, String.join(CUSTOM_DELIMITER, component, componentName))), listener::onFailure);
RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms(uploadedFilename, componentName, clusterUUID, compressor, namedXContentRegistry);
return () -> customsBlobStore.readAsync(remoteClusterStateCustoms, actionListener);
return () -> customsBlobStore.readAsync(remoteClusterStateCustoms, customActionListener);
} else {
throw new RemoteStateTransferException("Remote object not found for "+ component);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,12 +637,11 @@ private UploadedMetadataResults writeMetadataInParallel(
);
});
clusterStateCustomToUpload.forEach((key, value) -> {
String customComponent = String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, key);
uploadTasks.put(
customComponent,
key,
remoteClusterStateAttributesManager.getAsyncMetadataWriteAction(
clusterState,
customComponent,
key,
value,
listener
)
Expand Down Expand Up @@ -977,11 +976,14 @@ private ClusterState readClusterStateInParallel(
boolean readTemplatesMetadata,
boolean readDiscoveryNodes,
boolean readClusterBlocks,
List<UploadedIndexMetadata> indicesRoutingToRead
List<UploadedIndexMetadata> indicesRoutingToRead,
boolean readHashesOfConsistentSettings,
Map<String, UploadedMetadataAttribute> clusterStateCustomToRead
) throws IOException {
int totalReadTasks =
indicesToRead.size() + customToRead.size() + indicesRoutingToRead.size() + (readCoordinationMetadata ? 1 : 0) + (readSettingsMetadata ? 1 : 0) + (
readTemplatesMetadata ? 1 : 0) + (readDiscoveryNodes ? 1 : 0) + (readClusterBlocks ? 1 : 0) + (readTransientSettingsMetadata ? 1 : 0);
readTemplatesMetadata ? 1 : 0) + (readDiscoveryNodes ? 1 : 0) + (readClusterBlocks ? 1 : 0) + (readTransientSettingsMetadata ? 1 : 0) + (readHashesOfConsistentSettings ? 1 : 0)
+ clusterStateCustomToRead.size();
CountDownLatch latch = new CountDownLatch(totalReadTasks);
List<CheckedRunnable<IOException>> asyncMetadataReadActions = new ArrayList<>();
List<RemoteReadResult> readResults = Collections.synchronizedList(new ArrayList<>());
Expand Down Expand Up @@ -1120,6 +1122,30 @@ private ClusterState readClusterStateInParallel(
);
}

if (readHashesOfConsistentSettings) {
asyncMetadataReadActions.add(
remoteGlobalMetadataManager.getAsyncMetadataReadAction(
clusterUUID,
HASHES_OF_CONSISTENT_SETTINGS,
HASHES_OF_CONSISTENT_SETTINGS,
manifest.getHashesOfConsistentSettings().getUploadedFilename(),
listener
)
);
}

for (Map.Entry<String, UploadedMetadataAttribute> entry : clusterStateCustomToRead.entrySet()) {
asyncMetadataReadActions.add(
remoteClusterStateAttributesManager.getAsyncMetadataReadAction(
clusterUUID,
CLUSTER_STATE_CUSTOM,
entry.getKey(),
entry.getValue().getUploadedFilename(),
listener
)
);
}

for (CheckedRunnable<IOException> asyncMetadataReadAction : asyncMetadataReadActions) {
asyncMetadataReadAction.run();
}
Expand Down Expand Up @@ -1175,11 +1201,17 @@ private ClusterState readClusterStateInParallel(
case TEMPLATES_METADATA:
metadataBuilder.templates((TemplatesMetadata) remoteReadResult.getObj());
break;
case HASHES_OF_CONSISTENT_SETTINGS:
metadataBuilder.hashesOfConsistentSettings((DiffableStringMap) remoteReadResult.getObj());
case CLUSTER_STATE_ATTRIBUTE:
if (remoteReadResult.getComponentName().equals(DISCOVERY_NODES)) {
discoveryNodesBuilder.set(DiscoveryNodes.builder((DiscoveryNodes) remoteReadResult.getObj()));
} else if (remoteReadResult.getComponentName().equals(CLUSTER_BLOCKS)) {
clusterStateBuilder.blocks((ClusterBlocks) remoteReadResult.getObj());
} else if (remoteReadResult.getComponentName().startsWith(CLUSTER_STATE_CUSTOM)) {
// component name for mat is "cluster-state-custom--custom_name"
String custom = remoteReadResult.getComponentName().split(CUSTOM_DELIMITER)[1];
clusterStateBuilder.putCustom(custom, (ClusterState.Custom) remoteReadResult.getObj());
}
break;
default:
Expand Down Expand Up @@ -1218,7 +1250,9 @@ public ClusterState getClusterStateForManifest(String clusterName, ClusterMetada
manifest.getTemplatesMetadata() != null,
includeEphemeral && manifest.getDiscoveryNodesMetadata() != null,
includeEphemeral && manifest.getClusterBlocksMetadata() != null,
includeEphemeral ? manifest.getIndicesRouting() : Collections.emptyList()
includeEphemeral ? manifest.getIndicesRouting() : Collections.emptyList(),
includeEphemeral && manifest.getHashesOfConsistentSettings() != null,
includeEphemeral ? manifest.getClusterStateCustomMap() : Collections.emptyMap()
);
}

Expand All @@ -1242,6 +1276,12 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata
updatedCustomMetadata.put(customType, manifest.getCustomMetadataMap().get(customType));
}
}
Map<String, UploadedMetadataAttribute> updatedClusterStateCustom = new HashMap<>();
if (diff.getClusterStateCustomUpdated() != null) {
for (String customType : diff.getClusterStateCustomUpdated()) {
updatedClusterStateCustom.put(customType, manifest.getClusterStateCustomMap().get(customType));
}
}
ClusterState updatedClusterState = readClusterStateInParallel(
previousState,
manifest,
Expand All @@ -1256,7 +1296,9 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata
diff.isTemplatesMetadataUpdated(),
diff.isDiscoveryNodesUpdated(),
diff.isClusterBlocksUpdated(),
updatedIndexRouting
updatedIndexRouting,
diff.isHashesOfConsistentSettingsUpdated(),
updatedClusterStateCustom
);
ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState);
Metadata.Builder metadataBuilder = Metadata.builder(updatedClusterState.metadata());
Expand All @@ -1271,6 +1313,13 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata
}
}

// remove the deleted cluster state customs from the metadata
if (diff.getClusterStateCustomDeleted() != null) {
for (String customType : diff.getClusterStateCustomDeleted()) {
clusterStateBuilder.removeCustom(customType);
}
}

HashMap<String, IndexRoutingTable> indexRoutingTables = new HashMap<>(updatedClusterState.getRoutingTable().getIndicesRouting());

for (String indexName : diff.getIndicesRoutingDeleted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ ClusterMetadataManifest uploadManifest(
.routingTableVersion(clusterState.getRoutingTable().version())
.indicesRouting(uploadedMetadataResult.uploadedIndicesRoutingMetadata)
.metadataVersion(clusterState.metadata().version())
.transientSettingsMetadata(uploadedMetadataResult.uploadedTransientSettingsMetadata);
.transientSettingsMetadata(uploadedMetadataResult.uploadedTransientSettingsMetadata)
.clusterStateCustomMetadataMap(uploadedMetadataResult.uploadedClusterStateCustomMetadataMap)
.hashesOfConsistentSettings(uploadedMetadataResult.uploadedHashesOfConsistentSettings);
final ClusterMetadataManifest manifest = manifestBuilder.build();
writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest);
return manifest;
Expand Down

0 comments on commit caf61a6

Please sign in to comment.