Skip to content

Commit

Permalink
making endpoint protocol configurable (see awslabs#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
stefanofornari committed Jun 25, 2023
1 parent 2957afb commit b115b67
Show file tree
Hide file tree
Showing 14 changed files with 265 additions and 58 deletions.
21 changes: 17 additions & 4 deletions src/main/java/software/amazon/nio/spi/s3/S3ClientProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.nio.spi.s3.config.S3NioSpiConfiguration;

/**
*
* TODO: remove credentials as they can now be constructed from configuration
*/
public class S3ClientProvider {

Expand All @@ -46,6 +47,11 @@ public class S3ClientProvider {
*/
protected S3CrtAsyncClientBuilder asyncClientBuilder = S3AsyncClient.crtBuilder();

/**
* Configuration
*/
final protected S3NioSpiConfiguration configuration;


/**
* Default client using the "https://s3.us-east-1.amazonaws.com" endpoint
Expand Down Expand Up @@ -94,6 +100,14 @@ public class S3ClientProvider {

Logger logger = LoggerFactory.getLogger("S3ClientStoreProvider");

public S3ClientProvider(S3NioSpiConfiguration c) {
this.configuration = (c == null) ? new S3NioSpiConfiguration() : c;
}

public S3ClientProvider() {
this(null);
}


/**
* This method returns a universal client (i.e. not bound to any region)
Expand Down Expand Up @@ -287,8 +301,7 @@ private S3Client clientForRegion(String endpoint, String bucket, String region,
}

if ((endpoint != null) && (endpoint.length() > 0)) {
// TODO: shall we have the protocol in the endpoint already?
asyncClientBuilder.endpointOverride(URI.create("https://" + endpoint));
asyncClientBuilder.endpointOverride(URI.create(configuration.getEndpointProtocol() + "://" + endpoint));
}

if (credentials != null) {
Expand Down Expand Up @@ -317,7 +330,7 @@ private S3AsyncClient asyncClientForRegion(String endpoint, String bucket, Strin

if ((endpoint != null) && (endpoint.length() > 0)) {
// TODO: shall we have the protocol in the endpoint already?
asyncClientBuilder.endpointOverride(URI.create("https://" + endpoint));
asyncClientBuilder.endpointOverride(URI.create(configuration.getEndpointProtocol() + "://" + endpoint));
}

if (credentials != null) {
Expand Down
25 changes: 22 additions & 3 deletions src/main/java/software/amazon/nio/spi/s3/S3FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.nio.spi.s3.config.S3NioSpiConfiguration;

/**
* A Java NIO FileSystem for an S3 bucket as seen through the lens of the AWS Principal calling the class.
*
* TODO: replace uriString in constructors with endpoint, bucket and credentials
* TODO: make it closeable so that it is removed from the cache once not needed any more
*/
public class S3FileSystem extends FileSystem {
Logger logger = LoggerFactory.getLogger(this.getClass());
Expand All @@ -38,15 +40,17 @@ public class S3FileSystem extends FileSystem {
*/
public static final String BASIC_FILE_ATTRIBUTE_VIEW = "basic";

protected S3ClientProvider clientProvider;

private final String bucketName, endpoint;
private final S3FileSystemProvider provider;
private boolean open = true;
private final Set<S3SeekableByteChannel> openChannels = new HashSet<>();

private final AwsCredentials credentials;

private final S3AsyncClient client; // TODO: do we want this final?
// TODO: do we need to support S3Client too?
private S3AsyncClient client;

/**
* Create a filesystem that represents the bucket specified by the URI
* @param uriString a valid S3 URI to a bucket, e.g "s3://mybucket"
Expand All @@ -62,9 +66,21 @@ protected S3FileSystem(String uriString, S3FileSystemProvider s3FileSystemProvid
* @param s3FileSystemProvider the provider to be used with this fileSystem
*/
protected S3FileSystem(URI uri, S3FileSystemProvider s3FileSystemProvider) {
this(uri, s3FileSystemProvider, null);
}

/**
* Create a filesystem that represents the bucket specified by the URI
* @param uri a valid S3 URI to a bucket, e.g <code>URI.create("s3://mybucket")</code>
* @param s3FileSystemProvider the provider to be used with this fileSystem
* @param config the configuration to use; can be null to use a default configuration
*/
protected S3FileSystem(URI uri, S3FileSystemProvider s3FileSystemProvider, S3NioSpiConfiguration config) {
super();
assert uri.getScheme().equals(S3FileSystemProvider.SCHEME);

clientProvider = new S3ClientProvider(config);

//
// TODO: move this logic in S3FileSystemProvider and provide constructors
// that accept endpoint, bucket and credentials
Expand All @@ -82,7 +98,6 @@ protected S3FileSystem(URI uri, S3FileSystemProvider s3FileSystemProvider) {

logger.debug("creating FileSystem for 's3://{}'", this.bucketName);
this.provider = s3FileSystemProvider;
this.client = s3FileSystemProvider.clientProvider.generateAsyncClient(endpoint, bucketName, credentials); // provide the endpoint too
}

/**
Expand All @@ -105,6 +120,10 @@ public FileSystemProvider provider() {
}

public S3AsyncClient client() {
if (client == null) {
client = clientProvider.generateAsyncClient(endpoint, bucketName, credentials);
}

return client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static software.amazon.awssdk.http.HttpStatusCode.FORBIDDEN;
import static software.amazon.awssdk.http.HttpStatusCode.NOT_FOUND;
import software.amazon.nio.spi.s3.config.S3NioSpiConfiguration;
import static software.amazon.nio.spi.s3.util.TimeOutUtils.TIMEOUT_TIME_LENGTH_1;
import static software.amazon.nio.spi.s3.util.TimeOutUtils.logAndGenerateExceptionOnTimeOut;

Expand All @@ -55,8 +56,6 @@ public class S3FileSystemProvider extends FileSystemProvider {

private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());

protected S3ClientProvider clientProvider = new S3ClientProvider();

private static Map<String, S3FileSystem> cache = new HashMap<>();


Expand Down Expand Up @@ -120,7 +119,7 @@ public S3FileSystem newFileSystem(URI uri, Map<String, ?> env)
if (cache.containsKey(key)) {
throw new FileSystemAlreadyExistsException("a file system already exists for '" + key + "', use getFileSystem() instead");
}
cache.put(key, fs = new S3FileSystem(uri, this));
cache.put(key, fs = new S3FileSystem(uri, this, new S3NioSpiConfiguration(env)));

return fs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import software.amazon.awssdk.utils.Pair;

import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
Expand All @@ -36,6 +37,14 @@ public class S3NioSpiConfiguration {
* The default value of the maximum fragment size property
*/
public static final String S3_SPI_READ_MAX_FRAGMENT_NUMBER_DEFAULT = "50";
/**
* The name of the endpoint protocol property
*/
public static final String S3_SPI_ENDPOINT_PROTOCOL_PROPERTY = "s3.spi.endpoint-protocol";
/**
* The default value of the endpoint protocolproperty
*/
public static final String S3_SPI_ENDPOINT_PROTOCOL_DEFAULT = "https";

private final Properties properties;
private final Logger logger = LoggerFactory.getLogger(this.getClass());
Expand All @@ -44,6 +53,7 @@ public class S3NioSpiConfiguration {
final Properties defaults = new Properties();
defaults.put(S3_SPI_READ_MAX_FRAGMENT_NUMBER_PROPERTY, S3_SPI_READ_MAX_FRAGMENT_NUMBER_DEFAULT);
defaults.put(S3_SPI_READ_MAX_FRAGMENT_SIZE_PROPERTY, S3_SPI_READ_MAX_FRAGMENT_SIZE_DEFAULT);
defaults.put(S3_SPI_ENDPOINT_PROTOCOL_PROPERTY, S3_SPI_ENDPOINT_PROTOCOL_DEFAULT);

//setup defaults
properties = new Properties(defaults);
Expand All @@ -68,14 +78,23 @@ public S3NioSpiConfiguration(){
this(new Properties());
}

/**
* Create a new, empty configuration
*/
public S3NioSpiConfiguration(Map<String, ?> overrides){
Objects.requireNonNull(overrides);
overrides.keySet()
.forEach(key -> properties.setProperty(key, String.valueOf(overrides.get(key))));
}

/**
* Create a new configuration with overrides
* @param overrides the overrides
*/
protected S3NioSpiConfiguration(Properties overrides) {
Objects.requireNonNull(overrides);
overrides.stringPropertyNames()
.forEach(key -> properties.setProperty(key, overrides.getProperty(key)));
.forEach(key -> properties.setProperty(key, overrides.getProperty(key)));
}

/**
Expand All @@ -96,17 +115,22 @@ public int getMaxFragmentNumber(){
Integer.parseInt(S3_SPI_READ_MAX_FRAGMENT_NUMBER_DEFAULT));
}

private int parseIntProperty(String propName, int defaultVal){
String propertyVal = properties.getProperty(propName);
try{
return Integer.parseInt(propertyVal);
} catch (NumberFormatException e){
logger.warn("the value of '{}' for '{}' is not an integer, using default value of '{}'",
propertyVal, propName, defaultVal);
return defaultVal;
/**
* Get the value of the endpoint protocol
* @return the configured value or the default if not overridden
*/
public String getEndpointProtocol() {
String protocol = properties.getProperty(S3_SPI_ENDPOINT_PROTOCOL_PROPERTY, S3_SPI_ENDPOINT_PROTOCOL_DEFAULT);
if ("http".equalsIgnoreCase(protocol) || "https".equalsIgnoreCase(protocol)) {
return protocol;
}
logger.warn("the value of '{}' for '{}' is not an integer, using default value of '{}'",
protocol, S3_SPI_ENDPOINT_PROTOCOL_PROPERTY, S3_SPI_ENDPOINT_PROTOCOL_DEFAULT);
return S3_SPI_ENDPOINT_PROTOCOL_DEFAULT;
}

// ------------------------------------------------------- protected methods

/**
* Generates an environment variable name from a property name. E.g 'some.property' becomes 'SOME_PROPERTY'
* @param propertyName the name to convert
Expand All @@ -120,4 +144,17 @@ protected String convertPropertyNameToEnvVar(String propertyName){
.replace('.', '_').replace('-', '_')
.toUpperCase(Locale.ROOT);
}

// --------------------------------------------------------- private methods

private int parseIntProperty(String propName, int defaultVal){
String propertyVal = properties.getProperty(propName);
try{
return Integer.parseInt(propertyVal);
} catch (NumberFormatException e){
logger.warn("the value of '{}' for '{}' is not an integer, using default value of '{}'",
propertyVal, propName, defaultVal);
return defaultVal;
}
}
}
50 changes: 50 additions & 0 deletions src/test/java/software/amazon/nio/spi/s3/FakeS3ClientProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2023 ste.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.nio.spi.s3;

import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.awscore.AwsClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;

/**
*
*/
public class FakeS3ClientProvider extends S3ClientProvider {

final public AwsClient client;

public FakeS3ClientProvider(S3AsyncClient client) {
this.client = client;
}

@Override
public S3Client universalClient() {
return (S3Client)client;
}

@Override
protected S3AsyncClient generateAsyncClient(String endpoint, String bucketName, AwsCredentials credentials) {
return (S3AsyncClient)client;
}

@Override
protected S3Client generateClient (String endpoint, String bucket, AwsCredentials credentials) {
return (S3Client)client;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import software.amazon.awssdk.services.s3.model.GetBucketLocationResponse;
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.nio.spi.s3.config.S3NioSpiConfiguration;

@ExtendWith(MockitoExtension.class)
public class S3ClientProviderTest {
Expand All @@ -41,13 +42,18 @@ public void before() throws Exception {
@Test
public void initialization() {
final S3ClientProvider P = new S3ClientProvider();

assertNotNull(P.configuration);

assertTrue(P.universalClient() instanceof S3Client);
assertNotNull(P.universalClient());

assertTrue(P.universalClient(true) instanceof S3AsyncClient);
assertNotNull(P.universalClient());
}

S3NioSpiConfiguration config = new S3NioSpiConfiguration();
assertSame(config, new S3ClientProvider(config).configuration);
}

@Test
public void testGenerateAsyncClientWithNoErrors() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class S3FileSystemEndpointTest {
@BeforeEach
public void init() {
provider = new S3FileSystemProvider();
provider.clientProvider.asyncClientBuilder = BUILDER;
}

@Test
Expand All @@ -45,12 +44,16 @@ public void clientWithProvidedEndpoint() throws Exception {
System.setProperty("aws.region", "aws-east-1");

S3FileSystem fs = new S3FileSystem(URI.create(URI1), provider);
fs.clientProvider.asyncClientBuilder = BUILDER;

S3AsyncClient client = fs.client();
assertEquals(URI.create("https://endpoint1.io"), BUILDER.endpointOverride);
assertNull(BUILDER.credentialsProvider);

fs = new S3FileSystem(URI.create(URI2), provider); fs.client();
fs = new S3FileSystem(URI.create(URI2), provider);
fs.clientProvider.asyncClientBuilder = BUILDER;

fs.client();
assertEquals(URI.create("https://endpoint2.io:8080"), BUILDER.endpointOverride);
assertNull(BUILDER.credentialsProvider);
});
Expand All @@ -65,6 +68,7 @@ public void clientWithProvidedEndpointAndCredentials() throws Exception {
System.setProperty("aws.region", "aws-east-1");

S3FileSystem fs = new S3FileSystem(URI.create(URI1), provider);
fs.clientProvider.asyncClientBuilder = BUILDER;

//
// For non AWS S3 buckets, backet's region is not discovered runtime and it
Expand All @@ -77,8 +81,10 @@ public void clientWithProvidedEndpointAndCredentials() throws Exception {
assertEquals("key1", BUILDER.credentialsProvider.resolveCredentials().accessKeyId());
assertEquals("secret1", BUILDER.credentialsProvider.resolveCredentials().secretAccessKey());

fs = new S3FileSystem(URI.create(URI2), provider); fs.client();
fs = new S3FileSystem(URI.create(URI2), provider);
fs.clientProvider.asyncClientBuilder = BUILDER;

fs.client();
assertEquals(URI.create("https://endpoint2.io:8080"), BUILDER.endpointOverride);
assertNotNull(BUILDER.credentialsProvider);
assertEquals("key2", BUILDER.credentialsProvider.resolveCredentials().accessKeyId());
Expand Down
Loading

0 comments on commit b115b67

Please sign in to comment.