Skip to content

Commit

Permalink
chore: Make newFileSystem work for s3x provider (#420)
Browse files Browse the repository at this point in the history
* chore: Test `when bucket is empty, directory stream should be empty

* chore: Test `FileSystems$newFileSystem(URI, Map) should create bucket when name is valid` [skip ci]

* chore: Make `newFileSystem(URI uri, Map<String, ?> env)` support s3x provider

* chore: Delete FileSystemsTest unit test - noop

* fix: Avoid closing the client on readaheadbytechannel close
  • Loading branch information
guicamest authored Mar 19, 2024
1 parent c4d41bf commit acf3aab
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package software.amazon.nio.spi.s3;

import java.io.IOException;
import java.net.URI;
import java.nio.file.FileSystems;
import java.util.Map;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

@DisplayName("FileSystems$newFileSystem(URI, Map) should")
public class FileSystemsNewFSTest {

@Test
@DisplayName("create bucket when name is valid")
public void createBucket() throws IOException {
String bucketLocation = Containers.localStackConnectionEndpoint() + "/fresh-bucket";
FileSystems.newFileSystem(
URI.create(bucketLocation),
Map.of("locationConstraint", "us-east-1")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package software.amazon.nio.spi.s3;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static software.amazon.nio.spi.s3.Containers.localStackConnectionEndpoint;

Expand All @@ -20,10 +21,10 @@
public class FilesDirectoryStreamTest {

@Nested
@DisplayName("should throw")
@DisplayName("when bucket does not exist")
class DirectoryDoesNotExist {

@DisplayName("when bucket does not exist")
@DisplayName("should throw")
@Test
@SuppressWarnings("resource")
public void whenBucketNotFound() {
Expand All @@ -33,4 +34,20 @@ public void whenBucketNotFound() {
}
}

@Nested
@DisplayName("when bucket exists")
class DirectoryExists {

@DisplayName("and is empty, list should be empty")
@Test
public void listShouldBeEmpty() throws IOException {
Containers.createBucket("new-directory-stream");
final var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/new-directory-stream/"));

try(var stream = Files.newDirectoryStream(path, p -> true)) {
assertThat(stream).isEmpty();
}
}
}

}
15 changes: 8 additions & 7 deletions src/main/java/software/amazon/nio/spi/s3/S3ClientProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,7 @@ private String getBucketRegionFromResponse(SdkHttpResponse response) {
);
}

private S3AsyncClient configureCrtClientForRegion(String regionName) {
var region = getRegionFromRegionName(regionName);
logger.debug("bucket region is: '{}'", region);

S3CrtAsyncClientBuilder configureCrtClient() {
var endpointUri = configuration.endpointUri();
if (endpointUri != null) {
asyncClientBuilder.endpointOverride(endpointUri);
Expand All @@ -195,9 +192,13 @@ private S3AsyncClient configureCrtClientForRegion(String regionName) {
asyncClientBuilder.credentialsProvider(() -> credentials);
}

return asyncClientBuilder.forcePathStyle(configuration.getForcePathStyle())
.region(region)
.build();
return asyncClientBuilder.forcePathStyle(configuration.getForcePathStyle());
}

private S3AsyncClient configureCrtClientForRegion(String regionName) {
var region = getRegionFromRegionName(regionName);
logger.debug("bucket region is: '{}'", region);
return configureCrtClient().region(region).build();
}

private static Region getRegionFromRegionName(String regionName) {
Expand Down
18 changes: 10 additions & 8 deletions src/main/java/software/amazon/nio/spi/s3/S3FileSystemProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,21 @@ public String getScheme() {
*/
@Override
public FileSystem newFileSystem(final URI uri, final Map<String, ?> env) throws IOException {
if (! uri.getScheme().equals(SCHEME)) {
throw new IllegalArgumentException("URI scheme must be s3");
if (!uri.getScheme().equals(getScheme())) {
throw new IllegalArgumentException("URI scheme must be " + getScheme());
}

@SuppressWarnings("unchecked")
var envMap = (Map<String, Object>) env;

var bucketName = uri.getAuthority();
try (var client = S3AsyncClient.create()) {
var info = fileSystemInfo(uri);
var config = new S3NioSpiConfiguration().withEndpoint(info.endpoint()).withBucketName(info.bucket());
if (info.accessKey() != null) {
config.withCredentials(info.accessKey(), info.accessSecret());
}
var bucketName = config.getBucketName();

try (var client = new S3ClientProvider(config).configureCrtClient().build()) {
var createBucketResponse = client.createBucket(
bucketBuilder -> bucketBuilder.bucket(bucketName)
.acl(envMap.getOrDefault("acl", "").toString())
Expand All @@ -172,11 +178,7 @@ public FileSystem newFileSystem(final URI uri, final Map<String, ?> env) throws
} catch (InterruptedException | TimeoutException | SdkException e) {
throw new IOException(e.getMessage(), e);
}


return getFileSystem(uri, true);


}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ public void close() {
open = false;
readAheadBuffersCache.invalidateAll();
readAheadBuffersCache.cleanUp();
client.close();
}

private void clearPriorFragments(int currentFragIndx) {
Expand Down
32 changes: 0 additions & 32 deletions src/test/java/software/amazon/nio/spi/s3/FileSystemsTest.java

This file was deleted.

0 comments on commit acf3aab

Please sign in to comment.