Skip to content

Commit

Permalink
Ensure GCS Repository Metadata Blob Writes are Atomic (#72051)
Browse files Browse the repository at this point in the history
In the corner case of uploading a large (>5MB) metadata blob we did not set content validation
requirement on the upload request (we automatically have it for smaller requests that are not resumable
uploads). This change sets the relevant request option to enforce a MD5 hash check when writing
`BytesReference` to GCS (as is the case with all but data blob writes)

closes #72018
  • Loading branch information
original-brownbear authored Apr 22, 2021
1 parent 146ba50 commit 5f69ee3
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
}

@Override
public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists);
}

@Override
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, bytes, failIfAlreadyExists);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand All @@ -29,7 +31,9 @@
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand All @@ -41,7 +45,9 @@
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -217,12 +223,39 @@ InputStream readBlob(String blobName, long position, long length) throws IOExcep

/**
* Writes a blob in the specific bucket
* @param inputStream content of the blob to be written
* @param bytes content of the blob to be written
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
if (bytes.length() > getLargeBlobThresholdInBytes()) {
// Compute md5 here so #writeBlobResumable forces the integrity check on the resumable upload.
// This is needed since we rely on atomic write behavior when writing BytesReferences in BlobStoreRepository which is not
// guaranteed for resumable uploads.
MessageDigest md5 = MessageDigests.md5();
final BytesRefIterator iterator = bytes.iterator();
BytesRef ref;
while ((ref = iterator.next()) != null) {
md5.update(ref.bytes, ref.offset, ref.length);
}
writeBlobResumable(
BlobInfo.newBuilder(bucketName, blobName).setMd5(Base64.getEncoder().encodeToString(md5.digest())).build(),
bytes.streamInput(), bytes.length(), failIfAlreadyExists);
} else {
writeBlob(bytes.streamInput(), bytes.length(), failIfAlreadyExists, BlobInfo.newBuilder(bucketName, blobName).build());
}
}

/**
* Writes a blob in the specific bucket
* @param inputStream content of the blob to be written
* @param blobSize expected size of the blob to be written
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
writeBlob(inputStream, blobSize, failIfAlreadyExists, BlobInfo.newBuilder(bucketName, blobName).build());
}

private void writeBlob(InputStream inputStream, long blobSize, boolean failIfAlreadyExists, BlobInfo blobInfo) throws IOException {
if (blobSize > getLargeBlobThresholdInBytes()) {
writeBlobResumable(blobInfo, inputStream, blobSize, failIfAlreadyExists);
} else {
Expand All @@ -235,6 +268,13 @@ long getLargeBlobThresholdInBytes() {
return LARGE_BLOB_THRESHOLD_BYTE_SIZE;
}

// possible options for #writeBlobResumable uploads
private static final Storage.BlobWriteOption[] NO_OVERWRITE_NO_MD5 = {Storage.BlobWriteOption.doesNotExist()};
private static final Storage.BlobWriteOption[] OVERWRITE_NO_MD5 = new Storage.BlobWriteOption[0];
private static final Storage.BlobWriteOption[] NO_OVERWRITE_CHECK_MD5 =
{Storage.BlobWriteOption.doesNotExist(), Storage.BlobWriteOption.md5Match()};
private static final Storage.BlobWriteOption[] OVERWRITE_CHECK_MD5 = {Storage.BlobWriteOption.md5Match()};

/**
* Uploads a blob using the "resumable upload" method (multiple requests, which
* can be independently retried in case of failure, see
Expand All @@ -252,8 +292,14 @@ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long
inputStream.mark(Integer.MAX_VALUE);
final byte[] buffer = new byte[size < bufferSize ? Math.toIntExact(size) : bufferSize];
StorageException storageException = null;
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
new Storage.BlobWriteOption[]{Storage.BlobWriteOption.doesNotExist()} : new Storage.BlobWriteOption[0];
final Storage.BlobWriteOption[] writeOptions;
if (blobInfo.getMd5() == null) {
// no md5, use options without checksum validation
writeOptions = failIfAlreadyExists ? NO_OVERWRITE_NO_MD5 : OVERWRITE_NO_MD5;
} else {
// md5 value is set so we use it by enabling checksum validation
writeOptions = failIfAlreadyExists ? NO_OVERWRITE_CHECK_MD5 : OVERWRITE_CHECK_MD5;
}
for (int retry = 0; retry < 3; ++retry) {
try {
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s";

private static final String UPLOADED_DATA_BLOB_PREFIX = "__";
public static final String UPLOADED_DATA_BLOB_PREFIX = "__";

// Expose a copy of URLRepository#TYPE here too, for a better error message until https://github.com/elastic/elasticsearch/issues/68918
// is resolved.
Expand Down Expand Up @@ -273,7 +273,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private final NamedXContentRegistry namedXContentRegistry;

private final BigArrays bigArrays;
protected final BigArrays bigArrays;

/**
* Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;

Expand All @@ -29,6 +32,7 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URLDecoder;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -202,6 +206,11 @@ public void handle(final HttpExchange exchange) throws IOException {
blobs.put(blobName, BytesArray.EMPTY);

byte[] response = requestBody.utf8ToString().getBytes(UTF_8);
if (Paths.get(blobName).getFileName().toString().startsWith(BlobStoreRepository.UPLOADED_DATA_BLOB_PREFIX) == false) {
final Map<String, Object> parsedBody = XContentHelper.convertToMap(requestBody, false, XContentType.JSON).v2();
assert parsedBody.get("md5Hash") != null :
"file [" + blobName + "] is not a data blob but did not come with a md5 checksum";
}
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.getResponseHeaders().add("Location", httpServerUrl(exchange) + "/upload/storage/v1/b/" + bucket + "/o?"
+ "uploadType=resumable"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -386,7 +387,7 @@ public boolean hasAtomicOverwrites() {
}

// pkg-private for tests
static final class EncryptedBlobStore implements BlobStore {
class EncryptedBlobStore implements BlobStore {
private final BlobStore delegatedBlobStore;
private final BlobPath delegatedBasePath;
private final String repositoryName;
Expand Down Expand Up @@ -524,7 +525,7 @@ public void close() {
}
}

private static final class EncryptedBlobContainer extends AbstractBlobContainer {
private final class EncryptedBlobContainer extends AbstractBlobContainer {
private final String repositoryName;
private final BlobContainer delegatedBlobContainer;
// supplier for the DEK used for encryption (snapshot)
Expand Down Expand Up @@ -625,6 +626,45 @@ public InputStream readBlob(String blobName, long position, long length) throws
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
// reuse, but possibly generate and store a new DEK
final SingleUseKey singleUseNonceAndDEK = singleUseDEKSupplier.get();
final BytesReference dekIdBytes = getDEKBytes(singleUseNonceAndDEK);
final long encryptedBlobSize = getEncryptedBlobByteLength(blobSize);
try (InputStream encryptedInputStream = encryptedInput(inputStream, singleUseNonceAndDEK, dekIdBytes)) {
delegatedBlobContainer.writeBlob(blobName, encryptedInputStream, encryptedBlobSize, failIfAlreadyExists);
}
}

@Override
public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
// reuse, but possibly generate and store a new DEK
final SingleUseKey singleUseNonceAndDEK = singleUseDEKSupplier.get();
final BytesReference dekIdBytes = getDEKBytes(singleUseNonceAndDEK);
try (
ReleasableBytesStreamOutput tmp = new ReleasableBytesStreamOutput(
Math.toIntExact(getEncryptedBlobByteLength(bytes.length())),
bigArrays
)
) {
try (InputStream encryptedInputStream = encryptedInput(bytes.streamInput(), singleUseNonceAndDEK, dekIdBytes)) {
org.elasticsearch.core.internal.io.Streams.copy(encryptedInputStream, tmp, false);
}
delegatedBlobContainer.writeBlob(blobName, tmp.bytes(), failIfAlreadyExists);
}
}

private ChainingInputStream encryptedInput(InputStream inputStream, SingleUseKey singleUseNonceAndDEK, BytesReference dekIdBytes)
throws IOException {
return ChainingInputStream.chain(
dekIdBytes.streamInput(),
new EncryptionPacketsInputStream(
inputStream,
singleUseNonceAndDEK.getKey(),
singleUseNonceAndDEK.getNonce(),
PACKET_LENGTH_IN_BYTES
)
);
}

private BytesReference getDEKBytes(SingleUseKey singleUseNonceAndDEK) {
final BytesReference dekIdBytes = singleUseNonceAndDEK.getKeyId();
if (dekIdBytes.length() != DEK_ID_LENGTH) {
throw new RepositoryException(
Expand All @@ -633,20 +673,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
new IllegalStateException("Unexpected DEK Id length [" + dekIdBytes.length() + "]")
);
}
final long encryptedBlobSize = getEncryptedBlobByteLength(blobSize);
try (
InputStream encryptedInputStream = ChainingInputStream.chain(
dekIdBytes.streamInput(),
new EncryptionPacketsInputStream(
inputStream,
singleUseNonceAndDEK.getKey(),
singleUseNonceAndDEK.getNonce(),
PACKET_LENGTH_IN_BYTES
)
)
) {
delegatedBlobContainer.writeBlob(blobName, encryptedInputStream, encryptedBlobSize, failIfAlreadyExists);
}
return dekIdBytes;
}

@Override
Expand Down

0 comments on commit 5f69ee3

Please sign in to comment.