From 4d9e1e63c0c7bd097fb685096340db5b79b7ffc9 Mon Sep 17 00:00:00 2001 From: Mark Schreiber Date: Mon, 11 Mar 2024 09:51:28 -0400 Subject: [PATCH] fix: use environment configuration when creating a byte channel --- .../nio/spi/s3/S3ReadAheadByteChannel.java | 8 +++ .../nio/spi/s3/S3SeekableByteChannel.java | 5 +- .../nio/spi/s3/S3SeekableByteChannelTest.java | 55 +++++++++++++------ 3 files changed, 46 insertions(+), 22 deletions(-) diff --git a/src/main/java/software/amazon/nio/spi/s3/S3ReadAheadByteChannel.java b/src/main/java/software/amazon/nio/spi/s3/S3ReadAheadByteChannel.java index 79c3ecc2..178c4ac4 100644 --- a/src/main/java/software/amazon/nio/spi/s3/S3ReadAheadByteChannel.java +++ b/src/main/java/software/amazon/nio/spi/s3/S3ReadAheadByteChannel.java @@ -239,4 +239,12 @@ private CompletableFuture computeFragmentFuture(int fragmentIndex) { Integer fragmentIndexForByteNumber(long byteNumber) { return Math.toIntExact(Math.floorDiv(byteNumber, (long) maxFragmentSize)); } + + public int getMaxFragmentSize() { + return maxFragmentSize; + } + + public int getMaxNumberFragments() { + return maxNumberFragments; + } } diff --git a/src/main/java/software/amazon/nio/spi/s3/S3SeekableByteChannel.java b/src/main/java/software/amazon/nio/spi/s3/S3SeekableByteChannel.java index 9e936f86..a6d8b301 100644 --- a/src/main/java/software/amazon/nio/spi/s3/S3SeekableByteChannel.java +++ b/src/main/java/software/amazon/nio/spi/s3/S3SeekableByteChannel.java @@ -21,7 +21,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.nio.spi.s3.config.S3NioSpiConfiguration; import software.amazon.nio.spi.s3.util.TimeOutUtils; class S3SeekableByteChannel implements SeekableByteChannel { @@ -54,9 +53,7 @@ private S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client, long startA throw new IOException("The SYNC/DSYNC options is not supported"); } - // later we will add a constructor that allows providing delegates for composition - - var config = new S3NioSpiConfiguration(); + var config = s3Path.getFileSystem().configuration(); if (options.contains(StandardOpenOption.WRITE)) { LOGGER.debug("using S3WritableByteChannel as write delegate for path '{}'", s3Path.toUri()); readDelegate = null; diff --git a/src/test/java/software/amazon/nio/spi/s3/S3SeekableByteChannelTest.java b/src/test/java/software/amazon/nio/spi/s3/S3SeekableByteChannelTest.java index 73ace54d..021263cb 100644 --- a/src/test/java/software/amazon/nio/spi/s3/S3SeekableByteChannelTest.java +++ b/src/test/java/software/amazon/nio/spi/s3/S3SeekableByteChannelTest.java @@ -5,38 +5,46 @@ package software.amazon.nio.spi.s3; -import java.time.Instant; -import org.mockito.Mock; -import software.amazon.awssdk.core.ResponseBytes; -import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.core.async.AsyncResponseTransformer; -import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.model.*; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.WRITE; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.when; +import static software.amazon.nio.spi.s3.S3Matchers.anyConsumer; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.OpenOption; +import java.time.Instant; import java.util.Collections; import java.util.Set; import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.AfterEach; - -import static java.nio.file.StandardOpenOption.CREATE; -import static java.nio.file.StandardOpenOption.READ; -import static java.nio.file.StandardOpenOption.WRITE; -import static org.junit.jupiter.api.Assertions.*; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; - -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.when; -import static software.amazon.nio.spi.s3.S3Matchers.anyConsumer; - +import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; @ExtendWith(MockitoExtension.class) @SuppressWarnings("unchecked") @@ -149,4 +157,15 @@ private S3SeekableByteChannel seekableByteChannelForRead() throws IOException { return new S3SeekableByteChannel(path, mockClient, Collections.singleton(READ)); } + // test that the S3SeekableByteChannel uses the buffer size from the configuration set for the FileSystem + @Test + public void testBufferSize() throws IOException { + fs.configuration().withMaxFragmentSize(10000); + fs.configuration().withMaxFragmentNumber(10); + try(var channel = (S3SeekableByteChannel) fs.provider().newByteChannel(path, Set.of(READ))) { + assertEquals(10000, ((S3ReadAheadByteChannel) channel.getReadDelegate()).getMaxFragmentSize()); + assertEquals(10, ((S3ReadAheadByteChannel) channel.getReadDelegate()).getMaxNumberFragments()); + } + } + }