From 60bdf142b671e3ac501fe0b414e946042fe67b5d Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Mon, 27 Nov 2023 13:02:16 -0500 Subject: [PATCH] try fix ci --- .../hudi/SparkFileFormatInternalRowReaderContext.scala | 7 ++++--- .../hudi/common/table/read/HoodieFileGroupReader.java | 7 +++++-- .../common/table/read/TestHoodieFileGroupReaderBase.java | 3 +-- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala index 99e1b60e9e41..88a0499deea6 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala @@ -152,8 +152,9 @@ class SparkFileFormatInternalRowReaderContext(readerMaps: mutable.Map[Long, Part } override def projectRecord(from: Schema, to: Schema): UnaryOperator[InternalRow] = { - val projection = HoodieCatalystExpressionUtils.generateUnsafeProjection(AvroConversionUtils.convertAvroSchemaToStructType(from), - AvroConversionUtils.convertAvroSchemaToStructType(to)) - u: InternalRow => projection(u) + val projection = HoodieCatalystExpressionUtils.generateUnsafeProjection(AvroConversionUtils.convertAvroSchemaToStructType(from), + AvroConversionUtils.convertAvroSchemaToStructType(to)) + //needed for spark2.4 + u: InternalRow => projection(u).asInstanceOf[InternalRow] } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java index 21762e7a138f..52c56d832f7a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java @@ -41,6 +41,7 @@ import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import java.io.Closeable; import java.io.IOException; @@ -52,6 +53,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY; import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; @@ -106,7 +108,7 @@ public HoodieFileGroupReader(HoodieReaderContext readerContext, this.readerContext = readerContext; this.hadoopConf = hadoopConf; this.hoodieBaseFileOption = fileSlice.getBaseFile(); - this.logFiles = fileSlice.getLogFiles().collect(Collectors.toList()); + this.logFiles = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); this.props = props; this.start = start; this.length = length; @@ -272,7 +274,8 @@ private void scanLogFiles() { .withReadBlocksLazily(getBooleanWithAltKeys(props, HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE)) .withReverseReader(false) .withBufferSize(getIntWithAltKeys(props, HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE)) - .withPartition("needtofix") + .withPartition(getRelativePartitionPath( + new Path(readerState.tablePath), logFiles.get(0).getPath().getParent())) .withRecordMerger(recordMerger) .withRecordBuffer(recordBuffer) .build(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java index c9b8ea40c379..53710357e4f2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java @@ -44,7 +44,6 @@ import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -157,7 +156,7 @@ private void validateOutputFromFileGroupReader(Configuration hadoopConf, SyncableFileSystemView fsView = viewManager.getFileSystemView(metaClient); FileSlice fileSlice = fsView.getAllFileSlices(partitionPaths[0]).findFirst().get(); List logFilePathList = getLogFileListFromFileSlice(fileSlice); - Collections.sort(logFilePathList); + //Collections.sort(logFilePathList); assertEquals(expectedLogFileNum, logFilePathList.size()); List actualRecordList = new ArrayList<>();