Skip to content

Commit

Permalink
Fix FileInStreamIntegrationTest
Browse files Browse the repository at this point in the history
Fix `FileInStreamIntegrationTest`, removed unnecessary test cases
			pr-link: Alluxio#18178
			change-id: cid-dac1c4c2e4e403bbf4ad3cbdde9eaf43ce20e046
  • Loading branch information
voddle authored and ssz1997 committed Dec 15, 2023
1 parent b379fbf commit ad3dcda
Showing 1 changed file with 12 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@
import alluxio.AlluxioTestDirectory;
import alluxio.AlluxioURI;
import alluxio.Constants;
import alluxio.annotation.dora.DoraTestTodoItem;
import alluxio.client.ReadType;
import alluxio.client.file.FileInStream;
import alluxio.client.file.FileOutStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemTestUtils;
import alluxio.client.file.URIStatus;
import alluxio.conf.PropertyKey;
import alluxio.grpc.CreateFilePOptions;
import alluxio.grpc.OpenFilePOptions;
Expand All @@ -29,16 +26,13 @@
import alluxio.security.authorization.Mode;
import alluxio.testutils.BaseIntegrationTest;
import alluxio.testutils.LocalAlluxioClusterResource;
import alluxio.util.CommonUtils;
import alluxio.util.io.BufferUtils;
import alluxio.util.io.PathUtils;
import alluxio.wire.FileBlockInfo;
import alluxio.worker.block.BlockStoreType;

