From 999e0a707d67fdc6c690b1047192b00e5524750f Mon Sep 17 00:00:00 2001 From: Stefano Fornari Date: Sun, 9 Jul 2023 07:52:01 +0200 Subject: [PATCH] merging fix for aws-java-nio-spi-for-s3#66 (directory stream provides all keys (but broken) when listing a directory) and aws-java-nio-spi-for-s3#35 (S3FileSystem.getRootDirectories() returning an empty list) --- THIRD-PARTY | 1 + nbactions.xml | 13 +++ pom.xml | 7 +- src/main/java/examples/ListPrefix.java | 2 +- .../amazon/nio/spi/s3/S3FileSystem.java | 2 +- .../nio/spi/s3/S3FileSystemProvider.java | 107 +++++++++--------- .../nio/spi/s3/S3FileSystemProviderTest.java | 12 +- .../amazon/nio/spi/s3/S3FileSystemTest.java | 9 +- 8 files changed, 81 insertions(+), 72 deletions(-) create mode 100644 nbactions.xml diff --git a/THIRD-PARTY b/THIRD-PARTY index 780f3ec6..c3103c93 100644 --- a/THIRD-PARTY +++ b/THIRD-PARTY @@ -94,6 +94,7 @@ Everyone is permitted to copy and distribute copies of this Agreement, but in or This Agreement is governed by the laws of the State of New York and the intellectual property laws of the United States of America. No party to this Agreement will bring a legal action under this Agreement more than one year after the cause of action arose. Each party waives its rights to a jury trial in any resulting litigation. ** Caffeine -- https://github.com/ben-manes/caffeine -- Copyright 2014 Ben Manes. All Rights Reserved. +** RxJava -- https://github.com/ReactiveX/RxJava -- Copyright (c) 2016-present, RxJava Contributors. Apache License diff --git a/nbactions.xml b/nbactions.xml new file mode 100644 index 00000000..3a688be4 --- /dev/null +++ b/nbactions.xml @@ -0,0 +1,13 @@ + + + + CUSTOM-quick build + quick build + + install + + + true + + + diff --git a/pom.xml b/pom.xml index e1a45932..b1a88f0e 100644 --- a/pom.xml +++ b/pom.xml @@ -137,7 +137,12 @@ 1.2.1 test - + + io.reactivex.rxjava3 + rxjava + 3.1.6 + jar + diff --git a/src/main/java/examples/ListPrefix.java b/src/main/java/examples/ListPrefix.java index ac387b1c..e445dddf 100644 --- a/src/main/java/examples/ListPrefix.java +++ b/src/main/java/examples/ListPrefix.java @@ -9,7 +9,7 @@ public class ListPrefix { public static void main(String[] args) throws IOException, URISyntaxException { - if(args.length == 0){ + if (args.length == 0) { System.out.println("Provide an s3 prefix to list."); System.exit(1); } diff --git a/src/main/java/software/amazon/nio/spi/s3/S3FileSystem.java b/src/main/java/software/amazon/nio/spi/s3/S3FileSystem.java index 4c95c4d4..4f35399c 100644 --- a/src/main/java/software/amazon/nio/spi/s3/S3FileSystem.java +++ b/src/main/java/software/amazon/nio/spi/s3/S3FileSystem.java @@ -269,7 +269,7 @@ public String getSeparator() { */ @Override public Iterable getRootDirectories() { - return S3Path.getPath(this, "/"); + return Collections.singleton(S3Path.getPath(this, "/")); } /** diff --git a/src/main/java/software/amazon/nio/spi/s3/S3FileSystemProvider.java b/src/main/java/software/amazon/nio/spi/s3/S3FileSystemProvider.java index a4e7d977..3ebe8cad 100644 --- a/src/main/java/software/amazon/nio/spi/s3/S3FileSystemProvider.java +++ b/src/main/java/software/amazon/nio/spi/s3/S3FileSystemProvider.java @@ -5,12 +5,14 @@ package software.amazon.nio.spi.s3; +import io.reactivex.rxjava3.core.Flowable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.*; +import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher; import software.amazon.awssdk.transfer.s3.S3TransferManager; import software.amazon.awssdk.transfer.s3.model.CopyRequest; import software.amazon.nio.spi.s3.util.TimeOutUtils; @@ -312,7 +314,6 @@ protected SeekableByteChannel newByteChannel(S3AsyncClient client, Path path, Se @Override public DirectoryStream newDirectoryStream(Path dir, DirectoryStream.Filter filter) throws IOException { S3Path s3Path = (S3Path) dir; - String pathString = s3Path.getKey(); if (!pathString.endsWith(S3Path.PATH_SEPARATOR) && !pathString.isEmpty()) { pathString = pathString + S3Path.PATH_SEPARATOR; @@ -320,64 +321,58 @@ public DirectoryStream newDirectoryStream(Path dir, DirectoryStream.Filter final String bucketName = s3Path.bucketName(); - long timeOut = TIMEOUT_TIME_LENGTH_1; - final TimeUnit unit = MINUTES; - final S3FileSystem fs = s3Path.getFileSystem(); - try { - List s3Paths = new ArrayList<>(); - String finalPathString = pathString; + S3FileSystem fs = s3Path.getFileSystem(); + final String finalPathString = pathString; + + final ListObjectsV2Publisher listObjectsV2Publisher = fs.client().listObjectsV2Paginator(req -> req + .bucket(bucketName) + .prefix(finalPathString) + .delimiter(S3Path.PATH_SEPARATOR)); + + final Iterator iterator = Flowable.fromPublisher(listObjectsV2Publisher) + .flatMapIterable(response -> { + + //add common prefixes from this page + List items = response + .commonPrefixes().stream() + .map(CommonPrefix::prefix) + .collect(Collectors.toList()); + + //add s3 objects from this page + items.addAll(response + .contents().stream() + .map(S3Object::key) + .collect(Collectors.toList())); + + // trim keys, convert to S3Path and apply directory stream filter + return items.stream() + //.map(item -> item.replaceFirst("^"+finalPathString, "")) + .map(fs::getPath) + .filter(path -> { + try { + return filter.accept(path); + } catch (IOException e) { + e.printStackTrace(); + return false; + } + }).collect(Collectors.toList()); + }) + .blockingStream() + .map(Path.class::cast) + .iterator(); + + return new DirectoryStream() { + @Override + public void close() throws IOException { - fs.client().listObjectsV2Paginator(req -> req - .bucket(bucketName) - .prefix(finalPathString) - .delimiter(S3Path.PATH_SEPARATOR)) - .subscribe(response -> { - System.out.println("CHECK! " + response.commonPrefixes()); - // add common prefixes (essentially directories) - response.commonPrefixes().forEach(commonPrefix -> { - // remove the path from the start of the dir name - String dirName = commonPrefix.prefix().replaceFirst("^"+finalPathString, ""); - s3Paths.add(fs.getPath(dirName)); - }); - // add objects (files) from response contents to s3Paths - response.contents().forEach(s3Object -> { - String objectName = s3Object.key().replaceFirst("^"+finalPathString, ""); - s3Paths.add(S3Path.getPath(fs, objectName)); - }); - }).get(timeOut, unit); - - final Iterator filteredDirectoryContents = s3Paths.stream().filter(path -> { - try { - return filter.accept(path); - } catch (IOException e) { - e.printStackTrace(); - return false; - } - }).iterator(); - - return new DirectoryStream() { - final Iterator iterator = filteredDirectoryContents; - - @Override - @SuppressWarnings("unchecked") - public Iterator iterator() { - return (Iterator) iterator; - } + } - @Override - public void close() { - // nothing to close - } - }; - } catch (ExecutionException e) { - throw new IOException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (TimeoutException e) { - throw TimeOutUtils.logAndGenerateExceptionOnTimeOut(logger, "ListObjectsV2Paginator", timeOut, unit); - } + @Override + public Iterator iterator() { + return iterator; + } + }; } /** diff --git a/src/test/java/software/amazon/nio/spi/s3/S3FileSystemProviderTest.java b/src/test/java/software/amazon/nio/spi/s3/S3FileSystemProviderTest.java index aa97acd1..f38d9105 100644 --- a/src/test/java/software/amazon/nio/spi/s3/S3FileSystemProviderTest.java +++ b/src/test/java/software/amazon/nio/spi/s3/S3FileSystemProviderTest.java @@ -15,17 +15,7 @@ import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.model.CopyObjectRequest; -import software.amazon.awssdk.services.s3.model.CopyObjectResponse; -import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; -import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; -import software.amazon.awssdk.services.s3.model.HeadObjectRequest; -import software.amazon.awssdk.services.s3.model.HeadObjectResponse; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.PutObjectResponse; -import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.s3.model.*; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher; import java.net.URI; diff --git a/src/test/java/software/amazon/nio/spi/s3/S3FileSystemTest.java b/src/test/java/software/amazon/nio/spi/s3/S3FileSystemTest.java index fae2a88b..005a4148 100644 --- a/src/test/java/software/amazon/nio/spi/s3/S3FileSystemTest.java +++ b/src/test/java/software/amazon/nio/spi/s3/S3FileSystemTest.java @@ -10,6 +10,7 @@ import java.nio.file.FileSystems; import java.nio.file.Path; import java.util.Collections; +import java.util.Iterator; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import org.junit.jupiter.api.AfterEach; @@ -79,8 +80,12 @@ public void isReadOnly() { public void getRootDirectories() { final Iterable rootDirectories = s3FileSystem.getRootDirectories(); assertNotNull(rootDirectories); - assertEquals(S3Path.PATH_SEPARATOR, rootDirectories.toString()); - assertFalse(s3FileSystem.getRootDirectories().iterator().hasNext()); + + final Iterator rootDirectoriesIterator = rootDirectories.iterator(); + + assertTrue(rootDirectoriesIterator.hasNext()); + assertEquals(S3Path.PATH_SEPARATOR, rootDirectoriesIterator.next().toString()); + assertFalse(rootDirectoriesIterator.hasNext()); } @Test