diff --git a/dora/core/server/master/src/main/java/alluxio/master/job/UfsStatusIterable.java b/dora/core/server/master/src/main/java/alluxio/master/job/UfsStatusIterable.java index d9f30c8fd8b3..34f56126410b 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/job/UfsStatusIterable.java +++ b/dora/core/server/master/src/main/java/alluxio/master/job/UfsStatusIterable.java @@ -62,7 +62,9 @@ public Iterator iterator() { AuthenticatedClientUser.set(mUser.orElse(null)); UfsStatus rootUfsStatus = mUfs.getStatus(mPath); if (rootUfsStatus != null && rootUfsStatus.isFile()) { - rootUfsStatus.setUfsFullPath(mRootUri); + if (rootUfsStatus.getUfsFullPath() == null) { + rootUfsStatus.setUfsFullPath(mRootUri); + } return Iterators.filter(Iterators.singletonIterator(rootUfsStatus), mFilter::test); } Iterator statuses = @@ -72,7 +74,9 @@ public Iterator iterator() { } else { return Iterators.transform(Iterators.filter(statuses, mFilter::test), (it) -> { - it.setUfsFullPath(mRootUri.join(it.getName())); + if (it.getUfsFullPath() == null) { + it.setUfsFullPath(mRootUri.join(it.getName())); + } return it; }); } diff --git a/dora/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUfsStatusIterator.java b/dora/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUfsStatusIterator.java index 9e4e5ce541f8..941dac2944e1 100644 --- a/dora/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUfsStatusIterator.java +++ b/dora/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUfsStatusIterator.java @@ -39,6 +39,10 @@ public class HdfsUfsStatusIterator implements Iterator { private final FileSystem mFs; + private final String mUfsSchemaUri; + + private final String mPathToList; + /** * Each element is a pair of (full path, UfsStatus). */ @@ -48,12 +52,14 @@ public class HdfsUfsStatusIterator implements Iterator { /** * HDFS under file system status iterator. - * @param path the path for listing + * @param pathToList the path for listing * @param fs the hdfs file system */ - public HdfsUfsStatusIterator(String path, FileSystem fs) { + public HdfsUfsStatusIterator(String pathToList, FileSystem fs) { mFs = fs; - initQueue(path); + mUfsSchemaUri = mFs.getUri().toString(); + mPathToList = pathToList; + initQueue(pathToList); } private void initQueue(String path) { @@ -68,6 +74,9 @@ private void initQueue(String path) { @Override public boolean hasNext() { try { + if (mHdfsRemoteIterator == null) { + return false; + } if (mHdfsRemoteIterator.hasNext()) { return true; } @@ -97,24 +106,49 @@ public UfsStatus next() { FileStatus fileStatus = mHdfsRemoteIterator.next(); UfsStatus ufsStatus; Path path = fileStatus.getPath(); - AlluxioURI alluxioUri = new AlluxioURI(path.toString()); + + AlluxioURI rootUri = new AlluxioURI(mUfsSchemaUri); if (fileStatus.isDirectory()) { - ufsStatus = new UfsDirectoryStatus(path.getName(), fileStatus.getOwner(), + String relativePath = extractRelativePath(path.toUri().getPath()); + ufsStatus = new UfsDirectoryStatus(relativePath, fileStatus.getOwner(), fileStatus.getGroup(), fileStatus.getPermission().toShort(), fileStatus.getModificationTime()); + ufsStatus.setUfsFullPath(rootUri.join(ufsStatus.getName())); mDirPathsToProcess.addLast(new Pair<>(path.toString(), ufsStatus)); } else { String contentHash = UnderFileSystemUtils.approximateContentHash( fileStatus.getLen(), fileStatus.getModificationTime()); - ufsStatus = new UfsFileStatus(path.getName(), contentHash, fileStatus.getLen(), + String relativePath = extractRelativePath(path.toUri().getPath()); + ufsStatus = new UfsFileStatus(relativePath, contentHash, fileStatus.getLen(), fileStatus.getModificationTime(), fileStatus.getOwner(), fileStatus.getGroup(), fileStatus.getPermission().toShort(), fileStatus.getBlockSize()); + ufsStatus.setUfsFullPath(rootUri.join(ufsStatus.getName())); } - ufsStatus.setUfsFullPath(alluxioUri); return ufsStatus; } catch (IOException e) { throw new RuntimeException(e); } } + + private String extractRelativePath(String fullPath) { + String fullPathWithoutSchema = trimPathPrefix(mUfsSchemaUri, fullPath); + String pathToListWithoutSchema = trimPathPrefix(mUfsSchemaUri, mPathToList); + if (!pathToListWithoutSchema.startsWith("/")) { + pathToListWithoutSchema = "/" + pathToListWithoutSchema; + } + return trimPathPrefix(pathToListWithoutSchema, fullPathWithoutSchema); + } + + private String trimPathPrefix(String prefix, String path) { + if (path.startsWith(prefix)) { + if (prefix.endsWith("/")) { + return path.substring(prefix.length()); + } else { + return path.substring(prefix.length() + 1); + } + } else { + return path; + } + } } diff --git a/dora/underfs/hdfs/src/test/java/alluxio/underfs/hdfs/hdfs3/HdfsUnderFileSystemIntegrationTest.java b/dora/underfs/hdfs/src/test/java/alluxio/underfs/hdfs/hdfs3/HdfsUnderFileSystemIntegrationTest.java index 5afa0467e269..c8f3386c08e7 100644 --- a/dora/underfs/hdfs/src/test/java/alluxio/underfs/hdfs/hdfs3/HdfsUnderFileSystemIntegrationTest.java +++ b/dora/underfs/hdfs/src/test/java/alluxio/underfs/hdfs/hdfs3/HdfsUnderFileSystemIntegrationTest.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -122,22 +124,71 @@ public void testListUfsStatusIterator() throws Exception { writeMultiBlockFileTest("/testRoot/testDirectory2/testFileD"); writeMultiBlockFileTest("/testRoot/testDirectory2/testDirectory3/testFileE"); + String ufsSchema = mUfs.getFs().getUri().toString(); + + testListPath(ufsSchema + "/testRoot"); + testListPath(ufsSchema + "/testRoot/"); + testListPath(ufsSchema + "/testRoot/testDirectory2"); + testListPath(ufsSchema + "/testRoot/testDirectory2/"); + testListPath(ufsSchema + "/testRoot/testDirectory2/testFileB"); + testListPath(ufsSchema + "/"); + + testListPath("/testRoot"); + testListPath("/testRoot/"); + testListPath("/testRoot/testDirectory2"); + testListPath("/testRoot/testDirectory2/"); + testListPath("/testRoot/testDirectory2/testFileB"); + testListPath("/"); + Iterator iterator = mUfs.listStatusIterable("/testRoot", ListOptions.defaults(), null, 1000); - List listResult = new ArrayList<>(); + List iteratorListRes = new ArrayList<>(); while (iterator.hasNext()) { UfsStatus ufsStatus = iterator.next(); - listResult.add(ufsStatus); + iteratorListRes.add(ufsStatus); + } + + assertEquals(8, iteratorListRes.size()); + assertEquals("testDirectory1", iteratorListRes.get(0).getName()); + assertEquals("testDirectory2", iteratorListRes.get(1).getName()); + assertEquals("testFileA", iteratorListRes.get(2).getName()); + assertEquals("testFileC", iteratorListRes.get(3).getName()); + assertEquals("testDirectory1/testFileB", iteratorListRes.get(4).getName()); + assertEquals("testDirectory2/testDirectory3", iteratorListRes.get(5).getName()); + assertEquals("testDirectory2/testFileD", iteratorListRes.get(6).getName()); + assertEquals("testDirectory2/testDirectory3/testFileE", iteratorListRes.get(7).getName()); + } + + private void testListPath(String path) throws IOException { + Iterator iterator = mUfs.listStatusIterable(path, + ListOptions.defaults(), null, 1000); + + List fullListRes = null; + UfsStatus[] fullListResArray = mUfs.listStatus(path, + ListOptions.defaults().setRecursive(true)); + if (fullListResArray == null) { + fullListRes = Collections.emptyList(); + } else { + fullListRes = Arrays.asList(fullListResArray); + } + + List iteratorListRes = new ArrayList<>(); + while (iterator.hasNext()) { + UfsStatus ufsStatus = iterator.next(); + iteratorListRes.add(ufsStatus); + } + + assertEquals(fullListRes.size(), iteratorListRes.size()); + for (int i = 0; i < fullListRes.size(); i++) { + UfsStatus fullListUfsStatus = fullListRes.get(i); + UfsStatus iteratorListUfsStatus = iteratorListRes.get(i); + assertEquals(fullListUfsStatus.getName(), iteratorListUfsStatus.getName()); + assertEquals(fullListUfsStatus.getGroup(), iteratorListUfsStatus.getGroup()); + assertEquals(fullListUfsStatus.getOwner(), iteratorListUfsStatus.getOwner()); + assertEquals(fullListUfsStatus.getMode(), iteratorListUfsStatus.getMode()); + assertEquals(fullListUfsStatus.isDirectory(), iteratorListUfsStatus.isDirectory()); + assertEquals(fullListUfsStatus.isFile(), iteratorListUfsStatus.isFile()); } - assertEquals(8, listResult.size()); - assertEquals("testDirectory1", listResult.get(0).getName()); - assertEquals("testDirectory2", listResult.get(1).getName()); - assertEquals("testFileA", listResult.get(2).getName()); - assertEquals("testFileC", listResult.get(3).getName()); - assertEquals("testFileB", listResult.get(4).getName()); - assertEquals("testDirectory3", listResult.get(5).getName()); - assertEquals("testFileD", listResult.get(6).getName()); - assertEquals("testFileE", listResult.get(7).getName()); } }