From a66aaafac02aebed7659b42dc0fcd69d64306b02 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Wed, 17 Apr 2024 14:07:08 +0530 Subject: [PATCH 1/7] Initial commit for index routing table manifest Signed-off-by: Bukhtawar Khan --- .../remote/ClusterMetadataManifest.java | 98 ++++++++++++++++++- 1 file changed, 93 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 4725f40076ce2..4e5891d154de0 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -35,6 +35,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { public static final int CODEC_V0 = 0; // Older codec version, where we haven't introduced codec versions for manifest. public static final int CODEC_V1 = 1; // In Codec V1 we have introduced global-metadata and codec version in Manifest file. + public static final int CODEC_V2 = 2; // In Codec V2 we introduce index routing-metadata in manifest file. private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term"); private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version"); @@ -48,6 +49,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { private static final ParseField INDICES_FIELD = new ParseField("indices"); private static final ParseField PREVIOUS_CLUSTER_UUID = new ParseField("previous_cluster_uuid"); private static final ParseField CLUSTER_UUID_COMMITTED = new ParseField("cluster_uuid_committed"); + private static final ParseField INDICES_ROUTING_FIELD = new ParseField("indices_routing"); private static long term(Object[] fields) { return (long) fields[0]; @@ -97,6 +99,10 @@ private static String globalMetadataFileName(Object[] fields) { return (String) fields[11]; } + private static List indicesRouting(Object[] fields) { + return (List) fields[12]; + } + private static final ConstructingObjectParser PARSER_V0 = new ConstructingObjectParser<>( "cluster_metadata_manifest", fields -> new ClusterMetadataManifest( @@ -133,11 +139,31 @@ private static String globalMetadataFileName(Object[] fields) { ) ); - private static final ConstructingObjectParser CURRENT_PARSER = PARSER_V1; + private static final ConstructingObjectParser PARSER_V2 = new ConstructingObjectParser<>( + "cluster_metadata_manifest", + fields -> new ClusterMetadataManifest( + term(fields), + version(fields), + clusterUUID(fields), + stateUUID(fields), + opensearchVersion(fields), + nodeId(fields), + committed(fields), + codecVersion(fields), + globalMetadataFileName(fields), + indices(fields), + previousClusterUUID(fields), + clusterUUIDCommitted(fields), + indicesRouting(fields) + ) + ); + + private static final ConstructingObjectParser CURRENT_PARSER = PARSER_V2; static { declareParser(PARSER_V0, CODEC_V0); declareParser(PARSER_V1, CODEC_V1); + declareParser(PARSER_V2, CODEC_V2); } private static void declareParser(ConstructingObjectParser parser, long codec_version) { @@ -160,6 +186,13 @@ private static void declareParser(ConstructingObjectParser= CODEC_V2) { + parser.declareObjectArray( + ConstructingObjectParser.constructorArg(), + (p, c) -> UploadedIndexMetadata.fromXContent(p), + INDICES_ROUTING_FIELD + ); + } } private final int codecVersion; @@ -174,6 +207,7 @@ private static void declareParser(ConstructingObjectParser indicesRouting; public List getIndices() { return indices; @@ -223,6 +257,10 @@ public String getGlobalMetadataFileName() { return globalMetadataFileName; } + public List getIndicesRouting() { + return indicesRouting; + } + public ClusterMetadataManifest( long clusterTerm, long version, @@ -237,6 +275,25 @@ public ClusterMetadataManifest( String previousClusterUUID, boolean clusterUUIDCommitted ) { + this(clusterTerm, version, clusterUUID, stateUUID, opensearchVersion, nodeId, committed, codecVersion, + globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted, null); + } + + public ClusterMetadataManifest( + long clusterTerm, + long version, + String clusterUUID, + String stateUUID, + Version opensearchVersion, + String nodeId, + boolean committed, + int codecVersion, + String globalMetadataFileName, + List indices, + String previousClusterUUID, + boolean clusterUUIDCommitted, + List indicesRouting + ) { this.clusterTerm = clusterTerm; this.stateVersion = version; this.clusterUUID = clusterUUID; @@ -249,6 +306,7 @@ public ClusterMetadataManifest( this.indices = Collections.unmodifiableList(indices); this.previousClusterUUID = previousClusterUUID; this.clusterUUIDCommitted = clusterUUIDCommitted; + this.indicesRouting = Collections.unmodifiableList(indicesRouting); } public ClusterMetadataManifest(StreamInput in) throws IOException { @@ -262,12 +320,18 @@ public ClusterMetadataManifest(StreamInput in) throws IOException { this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new)); this.previousClusterUUID = in.readString(); this.clusterUUIDCommitted = in.readBoolean(); - if (in.getVersion().onOrAfter(Version.V_2_12_0)) { + if (in.getVersion().onOrAfter(Version.V_2_14_0)) { + this.codecVersion = in.readInt(); + this.globalMetadataFileName = in.readString(); + this.indicesRouting = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new)); + } else if (in.getVersion().onOrAfter(Version.V_2_12_0)) { this.codecVersion = in.readInt(); this.globalMetadataFileName = in.readString(); + this.indicesRouting = null; } else { this.codecVersion = CODEC_V0; // Default codec this.globalMetadataFileName = null; + this.indicesRouting = null; } } @@ -301,6 +365,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion()); builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName()); } + if (onOrAfterCodecVersion(CODEC_V2)) { + builder.startArray(INDICES_ROUTING_FIELD.getPreferredName()); + { + for (UploadedIndexMetadata uploadedIndexMetadata : indicesRouting) { + uploadedIndexMetadata.toXContent(builder, params); + } + } + } return builder; } @@ -319,6 +391,8 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeInt(codecVersion); out.writeString(globalMetadataFileName); + } else if (out.getVersion().onOrAfter(Version.V_2_14_0)) { + out.writeCollection(indicesRouting); } } @@ -342,7 +416,8 @@ public boolean equals(Object o) { && Objects.equals(previousClusterUUID, that.previousClusterUUID) && Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted) && Objects.equals(globalMetadataFileName, that.globalMetadataFileName) - && Objects.equals(codecVersion, that.codecVersion); + && Objects.equals(codecVersion, that.codecVersion) + && Objects.equals(indicesRouting, that.indicesRouting); } @Override @@ -359,7 +434,8 @@ public int hashCode() { nodeId, committed, previousClusterUUID, - clusterUUIDCommitted + clusterUUIDCommitted, + indicesRouting ); } @@ -399,12 +475,18 @@ public static class Builder { private String previousClusterUUID; private boolean committed; private boolean clusterUUIDCommitted; + private List indicesRouting; public Builder indices(List indices) { this.indices = indices; return this; } + public Builder indicesRouting(List indicesRouting) { + this.indicesRouting = indicesRouting; + return this; + } + public Builder codecVersion(int codecVersion) { this.codecVersion = codecVersion; return this; @@ -454,6 +536,10 @@ public List getIndices() { return indices; } + public List getIndicesRouting() { + return indicesRouting; + } + public Builder previousClusterUUID(String previousClusterUUID) { this.previousClusterUUID = previousClusterUUID; return this; @@ -481,6 +567,7 @@ public Builder(ClusterMetadataManifest manifest) { this.indices = new ArrayList<>(manifest.indices); this.previousClusterUUID = manifest.previousClusterUUID; this.clusterUUIDCommitted = manifest.clusterUUIDCommitted; + this.indicesRouting = new ArrayList<>(manifest.indicesRouting); } public ClusterMetadataManifest build() { @@ -496,7 +583,8 @@ public ClusterMetadataManifest build() { globalMetadataFileName, indices, previousClusterUUID, - clusterUUIDCommitted + clusterUUIDCommitted, + indicesRouting ); } From ad480ee208a8637920cba04d85c8fb9c426aa96c Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Sat, 20 Apr 2024 00:00:39 +0530 Subject: [PATCH 2/7] Changes for IndexRoutingTableHeader Signed-off-by: Bukhtawar Khan --- .../stream}/BufferedChecksumStreamInput.java | 2 +- .../stream}/BufferedChecksumStreamOutput.java | 2 +- .../org/opensearch/common/util/BigArrays.java | 2 +- .../routingtable/IndexRoutingTableHeader.java | 135 ++++++++++++++++++ .../index/translog/BaseTranslogReader.java | 1 + .../opensearch/index/translog/Translog.java | 2 + .../index/translog/TranslogHeader.java | 2 + .../index/translog/TranslogSnapshot.java | 1 + .../index/translog/TranslogWriter.java | 1 + .../coordination/CoordinationStateTests.java | 2 +- .../remote/ClusterMetadataManifestTests.java | 6 +- .../snapshots/BlobStoreFormatTests.java | 2 +- 12 files changed, 150 insertions(+), 8 deletions(-) rename server/src/main/java/org/opensearch/{index/translog => common/io/stream}/BufferedChecksumStreamInput.java (99%) rename server/src/main/java/org/opensearch/{index/translog => common/io/stream}/BufferedChecksumStreamOutput.java (98%) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java diff --git a/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamInput.java b/server/src/main/java/org/opensearch/common/io/stream/BufferedChecksumStreamInput.java similarity index 99% rename from server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamInput.java rename to server/src/main/java/org/opensearch/common/io/stream/BufferedChecksumStreamInput.java index f75f27b7bcb91..f3341712275f9 100644 --- a/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamInput.java +++ b/server/src/main/java/org/opensearch/common/io/stream/BufferedChecksumStreamInput.java @@ -30,7 +30,7 @@ * GitHub history for details. */ -package org.opensearch.index.translog; +package org.opensearch.common.io.stream; import org.apache.lucene.store.BufferedChecksum; import org.apache.lucene.util.BitUtil; diff --git a/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamOutput.java b/server/src/main/java/org/opensearch/common/io/stream/BufferedChecksumStreamOutput.java similarity index 98% rename from server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamOutput.java rename to server/src/main/java/org/opensearch/common/io/stream/BufferedChecksumStreamOutput.java index 9e96664c79cc5..254f228f1c739 100644 --- a/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamOutput.java +++ b/server/src/main/java/org/opensearch/common/io/stream/BufferedChecksumStreamOutput.java @@ -30,7 +30,7 @@ * GitHub history for details. */ -package org.opensearch.index.translog; +package org.opensearch.common.io.stream; import org.apache.lucene.store.BufferedChecksum; import org.opensearch.common.annotation.PublicApi; diff --git a/server/src/main/java/org/opensearch/common/util/BigArrays.java b/server/src/main/java/org/opensearch/common/util/BigArrays.java index 92371c2c77ef9..734e5535c3cf4 100644 --- a/server/src/main/java/org/opensearch/common/util/BigArrays.java +++ b/server/src/main/java/org/opensearch/common/util/BigArrays.java @@ -49,7 +49,7 @@ import java.util.Arrays; /** - * Utility class to work with arrays. + * Utility class to work with arrays.Ø * * @opensearch.api * */ diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java new file mode 100644 index 0000000000000..87d94dc1147d1 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java @@ -0,0 +1,135 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.routingtable; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.store.InputStreamDataInput; +import org.apache.lucene.store.OutputStreamDataOutput; +import org.opensearch.Version; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.BufferedChecksumStreamInput; +import org.opensearch.common.io.stream.BufferedChecksumStreamOutput; + +import java.io.EOFException; +import java.io.IOException; + +/** + * The stored header information for the individual index routing table + */ +public class IndexRoutingTableHeader { + + private int routingTableVersion; + + private String indexName; + + private Version nodeVersion; + + public static final String INDEX_ROUTING_HEADER_CODEC = "index_routing_header_codec"; + + public static final int INITIAL_VERSION = 1; + + public static final int CURRENT_VERSION = INITIAL_VERSION; + + + public IndexRoutingTableHeader(int routingTableVersion, String indexName, Version nodeVersion) { + this.routingTableVersion = routingTableVersion; + this.indexName = indexName; + this.nodeVersion = nodeVersion; + } + + /** + * Returns the bytes reference for the {@link IndexRoutingTableHeader} + * @return the {@link BytesReference} + * @throws IOException + */ + public BytesReference write() throws IOException { + BytesReference bytesReference; + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput)) { + CodecUtil.writeHeader(new OutputStreamDataOutput(out), INDEX_ROUTING_HEADER_CODEC, CURRENT_VERSION); + // Write version + out.writeInt(routingTableVersion); + out.writeInt(nodeVersion.id); + out.writeString(indexName); + // Checksum header + out.writeInt((int) out.getChecksum()); + out.flush(); + bytesReference = bytesStreamOutput.bytes(); + } + return bytesReference; + } + + + /** + * Reads the contents on the byte array into the corresponding {@link IndexRoutingTableHeader} + * @param inBytes + * @param source + * @return + * @throws IOException + */ + public IndexRoutingTableHeader read(byte[] inBytes, String source) throws IOException { + try { + try(BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(new BytesStreamInput(inBytes), source)) { + readHeaderVersion(in); + final int version = in.readInt(); + final int nodeVersion = in.readInt(); + final String name = in.readString(); + verifyChecksum(in); + assert version >= 0 : "Version must be non-negative [" + version + "]"; + assert in.readByte() == -1 : "Header is not fully read"; + return new IndexRoutingTableHeader(version, name, Version.fromId(nodeVersion)); + } + } catch (EOFException e) { + throw new IOException("index routing header truncated", e); + } + } + + + static void verifyChecksum(BufferedChecksumStreamInput in) throws IOException { + // This absolutely must come first, or else reading the checksum becomes part of the checksum + long expectedChecksum = in.getChecksum(); + long readChecksum = Integer.toUnsignedLong(in.readInt()); + if (readChecksum != expectedChecksum) { + throw new IOException( + "checksum verification failed - expected: 0x" + + Long.toHexString(expectedChecksum) + + ", got: 0x" + + Long.toHexString(readChecksum) + ); + } + } + + static int readHeaderVersion(final StreamInput in) throws IOException { + final int version; + try { + version = CodecUtil.checkHeader(new InputStreamDataInput(in), INDEX_ROUTING_HEADER_CODEC, INITIAL_VERSION, CURRENT_VERSION); + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { + throw new IOException("index routing table header corrupted", e); + } + return version; + } + + public int getRoutingTableVersion() { + return routingTableVersion; + } + + public String getIndexName() { + return indexName; + } + + public Version getNodeVersion() { + return nodeVersion; + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/BaseTranslogReader.java b/server/src/main/java/org/opensearch/index/translog/BaseTranslogReader.java index d6fa2a2e53de3..9088eb6b20fb8 100644 --- a/server/src/main/java/org/opensearch/index/translog/BaseTranslogReader.java +++ b/server/src/main/java/org/opensearch/index/translog/BaseTranslogReader.java @@ -32,6 +32,7 @@ package org.opensearch.index.translog; +import org.opensearch.common.io.stream.BufferedChecksumStreamInput; import org.opensearch.core.common.io.stream.ByteBufferStreamInput; import org.opensearch.index.seqno.SequenceNumbers; diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 842e9c77d2350..18b10cb886996 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -38,6 +38,8 @@ import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.io.stream.BufferedChecksumStreamInput; +import org.opensearch.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.common.io.stream.ReleasableBytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java index 7b5be9505f27a..c622214a3a69d 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java @@ -40,6 +40,8 @@ import org.apache.lucene.store.OutputStreamDataOutput; import org.apache.lucene.util.BytesRef; import org.opensearch.common.io.Channels; +import org.opensearch.common.io.stream.BufferedChecksumStreamInput; +import org.opensearch.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.core.common.io.stream.InputStreamStreamInput; import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogSnapshot.java b/server/src/main/java/org/opensearch/index/translog/TranslogSnapshot.java index 89718156cbbe8..a6e322bf58fff 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogSnapshot.java @@ -32,6 +32,7 @@ package org.opensearch.index.translog; import org.opensearch.common.io.Channels; +import org.opensearch.common.io.stream.BufferedChecksumStreamInput; import org.opensearch.index.seqno.SequenceNumbers; import java.io.EOFException; diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java index 86f7567f3333d..da3c7a8dee219 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java @@ -42,6 +42,7 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.Channels; import org.opensearch.common.io.DiskIoBufferPool; +import org.opensearch.common.io.stream.BufferedChecksumStreamInput; import org.opensearch.common.io.stream.ReleasableBytesStreamOutput; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index 1c0dc7fc1ca2d..0fe52d2e72d4b 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -942,7 +942,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep randomAlphaOfLength(10), Collections.emptyList(), randomAlphaOfLength(10), - true + true, ); Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID)).thenReturn(manifest); diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 6c9a3201656d7..647f4c913931f 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -40,7 +40,7 @@ public void testClusterMetadataManifestXContentV0() throws IOException { null, Collections.singletonList(uploadedIndexMetadata), "prev-cluster-uuid", - true + true, ); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -67,7 +67,7 @@ public void testClusterMetadataManifestXContent() throws IOException { "test-global-metadata-file", Collections.singletonList(uploadedIndexMetadata), "prev-cluster-uuid", - true + true, ); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -93,7 +93,7 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { "test-global-metadata-file", randomUploadedIndexMetadataList(), "yfObdx8KSMKKrXf8UyHhM", - true + true, ); { // Mutate Cluster Term EqualsHashCodeTestUtils.checkEqualsAndHashCode( diff --git a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java index c5f36fcc01983..8c64f9a3170b2 100644 --- a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java @@ -55,7 +55,7 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.index.translog.BufferedChecksumStreamOutput; +import org.opensearch.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; import org.opensearch.test.OpenSearchTestCase; From 2ad70c4fb9df43d99ce3714df1b4303f7589dc9e Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Sat, 20 Apr 2024 00:04:24 +0530 Subject: [PATCH 3/7] Revert unintentional changes for IndexRoutingTableHeader Signed-off-by: Bukhtawar Khan --- .../src/main/java/org/opensearch/common/util/BigArrays.java | 2 +- .../cluster/coordination/CoordinationStateTests.java | 2 +- .../gateway/remote/ClusterMetadataManifestTests.java | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/BigArrays.java b/server/src/main/java/org/opensearch/common/util/BigArrays.java index 734e5535c3cf4..92371c2c77ef9 100644 --- a/server/src/main/java/org/opensearch/common/util/BigArrays.java +++ b/server/src/main/java/org/opensearch/common/util/BigArrays.java @@ -49,7 +49,7 @@ import java.util.Arrays; /** - * Utility class to work with arrays.Ø + * Utility class to work with arrays. * * @opensearch.api * */ diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index 0fe52d2e72d4b..1c0dc7fc1ca2d 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -942,7 +942,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep randomAlphaOfLength(10), Collections.emptyList(), randomAlphaOfLength(10), - true, + true ); Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID)).thenReturn(manifest); diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 647f4c913931f..6c9a3201656d7 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -40,7 +40,7 @@ public void testClusterMetadataManifestXContentV0() throws IOException { null, Collections.singletonList(uploadedIndexMetadata), "prev-cluster-uuid", - true, + true ); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -67,7 +67,7 @@ public void testClusterMetadataManifestXContent() throws IOException { "test-global-metadata-file", Collections.singletonList(uploadedIndexMetadata), "prev-cluster-uuid", - true, + true ); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -93,7 +93,7 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { "test-global-metadata-file", randomUploadedIndexMetadataList(), "yfObdx8KSMKKrXf8UyHhM", - true, + true ); { // Mutate Cluster Term EqualsHashCodeTestUtils.checkEqualsAndHashCode( From acc172ea899fe2e0c5a75fe1a030b081af14aaca Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Sat, 20 Apr 2024 00:04:24 +0530 Subject: [PATCH 4/7] Revert unintentional changes for IndexRoutingTableHeader Signed-off-by: Bukhtawar Khan From f65b102996fd23e405ce4ea3bdfcdc54111cfe21 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Sun, 21 Apr 2024 00:15:47 +0530 Subject: [PATCH 5/7] Changes for IndexRoutingTableInputStream Signed-off-by: Bukhtawar Khan --- .../routingtable/IndexRoutingTableHeader.java | 27 ++- .../IndexRoutingTableInputStream.java | 157 ++++++++++++++++++ 2 files changed, 170 insertions(+), 14 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java index 87d94dc1147d1..04a0f1868b64f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java @@ -15,12 +15,12 @@ import org.apache.lucene.store.InputStreamDataInput; import org.apache.lucene.store.OutputStreamDataOutput; import org.opensearch.Version; +import org.opensearch.common.io.stream.BufferedChecksumStreamInput; +import org.opensearch.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.BytesStreamInput; import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.BufferedChecksumStreamInput; -import org.opensearch.common.io.stream.BufferedChecksumStreamOutput; import java.io.EOFException; import java.io.IOException; @@ -30,11 +30,11 @@ */ public class IndexRoutingTableHeader { - private int routingTableVersion; + private final long routingTableVersion; - private String indexName; + private final String indexName; - private Version nodeVersion; + private final Version nodeVersion; public static final String INDEX_ROUTING_HEADER_CODEC = "index_routing_header_codec"; @@ -42,8 +42,7 @@ public class IndexRoutingTableHeader { public static final int CURRENT_VERSION = INITIAL_VERSION; - - public IndexRoutingTableHeader(int routingTableVersion, String indexName, Version nodeVersion) { + public IndexRoutingTableHeader(long routingTableVersion, String indexName, Version nodeVersion) { this.routingTableVersion = routingTableVersion; this.indexName = indexName; this.nodeVersion = nodeVersion; @@ -56,11 +55,13 @@ public IndexRoutingTableHeader(int routingTableVersion, String indexName, Versio */ public BytesReference write() throws IOException { BytesReference bytesReference; - try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); - BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput)) { + try ( + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput) + ) { CodecUtil.writeHeader(new OutputStreamDataOutput(out), INDEX_ROUTING_HEADER_CODEC, CURRENT_VERSION); // Write version - out.writeInt(routingTableVersion); + out.writeLong(routingTableVersion); out.writeInt(nodeVersion.id); out.writeString(indexName); // Checksum header @@ -71,7 +72,6 @@ public BytesReference write() throws IOException { return bytesReference; } - /** * Reads the contents on the byte array into the corresponding {@link IndexRoutingTableHeader} * @param inBytes @@ -81,7 +81,7 @@ public BytesReference write() throws IOException { */ public IndexRoutingTableHeader read(byte[] inBytes, String source) throws IOException { try { - try(BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(new BytesStreamInput(inBytes), source)) { + try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(new BytesStreamInput(inBytes), source)) { readHeaderVersion(in); final int version = in.readInt(); final int nodeVersion = in.readInt(); @@ -96,7 +96,6 @@ public IndexRoutingTableHeader read(byte[] inBytes, String source) throws IOExce } } - static void verifyChecksum(BufferedChecksumStreamInput in) throws IOException { // This absolutely must come first, or else reading the checksum becomes part of the checksum long expectedChecksum = in.getChecksum(); @@ -121,7 +120,7 @@ static int readHeaderVersion(final StreamInput in) throws IOException { return version; } - public int getRoutingTableVersion() { + public long getRoutingTableVersion() { return routingTableVersion; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java new file mode 100644 index 0000000000000..ac65232e9a24d --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java @@ -0,0 +1,157 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.routingtable; + +import org.opensearch.Version; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.common.io.stream.BufferedChecksumStreamOutput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; + +public class IndexRoutingTableInputStream extends InputStream { + + /** + * The buffer where data is stored. + */ + protected byte[] buf; + + /** + * The number of valid bytes in the buffer. + */ + protected int count; + + /** + * The buffer left over from the last fill + */ + protected byte[] leftOverBuf; + + /** + * The mark position + */ + protected int markPos = -1; + + /** + * The read limit + */ + protected int markLimit; + + /** + * The position + */ + protected int pos; + + private static final int BUFFER_SIZE = 8192; + + private final IndexRoutingTableHeader indexRoutingTableHeader; + + private final Iterator shardIter; + + public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long version, Version nodeVersion) throws IOException { + this(indexRoutingTable, version, nodeVersion, BUFFER_SIZE); + } + + public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long version, Version nodeVersion, int size) + throws IOException { + this.buf = new byte[size]; + this.shardIter = indexRoutingTable.iterator(); + this.indexRoutingTableHeader = new IndexRoutingTableHeader(version, indexRoutingTable.getIndex().getName(), nodeVersion); + initialFill(); + } + + @Override + public int read() throws IOException { + if (pos >= count) { + maybeResizeAndFill(); + if (pos >= count) return -1; + } + return buf[pos++] & 0xff; + } + + private void initialFill() throws IOException { + BytesReference bytesReference = indexRoutingTableHeader.write(); + buf = bytesReference.toBytesRef().bytes; + count = bytesReference.length(); + fill(buf); + } + + private void fill(byte[] buf) throws IOException { + if (leftOverBuf != null) { + System.arraycopy(leftOverBuf, 0, buf, count, leftOverBuf.length); + } + if (count < buf.length && shardIter.hasNext()) { + IndexShardRoutingTable next = shardIter.next(); + BytesReference bytesRef; + try ( + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput) + ) { + IndexShardRoutingTable.Builder.writeTo(next, out); + // Checksum header + out.writeInt((int) out.getChecksum()); + out.flush(); + bytesRef = bytesStreamOutput.bytes(); + } + if (bytesRef.length() < buf.length - count) { + System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, bytesRef.length()); + count += bytesRef.length(); + leftOverBuf = null; + } else { + System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, buf.length - count); + count += buf.length - count; + leftOverBuf = new byte[bytesRef.length() - count]; + System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count + 1, leftOverBuf, 0, bytesRef.length() - count); + } + } + } + + private void maybeResizeAndFill() throws IOException { + byte[] buffer = buf; + if (markPos == -1) pos = 0; /* no mark: throw away the buffer */ + else if (pos >= buffer.length) { /* no room left in buffer */ + if (markPos > 0) { /* can throw away early part of the buffer */ + int sz = pos - markPos; + System.arraycopy(buffer, markPos, buffer, 0, sz); + pos = sz; + markPos = 0; + } else if (buffer.length >= markLimit) { + markPos = -1; /* buffer got too big, invalidate mark */ + pos = 0; /* drop buffer contents */ + } else { /* grow buffer */ + int nsz = markLimit + 1; + byte[] nbuf = new byte[nsz]; + System.arraycopy(buffer, 0, nbuf, 0, pos); + buffer = nbuf; + } + } + count = pos; + fill(buffer); + } + + @Override + public void mark(int readlimit) { + markLimit = readlimit; + markPos = pos; + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public void reset() throws IOException { + if (markPos < 0) throw new IOException("Resetting to invalid mark"); + pos = markPos; + } +} From 441f52023d3757e1073fefb629fe145f19ff671f Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Tue, 7 May 2024 16:11:06 +0530 Subject: [PATCH 6/7] Fixing IndexRoutingTableInputStream and moving checksum to end to file Signed-off-by: Himshikha Gupta --- .../remote/ClusterMetadataManifest.java | 15 +++-- .../routingtable/IndexRoutingTableHeader.java | 32 ++--------- .../IndexRoutingTableInputStream.java | 57 ++++++++++++------- .../IndexRoutingTableHeaderTests.java | 35 ++++++++++++ 4 files changed, 86 insertions(+), 53 deletions(-) create mode 100644 server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 4e5891d154de0..0279f8e0fd805 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -276,7 +276,7 @@ public ClusterMetadataManifest( boolean clusterUUIDCommitted ) { this(clusterTerm, version, clusterUUID, stateUUID, opensearchVersion, nodeId, committed, codecVersion, - globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted, null); + globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted, new ArrayList<>()); } public ClusterMetadataManifest( @@ -355,7 +355,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startArray(INDICES_FIELD.getPreferredName()); { for (UploadedIndexMetadata uploadedIndexMetadata : indices) { + builder.startObject(); uploadedIndexMetadata.toXContent(builder, params); + builder.endObject(); } } builder.endArray(); @@ -369,9 +371,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startArray(INDICES_ROUTING_FIELD.getPreferredName()); { for (UploadedIndexMetadata uploadedIndexMetadata : indicesRouting) { + builder.startObject(); uploadedIndexMetadata.toXContent(builder, params); + builder.endObject(); } } + builder.endArray(); } return builder; } @@ -391,7 +396,8 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeInt(codecVersion); out.writeString(globalMetadataFileName); - } else if (out.getVersion().onOrAfter(Version.V_2_14_0)) { + } + if (out.getVersion().onOrAfter(Version.V_2_14_0)) { out.writeCollection(indicesRouting); } } @@ -659,11 +665,10 @@ public String getIndexUUID() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder.startObject() + return builder .field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()) - .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath()) - .endObject(); + .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath()); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java index 04a0f1868b64f..23ab700d5a34f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java @@ -21,6 +21,7 @@ import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.BytesStreamInput; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; import java.io.EOFException; import java.io.IOException; @@ -50,26 +51,16 @@ public IndexRoutingTableHeader(long routingTableVersion, String indexName, Versi /** * Returns the bytes reference for the {@link IndexRoutingTableHeader} - * @return the {@link BytesReference} * @throws IOException */ - public BytesReference write() throws IOException { - BytesReference bytesReference; - try ( - BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); - BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput) - ) { + public void write(StreamOutput out) throws IOException { CodecUtil.writeHeader(new OutputStreamDataOutput(out), INDEX_ROUTING_HEADER_CODEC, CURRENT_VERSION); // Write version out.writeLong(routingTableVersion); out.writeInt(nodeVersion.id); out.writeString(indexName); - // Checksum header - out.writeInt((int) out.getChecksum()); + out.flush(); - bytesReference = bytesStreamOutput.bytes(); - } - return bytesReference; } /** @@ -83,10 +74,9 @@ public IndexRoutingTableHeader read(byte[] inBytes, String source) throws IOExce try { try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(new BytesStreamInput(inBytes), source)) { readHeaderVersion(in); - final int version = in.readInt(); + final long version = in.readLong(); final int nodeVersion = in.readInt(); final String name = in.readString(); - verifyChecksum(in); assert version >= 0 : "Version must be non-negative [" + version + "]"; assert in.readByte() == -1 : "Header is not fully read"; return new IndexRoutingTableHeader(version, name, Version.fromId(nodeVersion)); @@ -96,20 +86,6 @@ public IndexRoutingTableHeader read(byte[] inBytes, String source) throws IOExce } } - static void verifyChecksum(BufferedChecksumStreamInput in) throws IOException { - // This absolutely must come first, or else reading the checksum becomes part of the checksum - long expectedChecksum = in.getChecksum(); - long readChecksum = Integer.toUnsignedLong(in.readInt()); - if (readChecksum != expectedChecksum) { - throw new IOException( - "checksum verification failed - expected: 0x" - + Long.toHexString(expectedChecksum) - + ", got: 0x" - + Long.toHexString(readChecksum) - ); - } - } - static int readHeaderVersion(final StreamInput in) throws IOException { final int version; try { diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java index ac65232e9a24d..40e5908d5c65c 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java @@ -54,8 +54,9 @@ public class IndexRoutingTableInputStream extends InputStream { private static final int BUFFER_SIZE = 8192; private final IndexRoutingTableHeader indexRoutingTableHeader; - private final Iterator shardIter; + private final BytesStreamOutput bytesStreamOutput; + private final BufferedChecksumStreamOutput out; public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long version, Version nodeVersion) throws IOException { this(indexRoutingTable, version, nodeVersion, BUFFER_SIZE); @@ -66,7 +67,10 @@ public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long ve this.buf = new byte[size]; this.shardIter = indexRoutingTable.iterator(); this.indexRoutingTableHeader = new IndexRoutingTableHeader(version, indexRoutingTable.getIndex().getName(), nodeVersion); - initialFill(); + this.bytesStreamOutput = new BytesStreamOutput(); + this.out = new BufferedChecksumStreamOutput(bytesStreamOutput); + + initialFill(indexRoutingTable.shards().size()); } @Override @@ -78,39 +82,52 @@ public int read() throws IOException { return buf[pos++] & 0xff; } - private void initialFill() throws IOException { - BytesReference bytesReference = indexRoutingTableHeader.write(); - buf = bytesReference.toBytesRef().bytes; - count = bytesReference.length(); + private void initialFill(int shardCount) throws IOException { + indexRoutingTableHeader.write(out); + out.writeVInt(shardCount); + + System.arraycopy(bytesStreamOutput.bytes().toBytesRef().bytes, 0 , buf, 0, bytesStreamOutput.bytes().length()); + count = bytesStreamOutput.bytes().length(); + bytesStreamOutput.reset(); fill(buf); } private void fill(byte[] buf) throws IOException { if (leftOverBuf != null) { - System.arraycopy(leftOverBuf, 0, buf, count, leftOverBuf.length); + if(leftOverBuf.length > buf.length - count) { + // leftOverBuf has more content than length of buf, so we need to copy only based on buf length and keep the remaining in leftOverBuf. + System.arraycopy(leftOverBuf, 0, buf, count, buf.length - count); + byte[] tempLeftOverBuffer = new byte[leftOverBuf.length - (buf.length - count)]; + System.arraycopy(leftOverBuf, buf.length - count , tempLeftOverBuffer, 0, leftOverBuf.length - (buf.length - count)); + leftOverBuf = tempLeftOverBuffer; + count = buf.length - count; + } else { + System.arraycopy(leftOverBuf, 0, buf, count, leftOverBuf.length); + count += leftOverBuf.length; + leftOverBuf = null; + } } + if (count < buf.length && shardIter.hasNext()) { IndexShardRoutingTable next = shardIter.next(); - BytesReference bytesRef; - try ( - BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); - BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput) - ) { - IndexShardRoutingTable.Builder.writeTo(next, out); - // Checksum header - out.writeInt((int) out.getChecksum()); - out.flush(); - bytesRef = bytesStreamOutput.bytes(); + IndexShardRoutingTable.Builder.writeTo(next, out); + //Add checksum for the file after all shards are done + if(!shardIter.hasNext()) { + out.writeLong(out.getChecksum()); } + out.flush(); + BytesReference bytesRef = bytesStreamOutput.bytes(); + bytesStreamOutput.reset(); + if (bytesRef.length() < buf.length - count) { System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, bytesRef.length()); count += bytesRef.length(); leftOverBuf = null; } else { System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, buf.length - count); - count += buf.length - count; - leftOverBuf = new byte[bytesRef.length() - count]; - System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count + 1, leftOverBuf, 0, bytesRef.length() - count); + leftOverBuf = new byte[bytesRef.length() - (buf.length - count)]; + System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count , leftOverBuf, 0, bytesRef.length() - (buf.length - count)); + count = buf.length; } } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java new file mode 100644 index 0000000000000..068db554b4226 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.routingtable; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.InputStreamDataInput; +import org.opensearch.Version; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +public class IndexRoutingTableHeaderTests extends OpenSearchTestCase { + + public void testWrite() throws IOException { + BytesStreamOutput out = new BytesStreamOutput(); + IndexRoutingTableHeader header = new IndexRoutingTableHeader(1, "dummyIndex", Version.V_3_0_0); + header.write(out); + + BytesStreamInput in = new BytesStreamInput(out.bytes().toBytesRef().bytes); + CodecUtil.checkHeader(new InputStreamDataInput(in),IndexRoutingTableHeader.INDEX_ROUTING_HEADER_CODEC, IndexRoutingTableHeader.INITIAL_VERSION, IndexRoutingTableHeader.CURRENT_VERSION ); + assertEquals(1, in.readLong()); + assertEquals(Version.V_3_0_0.id, in.readInt()); + assertEquals("dummyIndex", in.readString()); + } + +} From 270e00de184aac8bb157c9f0ab30b7e5024e6fda Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Wed, 8 May 2024 11:09:54 +0530 Subject: [PATCH 7/7] Add read flow for IndexRoutingTable Signed-off-by: Arpit Bandejiya --- .../routingtable/IndexRoutingTableHeader.java | 10 +-- .../IndexRoutingTableInputStreamReader.java | 77 +++++++++++++++++++ .../routingtable/IndexRoutingTableTests.java | 61 +++++++++++++++ 3 files changed, 141 insertions(+), 7 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamReader.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableTests.java diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java index 23ab700d5a34f..e29ce5a79dc02 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java @@ -65,22 +65,18 @@ public void write(StreamOutput out) throws IOException { /** * Reads the contents on the byte array into the corresponding {@link IndexRoutingTableHeader} - * @param inBytes - * @param source - * @return + * @param in + * @return IndexRoutingTableHeader * @throws IOException */ - public IndexRoutingTableHeader read(byte[] inBytes, String source) throws IOException { + public static IndexRoutingTableHeader read(BufferedChecksumStreamInput in) throws IOException { try { - try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(new BytesStreamInput(inBytes), source)) { readHeaderVersion(in); final long version = in.readLong(); final int nodeVersion = in.readInt(); final String name = in.readString(); assert version >= 0 : "Version must be non-negative [" + version + "]"; - assert in.readByte() == -1 : "Header is not fully read"; return new IndexRoutingTableHeader(version, name, Version.fromId(nodeVersion)); - } } catch (EOFException e) { throw new IOException("index routing header truncated", e); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamReader.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamReader.java new file mode 100644 index 0000000000000..35ae9f287d7f2 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamReader.java @@ -0,0 +1,77 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.routingtable; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.common.io.stream.BufferedChecksumStreamInput; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.InputStreamStreamInput; +import org.opensearch.core.common.io.stream.StreamInput; + +import java.io.BufferedReader; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class IndexRoutingTableInputStreamReader { + + private final StreamInput streamInput; + + private static final Logger logger = LogManager.getLogger(IndexRoutingTableInputStreamReader.class); + + public IndexRoutingTableInputStreamReader(InputStream inputStream) throws IOException { + this.streamInput = new InputStreamStreamInput(inputStream); + } + + public Map read() throws IOException { + try { + try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(streamInput, "assertion")) { + // Read the Table Header first + IndexRoutingTableHeader.read(in); + int shards = in.readVInt(); + logger.info("Number of Index Routing Table {}", shards); + Map indicesRouting = new HashMap(Collections.EMPTY_MAP); + for(int i=0; i { + try { + logger.info("IndexShardRoutingTables: {}", indexShardRoutingTables); + InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexShardRoutingTables, + initialRoutingTable.version(), Version.CURRENT); + + IndexRoutingTableInputStreamReader reader = new IndexRoutingTableInputStreamReader(indexRoutingStream); + Map indexShardRoutingTableMap = reader.read(); + + logger.info("indexShardRoutingTableMap: {}", indexShardRoutingTableMap); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + +}