Skip to content

Commit

Permalink
wip commit to read and write objects for remote publication
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Jun 8, 2024
1 parent 7c61fe0 commit 4671704
Show file tree
Hide file tree
Showing 7 changed files with 840 additions and 344 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,10 @@ public static boolean isTemplatesMetadataEqual(Metadata metadata1, Metadata meta
return metadata1.templates.equals(metadata2.templates);
}

public static boolean isHashesOfConsistentSettingsEqual(Metadata metadata1, Metadata metadata2) {
return metadata1.hashesOfConsistentSettings.equals(metadata2.hashesOfConsistentSettings);
}

public static boolean isCustomMetadataEqual(Metadata metadata1, Metadata metadata2) {
int customCount1 = 0;
for (Map.Entry<String, Custom> cursor : metadata1.customs.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,34 @@

package org.opensearch.gateway.remote;

import java.io.IOException;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterState.Custom;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.RemoteClusterStateUtils.RemoteStateTransferException;
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms;
import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes;
import org.opensearch.gateway.remote.model.RemoteReadResult;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER;
import static org.opensearch.gateway.remote.model.RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM;
import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER;

/**
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
*
* @opensearch.internal
*/
public class RemoteClusterStateAttributesManager {
public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute";
public static final String DISCOVERY_NODES = "nodes";
Expand Down Expand Up @@ -75,29 +70,14 @@ public class RemoteClusterStateAttributesManager {
CheckedRunnable<IOException> getAsyncMetadataWriteAction(
ClusterState clusterState,
String component,
ToXContent componentData,
Object componentData,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
if (componentData instanceof DiscoveryNodes) {
RemoteDiscoveryNodes remoteObject = new RemoteDiscoveryNodes(
(DiscoveryNodes) componentData,
clusterState.version(),
clusterState.metadata().clusterUUID(),
compressor,
namedXContentRegistry
);
return () -> discoveryNodesBlobStore.writeAsync(
remoteObject,
getActionListener(component, remoteObject, latchedActionListener)
);
RemoteDiscoveryNodes remoteObject = new RemoteDiscoveryNodes((DiscoveryNodes)componentData, clusterState.version(), clusterState.metadata().clusterUUID(), compressor, namedXContentRegistry);
return () -> discoveryNodesBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener));
} else if (componentData instanceof ClusterBlocks) {
RemoteClusterBlocks remoteObject = new RemoteClusterBlocks(
(ClusterBlocks) componentData,
clusterState.version(),
clusterState.metadata().clusterUUID(),
compressor,
namedXContentRegistry
);
RemoteClusterBlocks remoteObject = new RemoteClusterBlocks((ClusterBlocks) componentData, clusterState.version(), clusterState.metadata().clusterUUID(), compressor, namedXContentRegistry);
return () -> clusterBlocksBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener));
} else if (componentData instanceof ClusterState.Custom) {
RemoteClusterStateCustoms remoteObject = new RemoteClusterStateCustoms(
Expand All @@ -111,17 +91,15 @@ CheckedRunnable<IOException> getAsyncMetadataWriteAction(
);
return () -> customsBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener));
} else {
throw new RemoteStateTransferException("Remote object not found for " + componentData.getClass());
throw new RemoteStateTransferException("Remote object not found for "+ componentData.getClass());
}
}

private ActionListener<Void> getActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
private ActionListener<Void> getActionListener(String component, AbstractRemoteWritableBlobEntity remoteObject, LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener) {
return ActionListener.wrap(
resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()),
resp -> latchedActionListener.onResponse(
remoteObject.getUploadedMetadata()
),
ex -> latchedActionListener.onFailure(new RemoteClusterStateUtils.RemoteStateTransferException(component, ex))
);
}
Expand All @@ -133,37 +111,15 @@ public CheckedRunnable<IOException> getAsyncMetadataReadAction(
String uploadedFilename,
LatchedActionListener<RemoteReadResult> listener
) {
final ActionListener actionListener = ActionListener.wrap(
response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)),
listener::onFailure
);
final ActionListener actionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)), listener::onFailure);
if (component.equals(RemoteDiscoveryNodes.DISCOVERY_NODES)) {
RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(
uploadedFilename,
clusterUUID,
compressor,
namedXContentRegistry
);
RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(uploadedFilename, clusterUUID, compressor, namedXContentRegistry);
return () -> discoveryNodesBlobStore.readAsync(remoteDiscoveryNodes, actionListener);
} else if (component.equals(RemoteClusterBlocks.CLUSTER_BLOCKS)) {
RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(
uploadedFilename,
clusterUUID,
compressor,
namedXContentRegistry
);
RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(uploadedFilename, clusterUUID, compressor, namedXContentRegistry);
return () -> clusterBlocksBlobStore.readAsync(remoteClusterBlocks, actionListener);
} else if (component.equals(CLUSTER_STATE_CUSTOM)) {
final ActionListener customActionListener = ActionListener.wrap(
response -> listener.onResponse(
new RemoteReadResult(
(ToXContent) response,
CLUSTER_STATE_ATTRIBUTE,
String.join(CUSTOM_DELIMITER, component, componentName)
)
),
listener::onFailure
);
final ActionListener customActionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, String.join(CUSTOM_DELIMITER, component, componentName))), listener::onFailure);
RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms(
uploadedFilename,
componentName,
Expand All @@ -174,7 +130,7 @@ public CheckedRunnable<IOException> getAsyncMetadataReadAction(
);
return () -> customsBlobStore.readAsync(remoteClusterStateCustoms, customActionListener);
} else {
throw new RemoteStateTransferException("Remote object not found for " + component);
throw new RemoteStateTransferException("Remote object not found for "+ component);
}
}

Expand Down
Loading

0 comments on commit 4671704

Please sign in to comment.