diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadCallable.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadCallable.java index 7a73d99b5..286ea8775 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadCallable.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadCallable.java @@ -19,27 +19,34 @@ import com.google.cloud.ReadChannel; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobSourceOption; import com.google.cloud.storage.StorageException; import com.google.common.io.ByteStreams; +import java.io.File; import java.io.IOException; import java.nio.channels.FileChannel; -import java.nio.file.Paths; +import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.concurrent.Callable; -public class DownloadCallable implements Callable { - private final TransferManagerConfig transferManagerConfig; +final class DownloadCallable implements Callable { private final BlobInfo originalBlob; private final ParallelDownloadConfig parallelDownloadConfig; + private final Storage storage; - public DownloadCallable( - TransferManagerConfig transferManagerConfig, + private final Storage.BlobSourceOption[] opts; + + DownloadCallable( + Storage storage, BlobInfo originalBlob, - ParallelDownloadConfig parallelDownloadConfig) { - this.transferManagerConfig = transferManagerConfig; + ParallelDownloadConfig parallelDownloadConfig, + BlobSourceOption[] opts) { this.originalBlob = originalBlob; this.parallelDownloadConfig = parallelDownloadConfig; + this.storage = storage; + this.opts = opts; } @Override @@ -49,31 +56,39 @@ public DownloadResult call() throws Exception { } private DownloadResult downloadWithoutChunking() { - try (ReadChannel rc = - transferManagerConfig - .getStorageOptions() - .getService() - .reader( - originalBlob.getBlobId(), - parallelDownloadConfig - .getOptionsPerRequest() - .toArray(new Storage.BlobSourceOption[0]))) { + Path path = createDestPath(); + try (ReadChannel rc = storage.reader(originalBlob.getBlobId(), opts)) { FileChannel destFile = - FileChannel.open(Paths.get(createDestPath()), StandardOpenOption.WRITE); + FileChannel.open( + path, + StandardOpenOption.WRITE, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING); ByteStreams.copy(rc, destFile); } catch (IOException e) { throw new StorageException(e); } DownloadResult result = DownloadResult.newBuilder(originalBlob, TransferStatus.SUCCESS) - .setOutputDestination(Paths.get(createDestPath())) + .setOutputDestination(path) .build(); return result; } - private String createDestPath() { - return originalBlob - .getName() - .replaceFirst(parallelDownloadConfig.getStripPrefix(), parallelDownloadConfig.getPrefix()); + private Path createDestPath() { + File newFile = + new File( + originalBlob + .getName() + .replaceFirst( + parallelDownloadConfig.getStripPrefix(), parallelDownloadConfig.getPrefix())); + // Check to make sure the parent directories exist + if (Files.exists(newFile.getParentFile().toPath())) { + return newFile.toPath(); + } else { + // Make parent directories if they do not exist + newFile.getParentFile().mkdirs(); + return newFile.toPath(); + } } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadJob.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadJob.java index ca67eb849..af2d8c703 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadJob.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadJob.java @@ -18,28 +18,28 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.api.core.ApiFuture; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import java.util.List; import java.util.Objects; -import java.util.concurrent.Future; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.NonNull; public final class DownloadJob { - @NonNull private final List> downloadResults; + @NonNull private final List> downloadResults; @NonNull private final ParallelDownloadConfig parallelDownloadConfig; private DownloadJob( - @NonNull List> successResponses, + @NonNull List> successResponses, @NonNull ParallelDownloadConfig parallelDownloadConfig) { this.downloadResults = successResponses; this.parallelDownloadConfig = parallelDownloadConfig; } - public List> getDownloadResults() { + public List> getDownloadResults() { return downloadResults; } @@ -79,14 +79,14 @@ public static Builder newBuilder() { public static final class Builder { - private @NonNull List> downloadResults; + private @NonNull List> downloadResults; private @MonotonicNonNull ParallelDownloadConfig parallelDownloadConfig; private Builder() { this.downloadResults = ImmutableList.of(); } - public Builder setDownloadResults(@NonNull List> downloadResults) { + public Builder setDownloadResults(@NonNull List> downloadResults) { this.downloadResults = ImmutableList.copyOf(downloadResults); return this; } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelDownloadConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelDownloadConfig.java index c3135e3b7..0418498ef 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelDownloadConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelDownloadConfig.java @@ -43,18 +43,27 @@ private ParallelDownloadConfig( this.optionsPerRequest = optionsPerRequest; } + /** + * A common prefix that is removed from downloaded object's name before written to the filesystem + */ public @NonNull String getStripPrefix() { return stripPrefix; } + /** + * A common prefix that is applied to downloaded objects before they are written to the + * filesystem. + */ public @NonNull String getPrefix() { return prefix; } + /** The bucket objects are being downloaded from */ public @NonNull String getBucketName() { return bucketName; } + /** A list of common BlobSourceOptions that are used for each download request */ public @NonNull List getOptionsPerRequest() { return optionsPerRequest; } @@ -103,6 +112,7 @@ public static final class Builder { private Builder() { this.stripPrefix = ""; this.prefix = ""; + this.bucketName = ""; this.optionsPerRequest = ImmutableList.of(); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelUploadConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelUploadConfig.java index 421a9e689..ce30ee393 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelUploadConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelUploadConfig.java @@ -48,22 +48,27 @@ private ParallelUploadConfig( this.writeOptsPerRequest = writeOptsPerRequest; } + /** If a corresponding object already exists skip uploading the object */ public boolean isSkipIfExists() { return skipIfExists; } + /** A common prefix that will be applied to all object paths in the destination bucket */ public @NonNull String getPrefix() { return prefix; } + /** The bucket objects are being uploaded from */ public @NonNull String getBucketName() { return bucketName; } + /** A list of common BlobTargetOptions that are used for each upload request */ public @NonNull List getTargetOptsPerRequest() { return targetOptsPerRequest; } + /** A list of common BlobWriteOptions that are used for each upload request */ public @NonNull List getWriteOptsPerRequest() { return writeOptsPerRequest; } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerConfig.java index b3f6a6b4c..086f4f1cd 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerConfig.java @@ -38,22 +38,33 @@ public final class TransferManagerConfig { this.storageOptions = storageOptions; } + /** Maximum amount of workers to be allocated to perform work in Transfer Manager */ public int getMaxWorkers() { return maxWorkers; } + /** Buffer size allowed to each worker */ public int getPerWorkerBufferSize() { return perWorkerBufferSize; } + /** + * Whether to allow Transfer Manager to performing chunked Uploads/Downloads if it determines + * chunking will be beneficial + */ public boolean isAllowChunking() { return allowChunking; } + /** Storage options that Transfer Manager will use to interact with GCS */ public StorageOptions getStorageOptions() { return storageOptions; } + public TransferManager getService() { + return new TransferManagerImpl(this); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -97,8 +108,6 @@ public static class Builder { private StorageOptions storageOptions; private Builder() { - // TODO: add default values - // bufferSize tbd? this.perWorkerBufferSize = 16 * 1024 * 1024; this.maxWorkers = 2 * Runtime.getRuntime().availableProcessors(); this.allowChunking = false; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java index 1c44d00ba..a90646222 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java @@ -16,53 +16,72 @@ package com.google.cloud.storage.transfermanager; +import com.google.api.core.ApiFuture; +import com.google.api.core.ListenableFutureToApiFuture; import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.checkerframework.checker.nullness.qual.NonNull; -public final class TransferManagerImpl implements TransferManager { +final class TransferManagerImpl implements TransferManager { private final TransferManagerConfig transferManagerConfig; - private final ExecutorService executor; + private final ListeningExecutorService executor; + private final Storage storage; - public TransferManagerImpl(TransferManagerConfig transferManagerConfig) { + TransferManagerImpl(TransferManagerConfig transferManagerConfig) { this.transferManagerConfig = transferManagerConfig; - this.executor = Executors.newFixedThreadPool(transferManagerConfig.getMaxWorkers()); + this.executor = + MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(transferManagerConfig.getMaxWorkers())); + + this.storage = transferManagerConfig.getStorageOptions().getService(); } @Override - public @NonNull UploadJob uploadFiles(List files, ParallelUploadConfig opts) { - List> uploadTasks = new ArrayList<>(); + public @NonNull UploadJob uploadFiles(List files, ParallelUploadConfig config) { + Storage.BlobWriteOption[] opts = + config.getWriteOptsPerRequest().toArray(new BlobWriteOption[0]); + List> uploadTasks = new ArrayList<>(); for (Path file : files) { if (Files.isDirectory(file)) throw new IllegalStateException("Directories are not supported"); - String blobName = TransferManagerUtils.createBlobName(opts, file); - BlobInfo blobInfo = BlobInfo.newBuilder(opts.getBucketName(), blobName).build(); - UploadCallable callable = new UploadCallable(transferManagerConfig, blobInfo, file, opts); - uploadTasks.add(executor.submit(callable)); + String blobName = TransferManagerUtils.createBlobName(config, file); + BlobInfo blobInfo = BlobInfo.newBuilder(config.getBucketName(), blobName).build(); + UploadCallable callable = + new UploadCallable(transferManagerConfig, storage, blobInfo, file, config, opts); + uploadTasks.add(convert(executor.submit(callable))); } return UploadJob.newBuilder() - .setParallelUploadConfig(opts) + .setParallelUploadConfig(config) .setUploadResponses(ImmutableList.copyOf(uploadTasks)) .build(); } @Override - public @NonNull DownloadJob downloadBlobs(List blobs, ParallelDownloadConfig opts) { - List> downloadTasks = new ArrayList<>(); + public @NonNull DownloadJob downloadBlobs(List blobs, ParallelDownloadConfig config) { + Storage.BlobSourceOption[] opts = + config.getOptionsPerRequest().toArray(new Storage.BlobSourceOption[0]); + List> downloadTasks = new ArrayList<>(); for (BlobInfo blob : blobs) { - DownloadCallable callable = new DownloadCallable(transferManagerConfig, blob, opts); - downloadTasks.add(executor.submit(callable)); + DownloadCallable callable = new DownloadCallable(storage, blob, config, opts); + downloadTasks.add(convert(executor.submit(callable))); } return DownloadJob.newBuilder() .setDownloadResults(downloadTasks) - .setParallelDownloadConfig(opts) + .setParallelDownloadConfig(config) .build(); } + + private static ApiFuture convert(ListenableFuture lf) { + return new ListenableFutureToApiFuture<>(lf); + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerUtils.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerUtils.java index 4f55fbb94..071e5fc3b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerUtils.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerUtils.java @@ -18,9 +18,11 @@ import java.nio.file.Path; -public class TransferManagerUtils { +final class TransferManagerUtils { - public static String createBlobName(ParallelUploadConfig config, Path file) { + private TransferManagerUtils() {} + + static String createBlobName(ParallelUploadConfig config, Path file) { if (config.getPrefix().isEmpty()) { return file.toString(); } else { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java index 82610523c..ebbb8d090 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java @@ -20,19 +20,17 @@ import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.PackagePrivateMethodWorkarounds; import com.google.cloud.storage.Storage; -import com.google.cloud.storage.StorageException; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.nio.file.Files; +import com.google.cloud.storage.Storage.BlobWriteOption; +import com.google.common.io.ByteStreams; +import java.nio.channels.FileChannel; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.Optional; import java.util.concurrent.Callable; -public class UploadCallable implements Callable { +final class UploadCallable implements Callable { private final TransferManagerConfig transferManagerConfig; + private final Storage storage; private final BlobInfo originalBlob; @@ -40,15 +38,21 @@ public class UploadCallable implements Callable { private final ParallelUploadConfig parallelUploadConfig; + private final Storage.BlobWriteOption[] opts; + public UploadCallable( TransferManagerConfig transferManagerConfig, + Storage storage, BlobInfo originalBlob, Path sourceFile, - ParallelUploadConfig parallelUploadConfig) { + ParallelUploadConfig parallelUploadConfig, + BlobWriteOption[] opts) { this.transferManagerConfig = transferManagerConfig; + this.storage = storage; this.originalBlob = originalBlob; this.sourceFile = sourceFile; this.parallelUploadConfig = parallelUploadConfig; + this.opts = opts; } public UploadResult call() throws Exception { @@ -56,41 +60,22 @@ public UploadResult call() throws Exception { return uploadWithoutChunking(); } - private UploadResult uploadWithoutChunking() throws IOException { - Optional newBlob; - WriteChannel wc = - transferManagerConfig - .getStorageOptions() - .getService() - .writer( - originalBlob, - parallelUploadConfig - .getWriteOptsPerRequest() - .toArray(new Storage.BlobWriteOption[0])); + private UploadResult uploadWithoutChunking() { try { - InputStream inputStream = Files.newInputStream(sourceFile); - uploadHelper(Channels.newChannel(inputStream), wc); - } catch (IOException e) { - throw new StorageException(e); - } finally { - wc.close(); - } - newBlob = PackagePrivateMethodWorkarounds.maybeGetBlobInfoFunction().apply(wc); - UploadResult result = - UploadResult.newBuilder(originalBlob, TransferStatus.SUCCESS) - .setUploadedBlob(newBlob.get()) - .build(); - return result; - } - - private void uploadHelper(ReadableByteChannel reader, WriteChannel writer) throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(transferManagerConfig.getPerWorkerBufferSize()); - writer.setChunkSize(transferManagerConfig.getPerWorkerBufferSize()); - - while (reader.read(buffer) >= 0) { - buffer.flip(); - writer.write(buffer); - buffer.clear(); + Optional newBlob; + try (FileChannel r = FileChannel.open(sourceFile, StandardOpenOption.READ); + WriteChannel w = storage.writer(originalBlob, opts)) { + w.setChunkSize(transferManagerConfig.getPerWorkerBufferSize()); + ByteStreams.copy(r, w); + newBlob = PackagePrivateMethodWorkarounds.maybeGetBlobInfoFunction().apply(w); + } + return UploadResult.newBuilder(originalBlob, TransferStatus.SUCCESS) + .setUploadedBlob(newBlob.get()) + .build(); + } catch (Exception e) { + return UploadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH) + .setException(e) + .build(); } } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadJob.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadJob.java index 009b0bc55..fb9cbc73b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadJob.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadJob.java @@ -18,28 +18,28 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.api.core.ApiFuture; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import java.util.List; import java.util.Objects; -import java.util.concurrent.Future; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.NonNull; public final class UploadJob { - @NonNull private final List> uploadResponses; + @NonNull private final List> uploadResponses; @NonNull private final ParallelUploadConfig parallelUploadConfig; private UploadJob( - @NonNull List> successResponses, + @NonNull List> successResponses, @NonNull ParallelUploadConfig parallelUploadConfig) { this.uploadResponses = successResponses; this.parallelUploadConfig = parallelUploadConfig; } - public List> getUploadResponses() { + public List> getUploadResponses() { return uploadResponses; } @@ -79,7 +79,7 @@ public static Builder newBuilder() { public static final class Builder { - private @NonNull List> uploadResponses; + private @NonNull List> uploadResponses; private @MonotonicNonNull ParallelUploadConfig parallelUploadConfig; @@ -87,7 +87,7 @@ private Builder() { this.uploadResponses = ImmutableList.of(); } - public Builder setUploadResponses(@NonNull List> uploadResponses) { + public Builder setUploadResponses(@NonNull List> uploadResponses) { this.uploadResponses = ImmutableList.copyOf(uploadResponses); return this; } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadResult.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadResult.java index 2f8b91869..f53c0eb9a 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadResult.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadResult.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkState; import com.google.cloud.storage.BlobInfo; -import com.google.cloud.storage.StorageException; import com.google.common.base.MoreObjects; import java.util.Objects; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -31,13 +30,13 @@ public final class UploadResult { @NonNull private final BlobInfo input; @NonNull private final TransferStatus status; @MonotonicNonNull private final BlobInfo uploadedBlob; - @MonotonicNonNull private final StorageException exception; + @MonotonicNonNull private final Exception exception; private UploadResult( @NonNull BlobInfo input, @NonNull TransferStatus status, BlobInfo uploadedBlob, - StorageException exception) { + Exception exception) { this.input = input; this.status = status; this.uploadedBlob = uploadedBlob; @@ -60,7 +59,7 @@ private UploadResult( return uploadedBlob; } - public @NonNull StorageException getException() { + public @NonNull Exception getException() { checkState( status == TransferStatus.FAILED_TO_START || status == TransferStatus.FAILED_TO_FINISH, "getException() is only valid when an unexpected error has occurred but status was %s", @@ -107,7 +106,7 @@ public static final class Builder { private @NonNull BlobInfo input; private @NonNull TransferStatus status; private @MonotonicNonNull BlobInfo uploadedBlob; - private @MonotonicNonNull StorageException exception; + private @MonotonicNonNull Exception exception; private Builder(@NonNull BlobInfo input, @NonNull TransferStatus status) { this.input = input; @@ -129,7 +128,7 @@ public Builder setUploadedBlob(@NonNull BlobInfo uploadedBlob) { return this; } - public Builder setException(@NonNull StorageException exception) { + public Builder setException(@NonNull Exception exception) { this.exception = exception; return this; } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java index 092be7f8e..a16f02daf 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java @@ -18,6 +18,7 @@ import static com.google.common.truth.Truth.assertThat; +import com.google.api.core.ApiFutures; import com.google.cloud.WriteChannel; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; @@ -33,19 +34,22 @@ import com.google.cloud.storage.it.runner.annotations.Inject; import com.google.cloud.storage.it.runner.registry.Generator; import com.google.cloud.storage.transfermanager.DownloadJob; +import com.google.cloud.storage.transfermanager.DownloadResult; import com.google.cloud.storage.transfermanager.ParallelDownloadConfig; import com.google.cloud.storage.transfermanager.ParallelUploadConfig; import com.google.cloud.storage.transfermanager.TransferManager; import com.google.cloud.storage.transfermanager.TransferManagerConfig; -import com.google.cloud.storage.transfermanager.TransferManagerImpl; import com.google.cloud.storage.transfermanager.UploadJob; +import com.google.cloud.storage.transfermanager.UploadResult; import com.google.common.collect.ImmutableList; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutionException; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -92,9 +96,9 @@ public void setUp() throws Exception { } @Test - public void uploadFiles() throws IOException { + public void uploadFiles() throws IOException, ExecutionException, InterruptedException { TransferManagerConfig config = TransferManagerConfig.newBuilder().setMaxWorkers(1).build(); - TransferManager transferManager = new TransferManagerImpl(config); + TransferManager transferManager = config.getService(); try (TmpFile tmpFile = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize); TmpFile tmpFile1 = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize); TmpFile tmpFile2 = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize)) { @@ -104,14 +108,15 @@ public void uploadFiles() throws IOException { ParallelUploadConfig parallelUploadConfig = ParallelUploadConfig.newBuilder().setBucketName(bucketName).build(); UploadJob job = transferManager.uploadFiles(files, parallelUploadConfig); - assertThat(job.getUploadResponses()).hasSize(3); + List uploadResults = ApiFutures.allAsList(job.getUploadResponses()).get(); + assertThat(uploadResults).hasSize(3); } } @Test - public void uploadFilesWithOpts() throws IOException { + public void uploadFilesWithOpts() throws IOException, ExecutionException, InterruptedException { TransferManagerConfig config = TransferManagerConfig.newBuilder().setMaxWorkers(1).build(); - TransferManager transferManager = new TransferManagerImpl(config); + TransferManager transferManager = config.getService(); try (TmpFile tmpFile = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize); TmpFile tmpFile1 = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize); TmpFile tmpFile2 = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize)) { @@ -124,18 +129,29 @@ public void uploadFilesWithOpts() throws IOException { .setWriteOptsPerRequest(Collections.singletonList(BlobWriteOption.doesNotExist())) .build(); UploadJob job = transferManager.uploadFiles(files, parallelUploadConfig); - assertThat(job.getUploadResponses()).hasSize(3); + List uploadResults = ApiFutures.allAsList(job.getUploadResponses()).get(); + assertThat(uploadResults).hasSize(3); } } @Test - public void downloadBlobs() throws IOException { + public void downloadBlobs() throws IOException, ExecutionException, InterruptedException { TransferManagerConfig config = TransferManagerConfig.newBuilder().setMaxWorkers(1).build(); - TransferManager transferManager = new TransferManagerImpl(config); + TransferManager transferManager = config.getService(); String bucketName = bucket.getName(); ParallelDownloadConfig parallelDownloadConfig = ParallelDownloadConfig.newBuilder().setBucketName(bucketName).build(); DownloadJob job = transferManager.downloadBlobs(blobs, parallelDownloadConfig); - assertThat(job.getDownloadResults()).hasSize(3); + List downloadResults = ApiFutures.allAsList(job.getDownloadResults()).get(); + assertThat(downloadResults).hasSize(3); + cleanUpFiles(downloadResults); + } + + private void cleanUpFiles(List results) throws IOException { + // Cleanup downloaded blobs and the parent directory + for (DownloadResult res : results) { + Files.delete(res.getOutputDestination()); + Files.delete(res.getOutputDestination().getParent()); + } } }