Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use chunked upload for all put object. #595

Merged
merged 1 commit into from
Aug 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 212 additions & 0 deletions api/src/main/java/io/minio/ChunkedInputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Minio Java SDK for Amazon S3 Compatible Cloud Storage,
* (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.minio;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;

import org.joda.time.DateTime;

import io.minio.errors.InternalException;
import io.minio.errors.InsufficientDataException;


class ChunkedInputStream extends InputStream {
// Chunk size in chunked upload for PUT object is 64KiB
private static final int CHUNK_SIZE = 64 * 1024;
// Each chunk body should be like
// CHUNK_SIZE_IN_HEX_STRING + ";chunk-signature=" + SIGNATURE + "\r\n" + CHUNK_DATA + "\r\n"
// e.g. for 64KiB of chunk
// 10000;chunk-signature=ad80c730a21e5b8d04586a2213dd63b9a0e99e0e2307b0ade35a65485a288648\r\n<65536-bytes>\r\n
// From the above value, a full chunk size 65626 is by
// len(hex string of 64KiB) = 5 (+)
// len(;chunk-signature=ad80c730a21e5b8d04586a2213dd63b9a0e99e0e2307b0ade35a65485a288648\r\n) = 83 (+)
// <65536 bytes> = 65536 (+)
// len(\r\n) = 2
private static final int FULL_CHUNK_LEN = 65626;
// Data in last chunk might be less than 64KiB. In this case, ignoring variable length of chunk
// body components, remaining length is constant
private static final int CHUNK_SIGNATURE_METADATA_LEN = 85;
// As final additional chunk must be like
// 0;chunk-signature=b6c6ea8a5354eaf15b3cb7646744f4275b71ea724fed81ceb9323e279d449df9\r\n\r\n
// the length is 86
private static final int FINAL_ADDITIONAL_CHUNK_LEN = 1 + CHUNK_SIGNATURE_METADATA_LEN;

private InputStream inputStream;
private int streamSize;
private int length;
private DateTime date;
private String region;
private String secretKey;
private String prevSignature;

// Counter denotes how many bytes read from given input stream.
private int streamBytesRead = 0;
// Initialize to avoid findbugs warning.
private byte[] chunkBody = new byte[0];
private int chunkPos = 0;
private boolean isEof = false;
// Counter denotes how many bytes the consumer read from this stream.
private int bytesRead = 0;


/**
* Create new ChunkedInputStream for given input stream.
*/
public ChunkedInputStream(InputStream inputStream, int streamSize, DateTime date, String region, String secretKey,
String seedSignature) throws IOException {
this.inputStream = inputStream;
this.streamSize = streamSize;
this.date = date;
this.region = region;
this.secretKey = secretKey;
this.prevSignature = seedSignature;

// Calculate stream length.
int fullChunks = this.streamSize / CHUNK_SIZE;
this.length = fullChunks * FULL_CHUNK_LEN;
int lastChunkLen = this.streamSize % CHUNK_SIZE;
if (lastChunkLen > 0) {
this.length += Integer.toHexString(lastChunkLen).getBytes(StandardCharsets.UTF_8).length;
this.length += CHUNK_SIGNATURE_METADATA_LEN;
this.length += lastChunkLen;
}
this.length += FINAL_ADDITIONAL_CHUNK_LEN;
}


private int readData(byte[] buf) throws IOException {
if (this.isEof) {
return -1;
}

int pos = 0;
int len = buf.length;
int totalBytesRead = 0;
int bytesRead = 0;
while (totalBytesRead < buf.length) {
bytesRead = inputStream.read(buf, pos, len);
if (bytesRead < 0) {
this.isEof = true;
break;
}

totalBytesRead += bytesRead;
pos = bytesRead;
len = buf.length - totalBytesRead;
}

return totalBytesRead;
}


private void createChunkBody(byte[] chunk)
throws IOException, NoSuchAlgorithmException, InvalidKeyException, InsufficientDataException, InternalException {
String chunkSha256 = Digest.sha256Hash(chunk, chunk.length);
String signature = Signer.getChunkSignature(chunkSha256, this.date, this.region, this.secretKey,
this.prevSignature);

ByteArrayOutputStream os = new ByteArrayOutputStream();
// Add metadata.
os.write(Integer.toHexString(chunk.length).getBytes(StandardCharsets.UTF_8));
os.write(";chunk-signature=".getBytes(StandardCharsets.UTF_8));
os.write(signature.getBytes(StandardCharsets.UTF_8));
os.write("\r\n".getBytes(StandardCharsets.UTF_8));
// Add chunk data.
os.write(chunk, 0, chunk.length);
os.write("\r\n".getBytes(StandardCharsets.UTF_8));

this.chunkBody = os.toByteArray();
this.chunkPos = 0;
this.prevSignature = signature;
}


private int readChunk(int chunkSize)
throws IOException, NoSuchAlgorithmException, InvalidKeyException, InsufficientDataException, InternalException {
byte[] chunk = new byte[chunkSize];
int len = readData(chunk);
if (len < 0) {
return -1;
}

if (len != chunkSize) {
throw new InsufficientDataException("Insufficient data. read = " + len + " expected = " + chunkSize);
}

createChunkBody(chunk);
return this.chunkBody.length;
}


/**
* read single byte from chunk body.
*/
public int read() throws IOException {
if (this.bytesRead == this.length) {
// All chunks and final additional chunk are read.
// This means we have reached EOF.
return -1;
}

try {
// Read a chunk from given input stream when
// it is first chunk or all bytes in chunk body is read
if (this.streamBytesRead == 0 || this.chunkPos == this.chunkBody.length) {
// Check if there are data available to read from given input stream.
if (this.streamBytesRead != this.streamSize) {
// Send all data chunks.
int chunkSize = CHUNK_SIZE;
if (this.streamBytesRead + chunkSize > this.streamSize) {
chunkSize = this.streamSize - this.streamBytesRead;
}

if (readChunk(chunkSize) < 0) {
return -1;
}

this.streamBytesRead += chunkSize;
} else {
// Send final additional chunk to complete chunk upload.
byte[] chunk = new byte[0];
createChunkBody(chunk);
}
}

this.bytesRead++;
// Value must be between 0 to 255.
int value = this.chunkBody[this.chunkPos] & 0xFF;
this.chunkPos++;
return value;
} catch (NoSuchAlgorithmException | InvalidKeyException | InsufficientDataException | InternalException e) {
throw new IOException(e.getCause());
}
}


/**
* return length of data ChunkedInputStream supposes to produce.
*/
public int length() {
return this.length;
}
}
133 changes: 78 additions & 55 deletions api/src/main/java/io/minio/MinioClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -847,9 +847,9 @@ public boolean verify(String hostname, SSLSession session) {
private Request createRequest(Method method, String bucketName, String objectName,
String region, Map<String,String> headerMap,
Map<String,String> queryParamMap, final String contentType,
final Object body, final int length)
throws InvalidBucketNameException, NoSuchAlgorithmException, InsufficientDataException, IOException,
InternalException {
Object body, int length)
throws InvalidBucketNameException, NoSuchAlgorithmException, InvalidKeyException, InsufficientDataException,
IOException, InternalException {
if (bucketName == null && objectName != null) {
throw new InvalidBucketNameException(NULL_STRING, "null bucket name for object '" + objectName + "'");
}
Expand Down Expand Up @@ -904,57 +904,8 @@ private Request createRequest(Method method, String bucketName, String objectNam

HttpUrl url = urlBuilder.build();

RequestBody requestBody = null;
if (body != null) {
requestBody = new RequestBody() {
@Override
public MediaType contentType() {
MediaType mediaType = null;

if (contentType != null) {
mediaType = MediaType.parse(contentType);
}
if (mediaType == null) {
mediaType = MediaType.parse("application/octet-stream");
}

return mediaType;
}

@Override
public long contentLength() {
if (body instanceof InputStream || body instanceof RandomAccessFile || body instanceof byte[]) {
return length;
}

if (length == 0) {
return -1;
} else {
return length;
}
}

@Override
public void writeTo(BufferedSink sink) throws IOException {
if (body instanceof InputStream) {
InputStream stream = (InputStream) body;
sink.write(Okio.source(stream), length);
} else if (body instanceof RandomAccessFile) {
RandomAccessFile file = (RandomAccessFile) body;
sink.write(Okio.source(Channels.newInputStream(file.getChannel())), length);
} else if (body instanceof byte[]) {
byte[] data = (byte[]) body;
sink.write(data, 0, length);
} else {
sink.writeUtf8(body.toString());
}
}
};
}

Request.Builder requestBuilder = new Request.Builder();
requestBuilder.url(url);
requestBuilder.method(method.toString(), requestBody);
if (headerMap != null) {
for (Map.Entry<String,String> entry : headerMap.entrySet()) {
requestBuilder.header(entry.getKey(), entry.getValue());
Expand All @@ -963,9 +914,16 @@ public void writeTo(BufferedSink sink) throws IOException {

String sha256Hash = null;
String md5Hash = null;
boolean chunkedUpload = false;
if (this.accessKey != null && this.secretKey != null) {
// Fix issue #415: No need to compute sha256 if endpoint scheme is HTTPS.
if (url.isHttps()) {
// Handle putobject specially to use chunked upload.
if (method == Method.PUT && objectName != null && body != null && body instanceof InputStream && length > 0) {
sha256Hash = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD";
requestBuilder.header("Content-Encoding", "aws-chunked");
requestBuilder.header("x-amz-decoded-content-length", Integer.toString(length));
chunkedUpload = true;
} else if (url.isHttps()) {
// Fix issue #415: No need to compute sha256 if endpoint scheme is HTTPS.
sha256Hash = "UNSIGNED-PAYLOAD";
if (body != null) {
md5Hash = Digest.md5Hash(body, length);
Expand All @@ -989,7 +947,7 @@ public void writeTo(BufferedSink sink) throws IOException {
}
}
} else {
// Fix issue #567: Compute MD5 hash only of anonymous access.
// Fix issue #567: Compute MD5 hash only for anonymous access.
if (body != null) {
md5Hash = Digest.md5Hash(body, length);
}
Expand All @@ -1010,6 +968,71 @@ public void writeTo(BufferedSink sink) throws IOException {
DateTime date = new DateTime();
requestBuilder.header("x-amz-date", date.toString(DateFormat.AMZ_DATE_FORMAT));

if (chunkedUpload) {
// Add empty request body for calculating seed signature.
// The actual request body is properly set below.
requestBuilder.method(method.toString(), RequestBody.create(null, ""));
Request request = requestBuilder.build();
String seedSignature = Signer.getChunkSeedSignature(request, region, secretKey);
requestBuilder = request.newBuilder();

ChunkedInputStream cis = new ChunkedInputStream((InputStream) body, length, date, region, this.secretKey,
seedSignature);
body = cis;
length = cis.length();
}

RequestBody requestBody = null;
if (body != null) {
final Object data = body;
final int len = length;
requestBody = new RequestBody() {
@Override
public MediaType contentType() {
MediaType mediaType = null;

if (contentType != null) {
mediaType = MediaType.parse(contentType);
}
if (mediaType == null) {
mediaType = MediaType.parse("application/octet-stream");
}

return mediaType;
}

@Override
public long contentLength() {
if (data instanceof InputStream || data instanceof RandomAccessFile || data instanceof byte[]) {
return len;
}

if (len == 0) {
return -1;
} else {
return len;
}
}

@Override
public void writeTo(BufferedSink sink) throws IOException {
if (data instanceof InputStream) {
InputStream stream = (InputStream) data;
sink.write(Okio.source(stream), len);
} else if (data instanceof RandomAccessFile) {
RandomAccessFile file = (RandomAccessFile) data;
sink.write(Okio.source(Channels.newInputStream(file.getChannel())), len);
} else if (data instanceof byte[]) {
byte[] bytes = (byte[]) data;
sink.write(bytes, 0, len);
} else {
sink.writeUtf8(data.toString());
}
}
};
}

requestBuilder.method(method.toString(), requestBody);
return requestBuilder.build();
}

Expand Down
Loading