Skip to content

Commit

Permalink
Storage InputStream and OutputStream (Azure#5455)
Browse files Browse the repository at this point in the history
  • Loading branch information
sima-zhu authored Oct 4, 2019
1 parent 3de3bbc commit 3c37147
Show file tree
Hide file tree
Showing 16 changed files with 982 additions and 506 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@
<Match>
<Or>
<Class name="com.azure.storage.blob.specialized.BlobInputStream"/>
<Class name="com.azure.storage.file.StorageFileInputStream"/>
<Class name="com.azure.storage.blob.specialized.BlobOutputStream$AppendBlobOutputStream"/>
<Class name="com.azure.storage.queue.QueueServiceClient"/>
</Or>
Expand Down Expand Up @@ -623,4 +624,11 @@
RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE,
UPM_UNCALLED_PRIVATE_METHOD"/>
</Match>


<!-- The field is read by parent class StorageOutputStrem methods. -->
<Match>
<Class name="com.azure.storage.file.StorageFileOutputStream"/>
<Bug pattern="URF_UNREAD_FIELD"/>
</Match>
</FindBugsFilter>

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT License.
package com.azure.storage.blob.specialized;

import com.azure.storage.blob.BlobAsyncClient;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.models.AppendBlobAccessConditions;
import com.azure.storage.blob.models.AppendPositionAccessConditions;
import com.azure.storage.blob.models.BlobAccessConditions;
Expand All @@ -11,153 +11,42 @@
import com.azure.storage.blob.models.PageRange;
import com.azure.storage.blob.models.StorageException;
import com.azure.storage.common.SR;
import com.azure.storage.common.StorageOutputStream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.NonNull;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.UUID;

public abstract class BlobOutputStream extends OutputStream {
/*
* Holds the write threshold of number of bytes to buffer prior to dispatching a write. For block blob this is the
* block size, for page blob this is the Page commit size.
*/
int writeThreshold;
public abstract class BlobOutputStream extends StorageOutputStream {

/*
* Holds the last exception this stream encountered.
*/
volatile IOException lastError;
BlobOutputStream(final int writeThreshold) {
super(writeThreshold);
}

static BlobOutputStream appendBlobOutputStream(final AppendBlobAsyncClient client,
final AppendBlobAccessConditions appendBlobAccessConditions) {
final AppendBlobAccessConditions appendBlobAccessConditions) {
return new AppendBlobOutputStream(client, appendBlobAccessConditions);
}

static BlobOutputStream blockBlobOutputStream(final BlockBlobAsyncClient client,
final BlobAccessConditions accessConditions) {
final BlobAccessConditions accessConditions) {
return new BlockBlobOutputStream(client, accessConditions);
}

static BlobOutputStream pageBlobOutputStream(final PageBlobAsyncClient client, final long length,
final BlobAccessConditions accessConditions) {
return new PageBlobOutputStream(client, length, accessConditions);
static BlobOutputStream pageBlobOutputStream(final PageBlobAsyncClient client, final PageRange pageRange,
final BlobAccessConditions accessConditions) {
return new PageBlobOutputStream(client, pageRange, accessConditions);
}

abstract Mono<Void> dispatchWrite(byte[] data, int writeLength, long offset);

abstract void commit();

/**
* Writes the data to the buffer and triggers writes to the service as needed.
*
* @param data A <code>byte</code> array which represents the data to write.
* @param offset An <code>int</code> which represents the start offset in the data.
* @param length An <code>int</code> which represents the number of bytes to write.
* @throws IOException If an I/O error occurs. In particular, an IOException may be thrown if the output stream has
* been closed.
*/
private void writeInternal(final byte[] data, int offset, int length) {
int chunks = (int) (Math.ceil((double) length / (double) this.writeThreshold));
Flux.range(0, chunks).map(c -> offset + c * this.writeThreshold)
.concatMap(pos -> processChunk(data, pos, offset, length))
.then()
.block();
}

private Mono<Void> processChunk(byte[] data, int position, int offset, int length) {
int chunkLength = this.writeThreshold;

if (position + chunkLength > offset + length) {
chunkLength = offset + length - position;
}

// Flux<ByteBuffer> chunkData = new ByteBufferStreamFromByteArray(data, writeThreshold, position, chunkLength);
return dispatchWrite(data, chunkLength, position - offset)
.doOnError(t -> {
if (t instanceof IOException) {
lastError = (IOException) t;
} else {
lastError = new IOException(t);
}
});
}

/**
* Helper function to check if the stream is faulted, if it is it surfaces the exception.
*
* @throws IOException If an I/O error occurs. In particular, an IOException may be thrown if the output stream has
* been closed.
*/
private void checkStreamState() throws IOException {
if (this.lastError != null) {
throw this.lastError;
}
}

/**
* Flushes this output stream and forces any buffered output bytes to be written out. If any data remains in the
* buffer it is committed to the service.
*
* @throws IOException If an I/O error occurs.
*/
@Override
public void flush() throws IOException {
this.checkStreamState();
}

/**
* Writes <code>b.length</code> bytes from the specified byte array to this output stream.
* <p>
*
* @param data A <code>byte</code> array which represents the data to write.
*/
@Override
public void write(@NonNull final byte[] data) {
this.write(data, 0, data.length);
}

/**
* Writes length bytes from the specified byte array starting at offset to this output stream.
* <p>
*
* @param data A <code>byte</code> array which represents the data to write.
* @param offset An <code>int</code> which represents the start offset in the data.
* @param length An <code>int</code> which represents the number of bytes to write.
* @throws IndexOutOfBoundsException If {@code offset} or {@code length} are less than {@code 0} or {@code offset}
* plus {@code length} is greater than the {@code data} length.
*/
@Override
public void write(@NonNull final byte[] data, final int offset, final int length) {
if (offset < 0 || length < 0 || length > data.length - offset) {
throw new IndexOutOfBoundsException();
}

this.writeInternal(data, offset, length);
}

/**
* Writes the specified byte to this output stream. The general contract for write is that one byte is written to
* the output stream. The byte to be written is the eight low-order bits of the argument b. The 24 high-order bits
* of b are ignored.
* <p>
* <code>true</code> is acceptable for you.
*
* @param byteVal An <code>int</code> which represents the bye value to write.
*/
@Override
public void write(final int byteVal) {
this.write(new byte[]{(byte) (byteVal & 0xFF)});
}

/**
* Closes this output stream and releases any system resources associated with this stream. If any data remains in
* the buffer it is committed to the service.
Expand Down Expand Up @@ -193,9 +82,9 @@ private static final class AppendBlobOutputStream extends BlobOutputStream {
private final AppendBlobAsyncClient client;

private AppendBlobOutputStream(final AppendBlobAsyncClient client,
final AppendBlobAccessConditions appendBlobAccessConditions) {
final AppendBlobAccessConditions appendBlobAccessConditions) {
super(AppendBlobClient.MAX_APPEND_BLOCK_BYTES);
this.client = client;
this.writeThreshold = BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE;
this.appendBlobAccessConditions = appendBlobAccessConditions;

if (appendBlobAccessConditions != null) {
Expand Down Expand Up @@ -226,7 +115,7 @@ private Mono<Void> appendBlock(Flux<ByteBuffer> blockData, long offset, long wri
}

@Override
Mono<Void> dispatchWrite(byte[] data, int writeLength, long offset) {
protected Mono<Void> dispatchWrite(byte[] data, int writeLength, long offset) {
if (writeLength == 0) {
return Mono.empty();
}
Expand All @@ -244,7 +133,7 @@ Mono<Void> dispatchWrite(byte[] data, int writeLength, long offset) {
Flux<ByteBuffer> fbb = Flux.range(0, 1)
.concatMap(pos -> Mono.fromCallable(() -> ByteBuffer.wrap(data, (int) offset, writeLength)));

return this.appendBlock(fbb.subscribeOn(Schedulers.elastic()), offset, writeLength);
return this.appendBlock(fbb.subscribeOn(Schedulers.elastic()), this.initialBlobOffset, writeLength);
}

@Override
Expand All @@ -260,11 +149,11 @@ private static final class BlockBlobOutputStream extends BlobOutputStream {
private final BlockBlobAsyncClient client;

private BlockBlobOutputStream(final BlockBlobAsyncClient client, final BlobAccessConditions accessConditions) {
super(BlockBlobClient.MAX_STAGE_BLOCK_BYTES);
this.client = client;
this.accessConditions = accessConditions;
this.blockIdPrefix = UUID.randomUUID().toString() + '-';
this.blockList = new ArrayList<>();
this.writeThreshold = BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE;
}

/**
Expand All @@ -291,7 +180,7 @@ private Mono<Void> writeBlock(Flux<ByteBuffer> blockData, String blockId, long w
}

@Override
Mono<Void> dispatchWrite(byte[] data, int writeLength, long offset) {
protected Mono<Void> dispatchWrite(byte[] data, int writeLength, long offset) {
if (writeLength == 0) {
return Mono.empty();
}
Expand All @@ -315,13 +204,16 @@ synchronized void commit() {
}

private static final class PageBlobOutputStream extends BlobOutputStream {
private final ClientLogger logger = new ClientLogger(PageBlobOutputStream.class);
private final PageBlobAsyncClient client;
private final PageBlobAccessConditions pageBlobAccessConditions;
private final PageRange pageRange;

private PageBlobOutputStream(final PageBlobAsyncClient client, final long length,
final BlobAccessConditions blobAccessConditions) {
private PageBlobOutputStream(final PageBlobAsyncClient client, final PageRange pageRange,
final BlobAccessConditions blobAccessConditions) {
super(PageBlobClient.MAX_PUT_PAGES_BYTES);
this.client = client;
this.writeThreshold = (int) Math.min(BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE, length);
this.pageRange = pageRange;

if (blobAccessConditions != null) {
this.pageBlobAccessConditions = new PageBlobAccessConditions()
Expand All @@ -332,8 +224,8 @@ private PageBlobOutputStream(final PageBlobAsyncClient client, final long length
}
}

private Mono<Void> writePages(Flux<ByteBuffer> pageData, long offset, long writeLength) {
return client.uploadPagesWithResponse(new PageRange().setStart(offset).setEnd(offset + writeLength - 1),
private Mono<Void> writePages(Flux<ByteBuffer> pageData, int length, long offset) {
return client.uploadPagesWithResponse(new PageRange().setStart(offset).setEnd(offset + length - 1),
pageData, pageBlobAccessConditions)
.then()
.onErrorResume(t -> t instanceof StorageException, e -> {
Expand All @@ -343,7 +235,7 @@ private Mono<Void> writePages(Flux<ByteBuffer> pageData, long offset, long write
}

@Override
Mono<Void> dispatchWrite(byte[] data, int writeLength, long offset) {
protected Mono<Void> dispatchWrite(byte[] data, int writeLength, long offset) {
if (writeLength == 0) {
return Mono.empty();
}
Expand All @@ -356,7 +248,13 @@ Mono<Void> dispatchWrite(byte[] data, int writeLength, long offset) {
Flux<ByteBuffer> fbb = Flux.range(0, 1)
.concatMap(pos -> Mono.fromCallable(() -> ByteBuffer.wrap(data, (int) offset, writeLength)));

return this.writePages(fbb.subscribeOn(Schedulers.elastic()), offset, writeLength);
long pageOffset = pageRange.getStart();
if (pageOffset + writeLength - 1 > pageRange.getEnd()) {
throw logger.logExceptionAsError(
new RuntimeException("The input data length is larger than the page range."));
}
pageRange.setStart(pageRange.getStart() + writeLength);
return this.writePages(fbb.subscribeOn(Schedulers.elastic()), writeLength, pageOffset);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,28 +69,30 @@ public final class PageBlobClient extends BlobClientBase {
* Creates and opens an output stream to write data to the page blob. If the blob already exists on the service, it
* will be overwritten.
*
* @param length A <code>long</code> which represents the length, in bytes, of the stream to create. This value must
* be a multiple of 512.
* @param pageRange A {@link PageRange} object. Given that pages must be aligned with 512-byte boundaries, the start
* offset must be a modulus of 512 and the end offset must be a modulus of 512 - 1. Examples of valid byte ranges
* are 0-511, 512-1023, etc.
* @return A {@link BlobOutputStream} object used to write data to the blob.
* @throws StorageException If a storage service error occurred.
*/
public BlobOutputStream getBlobOutputStream(long length) {
return getBlobOutputStream(length, null);
public BlobOutputStream getBlobOutputStream(PageRange pageRange) {
return getBlobOutputStream(pageRange, null);
}

/**
* Creates and opens an output stream to write data to the page blob. If the blob already exists on the service, it
* will be overwritten.
*
* @param length A <code>long</code> which represents the length, in bytes, of the stream to create. This value must
* be a multiple of 512.
* @param pageRange A {@link PageRange} object. Given that pages must be aligned with 512-byte boundaries, the start
* offset must be a modulus of 512 and the end offset must be a modulus of 512 - 1. Examples of valid byte ranges
* are 0-511, 512-1023, etc.
* @param accessConditions A {@link BlobAccessConditions} object that represents the access conditions for the
* blob.
* @return A {@link BlobOutputStream} object used to write data to the blob.
* @throws StorageException If a storage service error occurred.
*/
public BlobOutputStream getBlobOutputStream(long length, BlobAccessConditions accessConditions) {
return BlobOutputStream.pageBlobOutputStream(pageBlobAsyncClient, length, accessConditions);
public BlobOutputStream getBlobOutputStream(PageRange pageRange, BlobAccessConditions accessConditions) {
return BlobOutputStream.pageBlobOutputStream(pageBlobAsyncClient, pageRange, accessConditions);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.azure.storage.blob


import com.azure.storage.blob.models.PageRange
import com.azure.storage.common.Constants
import spock.lang.Ignore
import spock.lang.Requires

class BlobOutputStreamTest extends APISpec {
Expand Down Expand Up @@ -33,7 +32,7 @@ class BlobOutputStreamTest extends APISpec {


when:
def outputStream = pageBlobClient.getBlobOutputStream(data.length)
def outputStream = pageBlobClient.getBlobOutputStream(new PageRange().setStart(0).setEnd(16 * Constants.MB - 1))
outputStream.write(data)
outputStream.close()

Expand All @@ -42,7 +41,7 @@ class BlobOutputStreamTest extends APISpec {
}

// Test is failing, need to investigate.
@Ignore
@Requires({ liveMode() })
def "AppendBlob output stream"() {
setup:
def data = getRandomByteArray(4 * FOUR_MB)
Expand All @@ -52,7 +51,7 @@ class BlobOutputStreamTest extends APISpec {
when:
def outputStream = appendBlobClient.getBlobOutputStream()
for (int i = 0; i != 4; i++) {
outputStream.write(Arrays.copyOfRange(data, i * FOUR_MB, ((i + 1) * FOUR_MB) - 1))
outputStream.write(Arrays.copyOfRange(data, i * FOUR_MB, ((i + 1) * FOUR_MB)))
}
outputStream.close()

Expand Down
Loading

0 comments on commit 3c37147

Please sign in to comment.