diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index 2ba63a8100d6a..642de96436ce2 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -72,6 +72,11 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); } + @Override + public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists); + } + @Override public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { writeBlob(blobName, bytes, failIfAlreadyExists); diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index b5a22a09ed8ad..66be990e88bfa 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -21,6 +21,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; @@ -29,7 +31,9 @@ import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.hash.MessageDigests; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -41,7 +45,9 @@ import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.nio.file.FileAlreadyExistsException; +import java.security.MessageDigest; import java.util.ArrayList; +import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -217,12 +223,39 @@ InputStream readBlob(String blobName, long position, long length) throws IOExcep /** * Writes a blob in the specific bucket - * @param inputStream content of the blob to be written + * @param bytes content of the blob to be written + * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists + */ + void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + if (bytes.length() > getLargeBlobThresholdInBytes()) { + // Compute md5 here so #writeBlobResumable forces the integrity check on the resumable upload. + // This is needed since we rely on atomic write behavior when writing BytesReferences in BlobStoreRepository which is not + // guaranteed for resumable uploads. + MessageDigest md5 = MessageDigests.md5(); + final BytesRefIterator iterator = bytes.iterator(); + BytesRef ref; + while ((ref = iterator.next()) != null) { + md5.update(ref.bytes, ref.offset, ref.length); + } + writeBlobResumable( + BlobInfo.newBuilder(bucketName, blobName).setMd5(Base64.getEncoder().encodeToString(md5.digest())).build(), + bytes.streamInput(), bytes.length(), failIfAlreadyExists); + } else { + writeBlob(bytes.streamInput(), bytes.length(), failIfAlreadyExists, BlobInfo.newBuilder(bucketName, blobName).build()); + } + } + + /** + * Writes a blob in the specific bucket + * @param inputStream content of the blob to be written * @param blobSize expected size of the blob to be written * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists */ void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build(); + writeBlob(inputStream, blobSize, failIfAlreadyExists, BlobInfo.newBuilder(bucketName, blobName).build()); + } + + private void writeBlob(InputStream inputStream, long blobSize, boolean failIfAlreadyExists, BlobInfo blobInfo) throws IOException { if (blobSize > getLargeBlobThresholdInBytes()) { writeBlobResumable(blobInfo, inputStream, blobSize, failIfAlreadyExists); } else { @@ -235,6 +268,13 @@ long getLargeBlobThresholdInBytes() { return LARGE_BLOB_THRESHOLD_BYTE_SIZE; } + // possible options for #writeBlobResumable uploads + private static final Storage.BlobWriteOption[] NO_OVERWRITE_NO_MD5 = {Storage.BlobWriteOption.doesNotExist()}; + private static final Storage.BlobWriteOption[] OVERWRITE_NO_MD5 = new Storage.BlobWriteOption[0]; + private static final Storage.BlobWriteOption[] NO_OVERWRITE_CHECK_MD5 = + {Storage.BlobWriteOption.doesNotExist(), Storage.BlobWriteOption.md5Match()}; + private static final Storage.BlobWriteOption[] OVERWRITE_CHECK_MD5 = {Storage.BlobWriteOption.md5Match()}; + /** * Uploads a blob using the "resumable upload" method (multiple requests, which * can be independently retried in case of failure, see @@ -252,8 +292,14 @@ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long inputStream.mark(Integer.MAX_VALUE); final byte[] buffer = new byte[size < bufferSize ? Math.toIntExact(size) : bufferSize]; StorageException storageException = null; - final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ? - new Storage.BlobWriteOption[]{Storage.BlobWriteOption.doesNotExist()} : new Storage.BlobWriteOption[0]; + final Storage.BlobWriteOption[] writeOptions; + if (blobInfo.getMd5() == null) { + // no md5, use options without checksum validation + writeOptions = failIfAlreadyExists ? NO_OVERWRITE_NO_MD5 : OVERWRITE_NO_MD5; + } else { + // md5 value is set so we use it by enabling checksum validation + writeOptions = failIfAlreadyExists ? NO_OVERWRITE_CHECK_MD5 : OVERWRITE_CHECK_MD5; + } for (int retry = 0; retry < 3; ++retry) { try { final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions)); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 78feb52dc9809..cf552ae79b5dc 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -175,7 +175,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s"; - private static final String UPLOADED_DATA_BLOB_PREFIX = "__"; + public static final String UPLOADED_DATA_BLOB_PREFIX = "__"; // Expose a copy of URLRepository#TYPE here too, for a better error message until https://github.com/elastic/elasticsearch/issues/68918 // is resolved. @@ -273,7 +273,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final NamedXContentRegistry namedXContentRegistry; - private final BigArrays bigArrays; + protected final BigArrays bigArrays; /** * Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index 732f777bd73b8..6c8ead01364f1 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -21,6 +21,9 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; @@ -29,6 +32,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.net.URLDecoder; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -202,6 +206,11 @@ public void handle(final HttpExchange exchange) throws IOException { blobs.put(blobName, BytesArray.EMPTY); byte[] response = requestBody.utf8ToString().getBytes(UTF_8); + if (Paths.get(blobName).getFileName().toString().startsWith(BlobStoreRepository.UPLOADED_DATA_BLOB_PREFIX) == false) { + final Map parsedBody = XContentHelper.convertToMap(requestBody, false, XContentType.JSON).v2(); + assert parsedBody.get("md5Hash") != null : + "file [" + blobName + "] is not a data blob but did not come with a md5 checksum"; + } exchange.getResponseHeaders().add("Content-Type", "application/json"); exchange.getResponseHeaders().add("Location", httpServerUrl(exchange) + "/upload/storage/v1/b/" + bucket + "/o?" + "uploadType=resumable" diff --git a/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java b/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java index 51c3a93bb5d59..e7cd1fb157dd0 100644 --- a/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java +++ b/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.cache.CacheBuilder; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.util.BigArrays; @@ -392,7 +393,7 @@ public boolean hasAtomicOverwrites() { } // pkg-private for tests - static final class EncryptedBlobStore implements BlobStore { + class EncryptedBlobStore implements BlobStore { private final BlobStore delegatedBlobStore; private final BlobPath delegatedBasePath; private final String repositoryName; @@ -530,7 +531,7 @@ public void close() { } } - private static final class EncryptedBlobContainer extends AbstractBlobContainer { + private final class EncryptedBlobContainer extends AbstractBlobContainer { private final String repositoryName; private final BlobContainer delegatedBlobContainer; // supplier for the DEK used for encryption (snapshot) @@ -631,6 +632,45 @@ public InputStream readBlob(String blobName, long position, long length) throws public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { // reuse, but possibly generate and store a new DEK final SingleUseKey singleUseNonceAndDEK = singleUseDEKSupplier.get(); + final BytesReference dekIdBytes = getDEKBytes(singleUseNonceAndDEK); + final long encryptedBlobSize = getEncryptedBlobByteLength(blobSize); + try (InputStream encryptedInputStream = encryptedInput(inputStream, singleUseNonceAndDEK, dekIdBytes)) { + delegatedBlobContainer.writeBlob(blobName, encryptedInputStream, encryptedBlobSize, failIfAlreadyExists); + } + } + + @Override + public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + // reuse, but possibly generate and store a new DEK + final SingleUseKey singleUseNonceAndDEK = singleUseDEKSupplier.get(); + final BytesReference dekIdBytes = getDEKBytes(singleUseNonceAndDEK); + try ( + ReleasableBytesStreamOutput tmp = new ReleasableBytesStreamOutput( + Math.toIntExact(getEncryptedBlobByteLength(bytes.length())), + bigArrays + ) + ) { + try (InputStream encryptedInputStream = encryptedInput(bytes.streamInput(), singleUseNonceAndDEK, dekIdBytes)) { + org.elasticsearch.core.internal.io.Streams.copy(encryptedInputStream, tmp, false); + } + delegatedBlobContainer.writeBlob(blobName, tmp.bytes(), failIfAlreadyExists); + } + } + + private ChainingInputStream encryptedInput(InputStream inputStream, SingleUseKey singleUseNonceAndDEK, BytesReference dekIdBytes) + throws IOException { + return ChainingInputStream.chain( + dekIdBytes.streamInput(), + new EncryptionPacketsInputStream( + inputStream, + singleUseNonceAndDEK.getKey(), + singleUseNonceAndDEK.getNonce(), + PACKET_LENGTH_IN_BYTES + ) + ); + } + + private BytesReference getDEKBytes(SingleUseKey singleUseNonceAndDEK) { final BytesReference dekIdBytes = singleUseNonceAndDEK.getKeyId(); if (dekIdBytes.length() != DEK_ID_LENGTH) { throw new RepositoryException( @@ -639,20 +679,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b new IllegalStateException("Unexpected DEK Id length [" + dekIdBytes.length() + "]") ); } - final long encryptedBlobSize = getEncryptedBlobByteLength(blobSize); - try ( - InputStream encryptedInputStream = ChainingInputStream.chain( - dekIdBytes.streamInput(), - new EncryptionPacketsInputStream( - inputStream, - singleUseNonceAndDEK.getKey(), - singleUseNonceAndDEK.getNonce(), - PACKET_LENGTH_IN_BYTES - ) - ) - ) { - delegatedBlobContainer.writeBlob(blobName, encryptedInputStream, encryptedBlobSize, failIfAlreadyExists); - } + return dekIdBytes; } @Override