From 7f0ff145ee3a9725833512967459f0a62a591d2b Mon Sep 17 00:00:00 2001 From: Sooraj Sinha <81695996+soosinha@users.noreply.github.com> Date: Mon, 10 Jun 2024 21:33:59 +0530 Subject: [PATCH] Create serde utility for Writable classes (#14095) * Create serde utility for Writable classes Signed-off-by: Sooraj Sinha --- .../core/compress/CompressorRegistry.java | 13 +++ .../ChecksumWritableBlobStoreFormat.java | 108 ++++++++++++++++++ .../ChecksumWritableBlobStoreFormatTests.java | 66 +++++++++++ 3 files changed, 187 insertions(+) create mode 100644 server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java create mode 100644 server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java diff --git a/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java b/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java index af09a7aebba79..711f56c9f3e3b 100644 --- a/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java +++ b/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java @@ -78,6 +78,19 @@ public static Compressor compressor(final BytesReference bytes) { return null; } + /** + * @param bytes The bytes to check the compression for + * @return The detected compressor. If no compressor detected then return NoneCompressor. + */ + public static Compressor compressorForWritable(final BytesReference bytes) { + for (Compressor compressor : registeredCompressors.values()) { + if (compressor.isCompressed(bytes) == true) { + return compressor; + } + } + return CompressorRegistry.none(); + } + /** Decompress the provided {@link BytesReference}. */ public static BytesReference uncompress(BytesReference bytes) throws IOException { Compressor compressor = compressor(bytes); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java new file mode 100644 index 0000000000000..0add86ab88a16 --- /dev/null +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.blobstore; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.ByteBuffersIndexInput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.OutputStreamIndexOutput; +import org.apache.lucene.util.BytesRef; +import org.opensearch.Version; +import org.opensearch.common.CheckedFunction; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.common.lucene.store.IndexOutputOutputStream; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.InputStreamStreamInput; +import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.CompressorRegistry; +import org.opensearch.gateway.CorruptStateException; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; + +/** + * Checksum File format used to serialize/deserialize {@link Writeable} objects + * + * @opensearch.internal + */ +public class ChecksumWritableBlobStoreFormat { + + public static final int VERSION = 1; + + private static final int BUFFER_SIZE = 4096; + + private final String codec; + private final CheckedFunction reader; + + public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction reader) { + this.codec = codec; + this.reader = reader; + } + + public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException { + try (BytesStreamOutput outputStream = new BytesStreamOutput()) { + try ( + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( + "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")", + blobName, + outputStream, + BUFFER_SIZE + ) + ) { + CodecUtil.writeHeader(indexOutput, codec, VERSION); + + try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) { + @Override + public void close() throws IOException { + // this is important since some of the XContentBuilders write bytes on close. + // in order to write the footer we need to prevent closing the actual index input. + } + }; StreamOutput stream = new OutputStreamStreamOutput(compressor.threadLocalOutputStream(indexOutputOutputStream));) { + // TODO The stream version should be configurable + stream.setVersion(Version.CURRENT); + obj.writeTo(stream); + } + CodecUtil.writeFooter(indexOutput); + } + return outputStream.bytes(); + } + } + + public T deserialize(String blobName, BytesReference bytes) throws IOException { + final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")"; + try { + final IndexInput indexInput = bytes.length() > 0 + ? new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc) + : new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES); + CodecUtil.checksumEntireFile(indexInput); + CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION); + long filePointer = indexInput.getFilePointer(); + long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer; + BytesReference bytesReference = bytes.slice((int) filePointer, (int) contentSize); + Compressor compressor = CompressorRegistry.compressorForWritable(bytesReference); + try (StreamInput in = new InputStreamStreamInput(compressor.threadLocalInputStream(bytesReference.streamInput()))) { + return reader.apply(in); + } + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + // we trick this into a dedicated exception with the original stacktrace + throw new CorruptStateException(ex); + } + } + +} diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java new file mode 100644 index 0000000000000..536df880b2597 --- /dev/null +++ b/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.blobstore; + +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.compress.CompressorRegistry; +import org.opensearch.core.index.Index; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.is; + +/** + * Tests for {@link ChecksumWritableBlobStoreFormat} + */ +public class ChecksumWritableBlobStoreFormatTests extends OpenSearchTestCase { + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + private static final long VERSION = 5L; + + private final ChecksumWritableBlobStoreFormat clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>( + "index-metadata", + IndexMetadata::readFrom + ); + + public void testSerDe() throws IOException { + IndexMetadata indexMetadata = getIndexMetadata(); + BytesReference bytesReference = clusterBlocksFormat.serialize(indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.none()); + IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, bytesReference); + assertThat(readIndexMetadata, is(indexMetadata)); + } + + public void testSerDeForCompressed() throws IOException { + IndexMetadata indexMetadata = getIndexMetadata(); + BytesReference bytesReference = clusterBlocksFormat.serialize( + indexMetadata, + TEST_BLOB_FILE_NAME, + CompressorRegistry.getCompressor(DeflateCompressor.NAME) + ); + IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, bytesReference); + assertThat(readIndexMetadata, is(indexMetadata)); + } + + private IndexMetadata getIndexMetadata() { + final Index index = new Index("test-index", "index-uuid"); + final Settings idxSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build(); + return new IndexMetadata.Builder(index.getName()).settings(idxSettings) + .version(VERSION) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + } +}