Skip to content

Commit

Permalink
Ensure GCS Repository Metadata Blob Writes are Atomic
Browse files Browse the repository at this point in the history
In the corner case of uploading a large (>5MB) metadata blob we did not set content validation
requirement on the upload request (we automatically have it for smaller requests that are not resumable
uploads). This change sets the relevant request option to enforce a CRC32C hash check when writing
`BytesReference` to GCS (as is the case with all but data blob writes). The custom CRC32C implementation
here can be removed after backporting to 7.x. I copied over the Guava version of CRC32C with slight
adjustments for consuming `BytesReference` instead of pulling the Guava dependency into the compile path
so that we can use the JDK's implementation of CRC32C in 8.x without having different dependencies in 8.x
and 7.x.

closes elastic#72018
  • Loading branch information
original-brownbear committed Apr 21, 2021
1 parent 3f2eb32 commit 13fdb9a
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.repositories.gcs;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.bytes.BytesReference;

import java.io.IOException;
import java.util.Base64;

/**
* CRC32C implementation to compute a checksum for resumable GCS uploads.
* TODO: Move to java.util.zip.CRC32C instead of this custom implementation once we no longer need to support JDK-8
*/
public final class Crc32C {

private static final int[] CRC_TABLE = {
0x00000000, 0xf26b8303, 0xe13b70f7, 0x1350f3f4,
0xc79a971f, 0x35f1141c, 0x26a1e7e8, 0xd4ca64eb,
0x8ad958cf, 0x78b2dbcc, 0x6be22838, 0x9989ab3b,
0x4d43cfd0, 0xbf284cd3, 0xac78bf27, 0x5e133c24,
0x105ec76f, 0xe235446c, 0xf165b798, 0x030e349b,
0xd7c45070, 0x25afd373, 0x36ff2087, 0xc494a384,
0x9a879fa0, 0x68ec1ca3, 0x7bbcef57, 0x89d76c54,
0x5d1d08bf, 0xaf768bbc, 0xbc267848, 0x4e4dfb4b,
0x20bd8ede, 0xd2d60ddd, 0xc186fe29, 0x33ed7d2a,
0xe72719c1, 0x154c9ac2, 0x061c6936, 0xf477ea35,
0xaa64d611, 0x580f5512, 0x4b5fa6e6, 0xb93425e5,
0x6dfe410e, 0x9f95c20d, 0x8cc531f9, 0x7eaeb2fa,
0x30e349b1, 0xc288cab2, 0xd1d83946, 0x23b3ba45,
0xf779deae, 0x05125dad, 0x1642ae59, 0xe4292d5a,
0xba3a117e, 0x4851927d, 0x5b016189, 0xa96ae28a,
0x7da08661, 0x8fcb0562, 0x9c9bf696, 0x6ef07595,
0x417b1dbc, 0xb3109ebf, 0xa0406d4b, 0x522bee48,
0x86e18aa3, 0x748a09a0, 0x67dafa54, 0x95b17957,
0xcba24573, 0x39c9c670, 0x2a993584, 0xd8f2b687,
0x0c38d26c, 0xfe53516f, 0xed03a29b, 0x1f682198,
0x5125dad3, 0xa34e59d0, 0xb01eaa24, 0x42752927,
0x96bf4dcc, 0x64d4cecf, 0x77843d3b, 0x85efbe38,
0xdbfc821c, 0x2997011f, 0x3ac7f2eb, 0xc8ac71e8,
0x1c661503, 0xee0d9600, 0xfd5d65f4, 0x0f36e6f7,
0x61c69362, 0x93ad1061, 0x80fde395, 0x72966096,
0xa65c047d, 0x5437877e, 0x4767748a, 0xb50cf789,
0xeb1fcbad, 0x197448ae, 0x0a24bb5a, 0xf84f3859,
0x2c855cb2, 0xdeeedfb1, 0xcdbe2c45, 0x3fd5af46,
0x7198540d, 0x83f3d70e, 0x90a324fa, 0x62c8a7f9,
0xb602c312, 0x44694011, 0x5739b3e5, 0xa55230e6,
0xfb410cc2, 0x092a8fc1, 0x1a7a7c35, 0xe811ff36,
0x3cdb9bdd, 0xceb018de, 0xdde0eb2a, 0x2f8b6829,
0x82f63b78, 0x709db87b, 0x63cd4b8f, 0x91a6c88c,
0x456cac67, 0xb7072f64, 0xa457dc90, 0x563c5f93,
0x082f63b7, 0xfa44e0b4, 0xe9141340, 0x1b7f9043,
0xcfb5f4a8, 0x3dde77ab, 0x2e8e845f, 0xdce5075c,
0x92a8fc17, 0x60c37f14, 0x73938ce0, 0x81f80fe3,
0x55326b08, 0xa759e80b, 0xb4091bff, 0x466298fc,
0x1871a4d8, 0xea1a27db, 0xf94ad42f, 0x0b21572c,
0xdfeb33c7, 0x2d80b0c4, 0x3ed04330, 0xccbbc033,
0xa24bb5a6, 0x502036a5, 0x4370c551, 0xb11b4652,
0x65d122b9, 0x97baa1ba, 0x84ea524e, 0x7681d14d,
0x2892ed69, 0xdaf96e6a, 0xc9a99d9e, 0x3bc21e9d,
0xef087a76, 0x1d63f975, 0x0e330a81, 0xfc588982,
0xb21572c9, 0x407ef1ca, 0x532e023e, 0xa145813d,
0x758fe5d6, 0x87e466d5, 0x94b49521, 0x66df1622,
0x38cc2a06, 0xcaa7a905, 0xd9f75af1, 0x2b9cd9f2,
0xff56bd19, 0x0d3d3e1a, 0x1e6dcdee, 0xec064eed,
0xc38d26c4, 0x31e6a5c7, 0x22b65633, 0xd0ddd530,
0x0417b1db, 0xf67c32d8, 0xe52cc12c, 0x1747422f,
0x49547e0b, 0xbb3ffd08, 0xa86f0efc, 0x5a048dff,
0x8ecee914, 0x7ca56a17, 0x6ff599e3, 0x9d9e1ae0,
0xd3d3e1ab, 0x21b862a8, 0x32e8915c, 0xc083125f,
0x144976b4, 0xe622f5b7, 0xf5720643, 0x07198540,
0x590ab964, 0xab613a67, 0xb831c993, 0x4a5a4a90,
0x9e902e7b, 0x6cfbad78, 0x7fab5e8c, 0x8dc0dd8f,
0xe330a81a, 0x115b2b19, 0x020bd8ed, 0xf0605bee,
0x24aa3f05, 0xd6c1bc06, 0xc5914ff2, 0x37faccf1,
0x69e9f0d5, 0x9b8273d6, 0x88d28022, 0x7ab90321,
0xae7367ca, 0x5c18e4c9, 0x4f48173d, 0xbd23943e,
0xf36e6f75, 0x0105ec76, 0x12551f82, 0xe03e9c81,
0x34f4f86a, 0xc69f7b69, 0xd5cf889d, 0x27a40b9e,
0x79b737ba, 0x8bdcb4b9, 0x988c474d, 0x6ae7c44e,
0xbe2da0a5, 0x4c4623a6, 0x5f16d052, 0xad7d5351
};

static String checksum(BytesReference bytes) {
final BytesRefIterator iterator = bytes.iterator();
BytesRef ref;
int crc = 0;
try {
while ((ref = iterator.next()) != null) {
final int limit = ref.length + ref.offset;
final byte[] buffer = ref.bytes;
for (int i = ref.offset; i < limit; i++) {
crc ^= 0xffffffff;
crc = ~((crc >>> 8) ^ CRC_TABLE[(crc ^ buffer[i]) & 0xff]);
}
}
} catch (IOException e) {
throw new AssertionError("impossible actual IO happens here", e);
}
return Base64.getEncoder().encodeToString(Numbers.intToBytes(crc));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
}

@Override
public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists);
}

