Skip to content

Commit

Permalink
fix: Review session updates (#1964)
Browse files Browse the repository at this point in the history
* Adding documentation to Parallel Upload/Download configs

* Add documentation to Transfer Manager Config

* Review Session Comments Addressed

* Fix directory issue with downloadBlobs test

---------

Co-authored-by: BenWhitehead <[email protected]>
  • Loading branch information
sydney-munro and BenWhitehead committed Apr 5, 2023
1 parent 47dad11 commit abc6ec4
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,34 @@
import com.google.cloud.ReadChannel;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.cloud.storage.StorageException;
import com.google.common.io.ByteStreams;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Callable;

public class DownloadCallable implements Callable<DownloadResult> {
private final TransferManagerConfig transferManagerConfig;
final class DownloadCallable implements Callable<DownloadResult> {
private final BlobInfo originalBlob;

private final ParallelDownloadConfig parallelDownloadConfig;
private final Storage storage;

public DownloadCallable(
TransferManagerConfig transferManagerConfig,
private final Storage.BlobSourceOption[] opts;

DownloadCallable(
Storage storage,
BlobInfo originalBlob,
ParallelDownloadConfig parallelDownloadConfig) {
this.transferManagerConfig = transferManagerConfig;
ParallelDownloadConfig parallelDownloadConfig,
BlobSourceOption[] opts) {
this.originalBlob = originalBlob;
this.parallelDownloadConfig = parallelDownloadConfig;
this.storage = storage;
this.opts = opts;
}

@Override
Expand All @@ -49,31 +56,39 @@ public DownloadResult call() throws Exception {
}

private DownloadResult downloadWithoutChunking() {
try (ReadChannel rc =
transferManagerConfig
.getStorageOptions()
.getService()
.reader(
originalBlob.getBlobId(),
parallelDownloadConfig
.getOptionsPerRequest()
.toArray(new Storage.BlobSourceOption[0]))) {
Path path = createDestPath();
try (ReadChannel rc = storage.reader(originalBlob.getBlobId(), opts)) {
FileChannel destFile =
FileChannel.open(Paths.get(createDestPath()), StandardOpenOption.WRITE);
FileChannel.open(
path,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);
ByteStreams.copy(rc, destFile);
} catch (IOException e) {
throw new StorageException(e);
}
DownloadResult result =
DownloadResult.newBuilder(originalBlob, TransferStatus.SUCCESS)
.setOutputDestination(Paths.get(createDestPath()))
.setOutputDestination(path)
.build();
return result;
}

private String createDestPath() {
return originalBlob
.getName()
.replaceFirst(parallelDownloadConfig.getStripPrefix(), parallelDownloadConfig.getPrefix());
private Path createDestPath() {
File newFile =
new File(
originalBlob
.getName()
.replaceFirst(
parallelDownloadConfig.getStripPrefix(), parallelDownloadConfig.getPrefix()));
// Check to make sure the parent directories exist
if (Files.exists(newFile.getParentFile().toPath())) {
return newFile.toPath();
} else {
// Make parent directories if they do not exist
newFile.getParentFile().mkdirs();
return newFile.toPath();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,28 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.core.ApiFuture;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Future;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;

public final class DownloadJob {

@NonNull private final List<Future<DownloadResult>> downloadResults;
@NonNull private final List<ApiFuture<DownloadResult>> downloadResults;

@NonNull private final ParallelDownloadConfig parallelDownloadConfig;

private DownloadJob(
@NonNull List<Future<DownloadResult>> successResponses,
@NonNull List<ApiFuture<DownloadResult>> successResponses,
@NonNull ParallelDownloadConfig parallelDownloadConfig) {
this.downloadResults = successResponses;
this.parallelDownloadConfig = parallelDownloadConfig;
}

public List<Future<DownloadResult>> getDownloadResults() {
public List<ApiFuture<DownloadResult>> getDownloadResults() {
return downloadResults;
}

Expand Down Expand Up @@ -79,14 +79,14 @@ public static Builder newBuilder() {

public static final class Builder {

private @NonNull List<Future<DownloadResult>> downloadResults;
private @NonNull List<ApiFuture<DownloadResult>> downloadResults;
private @MonotonicNonNull ParallelDownloadConfig parallelDownloadConfig;

private Builder() {
this.downloadResults = ImmutableList.of();
}

public Builder setDownloadResults(@NonNull List<Future<DownloadResult>> downloadResults) {
public Builder setDownloadResults(@NonNull List<ApiFuture<DownloadResult>> downloadResults) {
this.downloadResults = ImmutableList.copyOf(downloadResults);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,27 @@ private ParallelDownloadConfig(
this.optionsPerRequest = optionsPerRequest;
}

/**
* A common prefix that is removed from downloaded object's name before written to the filesystem
*/
public @NonNull String getStripPrefix() {
return stripPrefix;
}

/**
* A common prefix that is applied to downloaded objects before they are written to the
* filesystem.
*/
public @NonNull String getPrefix() {
return prefix;
}

/** The bucket objects are being downloaded from */
public @NonNull String getBucketName() {
return bucketName;
}

/** A list of common BlobSourceOptions that are used for each download request */
public @NonNull List<BlobSourceOption> getOptionsPerRequest() {
return optionsPerRequest;
}
Expand Down Expand Up @@ -103,6 +112,7 @@ public static final class Builder {
private Builder() {
this.stripPrefix = "";
this.prefix = "";
this.bucketName = "";
this.optionsPerRequest = ImmutableList.of();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,27 @@ private ParallelUploadConfig(
this.writeOptsPerRequest = writeOptsPerRequest;
}

/** If a corresponding object already exists skip uploading the object */
public boolean isSkipIfExists() {
return skipIfExists;
}

/** A common prefix that will be applied to all object paths in the destination bucket */
public @NonNull String getPrefix() {
return prefix;
}

/** The bucket objects are being uploaded from */
public @NonNull String getBucketName() {
return bucketName;
}

/** A list of common BlobTargetOptions that are used for each upload request */
public @NonNull List<BlobTargetOption> getTargetOptsPerRequest() {
return targetOptsPerRequest;
}

/** A list of common BlobWriteOptions that are used for each upload request */
public @NonNull List<BlobWriteOption> getWriteOptsPerRequest() {
return writeOptsPerRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,33 @@ public final class TransferManagerConfig {
this.storageOptions = storageOptions;
}

/** Maximum amount of workers to be allocated to perform work in Transfer Manager */
public int getMaxWorkers() {
return maxWorkers;
}

/** Buffer size allowed to each worker */
public int getPerWorkerBufferSize() {
return perWorkerBufferSize;
}

/**
* Whether to allow Transfer Manager to performing chunked Uploads/Downloads if it determines
* chunking will be beneficial
*/
public boolean isAllowChunking() {
return allowChunking;
}

/** Storage options that Transfer Manager will use to interact with GCS */
public StorageOptions getStorageOptions() {
return storageOptions;
}

public TransferManager getService() {
return new TransferManagerImpl(this);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -97,8 +108,6 @@ public static class Builder {
private StorageOptions storageOptions;

private Builder() {
// TODO: add default values
// bufferSize tbd?
this.perWorkerBufferSize = 16 * 1024 * 1024;
this.maxWorkers = 2 * Runtime.getRuntime().availableProcessors();
this.allowChunking = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,72 @@

package com.google.cloud.storage.transfermanager;

import com.google.api.core.ApiFuture;
import com.google.api.core.ListenableFutureToApiFuture;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.checkerframework.checker.nullness.qual.NonNull;

public final class TransferManagerImpl implements TransferManager {
final class TransferManagerImpl implements TransferManager {

private final TransferManagerConfig transferManagerConfig;
private final ExecutorService executor;
private final ListeningExecutorService executor;
private final Storage storage;

public TransferManagerImpl(TransferManagerConfig transferManagerConfig) {
TransferManagerImpl(TransferManagerConfig transferManagerConfig) {
this.transferManagerConfig = transferManagerConfig;
this.executor = Executors.newFixedThreadPool(transferManagerConfig.getMaxWorkers());
this.executor =
MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(transferManagerConfig.getMaxWorkers()));

this.storage = transferManagerConfig.getStorageOptions().getService();
}

@Override
public @NonNull UploadJob uploadFiles(List<Path> files, ParallelUploadConfig opts) {
List<Future<UploadResult>> uploadTasks = new ArrayList<>();
public @NonNull UploadJob uploadFiles(List<Path> files, ParallelUploadConfig config) {
Storage.BlobWriteOption[] opts =
config.getWriteOptsPerRequest().toArray(new BlobWriteOption[0]);
List<ApiFuture<UploadResult>> uploadTasks = new ArrayList<>();
for (Path file : files) {
if (Files.isDirectory(file)) throw new IllegalStateException("Directories are not supported");
String blobName = TransferManagerUtils.createBlobName(opts, file);
BlobInfo blobInfo = BlobInfo.newBuilder(opts.getBucketName(), blobName).build();
UploadCallable callable = new UploadCallable(transferManagerConfig, blobInfo, file, opts);
uploadTasks.add(executor.submit(callable));
String blobName = TransferManagerUtils.createBlobName(config, file);
BlobInfo blobInfo = BlobInfo.newBuilder(config.getBucketName(), blobName).build();
UploadCallable callable =
new UploadCallable(transferManagerConfig, storage, blobInfo, file, config, opts);
uploadTasks.add(convert(executor.submit(callable)));
}
return UploadJob.newBuilder()
.setParallelUploadConfig(opts)
.setParallelUploadConfig(config)
.setUploadResponses(ImmutableList.copyOf(uploadTasks))
.build();
}

@Override
public @NonNull DownloadJob downloadBlobs(List<BlobInfo> blobs, ParallelDownloadConfig opts) {
List<Future<DownloadResult>> downloadTasks = new ArrayList<>();
public @NonNull DownloadJob downloadBlobs(List<BlobInfo> blobs, ParallelDownloadConfig config) {
Storage.BlobSourceOption[] opts =
config.getOptionsPerRequest().toArray(new Storage.BlobSourceOption[0]);
List<ApiFuture<DownloadResult>> downloadTasks = new ArrayList<>();
for (BlobInfo blob : blobs) {
DownloadCallable callable = new DownloadCallable(transferManagerConfig, blob, opts);
downloadTasks.add(executor.submit(callable));
DownloadCallable callable = new DownloadCallable(storage, blob, config, opts);
downloadTasks.add(convert(executor.submit(callable)));
}
return DownloadJob.newBuilder()
.setDownloadResults(downloadTasks)
.setParallelDownloadConfig(opts)
.setParallelDownloadConfig(config)
.build();
}

private static <T> ApiFuture<T> convert(ListenableFuture<T> lf) {
return new ListenableFutureToApiFuture<>(lf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import java.nio.file.Path;

public class TransferManagerUtils {
final class TransferManagerUtils {

public static String createBlobName(ParallelUploadConfig config, Path file) {
private TransferManagerUtils() {}

static String createBlobName(ParallelUploadConfig config, Path file) {
if (config.getPrefix().isEmpty()) {
return file.toString();
} else {
Expand Down
Loading

0 comments on commit abc6ec4

Please sign in to comment.