Skip to content

Commit

Permalink
Use InputStreams rather than XContent for serialization
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Jun 7, 2024
1 parent 87ade4c commit 5c3e28b
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
Expand Down Expand Up @@ -45,14 +46,22 @@ public class RemoteClusterStateAttributesManager {
private final RemoteClusterStateBlobStore<Custom, RemoteClusterStateCustoms> customsBlobStore;
private final Compressor compressor;
private final NamedXContentRegistry namedXContentRegistry;
private final NamedWriteableRegistry namedWriteableRegistry;

RemoteClusterStateAttributesManager(
RemoteClusterStateBlobStore<ClusterBlocks, RemoteClusterBlocks> clusterBlocksBlobStore, RemoteClusterStateBlobStore<DiscoveryNodes, RemoteDiscoveryNodes> discoveryNodesBlobStore, RemoteClusterStateBlobStore<Custom, RemoteClusterStateCustoms> customsBlobStore, Compressor compressor, NamedXContentRegistry namedXContentRegistry) {
RemoteClusterStateBlobStore<ClusterBlocks, RemoteClusterBlocks> clusterBlocksBlobStore,
RemoteClusterStateBlobStore<DiscoveryNodes, RemoteDiscoveryNodes> discoveryNodesBlobStore,
RemoteClusterStateBlobStore<Custom, RemoteClusterStateCustoms> customsBlobStore,
Compressor compressor,
NamedXContentRegistry namedXContentRegistry,
NamedWriteableRegistry namedWriteableRegistry
) {
this.clusterBlocksBlobStore = clusterBlocksBlobStore;
this.discoveryNodesBlobStore = discoveryNodesBlobStore;
this.customsBlobStore = customsBlobStore;
this.compressor = compressor;
this.namedXContentRegistry = namedXContentRegistry;
this.namedWriteableRegistry = namedWriteableRegistry;
}

/**
Expand All @@ -77,7 +86,8 @@ CheckedRunnable<IOException> getAsyncMetadataWriteAction(
clusterState.version(),
clusterState.metadata().clusterUUID(),
compressor,
namedXContentRegistry
namedXContentRegistry,
namedWriteableRegistry
);
return () -> customsBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener));
} else {
Expand Down Expand Up @@ -110,7 +120,14 @@ public CheckedRunnable<IOException> getAsyncMetadataReadAction(
return () -> clusterBlocksBlobStore.readAsync(remoteClusterBlocks, actionListener);
} else if (component.equals(CLUSTER_STATE_CUSTOM)) {
final ActionListener customActionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, String.join(CUSTOM_DELIMITER, component, componentName))), listener::onFailure);
RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms(uploadedFilename, componentName, clusterUUID, compressor, namedXContentRegistry);
RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms(
uploadedFilename,
componentName,
clusterUUID,
compressor,
namedXContentRegistry,
namedWriteableRegistry
);
return () -> customsBlobStore.readAsync(remoteClusterStateCustoms, customActionListener);
} else {
throw new RemoteStateTransferException("Remote object not found for "+ component);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
Expand Down Expand Up @@ -153,6 +154,7 @@ public class RemoteClusterStateService implements Closeable {
private RemoteClusterStateAttributesManager remoteClusterStateAttributesManager;
private RemoteManifestManager remoteManifestManager;
private ClusterSettings clusterSettings;
private NamedWriteableRegistry namedWriteableRegistry;
private final String CLUSTER_STATE_UPLOAD_TIME_LOG_STRING = "writing cluster state for version [{}] took [{}ms]";
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
Expand All @@ -169,19 +171,15 @@ public class RemoteClusterStateService implements Closeable {
FORMAT_PARAMS = new ToXContent.MapParams(params);
}

private String latestClusterName;
private String latestClusterUUID;
private long lastCleanupAttemptState;
private boolean isClusterManagerNode;

public RemoteClusterStateService(
String nodeId,
Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterService clusterService,
LongSupplier relativeTimeNanosSupplier,
ThreadPool threadPool,
List<IndexMetadataUploadListener> indexMetadataUploadListeners
List<IndexMetadataUploadListener> indexMetadataUploadListeners,
NamedWriteableRegistry namedWriteableRegistry
) {
assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled";
logger.info("REMOTE STATE ENABLED");
Expand All @@ -197,12 +195,10 @@ public RemoteClusterStateService(
this.remoteStateReadTimeout = clusterSettings.get(REMOTE_STATE_READ_TIMEOUT_SETTING);
clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_READ_TIMEOUT_SETTING, this::setRemoteClusterStateEnabled);
this.remoteStateStats = new RemotePersistenceStats();

this.namedWriteableRegistry = namedWriteableRegistry;
this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings)
? Optional.of(new RemoteRoutingTableService(repositoriesService, settings, threadPool))
: Optional.empty();
this.lastCleanupAttemptState = 0;
this.isClusterManagerNode = DiscoveryNode.isClusterManagerNode(settings);
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService);
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
}
Expand Down Expand Up @@ -439,9 +435,6 @@ public ClusterMetadataManifest writeIncrementalMetadata(

logger.info("MANIFEST IN INC STATE {}", manifest);

this.latestClusterName = clusterState.getClusterName().value();
this.latestClusterUUID = clusterState.metadata().clusterUUID();

final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateSucceeded();
remoteStateStats.stateTook(durationMillis);
Expand Down Expand Up @@ -904,8 +897,7 @@ public void start() {
RemoteClusterStateBlobStore<ClusterState.Custom, RemoteClusterStateCustoms> clusterStateCustomsBlobStore = new RemoteClusterStateBlobStore<>(
getBlobStoreTransferService(), blobStoreRepository, clusterName, threadpool, ThreadPool.Names.GENERIC);
remoteClusterStateAttributesManager = new RemoteClusterStateAttributesManager(clusterBlocksBlobStore, discoveryNodesBlobStore,
clusterStateCustomsBlobStore,
blobStoreRepository.getCompressor(), blobStoreRepository.getNamedXContentRegistry());
clusterStateCustomsBlobStore, blobStoreRepository.getCompressor(), blobStoreRepository.getNamedXContentRegistry(), namedWriteableRegistry);
RemoteClusterStateBlobStore<ClusterMetadataManifest, RemoteClusterMetadataManifest> manifestBlobStore = new RemoteClusterStateBlobStore<>(
getBlobStoreTransferService(), blobStoreRepository, clusterName, threadpool, ThreadPool.Names.GENERIC);
remoteManifestManager = new RemoteManifestManager(manifestBlobStore, clusterSettings, nodeId, blobStoreRepository.getCompressor(),
Expand Down Expand Up @@ -958,7 +950,6 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID
private ClusterState readClusterStateInParallel(
ClusterState previousState,
ClusterMetadataManifest manifest,
String clusterName,
String clusterUUID,
String localNodeId,
List<UploadedIndexMetadata> indicesToRead,
Expand Down Expand Up @@ -1233,7 +1224,6 @@ public ClusterState getClusterStateForManifest(String clusterName, ClusterMetada
return readClusterStateInParallel(
ClusterState.builder(new ClusterName(clusterName)).build(),
manifest,
clusterName,
manifest.getClusterUUID(),
localNodeId,
manifest.getIndices(),
Expand Down Expand Up @@ -1279,7 +1269,6 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata
ClusterState updatedClusterState = readClusterStateInParallel(
previousState,
manifest,
clusterName,
manifest.getClusterUUID(),
localNodeId,
updatedIndices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.gateway.remote.model;

import static org.opensearch.core.common.bytes.BytesReference.toBytes;
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT;
Expand All @@ -17,8 +18,11 @@
import java.util.List;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.common.io.Streams;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
Expand All @@ -33,11 +37,6 @@
public class RemoteClusterBlocks extends AbstractRemoteWritableBlobEntity<ClusterBlocks> {

public static final String CLUSTER_BLOCKS = "blocks";
public static final ChecksumBlobStoreFormat<ClusterBlocks> CLUSTER_BLOCKS_FORMAT = new ChecksumBlobStoreFormat<>(
"blocks",
METADATA_NAME_FORMAT,
ClusterBlocks::fromXContent
);

private ClusterBlocks clusterBlocks;
private long stateVersion;
Expand Down Expand Up @@ -81,11 +80,14 @@ public UploadedMetadata getUploadedMetadata() {

@Override
public InputStream serialize() throws IOException {
return CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput();
BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
clusterBlocks.writeTo(bytesStreamOutput);
return bytesStreamOutput.bytes().streamInput();
}

@Override
public ClusterBlocks deserialize(final InputStream inputStream) throws IOException {
return CLUSTER_BLOCKS_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream));
StreamInput in = new BytesStreamInput(toBytes(Streams.readFully(inputStream)));
return ClusterBlocks.readFrom(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterState.Custom;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.io.Streams;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
Expand All @@ -24,6 +30,8 @@
import java.io.InputStream;
import java.util.List;

import static org.opensearch.cluster.ClusterState.FeatureAware.shouldSerialize;
import static org.opensearch.core.common.bytes.BytesReference.toBytes;
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
Expand All @@ -33,32 +41,39 @@
public class RemoteClusterStateCustoms extends AbstractRemoteWritableBlobEntity<Custom> {
public static final String CLUSTER_STATE_CUSTOM = "cluster-state-custom";

public final ChecksumBlobStoreFormat<ClusterState.Custom> clusterStateCustomBlobStoreFormat;
private long stateVersion;
private String customType;
private ClusterState.Custom custom;
private final NamedWriteableRegistry namedWriteableRegistry;

public RemoteClusterStateCustoms(final ClusterState.Custom custom, final String customType, final long stateVersion, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) {
public RemoteClusterStateCustoms(
final ClusterState.Custom custom,
final String customType,
final long stateVersion,
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry,
final NamedWriteableRegistry namedWriteableRegistry
) {
super(clusterUUID, compressor, namedXContentRegistry);
this.stateVersion = stateVersion;
this.customType = customType;
this.custom = custom;
this.clusterStateCustomBlobStoreFormat = new ChecksumBlobStoreFormat<>(
CLUSTER_STATE_CUSTOM,
METADATA_NAME_FORMAT,
parser -> ClusterState.Custom.fromXContent(parser, customType)
);
this.namedWriteableRegistry = namedWriteableRegistry;
}

public RemoteClusterStateCustoms(final String blobName, final String customType, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) {
public RemoteClusterStateCustoms(
final String blobName,
final String customType,
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry,
final NamedWriteableRegistry namedWriteableRegistry
) {
super(clusterUUID, compressor, namedXContentRegistry);
this.blobName = blobName;
this.customType = customType;
this.clusterStateCustomBlobStoreFormat = new ChecksumBlobStoreFormat<>(
CLUSTER_STATE_CUSTOM,
METADATA_NAME_FORMAT,
parser -> ClusterState.Custom.fromXContent(parser, customType)
);
this.namedWriteableRegistry = namedWriteableRegistry;
}

@Override
Expand Down Expand Up @@ -88,11 +103,19 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() {

@Override
public InputStream serialize() throws IOException {
return clusterStateCustomBlobStoreFormat.serialize(custom, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput();
BytesStreamOutput outputStream = new BytesStreamOutput();
if (shouldSerialize(outputStream, custom)) {
outputStream.writeNamedWriteable(custom);
}
return outputStream.bytes().streamInput();
}

@Override
public ClusterState.Custom deserialize(final InputStream inputStream) throws IOException {
return clusterStateCustomBlobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream));
NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(
new BytesStreamInput(toBytes(Streams.readFully(inputStream))),
this.namedWriteableRegistry
);
return in.readNamedWriteable(Custom.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,23 @@

package org.opensearch.gateway.remote.model;

import static org.opensearch.core.common.bytes.BytesReference.toBytes;
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.io.Streams;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
Expand All @@ -34,11 +39,6 @@
public class RemoteDiscoveryNodes extends AbstractRemoteWritableBlobEntity<DiscoveryNodes> {

public static final String DISCOVERY_NODES = "nodes";
public static final ChecksumBlobStoreFormat<DiscoveryNodes> DISCOVERY_NODES_FORMAT = new ChecksumBlobStoreFormat<>(
"nodes",
METADATA_NAME_FORMAT,
DiscoveryNodes::fromXContent
);

private DiscoveryNodes discoveryNodes;
private long stateVersion;
Expand Down Expand Up @@ -81,11 +81,13 @@ public UploadedMetadata getUploadedMetadata() {

@Override
public InputStream serialize() throws IOException {
return DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput();
BytesStreamOutput outputStream = new BytesStreamOutput();
discoveryNodes.writeTo(outputStream);
return outputStream.bytes().streamInput();
}

@Override
public DiscoveryNodes deserialize(final InputStream inputStream) throws IOException {
return DISCOVERY_NODES_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream));
return DiscoveryNodes.readFrom(new BytesStreamInput(toBytes(Streams.readFully(inputStream))), null);
}
}
Loading

0 comments on commit 5c3e28b

Please sign in to comment.