Skip to content

Commit

Permalink
GCP: Add prefix and bulk operations to GCSFileIO (#8168)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Aug 5, 2023
1 parent 4f401a7 commit 3f94949
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 3 deletions.
19 changes: 19 additions & 0 deletions gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import org.apache.iceberg.util.PropertyUtil;

public class GCPProperties implements Serializable {
// Service Options
Expand All @@ -40,6 +41,14 @@ public class GCPProperties implements Serializable {
public static final String GCS_OAUTH2_TOKEN = "gcs.oauth2.token";
public static final String GCS_OAUTH2_TOKEN_EXPIRES_AT = "gcs.oauth2.token-expires-at";

/** Configure the batch size used when deleting multiple files from a given GCS bucket */
public static final String GCS_DELETE_BATCH_SIZE = "gcs.delete.batch-size";
/**
* Max possible batch size for deletion. Currently, a max of 100 keys is advised, so we default to
* a number below that. https://cloud.google.com/storage/docs/batch
*/
public static final int GCS_DELETE_BATCH_SIZE_DEFAULT = 50;

private String projectId;
private String clientLibToken;
private String serviceHost;
Expand All @@ -54,6 +63,8 @@ public class GCPProperties implements Serializable {
private String gcsOAuth2Token;
private Date gcsOAuth2TokenExpiresAt;

private int gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT;

public GCPProperties() {}

public GCPProperties(Map<String, String> properties) {
Expand All @@ -78,6 +89,10 @@ public GCPProperties(Map<String, String> properties) {
gcsOAuth2TokenExpiresAt =
new Date(Long.parseLong(properties.get(GCS_OAUTH2_TOKEN_EXPIRES_AT)));
}

gcsDeleteBatchSize =
PropertyUtil.propertyAsInt(
properties, GCS_DELETE_BATCH_SIZE, GCS_DELETE_BATCH_SIZE_DEFAULT);
}

public Optional<Integer> channelReadChunkSize() {
Expand Down Expand Up @@ -119,4 +134,8 @@ public Optional<String> oauth2Token() {
public Optional<Date> oauth2TokenExpiresAt() {
return Optional.ofNullable(gcsOAuth2TokenExpiresAt);
}

public int deleteBatchSize() {
return gcsDeleteBatchSize;
}
}
50 changes: 49 additions & 1 deletion gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,25 @@

import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.gcp.GCPProperties;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
Expand All @@ -48,7 +56,7 @@
* <p>See <a href="https://cloud.google.com/storage/docs/folders#overview">Cloud Storage
* Overview</a>
*/
public class GCSFileIO implements FileIO {
public class GCSFileIO implements FileIO, SupportsBulkOperations, SupportsPrefixOperations {
private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class);
private static final String DEFAULT_METRICS_IMPL =
"org.apache.iceberg.hadoop.HadoopMetricsContext";
Expand Down Expand Up @@ -174,4 +182,44 @@ public void close() {
}
}
}

@Override
public Iterable<FileInfo> listPrefix(String prefix) {
GCSLocation location = new GCSLocation(prefix);
return () ->
client()
.list(location.bucket(), Storage.BlobListOption.prefix(location.prefix()))
.streamAll()
.map(
blob ->
new FileInfo(
String.format("gs://%s/%s", blob.getBucket(), blob.getName()),
blob.getSize(),
createTimeMillis(blob)))
.iterator();
}

private long createTimeMillis(Blob blob) {
if (blob.getCreateTimeOffsetDateTime() == null) {
return 0;
}
return blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli();
}

@Override
public void deletePrefix(String prefix) {
internalDeleteFiles(
Streams.stream(listPrefix(prefix))
.map(fileInfo -> BlobId.fromGsUtilUri(fileInfo.location())));
}

@Override
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
internalDeleteFiles(Streams.stream(pathsToDelete).map(BlobId::fromGsUtilUri));
}

private void internalDeleteFiles(Stream<BlobId> blobIdsToDelete) {
Streams.stream(Iterators.partition(blobIdsToDelete.iterator(), gcpProperties.deleteBatchSize()))
.forEach(batch -> client().delete(batch));
}
}
76 changes: 76 additions & 0 deletions gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSLocation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.gcp.gcs;

import com.google.cloud.storage.BlobId;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* This class represents a fully qualified location in GCS expressed as a URI. This class allows for
* URIs with only a bucket and no path specified, unlike with {@link BlobId#fromGsUtilUri(String)}.
*/
class GCSLocation {
private static final String SCHEME_DELIM = "://";
private static final String PATH_DELIM = "/";
private static final String QUERY_DELIM = "\\?";
private static final String FRAGMENT_DELIM = "#";

private static final String EXPECTED_SCHEME = "gs";

private final String bucket;
private final String prefix;

/**
* Creates a new GCSLocation with the form of scheme://bucket/path?query#fragment
*
* @param location fully qualified URI
*/
GCSLocation(String location) {
Preconditions.checkArgument(location != null, "Invalid location: null");

String[] schemeSplit = location.split(SCHEME_DELIM, -1);
ValidationException.check(
schemeSplit.length == 2, "Invalid GCS URI, cannot determine scheme: %s", location);

String scheme = schemeSplit[0];
ValidationException.check(
EXPECTED_SCHEME.equals(scheme), "Invalid GCS URI, invalid scheme: %s", scheme);

String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);

this.bucket = authoritySplit[0];

// Strip query and fragment if they exist
String path = authoritySplit.length > 1 ? authoritySplit[1] : "";
path = path.split(QUERY_DELIM, -1)[0];
path = path.split(FRAGMENT_DELIM, -1)[0];
this.prefix = path;
}

/** Returns GCS bucket name. */
public String bucket() {
return bucket;
}

/** Returns GCS object name prefix. */
public String prefix() {
return prefix;
}
}
89 changes: 87 additions & 2 deletions gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@

import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
import java.util.stream.StreamSupport;
import org.apache.iceberg.TestHelpers;
Expand All @@ -36,19 +41,35 @@
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class GCSFileIOTest {
private static final String TEST_BUCKET = "TEST_BUCKET";
private final Random random = new Random(1);

private final Storage storage = LocalStorageHelper.getOptions().getService();
private final Storage storage = spy(LocalStorageHelper.getOptions().getService());
private GCSFileIO io;

@BeforeEach
public void before() {
// LocalStorageHelper doesn't support batch operations, so mock that here
doAnswer(
invoke -> {
Iterable<BlobId> iter = invoke.getArgument(0);
List<Boolean> answer = Lists.newArrayList();
iter.forEach(
blobId -> {
answer.add(storage.delete(blobId));
});
return answer;
})
.when(storage)
.delete(any(Iterable.class));

io = new GCSFileIO(() -> storage, new GCPProperties());
}

Expand Down Expand Up @@ -91,7 +112,7 @@ public void testDelete() {
.count())
.isEqualTo(1);

io.deleteFile(format("gs://%s/%s", TEST_BUCKET, path));
io.deleteFile(gsUri(path));

// The bucket should now be empty
assertThat(
Expand All @@ -100,6 +121,70 @@ public void testDelete() {
.isZero();
}

private String gsUri(String path) {
return format("gs://%s/%s", TEST_BUCKET, path);
}

@Test
public void testListPrefix() {
String prefix = "list/path/";
String path1 = prefix + "data1.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path1).build());
String path2 = prefix + "data2.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path2).build());
String path3 = "list/skip/data3.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build());

assertThat(StreamSupport.stream(io.listPrefix(gsUri("list/")).spliterator(), false).count())
.isEqualTo(3);

assertThat(StreamSupport.stream(io.listPrefix(gsUri(prefix)).spliterator(), false).count())
.isEqualTo(2);

assertThat(StreamSupport.stream(io.listPrefix(gsUri(path1)).spliterator(), false).count())
.isEqualTo(1);
}

@Test
public void testDeleteFiles() {
String prefix = "del/path/";
String path1 = prefix + "data1.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path1).build());
String path2 = prefix + "data2.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path2).build());
String path3 = "del/skip/data3.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build());

assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count())
.isEqualTo(3);

Iterable<String> deletes =
() -> ImmutableList.of(gsUri(path1), gsUri(path3)).stream().iterator();
io.deleteFiles(deletes);

assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count())
.isEqualTo(1);
}

@Test
public void testDeletePrefix() {
String prefix = "del/path/";
String path1 = prefix + "data1.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path1).build());
String path2 = prefix + "data2.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path2).build());
String path3 = "del/skip/data3.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build());

assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count())
.isEqualTo(3);

io.deletePrefix(gsUri(prefix));

assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count())
.isEqualTo(1);
}

@Test
public void testGCSFileIOKryoSerialization() throws IOException {
FileIO testGCSFileIO = new GCSFileIO();
Expand Down
Loading

0 comments on commit 3f94949

Please sign in to comment.