Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure single shot upload #6950

Merged
merged 4 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/storage/azure-storage-blob/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release History

## 12.2.0-beta.2 (Unreleased)
- Added a field to ParallelTransferOptions that allows customers to configure the maximum size to upload in a single PUT. Data sizes larger than this value will be chunked and parallelized.

## 12.2.0-beta.1 (2019-12-17)
- Added SAS generation methods on clients to improve discoverability and convenience of sas. Deprecated setContainerName, setBlobName, setSnapshotId, generateSasQueryParameters methods on BlobServiceSasSignatureValues to direct users to using the methods added on clients.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@
* Docs</a> for more information.
*/
public class BlobAsyncClient extends BlobAsyncClientBase {
private static final int CHUNKED_UPLOAD_REQUIREMENT = 4 * Constants.MB;

/**
* The block size to use if none is specified in parallel operations.
*/
Expand Down Expand Up @@ -325,7 +323,8 @@ public Mono<Response<BlockBlobItem>> uploadWithResponse(Flux<ByteBuffer> data,
.addProgressReporting(stream, validatedParallelTransferOptions.getProgressReceiver()),
length, headers, metadata, tier, null, validatedRequestConditions);

return determineUploadFullOrChunked(data, uploadInChunksFunction, uploadFullBlobMethod);
return determineUploadFullOrChunked(data, validatedParallelTransferOptions, uploadInChunksFunction,
uploadFullBlobMethod);
} catch (RuntimeException ex) {
return monoError(logger, ex);
}
Expand Down Expand Up @@ -392,6 +391,7 @@ as we can guarantee we only need at most two buffers for any call to write (two
}

