Skip to content

Commit

Permalink
Adopt new enum type
Browse files Browse the repository at this point in the history
  • Loading branch information
ttnghia committed Nov 4, 2023
1 parent 1b5112d commit 293e27c
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ public org.apache.iceberg.io.CloseableIterator<ColumnarBatch> iterator() {
new Path(input.location()), clippedBlocks, fileReadSchema, caseSensitive,
partReaderSparkSchema, debugDumpPrefix, debugDumpAlways,
maxBatchSizeRows, maxBatchSizeBytes, targetBatchSizeBytes, useChunkedReader, metrics,
DateTimeRebaseLegacy$.MODULE$.toString(), // dateRebaseMode
DateTimeRebaseLegacy$.MODULE$.toString(), // timestampRebaseMode
DateTimeRebaseLegacy$.MODULE$, // dateRebaseMode
DateTimeRebaseLegacy$.MODULE$, // timestampRebaseMode
true, // hasInt96Timestamps
false // useFieldId
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ static class IcebergParquetExtraInfo extends ParquetExtraInfo {
private final Schema expectedSchema;
private final PartitionSpec partitionSpec;

IcebergParquetExtraInfo(String dateRebaseMode, String timestampRebaseMode,
IcebergParquetExtraInfo(DateTimeRebaseMode dateRebaseMode,
DateTimeRebaseMode timestampRebaseMode,
boolean hasInt96Timestamps,
Map<Integer, ?> idToConstant, Schema expectedSchema,
PartitionSpec partitionSpec) {
Expand Down Expand Up @@ -310,8 +311,8 @@ protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst) {
ParquetFileInfoWithBlockMeta parquetBlockMeta = ParquetFileInfoWithBlockMeta.apply(
new Path(new URI(fst.file().path().toString())), clippedBlocks,
InternalRow.empty(), fileReadSchema, partReaderSparkSchema,
"CORRECTED", // dateRebaseMode
"CORRECTED", // timestampRebaseMode
DateTimeRebaseLegacy$.MODULE$, // dateRebaseMode
DateTimeRebaseLegacy$.MODULE$, // timestampRebaseMode
true // hasInt96Timestamps
);
return new FilteredParquetFileInfo(parquetBlockMeta, updatedConstants, updatedSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -157,15 +156,15 @@ object GpuParquetScan {
tagSupport(scan.sparkSession, schema, scanMeta)
}

def throwIfRebaseNeededInExceptionMode(table: Table, dateRebaseMode: String,
timestampRebaseMode: String): Unit = {
def throwIfRebaseNeededInExceptionMode(table: Table, dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode): Unit = {
(0 until table.getNumberOfColumns).foreach { i =>
val col = table.getColumn(i)
if (dateRebaseMode.equals("EXCEPTION") &&
if (dateRebaseMode == DateTimeRebaseException &&
DateTimeRebaseUtils.isDateRebaseNeededInRead(col)) {
throw DataSourceUtils.newRebaseExceptionInRead("Parquet")
}
else if (timestampRebaseMode.equals("EXCEPTION") &&
else if (timestampRebaseMode == DateTimeRebaseException &&
DateTimeRebaseUtils.isTimeRebaseNeededInRead(col)) {
throw DataSourceUtils.newRebaseExceptionInRead("Parquet")
}
Expand Down Expand Up @@ -310,7 +309,8 @@ object GpuParquetScan {
// contains meta about all the blocks in a file
case class ParquetFileInfoWithBlockMeta(filePath: Path, blocks: collection.Seq[BlockMetaData],
partValues: InternalRow, schema: MessageType, readSchema: StructType,
dateRebaseMode: String, timestampRebaseMode: String, hasInt96Timestamps: Boolean)
dateRebaseMode: DateTimeRebaseMode, timestampRebaseMode: DateTimeRebaseMode,
hasInt96Timestamps: Boolean)

private case class BlockMetaWithPartFile(meta: ParquetFileInfoWithBlockMeta, file: PartitionedFile)

Expand Down Expand Up @@ -674,8 +674,8 @@ private case class GpuParquetFileFilterHandler(
block.setRowCount(numRows)
val schema = new MessageType("root")
return ParquetFileInfoWithBlockMeta(filePath, Seq(block), file.partitionValues,
schema, readDataSchema, LegacyBehaviorPolicy.LEGACY.toString,
LegacyBehaviorPolicy.LEGACY.toString, hasInt96Timestamps = false)
schema, readDataSchema, DateTimeRebaseLegacy, DateTimeRebaseLegacy,
hasInt96Timestamps = false)
}

tableFooter.serializeThriftFile()
Expand Down Expand Up @@ -750,8 +750,8 @@ private case class GpuParquetFileFilterHandler(
}

ParquetFileInfoWithBlockMeta(filePath, clipped, file.partitionValues,
clippedSchema, readDataSchema, dateRebaseModeForThisFile.toString,
timestampRebaseModeForThisFile.toString, hasInt96Timestamps)
clippedSchema, readDataSchema, dateRebaseModeForThisFile,
timestampRebaseModeForThisFile, hasInt96Timestamps)
}
}

Expand Down Expand Up @@ -1147,8 +1147,8 @@ case class GpuParquetMultiFilePartitionReaderFactory(
logWarning(s"Skipped missing file: ${file.filePath}", e)
val meta = ParquetFileInfoWithBlockMeta(
new Path(new URI(file.filePath.toString())), Seq.empty,
file.partitionValues, null, null, LegacyBehaviorPolicy.LEGACY.toString,
LegacyBehaviorPolicy.LEGACY.toString, hasInt96Timestamps = false)
file.partitionValues, null, null, DateTimeRebaseLegacy, DateTimeRebaseLegacy,
hasInt96Timestamps = false)
BlockMetaWithPartFile(meta, file)
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
Expand All @@ -1159,8 +1159,8 @@ case class GpuParquetMultiFilePartitionReaderFactory(
s"Skipped the rest of the content in the corrupted file: ${file.filePath}", e)
val meta = ParquetFileInfoWithBlockMeta(
new Path(new URI(file.filePath.toString())), Seq.empty,
file.partitionValues, null, null, LegacyBehaviorPolicy.LEGACY.toString,
LegacyBehaviorPolicy.LEGACY.toString, hasInt96Timestamps = false)
file.partitionValues, null, null, DateTimeRebaseLegacy, DateTimeRebaseLegacy,
hasInt96Timestamps = false)
BlockMetaWithPartFile(meta, file)
}
}
Expand Down Expand Up @@ -1324,10 +1324,10 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics

val copyBufferSize = conf.getInt("parquet.read.allocation.size", 8 * 1024 * 1024)

def checkIfNeedToSplitBlocks(currentDateRebaseMode: String,
nextDateRebaseMode: String,
currentTimestampRebaseMode: String,
nextTimestampRebaseMode: String,
def checkIfNeedToSplitBlocks(currentDateRebaseMode: DateTimeRebaseMode,
nextDateRebaseMode: DateTimeRebaseMode,
currentTimestampRebaseMode: DateTimeRebaseMode,
nextTimestampRebaseMode: DateTimeRebaseMode,
currentSchema: SchemaBase,
nextSchema: SchemaBase,
currentFilePath: String,
Expand Down Expand Up @@ -1786,7 +1786,8 @@ private case class ParquetDataBlock(dataBlock: BlockMetaData) extends DataBlockB
}

/** Parquet extra information containing rebase modes and whether there is int96 timestamp */
class ParquetExtraInfo(val dateRebaseMode: String, val timestampRebaseMode: String,
class ParquetExtraInfo(val dateRebaseMode: DateTimeRebaseMode,
val timestampRebaseMode: DateTimeRebaseMode,
val hasInt96Timestamps: Boolean) extends ExtraInfo

// contains meta about a single block in a file
Expand Down Expand Up @@ -2283,8 +2284,8 @@ class MultiFileCloudParquetPartitionReader(
override val origPartitionedFile: Option[PartitionedFile],
bufferSize: Long,
override val bytesRead: Long,
dateRebaseMode: String,
timestampRebaseMode: String,
dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode,
hasInt96Timestamps: Boolean,
clippedSchema: MessageType,
readSchema: StructType,
Expand All @@ -2300,8 +2301,8 @@ class MultiFileCloudParquetPartitionReader(
override val origPartitionedFile: Option[PartitionedFile],
override val memBuffersAndSizes: Array[SingleHMBAndMeta],
override val bytesRead: Long,
dateRebaseMode: String,
timestampRebaseMode: String,
dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode,
hasInt96Timestamps: Boolean,
clippedSchema: MessageType,
readSchema: StructType,
Expand Down Expand Up @@ -2332,15 +2333,15 @@ class MultiFileCloudParquetPartitionReader(
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${file.filePath}", e)
HostMemoryEmptyMetaData(file, origPartitionedFile, 0, 0,
LegacyBehaviorPolicy.LEGACY.toString, LegacyBehaviorPolicy.LEGACY.toString,
DateTimeRebaseLegacy, DateTimeRebaseLegacy,
hasInt96Timestamps = false, null, null, 0)
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest of the content in the corrupted file: ${file.filePath}", e)
HostMemoryEmptyMetaData(file, origPartitionedFile, 0, 0,
LegacyBehaviorPolicy.LEGACY.toString, LegacyBehaviorPolicy.LEGACY.toString,
DateTimeRebaseLegacy, DateTimeRebaseLegacy,
hasInt96Timestamps = false, null, null, 0)
} finally {
TrampolineUtil.unsetTaskContext()
Expand Down Expand Up @@ -2495,8 +2496,8 @@ class MultiFileCloudParquetPartitionReader(
}

private def readBufferToBatches(
dateRebaseMode: String,
timestampRebaseMode: String,
dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode,
hasInt96Timestamps: Boolean,
clippedSchema: MessageType,
readDataSchema: StructType,
Expand Down Expand Up @@ -2562,8 +2563,8 @@ object MakeParquetTableProducer extends Logging {
offset: Long,
len: Long,
metrics : Map[String, GpuMetric],
dateRebaseMode: String,
timestampRebaseMode: String,
dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode,
hasInt96Timestamps: Boolean,
isSchemaCaseSensitive: Boolean,
useFieldId: Boolean,
Expand Down Expand Up @@ -2626,8 +2627,8 @@ case class ParquetTableReader(
offset: Long,
len: Long,
metrics : Map[String, GpuMetric],
dateRebaseMode: String,
timestampRebaseMode: String,
dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode,
hasInt96Timestamps: Boolean,
isSchemaCaseSensitive: Boolean,
useFieldId: Boolean,
Expand Down Expand Up @@ -2714,8 +2715,8 @@ class ParquetPartitionReader(
targetBatchSizeBytes: Long,
useChunkedReader: Boolean,
override val execMetrics: Map[String, GpuMetric],
dateRebaseMode: String,
timestampRebaseMode: String,
dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode,
hasInt96Timestamps: Boolean,
useFieldId: Boolean) extends FilePartitionReaderBase(conf, execMetrics)
with ParquetPartitionReaderBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.util.RebaseDateTime
import org.apache.spark.sql.rapids.execution.TrampolineUtil

/**
* Mirror of Spark's LegaclBehaviorPolicy
* Mirror of Spark's LegacyBehaviorPolicy
*/
sealed abstract class DateTimeRebaseMode

Expand Down

0 comments on commit 293e27c

Please sign in to comment.