Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Initial Preview of Transfer Manager #2105

Merged
merged 29 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
57fa6d5
feat: Initial Classes and Interface for Transfer Manager (#1874)
sydney-munro Feb 2, 2023
233579d
chore: Add null check based of TransferStatus to Download/Upload Resu…
sydney-munro Feb 4, 2023
0fa8d57
refactor: Move TransferManagerConfig to own file and Add Parallel Upl…
sydney-munro Feb 9, 2023
3e89b1f
chore: merge main into transfer-manager branch (#1930)
sydney-munro Mar 7, 2023
375dd7b
Revert "chore: merge main into transfer-manager branch (#1930)"
sydney-munro Mar 7, 2023
fa57e3a
Merge branch 'main' into feat/transfer-manager
sydney-munro Mar 7, 2023
01d7de9
feat: First Pass Implementation of UploadMany (#1922)
sydney-munro Mar 15, 2023
fedbd9c
feat: First pass downloadMany (#1945)
sydney-munro Mar 23, 2023
47dad11
feat: Apply BlobWriteOptions to UploadMany operations (#1950)
sydney-munro Mar 28, 2023
abc6ec4
fix: Review session updates (#1964)
sydney-munro Apr 5, 2023
63e6d52
feat: Change Prefix to Download Directory (#1975)
sydney-munro Apr 12, 2023
b99a67d
feat: Implement chunkedDownload and refactor direct downloads. (#1990)
sydney-munro May 4, 2023
a656793
Merge branch 'main' into feat/transfer-manager
sydney-munro May 5, 2023
cbade06
feat: Error Handling and Transfer Status assignment FAILED_TO_START (…
sydney-munro May 24, 2023
ad72bc8
Merge branch 'main' into feat/transfer-manager
sydney-munro May 30, 2023
cf10576
feat: Error handling for Downloads (#2043)
sydney-munro Jun 6, 2023
b34b05e
fix: validateBlob failure fallsback to direct download (#2055)
sydney-munro Jun 9, 2023
12a7a6f
feat: Exceptions for Chunked Downloads and Precondition Failure tests…
sydney-munro Jun 13, 2023
c19049d
feat: Resolve API futures on getDownload/UploadResults calls (#2064)
sydney-munro Jun 14, 2023
437385d
test: Success and failure combined tests (#2066)
sydney-munro Jun 14, 2023
d503e34
chore: Add BetaApi Annotation to public classes and methods (#2067)
sydney-munro Jun 16, 2023
4ea18d9
Merge branch 'main' into feat/transfer-manager
sydney-munro Jun 20, 2023
45d142a
feat: Add validation around bytes received vs bytes expected (#2078)
sydney-munro Jun 21, 2023
6769a2b
fix: update UploadCallable to use createFrom to avoid NPE trying to r…
sydney-munro Jun 23, 2023
63d8ed3
docs: Javadocs for TransferManager interface and ParallelUploadConfig…
sydney-munro Jun 26, 2023
0362e80
docs: Javadocs for remainder of Transfer Manager (#2097)
sydney-munro Jun 29, 2023
e57e633
chore: add gcloud-tm/ entry to useragent for TransferManager (#2104)
BenWhitehead Jul 5, 2023
97e5d18
chore: move PackagePrivateMethodWorkarounds.java back to test folder
sydney-munro Jul 6, 2023
09ac019
revert copyright update
sydney-munro Jul 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage.transfermanager;

import com.google.cloud.ReadChannel;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.cloud.storage.StorageException;
import com.google.common.io.ByteStreams;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Callable;

final class ChunkedDownloadCallable implements Callable<DownloadSegment> {

private final BlobInfo originalBlob;

private final Storage storage;

private final Storage.BlobSourceOption[] opts;

private final long startPosition;

private final long endPosition;
private final Path destPath;

ChunkedDownloadCallable(
Storage storage,
BlobInfo originalBlob,
BlobSourceOption[] opts,
Path destPath,
long startPosition,
long endPosition) {
this.originalBlob = originalBlob;
this.storage = storage;
this.opts = opts;
this.startPosition = startPosition;
this.endPosition = endPosition;
this.destPath = destPath;
}

@Override
public DownloadSegment call() {
long bytesCopied = -1L;
try (ReadChannel rc = storage.reader(originalBlob.getBlobId(), opts)) {
FileChannel wc =
FileChannel.open(destPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
rc.seek(startPosition);
rc.limit(endPosition);
wc.position(startPosition);
bytesCopied = ByteStreams.copy(rc, wc);
long bytesExpected = endPosition - startPosition;
if (bytesCopied != bytesExpected) {
return DownloadSegment.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH)
.setException(
new StorageException(
0,
"Unexpected end of stream, read "
+ bytesCopied
+ " expected "
+ bytesExpected
+ " from object "
+ originalBlob.getBlobId().toGsUtilUriWithGeneration()))
.build();
}
} catch (Exception e) {
if (bytesCopied == -1) {
return DownloadSegment.newBuilder(originalBlob, TransferStatus.FAILED_TO_START)
.setException(e)
.build();
}
return DownloadSegment.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH)
.setException(e)
.build();
}
DownloadSegment result =
DownloadSegment.newBuilder(originalBlob, TransferStatus.SUCCESS)
.setOutputDestination(destPath)
.build();
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage.transfermanager;

final class DefaultQos implements Qos {

private final long divideAndConquerThreshold;

private DefaultQos(long divideAndConquerThreshold) {
this.divideAndConquerThreshold = divideAndConquerThreshold;
}

@Override
public boolean divideAndConquer(long objectSize) {
return objectSize > divideAndConquerThreshold;
}

static DefaultQos of() {
return of(128L * 1024 * 1024);
}

static DefaultQos of(long divideAndConquerThreshold) {
return new DefaultQos(divideAndConquerThreshold);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage.transfermanager;

import com.google.cloud.ReadChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.cloud.storage.StorageException;
import com.google.common.io.ByteStreams;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Callable;

final class DirectDownloadCallable implements Callable<DownloadResult> {
private final BlobInfo originalBlob;

private final ParallelDownloadConfig parallelDownloadConfig;
private final Storage storage;

private final Storage.BlobSourceOption[] opts;

DirectDownloadCallable(
Storage storage,
BlobInfo originalBlob,
ParallelDownloadConfig parallelDownloadConfig,
BlobSourceOption[] opts) {
this.originalBlob = originalBlob;
this.parallelDownloadConfig = parallelDownloadConfig;
this.storage = storage;
this.opts = opts;
}

@Override
public DownloadResult call() {
Path path = TransferManagerUtils.createDestPath(parallelDownloadConfig, originalBlob);
long bytesCopied = -1L;
try (ReadChannel rc =
storage.reader(
BlobId.of(parallelDownloadConfig.getBucketName(), originalBlob.getName()), opts)) {
FileChannel wc =
FileChannel.open(
path,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);
bytesCopied = ByteStreams.copy(rc, wc);
if (originalBlob.getSize() != null) {
if (bytesCopied != originalBlob.getSize()) {
return DownloadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH)
.setException(
new StorageException(
0,
"Unexpected end of stream, read "
+ bytesCopied
+ " expected "
+ originalBlob.getSize()
+ " from object "
+ originalBlob.getBlobId().toGsUtilUriWithGeneration()))
.build();
}
}
} catch (Exception e) {
if (bytesCopied == -1) {
return DownloadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_START)
.setException(e)
.build();
}
return DownloadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH)
.setException(e)
.build();
}
DownloadResult result =
DownloadResult.newBuilder(originalBlob, TransferStatus.SUCCESS)
.setOutputDestination(path.toAbsolutePath())
.build();
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage.transfermanager;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
* A parallel download job sent to Transfer Manager.
*
* @see Builder
*/
@BetaApi
public final class DownloadJob {

@NonNull private final List<ApiFuture<DownloadResult>> downloadResults;

@NonNull private final ParallelDownloadConfig parallelDownloadConfig;

private DownloadJob(
@NonNull List<ApiFuture<DownloadResult>> downloadResults,
@NonNull ParallelDownloadConfig parallelDownloadConfig) {
this.downloadResults = downloadResults;
this.parallelDownloadConfig = parallelDownloadConfig;
}

/**
* The list of {@link DownloadResult DownloadResults} for each download request Transfer Manager
* executed for this job. Note calling this method will block the invoking thread until all
* download requests are complete.
*
* @see Builder#setDownloadResults(List)
*/
@BetaApi
public @NonNull List<DownloadResult> getDownloadResults() {
return ApiExceptions.callAndTranslateApiException(ApiFutures.allAsList(downloadResults));
}

/**
* The {@link ParallelDownloadConfig} used for this DownloadJob.
*
* @see Builder#setParallelDownloadConfig(ParallelDownloadConfig)
*/
@BetaApi
public @NonNull ParallelDownloadConfig getParallelDownloadConfig() {
return parallelDownloadConfig;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DownloadJob)) {
return false;
}
DownloadJob that = (DownloadJob) o;
return downloadResults.equals(that.downloadResults)
&& parallelDownloadConfig.equals(that.parallelDownloadConfig);
}

@Override
public int hashCode() {
return Objects.hash(downloadResults, parallelDownloadConfig);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("downloadResults", downloadResults)
.add("parallelDownloadConfig", parallelDownloadConfig)
.toString();
}

@BetaApi
public static Builder newBuilder() {
return new Builder();
}

/**
* Builds an instance of DownloadJob
*
* @see DownloadJob
*/
@BetaApi
public static final class Builder {

private @NonNull List<ApiFuture<DownloadResult>> downloadResults;
private @MonotonicNonNull ParallelDownloadConfig parallelDownloadConfig;

private Builder() {
this.downloadResults = ImmutableList.of();
}

/**
* Sets the results for a DownloadJob being performed by Transfer Manager.
*
* @return the instance of the Builder with DownloadResults modified.
* @see DownloadJob#getDownloadResults()
*/
@BetaApi
public Builder setDownloadResults(@NonNull List<ApiFuture<DownloadResult>> downloadResults) {
this.downloadResults = ImmutableList.copyOf(downloadResults);
return this;
}

/**
* Sets the {@link ParallelDownloadConfig} used for this DownloadJob.
*
* @return the instance of the Builder with ParallelDownloadConfig modified.
* @see DownloadJob#getParallelDownloadConfig()
*/
@BetaApi
public Builder setParallelDownloadConfig(
@NonNull ParallelDownloadConfig parallelDownloadConfig) {
this.parallelDownloadConfig = parallelDownloadConfig;
return this;
}

/**
* Creates a DownloadJob object.
*
* @return {@link DownloadJob}
*/
@BetaApi
public DownloadJob build() {
checkNotNull(downloadResults);
checkNotNull(parallelDownloadConfig);
return new DownloadJob(downloadResults, parallelDownloadConfig);
}
}
}
Loading