Skip to content

Commit

Permalink
Revert "Merge remote-tracking branch 'upstream/markjschreiber/issue66…
Browse files Browse the repository at this point in the history
…' into awslabs#11-awslabs#30-awslabs#61"

This reverts commit 19ae58e, reversing
changes made to 45666f0.
  • Loading branch information
stefanofornari committed Jul 1, 2023
1 parent 189c4ba commit 66c3a2d
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 228 deletions.
23 changes: 0 additions & 23 deletions src/main/java/examples/ListPrefix.java

This file was deleted.

141 changes: 7 additions & 134 deletions src/main/java/software/amazon/nio/spi/s3/S3FileSystemProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,20 +269,6 @@ public S3Path getPath(URI uri) {
*/
@Override
public SeekableByteChannel newByteChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
return this.newByteChannel(null, path, options, attrs);
}

/**
* Construct a byte channel for the path with the specified client. A more composable and testable (by using a Mock Client)
* version of the public method
* @param client a client that will make data requests for the channel
* @param path the path to read from. Must not be null.
* @param options a set of zero or more open options. May be null.
* @param attrs optional file attributes to set.
* @return An {@link S3SeekableByteChannel}
* @throws IOException if the channel creation fails
*/
protected SeekableByteChannel newByteChannel(S3AsyncClient client, Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
if (Objects.isNull(options)) {
options = Collections.emptySet();
}
Expand All @@ -307,7 +293,6 @@ protected SeekableByteChannel newByteChannel(S3AsyncClient client, Path path, Se
* @param dir the path to the directory
* @param filter the directory stream filter
* @return a new and open {@code DirectoryStream} object
* @throws IOException if the stream cannot be created or has a streaming problem.
*/
@Override
public DirectoryStream<Path> newDirectoryStream(Path dir, DirectoryStream.Filter<? super Path> filter) throws IOException {
Expand Down Expand Up @@ -369,82 +354,16 @@ public void close() {
}

/**
* Get a new directory stream that will use the specified client. A composable and testable version of the public
* version of {@code newDirectoryStream}
* @param s3Client the client to use for calls to S3
* @param dir the path to the directory
* @param filter the directory stream filter
* @return a new and open {@code DirectoryStream} object
* @throws ExecutionException if the async operation cannot be executed.
* @throws InterruptedException if the async operation is interrupted.
* truncate objects whose key after the prefix contains a "/" to the first "/" after the prefix
*/
protected DirectoryStream<Path> newDirectoryStream(S3AsyncClient s3Client, Path dir, DirectoryStream.Filter<? super Path> filter) throws ExecutionException, InterruptedException {
S3Path s3Path = (S3Path) dir;

if (s3Client == null) {
s3Client = getClientStore().getAsyncClientForBucketName(s3Path.bucketName());
}

String pathString = s3Path.getKey();
if (!pathString.endsWith(S3Path.PATH_SEPARATOR) && !pathString.isEmpty()) {
pathString = pathString + S3Path.PATH_SEPARATOR;
private S3Path truncateByPrefix(final S3FileSystem fs, final String prefix, final S3Object object) {
if (object.key().indexOf(prefix) != 0 || object.key().equals(prefix)) {
return S3Path.getPath(fs, object);
}

final String bucketName = s3Path.bucketName();

long timeOut = TIMEOUT_TIME_LENGTH_1;
final TimeUnit unit = MINUTES;

try (final S3FileSystem fs = new S3FileSystem(bucketName)) {
List<S3Path> s3Paths = new ArrayList<>();
String finalPathString = pathString;

s3Client.listObjectsV2Paginator(req -> req
.bucket(bucketName)
.prefix(finalPathString)
.delimiter(S3Path.PATH_SEPARATOR))
.subscribe(response -> {
// add common prefixes (essentially directories)
response.commonPrefixes().forEach(commonPrefix -> {
// remove the path from the start of the dir name
String dirName = commonPrefix.prefix().replaceFirst("^"+finalPathString, "");
s3Paths.add(fs.getPath(dirName));
});
// add objects (files) from response contents to s3Paths
response.contents().forEach(s3Object -> {
String objectName = s3Object.key().replaceFirst("^"+finalPathString, "");
s3Paths.add(S3Path.getPath(fs, objectName));
});
}).get(timeOut, unit);

final Iterator<S3Path> filteredDirectoryContents = s3Paths.stream().filter(path -> {
try {
return filter.accept(path);
} catch (IOException e) {
e.printStackTrace();
return false;
}
}).iterator();

return new DirectoryStream<Path>() {
final Iterator<? extends Path> iterator = filteredDirectoryContents;

@Override
@SuppressWarnings("unchecked")
public Iterator<Path> iterator() {
return (Iterator<Path>) iterator;
}

@Override
public void close() {
// nothing to close
}
};
} catch (TimeoutException e) {
throw TimeOutUtils.logAndGenerateExceptionOnTimeOut(logger, "ListObjectsV2Paginator", timeOut, unit);
} catch (IOException e){
throw new RuntimeException("Cannot construct filesystem for path "+pathString, e);
}
int indexOfNextSeparator = object.key().indexOf(S3Path.PATH_SEPARATOR, prefix.length());
String truncated = indexOfNextSeparator == -1 ? object.key() : object.key().substring(0, indexOfNextSeparator+1);
return fs.getPath(truncated);
}

/**
Expand Down Expand Up @@ -773,52 +692,6 @@ public void checkAccess(Path path, AccessMode... modes) throws IOException {
}
}

/**
* Composable and testable version of {@code checkAccess} that uses the provided client to check access
* @param s3Client the client to use for S3 operations
* @param path the path to the file to check
* @param modes The access modes to check; may have zero elements
* @throws IOException if an IO error occurs when trying to check access
* @throws ExecutionException if the async operation execution fails
* @throws InterruptedException if the async operation is interrupted
*/
protected void checkAccess(S3AsyncClient s3Client, Path path, AccessMode... modes) throws IOException, ExecutionException, InterruptedException {
assert path instanceof S3Path;
S3Path s3Path = (S3Path) path.toRealPath(NOFOLLOW_LINKS);
final String bucketName = s3Path.getFileSystem().bucketName();

if (s3Client == null) {
s3Client = getClientStore().getAsyncClientForBucketName(bucketName);
}

final CompletableFuture<? extends S3Response> response;
if (s3Path.equals(s3Path.getRoot())) {
response = s3Client.headBucket(request -> request.bucket(bucketName));
} else {
response = s3Client.headObject(req -> req.bucket(bucketName).key(s3Path.getKey()));
}

long timeOut = TimeOutUtils.TIMEOUT_TIME_LENGTH_1;
TimeUnit unit = MINUTES;

try {
SdkHttpResponse httpResponse = response.get(timeOut, unit).sdkHttpResponse();
if (httpResponse.isSuccessful()) return;

if (httpResponse.statusCode() == FORBIDDEN)
throw new AccessDeniedException(s3Path.toString());

if (httpResponse.statusCode() == NOT_FOUND)
throw new NoSuchFileException(s3Path.toString());

throw new IOException(String.format("exception occurred while checking access, response code was '%d'",
httpResponse.statusCode()));

} catch (TimeoutException e) {
throw logAndGenerateExceptionOnTimeOut(logger, "checkAccess", timeOut, unit);
}
}

/**
* Returns a file attribute view of a given type. This method works in
* exactly the manner specified by the {@link Files#getFileAttributeView}
Expand Down
11 changes: 2 additions & 9 deletions src/main/java/software/amazon/nio/spi/s3/S3Path.java
Original file line number Diff line number Diff line change
Expand Up @@ -778,19 +778,12 @@ public String toString() {
}

/**
* The key of the object for S3. Essentially the "real path" with the "/" prefix and bucket name removed.
* The key of the object for S3. Essentially the "real path" with the "/" prefix removed.
* @return the key
*/
public String getKey(){
if(isEmpty()) return "";
String s = toRealPath(NOFOLLOW_LINKS).toString();
if(s.startsWith(S3Path.PATH_SEPARATOR+bucketName())) {
s = s.replaceFirst(S3Path.PATH_SEPARATOR+bucketName(), "");
}
while(s.startsWith(S3Path.PATH_SEPARATOR)){
s = s.substring(1);
}
return s;
return toRealPath(NOFOLLOW_LINKS).toString().substring(1);
}

private final class S3PathIterator implements Iterator<Path> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ public class S3ReadAheadByteChannel implements ReadableByteChannel {
* @param maxNumberFragments the maximum number of read ahead fragments to hold. Must be {@code >= 2}.
* @param client the client used to read from the {@code path}
* @param delegator the {@code S3SeekableByteChannel} that delegates reading to this object.
* @param timeout the amount of time after which the operation will timeout
* @param timeUnit the unit of time for the timeout
* @throws IOException if a problem occurs initializing the cached fragments
*/
public S3ReadAheadByteChannel(S3Path path, int maxFragmentSize, int maxNumberFragments, S3AsyncClient client, S3SeekableByteChannel delegator, Long timeout, TimeUnit timeUnit) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ protected S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client) throws IO
/**
* @deprecated startAt is only a valid parameter for the read mode and is
* therefore discouraged to be used during creation of the channel
* @param s3Client the client to use for S3 operations
* @param s3Path the path to open a byte channel for
* @param startAt the byte offset to start at. Implicitly for a read channel.
* @throws IOException if the channel cannot be created.
*/
@Deprecated
protected S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client, long startAt) throws IOException {
Expand All @@ -56,9 +52,6 @@ protected S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client, long star
/**
* @deprecated startAt is only a valid parameter for the read mode and is
* therefore discouraged to be used during creation of the channel
* @param s3Path the path to open a byte channel for
* @param startAt the byte offset to start at. Implicitly for a read channel.
* @throws IOException if the channel cannot be created.
*/
@Deprecated
protected S3SeekableByteChannel(S3Path s3Path, long startAt) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,16 @@

package software.amazon.nio.spi.s3;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
import software.amazon.awssdk.services.s3.model.*;

import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.AccessDeniedException;
import java.nio.file.AccessMode;
import java.nio.file.DirectoryStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.FileSystem;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributeView;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileAttributeView;
Expand All @@ -55,11 +31,16 @@
import org.junit.jupiter.api.AfterEach;

import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.mockito.junit.jupiter.MockitoExtension;
import static software.amazon.nio.spi.s3.config.S3NioSpiConfiguration.AWS_REGION_PROPERTY;

@SuppressWarnings("unchecked")
@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -294,24 +275,22 @@ public void newByteChannel() throws Exception {
}

@Test
public void newDirectoryStream() throws ExecutionException, InterruptedException, IOException {
S3Object object1 = S3Object.builder().key("key1").build();
S3Object object2 = S3Object.builder().key("key2").build();

public void newDirectoryStream() throws Exception {

when(mockClient.listObjectsV2Paginator(any(Consumer.class))).thenReturn(new ListObjectsV2Publisher(mockClient,
ListObjectsV2Request.builder()
.bucket(fs.bucketName())
.prefix("")
.build())
);
S3Object object1 = S3Object.builder().key("key1").build();
S3Object object2 = S3Object.builder().key("foo/key2").build();

when(mockClient.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(CompletableFuture.supplyAsync(() ->
when(mockClient.listObjectsV2(any(Consumer.class))).thenReturn(CompletableFuture.supplyAsync(() ->
ListObjectsV2Response.builder().contents(object1, object2).build()));

final DirectoryStream<Path> stream = provider.newDirectoryStream(Paths.get(URI.create(pathUri)), entry -> true);
assertNotNull(stream);
assertEquals(2, countDirStreamItems(stream));

final DirectoryStream<Path> filteredStream = provider.newDirectoryStream(Paths.get(URI.create(pathUri)),
entry -> entry.endsWith("key2"));
assertNotNull(filteredStream);
assertEquals(1, countDirStreamItems(filteredStream));
}

@Test
Expand Down Expand Up @@ -431,6 +410,7 @@ public void isSameFile() throws Exception {
assertTrue(provider.isSameFile(foo, alsoFoo2));
}


@Test
public void isHidden() {
S3Path foo = fileSystem.getPath("/foo");
Expand Down
21 changes: 5 additions & 16 deletions src/test/java/software/amazon/nio/spi/s3/S3PathTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,7 @@ public void getFileSystem() {

@Test
public void bucketName() {
String b = "mybucket";
assertEquals(b, root.bucketName());
assertEquals(b, absoluteDirectory.bucketName());
assertEquals(b, absoluteObject.bucketName());
assertEquals(b, relativeObject.bucketName());
assertEquals(b, relativeDirectory.bucketName());
assertEquals("mybucket", root.bucketName());
}

@Test
Expand Down Expand Up @@ -452,6 +447,8 @@ public void compareTo() {

@Test
public void testEquals() {
assertEquals(absoluteObject, absoluteObject);

// true because the equals contract requires the use of realPath which uses an absolute path which is relative to
// the working directory, which is always "/" for a bucket.
assertEquals(S3Path.getPath(fileSystem, "dir1/"), S3Path.getPath(fileSystem, "/dir1/"));
Expand All @@ -465,6 +462,8 @@ public void testEquals() {

@Test
public void testHashCode() {
assertEquals(root.hashCode(), root.hashCode());

final S3Path rootAbc = fileSystem.getPath("/a/b/c");
final S3Path abc = fileSystem.getPath("a/b/c");
assertEquals(rootAbc.hashCode(), abc.hashCode());
Expand All @@ -476,15 +475,5 @@ public void testToString() {
assertEquals("/dir1/dir2/", absoluteDirectory.toString());
assertEquals("/dir1/dir2/object", absoluteObject.toString());
assertEquals("../dir3/", relativeDirectory.toString());
assertEquals("dir2/object", relativeObject.toString());
}

@Test
public void testGetKey() {
assertEquals("", root.getKey());
assertEquals("dir1/dir2/", absoluteDirectory.getKey());
assertEquals("dir1/dir2/object", absoluteObject.getKey());
assertEquals("dir3/", relativeDirectory.getKey());
assertEquals("dir1/dir2/object", relativeObject.getKey());
}
}

0 comments on commit 66c3a2d

Please sign in to comment.