Skip to content

Commit

Permalink
merging fix for aws-java-nio-spi-for-s3#66 (directory stream provides…
Browse files Browse the repository at this point in the history
… all keys (but broken) when listing a directory) and aws-java-nio-spi-for-s3#35 (S3FileSystem.getRootDirectories() returning an empty list)
  • Loading branch information
stefanofornari committed Jul 9, 2023
1 parent 82f9339 commit 999e0a7
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 72 deletions.
1 change: 1 addition & 0 deletions THIRD-PARTY
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ Everyone is permitted to copy and distribute copies of this Agreement, but in or
This Agreement is governed by the laws of the State of New York and the intellectual property laws of the United States of America. No party to this Agreement will bring a legal action under this Agreement more than one year after the cause of action arose. Each party waives its rights to a jury trial in any resulting litigation.

** Caffeine -- https://github.com/ben-manes/caffeine -- Copyright 2014 Ben Manes. All Rights Reserved.
** RxJava -- https://github.com/ReactiveX/RxJava -- Copyright (c) 2016-present, RxJava Contributors.


Apache License
Expand Down
13 changes: 13 additions & 0 deletions nbactions.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<actions>
<action>
<actionName>CUSTOM-quick build</actionName>
<displayName>quick build</displayName>
<goals>
<goal>install</goal>
</goals>
<properties>
<skipTests>true</skipTests>
</properties>
</action>
</actions>
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@
<version>1.2.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.1.6</version>
<type>jar</type>
</dependency>
</dependencies>

<profiles>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/examples/ListPrefix.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

public class ListPrefix {
public static void main(String[] args) throws IOException, URISyntaxException {
if(args.length == 0){
if (args.length == 0) {
System.out.println("Provide an s3 prefix to list.");
System.exit(1);
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/software/amazon/nio/spi/s3/S3FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public String getSeparator() {
*/
@Override
public Iterable<Path> getRootDirectories() {
return S3Path.getPath(this, "/");
return Collections.singleton(S3Path.getPath(this, "/"));
}

/**
Expand Down
107 changes: 51 additions & 56 deletions src/main/java/software/amazon/nio/spi/s3/S3FileSystemProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

package software.amazon.nio.spi.s3;

import io.reactivex.rxjava3.core.Flowable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.*;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CopyRequest;
import software.amazon.nio.spi.s3.util.TimeOutUtils;
Expand Down Expand Up @@ -312,72 +314,65 @@ protected SeekableByteChannel newByteChannel(S3AsyncClient client, Path path, Se
@Override
public DirectoryStream<Path> newDirectoryStream(Path dir, DirectoryStream.Filter<? super Path> filter) throws IOException {
S3Path s3Path = (S3Path) dir;

String pathString = s3Path.getKey();
if (!pathString.endsWith(S3Path.PATH_SEPARATOR) && !pathString.isEmpty()) {
pathString = pathString + S3Path.PATH_SEPARATOR;
}

final String bucketName = s3Path.bucketName();

long timeOut = TIMEOUT_TIME_LENGTH_1;
final TimeUnit unit = MINUTES;
final S3FileSystem fs = s3Path.getFileSystem();

try {
List<S3Path> s3Paths = new ArrayList<>();
String finalPathString = pathString;
S3FileSystem fs = s3Path.getFileSystem();
final String finalPathString = pathString;

final ListObjectsV2Publisher listObjectsV2Publisher = fs.client().listObjectsV2Paginator(req -> req
.bucket(bucketName)
.prefix(finalPathString)
.delimiter(S3Path.PATH_SEPARATOR));

final Iterator<Path> iterator = Flowable.fromPublisher(listObjectsV2Publisher)
.flatMapIterable(response -> {

//add common prefixes from this page
List<String> items = response
.commonPrefixes().stream()
.map(CommonPrefix::prefix)
.collect(Collectors.toList());

//add s3 objects from this page
items.addAll(response
.contents().stream()
.map(S3Object::key)
.collect(Collectors.toList()));

// trim keys, convert to S3Path and apply directory stream filter
return items.stream()
//.map(item -> item.replaceFirst("^"+finalPathString, ""))
.map(fs::getPath)
.filter(path -> {
try {
return filter.accept(path);
} catch (IOException e) {
e.printStackTrace();
return false;
}
}).collect(Collectors.toList());
})
.blockingStream()
.map(Path.class::cast)
.iterator();

return new DirectoryStream<Path>() {
@Override
public void close() throws IOException {

fs.client().listObjectsV2Paginator(req -> req
.bucket(bucketName)
.prefix(finalPathString)
.delimiter(S3Path.PATH_SEPARATOR))
.subscribe(response -> {
System.out.println("CHECK! " + response.commonPrefixes());
// add common prefixes (essentially directories)
response.commonPrefixes().forEach(commonPrefix -> {
// remove the path from the start of the dir name
String dirName = commonPrefix.prefix().replaceFirst("^"+finalPathString, "");
s3Paths.add(fs.getPath(dirName));
});
// add objects (files) from response contents to s3Paths
response.contents().forEach(s3Object -> {
String objectName = s3Object.key().replaceFirst("^"+finalPathString, "");
s3Paths.add(S3Path.getPath(fs, objectName));
});
}).get(timeOut, unit);

final Iterator<S3Path> filteredDirectoryContents = s3Paths.stream().filter(path -> {
try {
return filter.accept(path);
} catch (IOException e) {
e.printStackTrace();
return false;
}
}).iterator();

return new DirectoryStream<Path>() {
final Iterator<? extends Path> iterator = filteredDirectoryContents;

@Override
@SuppressWarnings("unchecked")
public Iterator<Path> iterator() {
return (Iterator<Path>) iterator;
}
}

@Override
public void close() {
// nothing to close
}
};
} catch (ExecutionException e) {
throw new IOException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw TimeOutUtils.logAndGenerateExceptionOnTimeOut(logger, "ListObjectsV2Paginator", timeOut, unit);
}
@Override
public Iterator<Path> iterator() {
return iterator;
}
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,7 @@
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.*;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;

import java.net.URI;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -79,8 +80,12 @@ public void isReadOnly() {
public void getRootDirectories() {
final Iterable<Path> rootDirectories = s3FileSystem.getRootDirectories();
assertNotNull(rootDirectories);
assertEquals(S3Path.PATH_SEPARATOR, rootDirectories.toString());
assertFalse(s3FileSystem.getRootDirectories().iterator().hasNext());

final Iterator<Path> rootDirectoriesIterator = rootDirectories.iterator();

assertTrue(rootDirectoriesIterator.hasNext());
assertEquals(S3Path.PATH_SEPARATOR, rootDirectoriesIterator.next().toString());
assertFalse(rootDirectoriesIterator.hasNext());
}

@Test
Expand Down

0 comments on commit 999e0a7

Please sign in to comment.