import com.google.common.collect.ImmutableList;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand All @@ -59,9 +53,6 @@
* Integration tests for {@link alluxio.client.file.FileInStream}.
*/
@RunWith(Parameterized.class)
@Ignore
@DoraTestTodoItem(action = DoraTestTodoItem.Action.FIX, owner = "jiaming",
comment = "fix the tests")
public final class FileInStreamIntegrationTest extends BaseIntegrationTest {
// The block size needs to be sufficiently large based on TCP send/receive buffers, set to 1MB.
private static final int BLOCK_SIZE = Constants.MB;
Expand All @@ -86,7 +77,7 @@ public static Collection<Object[]> data() {
private String mTestPath;

@Rule
public Timeout mGlobalTimeout = Timeout.seconds(60);
public Timeout mGlobalTimeout = Timeout.seconds(300);

@Rule
public ExpectedException mThrown = ExpectedException.none();
Expand Down Expand Up @@ -139,12 +130,16 @@ private List<CreateFilePOptions> getOptionSet() {

/**
* Tests {@link FileInStream#read()} across block boundary.
* This test might cost about 240s
*/
@Test
@LocalAlluxioClusterResource.Config(
confParams = {PropertyKey.Name.USER_STREAMING_READER_CHUNK_SIZE_BYTES, "64KB"})
public void readTest1() throws Exception {
for (int k = MIN_LEN; k <= MAX_LEN; k += DELTA) {
// according to MAX_LEN, code inside this loop will be executed twice,
// the first loop will cost about 40s,
// the second time will cost about 200s
for (int k = MIN_LEN; k <= MAX_LEN; k += 3 * BLOCK_SIZE) {
for (CreateFilePOptions op : getOptionSet()) {
String filename = mTestPath + "/file_" + k + "_" + op.hashCode();
AlluxioURI uri = new AlluxioURI(filename);
Expand All @@ -154,22 +149,8 @@ public void readTest1() throws Exception {
int value = is.read();
int cnt = 0;
while (value != -1) {
Assert.assertTrue(value >= 0);
Assert.assertTrue(value < 256);
ret[cnt++] = (byte) value;
value = is.read();
}
Assert.assertEquals(cnt, k);
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(k, ret));
is.close();

is = mFileSystem.openFile(uri, FileSystemTestUtils.toOpenFileOptions(op));
ret = new byte[k];
value = is.read();
cnt = 0;
while (value != -1) {
Assert.assertTrue(value >= 0);
Assert.assertTrue(value < 256);
Assert.assertTrue(value >= 0 && value < 256);
// Assert.assertTrue(value < 256);
ret[cnt++] = (byte) value;
value = is.read();
}
Expand Down Expand Up @@ -433,7 +414,7 @@ public void run() {
PropertyKey.Name.USER_STREAMING_READER_CHUNK_SIZE_BYTES, "64KB",
PropertyKey.Name.WORKER_RAMDISK_SIZE, "1GB"})
public void remoteReadLargeFile() throws Exception {
// write a file outside of Alluxio
// write a file outside Alluxio
AlluxioURI filePath = new AlluxioURI(mTestPath + "/test");
try (FileOutStream os = mFileSystem.createFile(filePath, CreateFilePOptions.newBuilder()
.setBlockSizeBytes(16 * Constants.MB).setWriteType(WritePType.THROUGH).build())) {
Expand Down Expand Up @@ -463,16 +444,14 @@ public void positionedReadWithoutCaching() throws Exception {

FileInStream is = mFileSystem.openFile(uri, FileSystemTestUtils.toOpenFileOptions(op));
byte[] ret = new byte[DELTA - 1];
Assert.assertEquals(DELTA - 1, is.positionedRead(MIN_LEN - DELTA + 1, ret, 0, DELTA));
Assert.assertEquals(DELTA - 1, is.positionedRead(MIN_LEN - DELTA + 1, ret, 0, DELTA - 1));
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(MIN_LEN - DELTA + 1, DELTA - 1, ret));
is.close();
}
}

@Test
@LocalAlluxioClusterResource.Config(
confParams = {PropertyKey.Name.USER_FILE_SEQUENTIAL_PREAD_THRESHOLD, "700KB"})
public void positionedReadWithLargeThreshold() throws Exception {
public void positionedRead() throws Exception {
List<CreateFilePOptions> optionSet = new ArrayList<>(2);
optionSet.add(mWriteBoth);
optionSet.add(mWriteUnderStore);
Expand All @@ -483,200 +462,10 @@ public void positionedReadWithLargeThreshold() throws Exception {
try (FileInStream is = mFileSystem.openFile(uri, FileSystemTestUtils.toOpenFileOptions(op))) {
byte[] ret = new byte[DELTA - 1];
Assert.assertEquals(DELTA - 1,
is.positionedRead(MIN_LEN - DELTA + 1, ret, 0, DELTA));
is.positionedRead(MIN_LEN - DELTA + 1, ret, 0, DELTA - 1));
Assert.assertTrue(
BufferUtils.equalIncreasingByteArray(MIN_LEN - DELTA + 1, DELTA - 1, ret));
}
}
}

@Test
@LocalAlluxioClusterResource.Config(
confParams = {PropertyKey.Name.USER_FILE_SEQUENTIAL_PREAD_THRESHOLD, "200KB"})
public void positionedReadWithSmallThreshold() throws Exception {
List<CreateFilePOptions> optionSet = new ArrayList<>(2);
optionSet.add(mWriteBoth);
optionSet.add(mWriteUnderStore);
for (CreateFilePOptions op : optionSet) {
String filename = mTestPath + "/file_" + MIN_LEN + "_" + op.hashCode();
AlluxioURI uri = new AlluxioURI(filename);

try (FileInStream is = mFileSystem.openFile(uri, FileSystemTestUtils.toOpenFileOptions(op))) {
byte[] ret = new byte[DELTA - 1];
Assert.assertEquals(DELTA - 1,
is.positionedRead(MIN_LEN - DELTA + 1, ret, 0, DELTA));
Assert.assertTrue(
BufferUtils.equalIncreasingByteArray(MIN_LEN - DELTA + 1, DELTA - 1, ret));
}
}
}

@Test(timeout = 10000)
@LocalAlluxioClusterResource.Config(
confParams = {PropertyKey.Name.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS, "2000"})
public void asyncCacheFirstBlock() throws Exception {
String filename = mTestPath + "/file_" + MAX_LEN + "_" + mWriteUnderStore.hashCode();
AlluxioURI uri = new AlluxioURI(filename);

for (ReadType readType : ReadType.values()) {
mFileSystem.free(uri);
CommonUtils.waitFor("No in-Alluxio data left from previous iteration.", () -> {
try {
URIStatus st = mFileSystem.getStatus(uri);
return st.getInAlluxioPercentage() == 0;
} catch (Exception e) {
return false;
}
});
FileInStream is = mFileSystem.openFile(uri,
OpenFilePOptions.newBuilder().setReadType(readType.toProto()).build());
is.read();
URIStatus status = mFileSystem.getStatus(uri);
// if the test is running extremely slow, this check can happen after the worker reports
// the newly cached blocks to master, and thus failing the assertion
Assert.assertEquals(0, status.getInAlluxioPercentage());
is.close();
if (readType.isCache()) {
CommonUtils.waitFor("First block to be cached.", () -> {
try {
URIStatus st = mFileSystem.getStatus(uri);
boolean achieved = true;
// Expect only first block to be cached, other blocks should be empty in Alluxio
for (int i = 0; i < st.getFileBlockInfos().size(); i++) {
FileBlockInfo info = st.getFileBlockInfos().get(i);
if (i == 0) {
achieved = achieved && !info.getBlockInfo().getLocations().isEmpty();
} else {
achieved = achieved && info.getBlockInfo().getLocations().isEmpty();
}
}
return achieved;
} catch (Exception e) {
return false;
}
});
} else {
Thread.sleep(1000);
status = mFileSystem.getStatus(uri);
Assert.assertEquals(0, status.getInAlluxioPercentage());
}
}
}

@Test(timeout = 10000)
@LocalAlluxioClusterResource.Config(
confParams = {PropertyKey.Name.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS, "2000"})
public void asyncCacheAfterSeek() throws Exception {
String filename = mTestPath + "/file_" + MAX_LEN + "_" + mWriteUnderStore.hashCode();
AlluxioURI uri = new AlluxioURI(filename);

for (ReadType readType : ReadType.values()) {
mFileSystem.free(uri);
CommonUtils.waitFor("No in-Alluxio data left from previous iteration.", () -> {
try {
URIStatus st = mFileSystem.getStatus(uri);
return st.getInAlluxioPercentage() == 0;
} catch (Exception e) {
return false;
}
});
FileInStream is = mFileSystem.openFile(uri,
OpenFilePOptions.newBuilder().setReadType(readType.toProto()).build());
URIStatus status = mFileSystem.getStatus(uri);
is.seek(status.getBlockSizeBytes() + 1);
is.read();
status = mFileSystem.getStatus(uri);
Assert.assertEquals(0, status.getInAlluxioPercentage());
is.close();
if (readType.isCache()) {
CommonUtils.waitFor("Second block to be cached.", () -> {
try {
URIStatus st = mFileSystem.getStatus(uri);
boolean achieved = true;
// Expect only second block to be cached, other blocks should be empty in Alluxio
for (int i = 0; i < st.getFileBlockInfos().size(); i++) {
FileBlockInfo info = st.getFileBlockInfos().get(i);
if (i == 1) {
achieved = achieved && !info.getBlockInfo().getLocations().isEmpty();
} else {
achieved = achieved && info.getBlockInfo().getLocations().isEmpty();
}
}
return achieved;
} catch (Exception e) {
return false;
}
});
} else {
Thread.sleep(1000);
status = mFileSystem.getStatus(uri);
Assert.assertEquals(0, status.getInAlluxioPercentage());
}
}
}

@Test(timeout = 10000)
public void asyncCacheFirstBlockPRead() throws Exception {
String filename = mTestPath + "/file_" + MAX_LEN + "_" + mWriteUnderStore.hashCode();
AlluxioURI uri = new AlluxioURI(filename);

for (ReadType readType : ReadType.values()) {
mFileSystem.free(uri);
CommonUtils.waitFor("No in-Alluxio data left from previous iteration.", () -> {
try {
URIStatus st = mFileSystem.getStatus(uri);
return st.getInAlluxioPercentage() == 0;
} catch (Exception e) {
return false;
}
});
FileInStream is = mFileSystem.openFile(uri,
OpenFilePOptions.newBuilder().setReadType(readType.toProto()).build());
// Positioned reads trigger async caching after reading and do not need to wait for a close
// or a block boundary to be crossed.
URIStatus status = mFileSystem.getStatus(uri);
Assert.assertEquals(0, status.getInAlluxioPercentage());
is.positionedRead(BLOCK_SIZE / 2, new byte[1], 0, 1);
if (readType.isCache()) {
CommonUtils.waitFor("First block to be cached.", () -> {
try {
URIStatus st = mFileSystem.getStatus(uri);
boolean achieved = true;
// Expect only first block to be cached, other blocks should be empty in Alluxio
for (int i = 0; i < st.getFileBlockInfos().size(); i++) {
FileBlockInfo info = st.getFileBlockInfos().get(i);
if (i == 0) {
achieved = achieved && !info.getBlockInfo().getLocations().isEmpty();
} else {
achieved = achieved && info.getBlockInfo().getLocations().isEmpty();
}
}
return achieved;
} catch (Exception e) {
return false;
}
});
} else {
Thread.sleep(1000);
status = mFileSystem.getStatus(uri);
Assert.assertEquals(0, status.getInAlluxioPercentage());
}
is.close();
}
}

@Test
public void syncCacheFirstBlock() throws Exception {
String filename = mTestPath + "/file_" + MAX_LEN + "_" + mWriteUnderStore.hashCode();
AlluxioURI uri = new AlluxioURI(filename);

FileInStream is = mFileSystem.openFile(uri,
OpenFilePOptions.newBuilder().setReadType(ReadPType.CACHE).build());
URIStatus status = mFileSystem.getStatus(uri);
byte[] data = new byte[(int) status.getBlockSizeBytes() + 1];
is.read(data);
status = mFileSystem.getStatus(uri);
Assert.assertFalse(status.getFileBlockInfos().get(0).getBlockInfo().getLocations().isEmpty());
is.close();
}
}

0 comments on commit ad3dcda

Please sign in to comment.