private Mono<Response<BlockBlobItem>> determineUploadFullOrChunked(final Flux<ByteBuffer> data,
ParallelTransferOptions parallelTransferOptions,
final Function<Flux<ByteBuffer>, Mono<Response<BlockBlobItem>>> uploadInChunks,
final BiFunction<Flux<ByteBuffer>, Long, Mono<Response<BlockBlobItem>>> uploadFullBlob) {
final long[] bufferedDataSize = {0};
Expand All @@ -408,12 +408,12 @@ private Mono<Response<BlockBlobItem>> determineUploadFullOrChunked(final Flux<By
return data
.filter(ByteBuffer::hasRemaining)
.windowUntil(buffer -> {
if (bufferedDataSize[0] > CHUNKED_UPLOAD_REQUIREMENT) {
if (bufferedDataSize[0] > parallelTransferOptions.getMaxSingleUploadSize()) {
return false;
} else {
bufferedDataSize[0] += buffer.remaining();

if (bufferedDataSize[0] > CHUNKED_UPLOAD_REQUIREMENT) {
if (bufferedDataSize[0] > parallelTransferOptions.getMaxSingleUploadSize()) {
return true;
} else {
/*
Expand All @@ -428,13 +428,13 @@ private Mono<Response<BlockBlobItem>> determineUploadFullOrChunked(final Flux<By
return false;
}
}
/*
* Use cutBefore = true as we want to window all data under 4MB into one window.
* Set the prefetch to CHUNKED_UPLOAD_REQUIREMENT in the case that there are numerous tiny buffers,
* windowUntil uses a default limit of 256 and once that is hit it will trigger onComplete which causes
* downstream issues.
*/
}, true, CHUNKED_UPLOAD_REQUIREMENT)
/*
* Use cutBefore = true as we want to window all data under 4MB into one window.
* Set the prefetch to the maxSingleUploadSize in the case that there are numerous tiny buffers,
* windowUntil uses a default limit of 256 and once that is hit it will trigger onComplete which causes
* downstream issues.
*/
}, true, parallelTransferOptions.getMaxSingleUploadSize())
.buffer(2)
.next()
.flatMap(fluxes -> {
Expand Down Expand Up @@ -500,7 +500,7 @@ public Mono<Void> uploadFromFile(String filePath, boolean overwrite) {

// Note that if the file will be uploaded using a putBlob, we also can skip the exists check.
if (!overwrite) {
if (uploadInBlocks(filePath)) {
if (uploadInBlocks(filePath, BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES)) {
overwriteCheck = exists().flatMap(exists -> exists
? monoError(logger, new IllegalArgumentException(Constants.BLOB_ALREADY_EXISTS))
: Mono.empty());
Expand Down Expand Up @@ -538,6 +538,8 @@ public Mono<Void> uploadFromFile(String filePath, boolean overwrite) {
public Mono<Void> uploadFromFile(String filePath, ParallelTransferOptions parallelTransferOptions,
BlobHttpHeaders headers, Map<String, String> metadata, AccessTier tier,
BlobRequestConditions requestConditions) {
final ParallelTransferOptions finalParallelTransferOptions =
ModelHelper.populateAndApplyDefaults(parallelTransferOptions);
try {
return Mono.using(() -> uploadFileResourceSupplier(filePath),
channel -> {
Expand All @@ -546,8 +548,8 @@ public Mono<Void> uploadFromFile(String filePath, ParallelTransferOptions parall
long fileSize = channel.size();

// If the file is larger than 256MB chunk it and stage it as blocks.
if (uploadInBlocks(filePath)) {
return uploadBlocks(fileSize, parallelTransferOptions, headers, metadata, tier,
if (uploadInBlocks(filePath, finalParallelTransferOptions.getMaxSingleUploadSize())) {
return uploadBlocks(fileSize, finalParallelTransferOptions, headers, metadata, tier,
requestConditions, channel, blockBlobAsyncClient);
} else {
// Otherwise we know it can be sent in a single request reducing network overhead.
Expand All @@ -564,11 +566,11 @@ public Mono<Void> uploadFromFile(String filePath, ParallelTransferOptions parall
}
}

boolean uploadInBlocks(String filePath) {
boolean uploadInBlocks(String filePath, Integer maxSingleUploadSize) {
AsynchronousFileChannel channel = uploadFileResourceSupplier(filePath);
boolean retVal;
try {
retVal = channel.size() > 256 * Constants.MB;
retVal = channel.size() > maxSingleUploadSize;
} catch (IOException e) {
throw logger.logExceptionAsError(new UncheckedIOException(e));
} finally {
Expand All @@ -581,25 +583,24 @@ boolean uploadInBlocks(String filePath) {
private Mono<Void> uploadBlocks(long fileSize, ParallelTransferOptions parallelTransferOptions,
BlobHttpHeaders headers, Map<String, String> metadata, AccessTier tier,
BlobRequestConditions requestConditions, AsynchronousFileChannel channel, BlockBlobAsyncClient client) {
final ParallelTransferOptions finalParallelTransferOptions =
ModelHelper.populateAndApplyDefaults(parallelTransferOptions);
final BlobRequestConditions finalRequestConditions = (requestConditions == null)
? new BlobRequestConditions() : requestConditions;
// parallelTransferOptions are finalized in the calling method.

// See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong.
AtomicLong totalProgress = new AtomicLong();
Lock progressLock = new ReentrantLock();

final SortedMap<Long, String> blockIds = new TreeMap<>();
return Flux.fromIterable(sliceFile(fileSize, finalParallelTransferOptions.getBlockSize(),
return Flux.fromIterable(sliceFile(fileSize, parallelTransferOptions.getBlockSize(),
parallelTransferOptions))
.flatMap(chunk -> {
String blockId = getBlockID();
blockIds.put(chunk.getOffset(), blockId);

Flux<ByteBuffer> progressData = ProgressReporter.addParallelProgressReporting(
FluxUtil.readFile(channel, chunk.getOffset(), chunk.getCount()),
finalParallelTransferOptions.getProgressReceiver(), progressLock, totalProgress);
parallelTransferOptions.getProgressReceiver(), progressLock, totalProgress);

return client.stageBlockWithResponse(blockId, progressData, chunk.getCount(), null,
finalRequestConditions.getLeaseId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void uploadFromFile(String filePath, boolean overwrite) {

if (!overwrite) {
// Note we only want to make the exists call if we will be uploading in stages. Otherwise it is superfluous.
if (client.uploadInBlocks(filePath) && exists()) {
if (client.uploadInBlocks(filePath, BlockBlobClient.MAX_UPLOAD_BLOB_BYTES) && exists()) {
throw logger.logExceptionAsError(new IllegalArgumentException(Constants.BLOB_ALREADY_EXISTS));
}
requestConditions = new BlobRequestConditions().setIfNoneMatch(Constants.HeaderConstants.ETAG_WILDCARD);
Expand All @@ -157,7 +157,7 @@ public void uploadFromFile(String filePath, boolean overwrite) {
* Creates a new block blob, or updates the content of an existing block blob.
* <p>
* To avoid overwriting, pass "*" to {@link BlobRequestConditions#setIfNoneMatch(String)}.
*
*
* <p><strong>Code Samples</strong></p>
*
* {@codesnippet com.azure.storage.blob.BlobClient.uploadFromFile#String-ParallelTransferOptions-BlobHttpHeaders-Map-AccessTier-BlobRequestConditions-Duration}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;

import java.util.regex.Pattern;

Expand All @@ -31,6 +32,8 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO
: other.getBlockSize(),
other.getNumBuffers() == null ? Integer.valueOf(BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS)
: other.getNumBuffers(),
other.getProgressReceiver());
other.getProgressReceiver(),
other.getMaxSingleUploadSize() == null ? Integer.valueOf(BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES)
: other.getMaxSingleUploadSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public final class ParallelTransferOptions {
private final Integer blockSize;
private final Integer numBuffers;
private final ProgressReceiver progressReceiver;
private final Integer maxSingleUploadSize;

/**
* Creates a new {@link ParallelTransferOptions} with default parameters applied.
Expand All @@ -35,6 +36,33 @@ public final class ParallelTransferOptions {
* @param progressReceiver {@link ProgressReceiver}
*/
public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver) {
this(blockSize, numBuffers, progressReceiver, null);
}

/**
* Creates a new {@link ParallelTransferOptions} with default parameters applied.
*
* @param blockSize The block size.
* For upload, The block size is the size of each block that will be staged. This value also determines the number
* of requests that need to be made. If block size is large, upload will make fewer network calls, but each
* individual call will send more data and will therefore take longer. This parameter also determines the size
* that each buffer uses when buffering is required and consequently amount of memory consumed by such methods may
* be up to blockSize * numBuffers.
* @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method
* should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the
* number of buffers, the more parallel, and thus faster, the upload portion of this operation will be.
* The amount of memory consumed by methods using this value may be up to blockSize * numBuffers.
* @param progressReceiver {@link ProgressReceiver}
* @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a
* single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be
* ignored. Some constraints to consider are that more requests cost more, but several small or mid-sized requests
* may sometimes perform better. In the case of buffered upload, up to this amount of data may be buffered before
* any data is sent. Must be greater than 0. May be null to accept default behavior, which is the maximum value the
* service accepts for uploading in a single requests and is represented by
* {@link BlockBlobAsyncClient#MAX_UPLOAD_BLOB_BYTES}.
*/
public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver,
Integer maxSingleUploadSize) {
if (blockSize != null) {
StorageImplUtils.assertInBounds("blockSize", blockSize, 1, BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES);
}
Expand All @@ -45,6 +73,12 @@ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressRe
}
this.numBuffers = numBuffers;
this.progressReceiver = progressReceiver;

if (maxSingleUploadSize != null) {
StorageImplUtils.assertInBounds("maxSingleUploadSize", maxSingleUploadSize, 1,
BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES);
}
this.maxSingleUploadSize = maxSingleUploadSize;
}

/**
Expand All @@ -65,9 +99,17 @@ public Integer getNumBuffers() {

/**
* Gets the Progress receiver for parallel reporting
* @return the progress reporter
* @return The progress reporter
*/
public ProgressReceiver getProgressReceiver() {
return this.progressReceiver;
}

/**
* Gets the value above which the upload will be broken into blocks and parallelized.
* @return The threshold value.
*/
public Integer getMaxSingleUploadSize() {
return this.maxSingleUploadSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,5 @@ class ProgressReporterTest extends APISpec {
0 * mockReceiver.reportProgress({ it > 60 })
}

// See TransferManagerTest for network tests of the parallel ProgressReporter.
// TODO (rickle-msft): See TransferManagerTest for network tests of the parallel ProgressReporter.
}
Loading