diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 5b7192c624fed..90ab4869ad1fd 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -161,7 +161,7 @@ private static List indicesRouting(Object[] fields) { clusterUUIDCommitted(fields), routingTableVersion(fields), indicesRouting(fields) - ) + ) ); private static final ConstructingObjectParser CURRENT_PARSER = PARSER_V2; @@ -287,8 +287,22 @@ public ClusterMetadataManifest( String previousClusterUUID, boolean clusterUUIDCommitted ) { - this(clusterTerm, version, clusterUUID, stateUUID, opensearchVersion, nodeId, committed, codecVersion, - globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted, -1, new ArrayList<>()); + this( + clusterTerm, + version, + clusterUUID, + stateUUID, + opensearchVersion, + nodeId, + committed, + codecVersion, + globalMetadataFileName, + indices, + previousClusterUUID, + clusterUUIDCommitted, + -1, + new ArrayList<>() + ); } public ClusterMetadataManifest( @@ -306,7 +320,7 @@ public ClusterMetadataManifest( boolean clusterUUIDCommitted, long routingTableVersion, List indicesRouting - ) { + ) { this.clusterTerm = clusterTerm; this.stateVersion = version; this.clusterUUID = clusterUUID; @@ -694,8 +708,7 @@ public String getIndexUUID() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder - .field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) + return builder.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()) .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath()); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java index 5332a8a87c9a4..d4e2594bf153c 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java @@ -61,8 +61,7 @@ public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable) throws this(indexRoutingTable, BUFFER_SIZE); } - public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, int size) - throws IOException { + public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, int size) throws IOException { this.buf = new byte[size]; this.shardIter = indexRoutingTable.iterator(); this.indexRoutingTableHeader = new IndexRoutingTableHeader(indexRoutingTable.getIndex().getName()); @@ -85,7 +84,7 @@ private void initialFill(int shardCount) throws IOException { indexRoutingTableHeader.write(out); out.writeVInt(shardCount); - System.arraycopy(bytesStreamOutput.bytes().toBytesRef().bytes, 0 , buf, 0, bytesStreamOutput.bytes().length()); + System.arraycopy(bytesStreamOutput.bytes().toBytesRef().bytes, 0, buf, 0, bytesStreamOutput.bytes().length()); count = bytesStreamOutput.bytes().length(); bytesStreamOutput.reset(); fill(buf); @@ -93,16 +92,17 @@ private void initialFill(int shardCount) throws IOException { private void fill(byte[] buf) throws IOException { if (leftOverBuf != null) { - if(leftOverBuf.length > buf.length - count) { - // leftOverBuf has more content than length of buf, so we need to copy only based on buf length and keep the remaining in leftOverBuf. + if (leftOverBuf.length > buf.length - count) { + // leftOverBuf has more content than length of buf, so we need to copy only based on buf length and keep the remaining in + // leftOverBuf. System.arraycopy(leftOverBuf, 0, buf, count, buf.length - count); - byte[] tempLeftOverBuffer = new byte[leftOverBuf.length - (buf.length - count)]; - System.arraycopy(leftOverBuf, buf.length - count , tempLeftOverBuffer, 0, leftOverBuf.length - (buf.length - count)); + byte[] tempLeftOverBuffer = new byte[leftOverBuf.length - (buf.length - count)]; + System.arraycopy(leftOverBuf, buf.length - count, tempLeftOverBuffer, 0, leftOverBuf.length - (buf.length - count)); leftOverBuf = tempLeftOverBuffer; count = buf.length - count; } else { System.arraycopy(leftOverBuf, 0, buf, count, leftOverBuf.length); - count += leftOverBuf.length; + count += leftOverBuf.length; leftOverBuf = null; } } @@ -110,8 +110,8 @@ private void fill(byte[] buf) throws IOException { if (count < buf.length && shardIter.hasNext()) { IndexShardRoutingTable next = shardIter.next(); IndexShardRoutingTable.Builder.writeTo(next, out); - //Add checksum for the file after all shards are done - if(!shardIter.hasNext()) { + // Add checksum for the file after all shards are done + if (!shardIter.hasNext()) { out.writeLong(out.getChecksum()); } out.flush(); @@ -125,7 +125,7 @@ private void fill(byte[] buf) throws IOException { } else { System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, buf.length - count); leftOverBuf = new byte[bytesRef.length() - (buf.length - count)]; - System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count , leftOverBuf, 0, bytesRef.length() - (buf.length - count)); + System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count, leftOverBuf, 0, bytesRef.length() - (buf.length - count)); count = buf.length; } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamReader.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamReader.java index 35ae9f287d7f2..e2b4f5da5f6e9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamReader.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamReader.java @@ -13,19 +13,13 @@ import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.common.io.stream.BufferedChecksumStreamInput; -import org.opensearch.core.common.io.stream.BytesStreamInput; import org.opensearch.core.common.io.stream.InputStreamStreamInput; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.index.Index; -import java.io.BufferedReader; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; public class IndexRoutingTableInputStreamReader { @@ -34,32 +28,31 @@ public class IndexRoutingTableInputStreamReader { private static final Logger logger = LogManager.getLogger(IndexRoutingTableInputStreamReader.class); public IndexRoutingTableInputStreamReader(InputStream inputStream) throws IOException { - this.streamInput = new InputStreamStreamInput(inputStream); + streamInput = new InputStreamStreamInput(inputStream); } - public Map read() throws IOException { + public IndexRoutingTable readIndexRoutingTable(Index index) throws IOException { try { try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(streamInput, "assertion")) { - // Read the Table Header first - IndexRoutingTableHeader.read(in); - int shards = in.readVInt(); - logger.info("Number of Index Routing Table {}", shards); - Map indicesRouting = new HashMap(Collections.EMPTY_MAP); - for(int i=0; i indexShardRoutingTableMap = reader.read(); + IndexRoutingTable indexRoutingTable = reader.readIndexRoutingTable(metadata.index("test").getIndex()); - assertEquals(1, indexShardRoutingTableMap.size()); - assertNotNull(indexShardRoutingTableMap.get("test")); - assertEquals(2,indexShardRoutingTableMap.get("test").shards().size()); + assertEquals(1, indexRoutingTable.getShards().size()); + assertEquals(indexRoutingTable.getIndex(), metadata.index("test").getIndex()); + assertEquals(indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED).size(), 2); } catch (IOException e) { throw new RuntimeException(e); } }); } + public void testRoutingTableInputStreamWithInvalidIndex() { + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetadata.builder("invalid-index").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + AtomicInteger assertionError = new AtomicInteger(); + initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> { + try { + InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexShardRoutingTables); + + IndexRoutingTableInputStreamReader reader = new IndexRoutingTableInputStreamReader(indexRoutingStream); + reader.readIndexRoutingTable(metadata.index("invalid-index").getIndex()); + + } catch (AssertionError e) { + assertionError.getAndIncrement(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + assertEquals(1, assertionError.get()); + } + } diff --git a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java index 8c64f9a3170b2..8d9a444c6f9e2 100644 --- a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java @@ -46,6 +46,7 @@ import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.io.Streams; +import org.opensearch.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesArray; @@ -55,7 +56,6 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; import org.opensearch.test.OpenSearchTestCase;