@Override
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, bytes, failIfAlreadyExists);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -217,12 +218,33 @@ InputStream readBlob(String blobName, long position, long length) throws IOExcep

/**
* Writes a blob in the specific bucket
* @param inputStream content of the blob to be written
* @param bytes content of the blob to be written
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
if (bytes.length() > getLargeBlobThresholdInBytes()) {
// Compute crc32c here so #writeBlobResumable forces the integrity check on the resumable upload.
// This is needed since we rely on atomic write behavior when writing BytesReferences in BlobStoreRepository which is not
// guaranteed for resumable uploads.
writeBlobResumable(
BlobInfo.newBuilder(bucketName, blobName).setCrc32c(Crc32C.checksum(bytes)).build(),
bytes.streamInput(), bytes.length(), failIfAlreadyExists);
} else {
writeBlob(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists);
}
}

/**
* Writes a blob in the specific bucket
* @param inputStream content of the blob to be written
* @param blobSize expected size of the blob to be written
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
writeBlob(inputStream, blobSize, failIfAlreadyExists, BlobInfo.newBuilder(bucketName, blobName).build());
}

private void writeBlob(InputStream inputStream, long blobSize, boolean failIfAlreadyExists, BlobInfo blobInfo) throws IOException {
if (blobSize > getLargeBlobThresholdInBytes()) {
writeBlobResumable(blobInfo, inputStream, blobSize, failIfAlreadyExists);
} else {
Expand All @@ -235,6 +257,13 @@ long getLargeBlobThresholdInBytes() {
return LARGE_BLOB_THRESHOLD_BYTE_SIZE;
}

// possible options for #writeBlobResumable uploads
private static final Storage.BlobWriteOption[] NO_OVERWRITE_NO_CRC = {Storage.BlobWriteOption.doesNotExist()};
private static final Storage.BlobWriteOption[] OVERWRITE_NO_CRC = new Storage.BlobWriteOption[0];
private static final Storage.BlobWriteOption[] NO_OVERWRITE_CHECK_CRC =
{Storage.BlobWriteOption.doesNotExist(), Storage.BlobWriteOption.crc32cMatch()};
private static final Storage.BlobWriteOption[] OVERWRITE_CHECK_CRC = {Storage.BlobWriteOption.crc32cMatch()};

/**
* Uploads a blob using the "resumable upload" method (multiple requests, which
* can be independently retried in case of failure, see
Expand All @@ -252,8 +281,14 @@ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long
inputStream.mark(Integer.MAX_VALUE);
final byte[] buffer = new byte[size < bufferSize ? Math.toIntExact(size) : bufferSize];
StorageException storageException = null;
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
new Storage.BlobWriteOption[]{Storage.BlobWriteOption.doesNotExist()} : new Storage.BlobWriteOption[0];
final Storage.BlobWriteOption[] writeOptions;
if (blobInfo.getCrc32c() == null) {
// no crc32c, use options without checksum validation
writeOptions = failIfAlreadyExists ? NO_OVERWRITE_NO_CRC : OVERWRITE_NO_CRC;
} else {
// crc32c value is set so we use it by enabling checksum validation
writeOptions = failIfAlreadyExists ? NO_OVERWRITE_CHECK_CRC : OVERWRITE_CHECK_CRC;
}
for (int retry = 0; retry < 3; ++retry) {
try {
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.repositories.gcs;

import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.test.ESTestCase;

public class Crc32CTests extends ESTestCase {

public void testChecksum() {
assertEquals("AAAAAA==", Crc32C.checksum(BytesArray.EMPTY));
assertEquals("B8uf9g==", Crc32C.checksum(new BytesArray(new byte[100])));
final byte[] ascending = new byte[32];
for (int i = 0; i < 32; i++) {
ascending[i] = (byte) i;
}
assertEquals("Rt15Tg==", Crc32C.checksum(new BytesArray(ascending)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s";

private static final String UPLOADED_DATA_BLOB_PREFIX = "__";
public static final String UPLOADED_DATA_BLOB_PREFIX = "__";

// Expose a copy of URLRepository#TYPE here too, for a better error message until https://github.com/elastic/elasticsearch/issues/68918
// is resolved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;

Expand All @@ -29,6 +32,7 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URLDecoder;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -202,6 +206,11 @@ public void handle(final HttpExchange exchange) throws IOException {
blobs.put(blobName, BytesArray.EMPTY);

byte[] response = requestBody.utf8ToString().getBytes(UTF_8);
if (Paths.get(blobName).getFileName().toString().startsWith(BlobStoreRepository.UPLOADED_DATA_BLOB_PREFIX) == false) {
final Map<String, Object> parsedBody = XContentHelper.convertToMap(requestBody, false, XContentType.JSON).v2();
assert parsedBody.get("crc32c") != null :
"file [" + blobName + "] is not a data blob but did not come with a crc32c checksum";
}
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.getResponseHeaders().add("Location", httpServerUrl(exchange) + "/upload/storage/v1/b/" + bucket + "/o?"
+ "uploadType=resumable"
Expand Down

0 comments on commit 13fdb9a

Please sign in to comment.