diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java index b2967e345448..9a56db4cea6a 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java @@ -140,8 +140,8 @@ public org.apache.iceberg.io.CloseableIterator iterator() { new Path(input.location()), clippedBlocks, fileReadSchema, caseSensitive, partReaderSparkSchema, debugDumpPrefix, debugDumpAlways, maxBatchSizeRows, maxBatchSizeBytes, targetBatchSizeBytes, useChunkedReader, metrics, - DateTimeRebaseLegacy$.MODULE$.toString(), // dateRebaseMode - DateTimeRebaseLegacy$.MODULE$.toString(), // timestampRebaseMode + DateTimeRebaseLegacy$.MODULE$, // dateRebaseMode + DateTimeRebaseLegacy$.MODULE$, // timestampRebaseMode true, // hasInt96Timestamps false // useFieldId ); diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java index 0170300d68c5..52c373259588 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java @@ -220,7 +220,8 @@ static class IcebergParquetExtraInfo extends ParquetExtraInfo { private final Schema expectedSchema; private final PartitionSpec partitionSpec; - IcebergParquetExtraInfo(String dateRebaseMode, String timestampRebaseMode, + IcebergParquetExtraInfo(DateTimeRebaseMode dateRebaseMode, + DateTimeRebaseMode timestampRebaseMode, boolean hasInt96Timestamps, Map idToConstant, Schema expectedSchema, PartitionSpec partitionSpec) { @@ -310,8 +311,8 @@ protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst) { ParquetFileInfoWithBlockMeta parquetBlockMeta = ParquetFileInfoWithBlockMeta.apply( new Path(new URI(fst.file().path().toString())), clippedBlocks, InternalRow.empty(), fileReadSchema, partReaderSparkSchema, - "CORRECTED", // dateRebaseMode - "CORRECTED", // timestampRebaseMode + DateTimeRebaseLegacy$.MODULE$, // dateRebaseMode + DateTimeRebaseLegacy$.MODULE$, // timestampRebaseMode true // hasInt96Timestamps ); return new FilteredParquetFileInfo(parquetBlockMeta, updatedConstants, updatedSchema); diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index f5449b641798..7a8676858e21 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -68,7 +68,6 @@ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF import org.apache.spark.sql.execution.datasources.v2.FileScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ @@ -157,15 +156,15 @@ object GpuParquetScan { tagSupport(scan.sparkSession, schema, scanMeta) } - def throwIfRebaseNeededInExceptionMode(table: Table, dateRebaseMode: String, - timestampRebaseMode: String): Unit = { + def throwIfRebaseNeededInExceptionMode(table: Table, dateRebaseMode: DateTimeRebaseMode, + timestampRebaseMode: DateTimeRebaseMode): Unit = { (0 until table.getNumberOfColumns).foreach { i => val col = table.getColumn(i) - if (dateRebaseMode.equals("EXCEPTION") && + if (dateRebaseMode == DateTimeRebaseException && DateTimeRebaseUtils.isDateRebaseNeededInRead(col)) { throw DataSourceUtils.newRebaseExceptionInRead("Parquet") } - else if (timestampRebaseMode.equals("EXCEPTION") && + else if (timestampRebaseMode == DateTimeRebaseException && DateTimeRebaseUtils.isTimeRebaseNeededInRead(col)) { throw DataSourceUtils.newRebaseExceptionInRead("Parquet") } @@ -310,7 +309,8 @@ object GpuParquetScan { // contains meta about all the blocks in a file case class ParquetFileInfoWithBlockMeta(filePath: Path, blocks: collection.Seq[BlockMetaData], partValues: InternalRow, schema: MessageType, readSchema: StructType, - dateRebaseMode: String, timestampRebaseMode: String, hasInt96Timestamps: Boolean) + dateRebaseMode: DateTimeRebaseMode, timestampRebaseMode: DateTimeRebaseMode, + hasInt96Timestamps: Boolean) private case class BlockMetaWithPartFile(meta: ParquetFileInfoWithBlockMeta, file: PartitionedFile) @@ -674,8 +674,8 @@ private case class GpuParquetFileFilterHandler( block.setRowCount(numRows) val schema = new MessageType("root") return ParquetFileInfoWithBlockMeta(filePath, Seq(block), file.partitionValues, - schema, readDataSchema, LegacyBehaviorPolicy.LEGACY.toString, - LegacyBehaviorPolicy.LEGACY.toString, hasInt96Timestamps = false) + schema, readDataSchema, DateTimeRebaseLegacy, DateTimeRebaseLegacy, + hasInt96Timestamps = false) } tableFooter.serializeThriftFile() @@ -750,8 +750,8 @@ private case class GpuParquetFileFilterHandler( } ParquetFileInfoWithBlockMeta(filePath, clipped, file.partitionValues, - clippedSchema, readDataSchema, dateRebaseModeForThisFile.toString, - timestampRebaseModeForThisFile.toString, hasInt96Timestamps) + clippedSchema, readDataSchema, dateRebaseModeForThisFile, + timestampRebaseModeForThisFile, hasInt96Timestamps) } } @@ -1147,8 +1147,8 @@ case class GpuParquetMultiFilePartitionReaderFactory( logWarning(s"Skipped missing file: ${file.filePath}", e) val meta = ParquetFileInfoWithBlockMeta( new Path(new URI(file.filePath.toString())), Seq.empty, - file.partitionValues, null, null, LegacyBehaviorPolicy.LEGACY.toString, - LegacyBehaviorPolicy.LEGACY.toString, hasInt96Timestamps = false) + file.partitionValues, null, null, DateTimeRebaseLegacy, DateTimeRebaseLegacy, + hasInt96Timestamps = false) BlockMetaWithPartFile(meta, file) // Throw FileNotFoundException even if `ignoreCorruptFiles` is true case e: FileNotFoundException if !ignoreMissingFiles => throw e @@ -1159,8 +1159,8 @@ case class GpuParquetMultiFilePartitionReaderFactory( s"Skipped the rest of the content in the corrupted file: ${file.filePath}", e) val meta = ParquetFileInfoWithBlockMeta( new Path(new URI(file.filePath.toString())), Seq.empty, - file.partitionValues, null, null, LegacyBehaviorPolicy.LEGACY.toString, - LegacyBehaviorPolicy.LEGACY.toString, hasInt96Timestamps = false) + file.partitionValues, null, null, DateTimeRebaseLegacy, DateTimeRebaseLegacy, + hasInt96Timestamps = false) BlockMetaWithPartFile(meta, file) } } @@ -1324,10 +1324,10 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics val copyBufferSize = conf.getInt("parquet.read.allocation.size", 8 * 1024 * 1024) - def checkIfNeedToSplitBlocks(currentDateRebaseMode: String, - nextDateRebaseMode: String, - currentTimestampRebaseMode: String, - nextTimestampRebaseMode: String, + def checkIfNeedToSplitBlocks(currentDateRebaseMode: DateTimeRebaseMode, + nextDateRebaseMode: DateTimeRebaseMode, + currentTimestampRebaseMode: DateTimeRebaseMode, + nextTimestampRebaseMode: DateTimeRebaseMode, currentSchema: SchemaBase, nextSchema: SchemaBase, currentFilePath: String, @@ -1786,7 +1786,8 @@ private case class ParquetDataBlock(dataBlock: BlockMetaData) extends DataBlockB } /** Parquet extra information containing rebase modes and whether there is int96 timestamp */ -class ParquetExtraInfo(val dateRebaseMode: String, val timestampRebaseMode: String, +class ParquetExtraInfo(val dateRebaseMode: DateTimeRebaseMode, + val timestampRebaseMode: DateTimeRebaseMode, val hasInt96Timestamps: Boolean) extends ExtraInfo // contains meta about a single block in a file @@ -2283,8 +2284,8 @@ class MultiFileCloudParquetPartitionReader( override val origPartitionedFile: Option[PartitionedFile], bufferSize: Long, override val bytesRead: Long, - dateRebaseMode: String, - timestampRebaseMode: String, + dateRebaseMode: DateTimeRebaseMode, + timestampRebaseMode: DateTimeRebaseMode, hasInt96Timestamps: Boolean, clippedSchema: MessageType, readSchema: StructType, @@ -2300,8 +2301,8 @@ class MultiFileCloudParquetPartitionReader( override val origPartitionedFile: Option[PartitionedFile], override val memBuffersAndSizes: Array[SingleHMBAndMeta], override val bytesRead: Long, - dateRebaseMode: String, - timestampRebaseMode: String, + dateRebaseMode: DateTimeRebaseMode, + timestampRebaseMode: DateTimeRebaseMode, hasInt96Timestamps: Boolean, clippedSchema: MessageType, readSchema: StructType, @@ -2332,7 +2333,7 @@ class MultiFileCloudParquetPartitionReader( case e: FileNotFoundException if ignoreMissingFiles => logWarning(s"Skipped missing file: ${file.filePath}", e) HostMemoryEmptyMetaData(file, origPartitionedFile, 0, 0, - LegacyBehaviorPolicy.LEGACY.toString, LegacyBehaviorPolicy.LEGACY.toString, + DateTimeRebaseLegacy, DateTimeRebaseLegacy, hasInt96Timestamps = false, null, null, 0) // Throw FileNotFoundException even if `ignoreCorruptFiles` is true case e: FileNotFoundException if !ignoreMissingFiles => throw e @@ -2340,7 +2341,7 @@ class MultiFileCloudParquetPartitionReader( logWarning( s"Skipped the rest of the content in the corrupted file: ${file.filePath}", e) HostMemoryEmptyMetaData(file, origPartitionedFile, 0, 0, - LegacyBehaviorPolicy.LEGACY.toString, LegacyBehaviorPolicy.LEGACY.toString, + DateTimeRebaseLegacy, DateTimeRebaseLegacy, hasInt96Timestamps = false, null, null, 0) } finally { TrampolineUtil.unsetTaskContext() @@ -2495,8 +2496,8 @@ class MultiFileCloudParquetPartitionReader( } private def readBufferToBatches( - dateRebaseMode: String, - timestampRebaseMode: String, + dateRebaseMode: DateTimeRebaseMode, + timestampRebaseMode: DateTimeRebaseMode, hasInt96Timestamps: Boolean, clippedSchema: MessageType, readDataSchema: StructType, @@ -2562,8 +2563,8 @@ object MakeParquetTableProducer extends Logging { offset: Long, len: Long, metrics : Map[String, GpuMetric], - dateRebaseMode: String, - timestampRebaseMode: String, + dateRebaseMode: DateTimeRebaseMode, + timestampRebaseMode: DateTimeRebaseMode, hasInt96Timestamps: Boolean, isSchemaCaseSensitive: Boolean, useFieldId: Boolean, @@ -2626,8 +2627,8 @@ case class ParquetTableReader( offset: Long, len: Long, metrics : Map[String, GpuMetric], - dateRebaseMode: String, - timestampRebaseMode: String, + dateRebaseMode: DateTimeRebaseMode, + timestampRebaseMode: DateTimeRebaseMode, hasInt96Timestamps: Boolean, isSchemaCaseSensitive: Boolean, useFieldId: Boolean, @@ -2714,8 +2715,8 @@ class ParquetPartitionReader( targetBatchSizeBytes: Long, useChunkedReader: Boolean, override val execMetrics: Map[String, GpuMetric], - dateRebaseMode: String, - timestampRebaseMode: String, + dateRebaseMode: DateTimeRebaseMode, + timestampRebaseMode: DateTimeRebaseMode, hasInt96Timestamps: Boolean, useFieldId: Boolean) extends FilePartitionReaderBase(conf, execMetrics) with ParquetPartitionReaderBase { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/datetimeRebaseUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/datetimeRebaseUtils.scala index c691af0ac395..72e2666f29cd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/datetimeRebaseUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/datetimeRebaseUtils.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.util.RebaseDateTime import org.apache.spark.sql.rapids.execution.TrampolineUtil /** - * Mirror of Spark's LegaclBehaviorPolicy + * Mirror of Spark's LegacyBehaviorPolicy */ sealed abstract class DateTimeRebaseMode