From 5c3e28b9459f04f5d83c5d85a854889715311464 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Fri, 7 Jun 2024 14:44:39 +0530 Subject: [PATCH] Use InputStreams rather than XContent for serialization Signed-off-by: Shivansh Arora --- .../RemoteClusterStateAttributesManager.java | 23 ++++++-- .../remote/RemoteClusterStateService.java | 23 +++----- .../remote/model/RemoteClusterBlocks.java | 16 +++--- .../model/RemoteClusterStateCustoms.java | 53 +++++++++++++------ .../remote/model/RemoteDiscoveryNodes.java | 16 +++--- .../RemoteHashesOfConsistentSettings.java | 16 +++--- .../main/java/org/opensearch/node/Node.java | 3 +- 7 files changed, 93 insertions(+), 57 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 4a4b0c79b21a9..e7d497bf2ee83 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -16,6 +16,7 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.CheckedRunnable; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; @@ -45,14 +46,22 @@ public class RemoteClusterStateAttributesManager { private final RemoteClusterStateBlobStore customsBlobStore; private final Compressor compressor; private final NamedXContentRegistry namedXContentRegistry; + private final NamedWriteableRegistry namedWriteableRegistry; RemoteClusterStateAttributesManager( - RemoteClusterStateBlobStore clusterBlocksBlobStore, RemoteClusterStateBlobStore discoveryNodesBlobStore, RemoteClusterStateBlobStore customsBlobStore, Compressor compressor, NamedXContentRegistry namedXContentRegistry) { + RemoteClusterStateBlobStore clusterBlocksBlobStore, + RemoteClusterStateBlobStore discoveryNodesBlobStore, + RemoteClusterStateBlobStore customsBlobStore, + Compressor compressor, + NamedXContentRegistry namedXContentRegistry, + NamedWriteableRegistry namedWriteableRegistry + ) { this.clusterBlocksBlobStore = clusterBlocksBlobStore; this.discoveryNodesBlobStore = discoveryNodesBlobStore; this.customsBlobStore = customsBlobStore; this.compressor = compressor; this.namedXContentRegistry = namedXContentRegistry; + this.namedWriteableRegistry = namedWriteableRegistry; } /** @@ -77,7 +86,8 @@ CheckedRunnable getAsyncMetadataWriteAction( clusterState.version(), clusterState.metadata().clusterUUID(), compressor, - namedXContentRegistry + namedXContentRegistry, + namedWriteableRegistry ); return () -> customsBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener)); } else { @@ -110,7 +120,14 @@ public CheckedRunnable getAsyncMetadataReadAction( return () -> clusterBlocksBlobStore.readAsync(remoteClusterBlocks, actionListener); } else if (component.equals(CLUSTER_STATE_CUSTOM)) { final ActionListener customActionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, String.join(CUSTOM_DELIMITER, component, componentName))), listener::onFailure); - RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms(uploadedFilename, componentName, clusterUUID, compressor, namedXContentRegistry); + RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms( + uploadedFilename, + componentName, + clusterUUID, + compressor, + namedXContentRegistry, + namedWriteableRegistry + ); return () -> customsBlobStore.readAsync(remoteClusterStateCustoms, customActionListener); } else { throw new RemoteStateTransferException("Remote object not found for "+ component); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 26b1d0c4de98f..2918317622038 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -76,6 +76,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; @@ -153,6 +154,7 @@ public class RemoteClusterStateService implements Closeable { private RemoteClusterStateAttributesManager remoteClusterStateAttributesManager; private RemoteManifestManager remoteManifestManager; private ClusterSettings clusterSettings; + private NamedWriteableRegistry namedWriteableRegistry; private final String CLUSTER_STATE_UPLOAD_TIME_LOG_STRING = "writing cluster state for version [{}] took [{}ms]"; private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " @@ -169,11 +171,6 @@ public class RemoteClusterStateService implements Closeable { FORMAT_PARAMS = new ToXContent.MapParams(params); } - private String latestClusterName; - private String latestClusterUUID; - private long lastCleanupAttemptState; - private boolean isClusterManagerNode; - public RemoteClusterStateService( String nodeId, Supplier repositoriesService, @@ -181,7 +178,8 @@ public RemoteClusterStateService( ClusterService clusterService, LongSupplier relativeTimeNanosSupplier, ThreadPool threadPool, - List indexMetadataUploadListeners + List indexMetadataUploadListeners, + NamedWriteableRegistry namedWriteableRegistry ) { assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled"; logger.info("REMOTE STATE ENABLED"); @@ -197,12 +195,10 @@ public RemoteClusterStateService( this.remoteStateReadTimeout = clusterSettings.get(REMOTE_STATE_READ_TIMEOUT_SETTING); clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_READ_TIMEOUT_SETTING, this::setRemoteClusterStateEnabled); this.remoteStateStats = new RemotePersistenceStats(); - + this.namedWriteableRegistry = namedWriteableRegistry; this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings) ? Optional.of(new RemoteRoutingTableService(repositoriesService, settings, threadPool)) : Optional.empty(); - this.lastCleanupAttemptState = 0; - this.isClusterManagerNode = DiscoveryNode.isClusterManagerNode(settings); this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService); this.indexMetadataUploadListeners = indexMetadataUploadListeners; } @@ -439,9 +435,6 @@ public ClusterMetadataManifest writeIncrementalMetadata( logger.info("MANIFEST IN INC STATE {}", manifest); - this.latestClusterName = clusterState.getClusterName().value(); - this.latestClusterUUID = clusterState.metadata().clusterUUID(); - final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); remoteStateStats.stateSucceeded(); remoteStateStats.stateTook(durationMillis); @@ -904,8 +897,7 @@ public void start() { RemoteClusterStateBlobStore clusterStateCustomsBlobStore = new RemoteClusterStateBlobStore<>( getBlobStoreTransferService(), blobStoreRepository, clusterName, threadpool, ThreadPool.Names.GENERIC); remoteClusterStateAttributesManager = new RemoteClusterStateAttributesManager(clusterBlocksBlobStore, discoveryNodesBlobStore, - clusterStateCustomsBlobStore, - blobStoreRepository.getCompressor(), blobStoreRepository.getNamedXContentRegistry()); + clusterStateCustomsBlobStore, blobStoreRepository.getCompressor(), blobStoreRepository.getNamedXContentRegistry(), namedWriteableRegistry); RemoteClusterStateBlobStore manifestBlobStore = new RemoteClusterStateBlobStore<>( getBlobStoreTransferService(), blobStoreRepository, clusterName, threadpool, ThreadPool.Names.GENERIC); remoteManifestManager = new RemoteManifestManager(manifestBlobStore, clusterSettings, nodeId, blobStoreRepository.getCompressor(), @@ -958,7 +950,6 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID private ClusterState readClusterStateInParallel( ClusterState previousState, ClusterMetadataManifest manifest, - String clusterName, String clusterUUID, String localNodeId, List indicesToRead, @@ -1233,7 +1224,6 @@ public ClusterState getClusterStateForManifest(String clusterName, ClusterMetada return readClusterStateInParallel( ClusterState.builder(new ClusterName(clusterName)).build(), manifest, - clusterName, manifest.getClusterUUID(), localNodeId, manifest.getIndices(), @@ -1279,7 +1269,6 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata ClusterState updatedClusterState = readClusterStateInParallel( previousState, manifest, - clusterName, manifest.getClusterUUID(), localNodeId, updatedIndices, diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java index c2fff1ee15439..707c4d54e04fa 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote.model; +import static org.opensearch.core.common.bytes.BytesReference.toBytes; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; @@ -17,8 +18,11 @@ import java.util.List; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.common.io.Streams; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; @@ -33,11 +37,6 @@ public class RemoteClusterBlocks extends AbstractRemoteWritableBlobEntity { public static final String CLUSTER_BLOCKS = "blocks"; - public static final ChecksumBlobStoreFormat CLUSTER_BLOCKS_FORMAT = new ChecksumBlobStoreFormat<>( - "blocks", - METADATA_NAME_FORMAT, - ClusterBlocks::fromXContent - ); private ClusterBlocks clusterBlocks; private long stateVersion; @@ -81,11 +80,14 @@ public UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + clusterBlocks.writeTo(bytesStreamOutput); + return bytesStreamOutput.bytes().streamInput(); } @Override public ClusterBlocks deserialize(final InputStream inputStream) throws IOException { - return CLUSTER_BLOCKS_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + StreamInput in = new BytesStreamInput(toBytes(Streams.readFully(inputStream))); + return ClusterBlocks.readFrom(in); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java index 58fbbdad3c4b5..b5af4969b7454 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java @@ -10,9 +10,15 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterState.Custom; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.io.Streams; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest; @@ -24,6 +30,8 @@ import java.io.InputStream; import java.util.List; +import static org.opensearch.cluster.ClusterState.FeatureAware.shouldSerialize; +import static org.opensearch.core.common.bytes.BytesReference.toBytes; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; @@ -33,32 +41,39 @@ public class RemoteClusterStateCustoms extends AbstractRemoteWritableBlobEntity { public static final String CLUSTER_STATE_CUSTOM = "cluster-state-custom"; - public final ChecksumBlobStoreFormat clusterStateCustomBlobStoreFormat; private long stateVersion; private String customType; private ClusterState.Custom custom; + private final NamedWriteableRegistry namedWriteableRegistry; - public RemoteClusterStateCustoms(final ClusterState.Custom custom, final String customType, final long stateVersion, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + public RemoteClusterStateCustoms( + final ClusterState.Custom custom, + final String customType, + final long stateVersion, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry, + final NamedWriteableRegistry namedWriteableRegistry + ) { super(clusterUUID, compressor, namedXContentRegistry); this.stateVersion = stateVersion; this.customType = customType; this.custom = custom; - this.clusterStateCustomBlobStoreFormat = new ChecksumBlobStoreFormat<>( - CLUSTER_STATE_CUSTOM, - METADATA_NAME_FORMAT, - parser -> ClusterState.Custom.fromXContent(parser, customType) - ); + this.namedWriteableRegistry = namedWriteableRegistry; } - public RemoteClusterStateCustoms(final String blobName, final String customType, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + public RemoteClusterStateCustoms( + final String blobName, + final String customType, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry, + final NamedWriteableRegistry namedWriteableRegistry + ) { super(clusterUUID, compressor, namedXContentRegistry); this.blobName = blobName; this.customType = customType; - this.clusterStateCustomBlobStoreFormat = new ChecksumBlobStoreFormat<>( - CLUSTER_STATE_CUSTOM, - METADATA_NAME_FORMAT, - parser -> ClusterState.Custom.fromXContent(parser, customType) - ); + this.namedWriteableRegistry = namedWriteableRegistry; } @Override @@ -88,11 +103,19 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return clusterStateCustomBlobStoreFormat.serialize(custom, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); + BytesStreamOutput outputStream = new BytesStreamOutput(); + if (shouldSerialize(outputStream, custom)) { + outputStream.writeNamedWriteable(custom); + } + return outputStream.bytes().streamInput(); } @Override public ClusterState.Custom deserialize(final InputStream inputStream) throws IOException { - return clusterStateCustomBlobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput( + new BytesStreamInput(toBytes(Streams.readFully(inputStream))), + this.namedWriteableRegistry + ); + return in.readNamedWriteable(Custom.class); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java index e63e6ed63f6d1..d5f2bcac3f2a9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java @@ -8,18 +8,23 @@ package org.opensearch.gateway.remote.model; +import static org.opensearch.core.common.bytes.BytesReference.toBytes; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.List; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.io.Streams; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BytesStreamInput; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; @@ -34,11 +39,6 @@ public class RemoteDiscoveryNodes extends AbstractRemoteWritableBlobEntity { public static final String DISCOVERY_NODES = "nodes"; - public static final ChecksumBlobStoreFormat DISCOVERY_NODES_FORMAT = new ChecksumBlobStoreFormat<>( - "nodes", - METADATA_NAME_FORMAT, - DiscoveryNodes::fromXContent - ); private DiscoveryNodes discoveryNodes; private long stateVersion; @@ -81,11 +81,13 @@ public UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); + BytesStreamOutput outputStream = new BytesStreamOutput(); + discoveryNodes.writeTo(outputStream); + return outputStream.bytes().streamInput(); } @Override public DiscoveryNodes deserialize(final InputStream inputStream) throws IOException { - return DISCOVERY_NODES_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + return DiscoveryNodes.readFrom(new BytesStreamInput(toBytes(Streams.readFully(inputStream))), null); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java index d7908ac6e8235..f8ef85616a578 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote.model; +import static org.opensearch.core.common.bytes.BytesReference.toBytes; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN; @@ -18,8 +19,11 @@ import java.util.List; import org.opensearch.cluster.metadata.DiffableStringMap; import org.opensearch.common.io.Streams; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest; @@ -29,11 +33,6 @@ public class RemoteHashesOfConsistentSettings extends AbstractRemoteWritableBlobEntity { public static final String HASHES_OF_CONSISTENT_SETTINGS = "hashes-of-consistent-settings"; - public static final ChecksumBlobStoreFormat HASHES_OF_CONSISTENT_SETTINGS_FORMAT = new ChecksumBlobStoreFormat<>( - HASHES_OF_CONSISTENT_SETTINGS, - METADATA_NAME_FORMAT, - DiffableStringMap::fromXContent - ); private DiffableStringMap hashesOfConsistentSettings; private long metadataVersion; @@ -74,11 +73,14 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + hashesOfConsistentSettings.writeTo(bytesStreamOutput); + return bytesStreamOutput.bytes().streamInput(); } @Override public DiffableStringMap deserialize(final InputStream inputStream) throws IOException { - return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + StreamInput in = new BytesStreamInput(toBytes(Streams.readFully(inputStream))); + return DiffableStringMap.readFrom(in); } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index c2866247fd723..6d5e47404a5b3 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -769,7 +769,8 @@ protected Node( clusterService, threadPool::preciseRelativeTimeInNanos, threadPool, - List.of(remoteIndexPathUploader) + List.of(remoteIndexPathUploader), + namedWriteableRegistry ); remoteClusterStateCleanupManager = remoteClusterStateService.getCleanupManager(); } else {