Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[auto-merge] branch-24.10 to branch-24.12 [skip ci] [bot] #11568

Merged
merged 2 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ LEGACY timeParserPolicy support has the following limitations when running on th
- The proleptic Gregorian calendar is used instead of the hybrid Julian+Gregorian calendar
that Spark uses in legacy mode
- When format is `yyyyMMdd`, GPU only supports 8 digit strings. Spark supports like 7 digit
`2024101` string while GPU does not support.
`2024101` string while GPU does not support. Only tested `UTC` and `Asia/Shanghai` timezones.

## Formatting dates and timestamps as strings

Expand Down
1 change: 1 addition & 0 deletions integration_tests/src/main/python/date_time_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ def test_to_timestamp(parser_policy):

@pytest.mark.skipif(not is_supported_time_zone(), reason="not all time zones are supported now, refer to https://github.com/NVIDIA/spark-rapids/issues/6839, please update after all time zones are supported")
# Test years after 1900, refer to issues: https://github.com/NVIDIA/spark-rapids/issues/11543, https://github.com/NVIDIA/spark-rapids/issues/11539
@pytest.mark.skipif(get_test_tz() != "Asia/Shanghai" and get_test_tz() != "UTC", reason="https://github.com/NVIDIA/spark-rapids/issues/11562")
def test_yyyyMMdd_format_for_legacy_mode():
gen = StringGen('(19[0-9]{2}|[2-9][0-9]{3})([0-9]{4})')
assert_gpu_and_cpu_are_equal_sql(
Expand Down
34 changes: 18 additions & 16 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2822,6 +2822,12 @@ object MakeOrcTableProducer extends Logging {
debugDumpPrefix: Option[String],
debugDumpAlways: Boolean
): GpuDataProducer[Table] = {
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, bufferSize, prefix, ".orc")
logWarning(s"Wrote data for ${splits.mkString(", ")} to $p")
}
}
if (useChunkedReader) {
OrcTableReader(conf, chunkSizeByteLimit, maxChunkedReaderMemoryUsageSizeBytes,
parseOpts, buffer, offset, bufferSize, metrics, isSchemaCaseSensitive, readDataSchema,
Expand All @@ -2838,19 +2844,17 @@ object MakeOrcTableProducer extends Logging {
} catch {
case e: Exception =>
val dumpMsg = debugDumpPrefix.map { prefix =>
val p = DumpUtils.dumpBuffer(conf, buffer, offset, bufferSize, prefix, ".orc")
s", data dumped to $p"
if (!debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, bufferSize, prefix, ".orc")
s", data dumped to $p"
} else {
""
}
}.getOrElse("")
throw new IOException(s"Error when processing ${splits.mkString("; ")}$dumpMsg", e)
}
}
closeOnExcept(table) { _ =>
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, bufferSize, prefix, ".orc")
logWarning(s"Wrote data for ${splits.mkString(", ")} to $p")
}
}
if (readDataSchema.length < table.getNumberOfColumns) {
throw new QueryExecutionException(s"Expected ${readDataSchema.length} columns " +
s"but read ${table.getNumberOfColumns} from ${splits.mkString("; ")}")
Expand Down Expand Up @@ -2895,8 +2899,12 @@ case class OrcTableReader(
} catch {
case e: Exception =>
val dumpMsg = debugDumpPrefix.map { prefix =>
val p = DumpUtils.dumpBuffer(conf, buffer, offset, bufferSize, prefix, ".orc")
s", data dumped to $p"
if (!debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, bufferSize, prefix, ".orc")
s", data dumped to $p"
} else {
""
}
}.getOrElse("")
throw new IOException(s"Error when processing $splitsString$dumpMsg", e)
}
Expand All @@ -2914,12 +2922,6 @@ case class OrcTableReader(
}

override def close(): Unit = {
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, bufferSize, prefix, ".orc")
logWarning(s"Wrote data for $splitsString to $p")
}
}
reader.close()
buffer.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2613,6 +2613,12 @@ object MakeParquetTableProducer extends Logging {
debugDumpPrefix: Option[String],
debugDumpAlways: Boolean
): GpuDataProducer[Table] = {
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
logWarning(s"Wrote data for ${splits.mkString(", ")} to $p")
}
}
if (useChunkedReader) {
ParquetTableReader(conf, chunkSizeByteLimit, maxChunkedReaderMemoryUsageSizeBytes,
opts, buffer, offset,
Expand All @@ -2631,19 +2637,17 @@ object MakeParquetTableProducer extends Logging {
} catch {
case e: Exception =>
val dumpMsg = debugDumpPrefix.map { prefix =>
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
s", data dumped to $p"
if (!debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
s", data dumped to $p"
} else {
""
}
}.getOrElse("")
throw new IOException(s"Error when processing ${splits.mkString("; ")}$dumpMsg", e)
}
}
closeOnExcept(table) { _ =>
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
logWarning(s"Wrote data for ${splits.mkString(", ")} to $p")
}
}
GpuParquetScan.throwIfRebaseNeededInExceptionMode(table, dateRebaseMode,
timestampRebaseMode)
if (readDataSchema.length < table.getNumberOfColumns) {
Expand Down Expand Up @@ -2695,8 +2699,12 @@ case class ParquetTableReader(
} catch {
case e: Exception =>
val dumpMsg = debugDumpPrefix.map { prefix =>
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
s", data dumped to $p"
if (!debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
s", data dumped to $p"
} else {
""
}
}.getOrElse("")
throw new IOException(s"Error when processing $splitsString$dumpMsg", e)
}
Expand All @@ -2716,12 +2724,6 @@ case class ParquetTableReader(
}

override def close(): Unit = {
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
logWarning(s"Wrote data for $splitsString to $p")
}
}
reader.close()
buffer.close()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -325,6 +325,12 @@ trait GpuAvroReaderBase extends Logging { self: FilePartitionReaderBase =>
hostBuf: HostMemoryBuffer,
bufSize: Long,
splits: Array[PartitionedFile]): Table = {
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, hostBuf, 0, bufSize, prefix, ".avro")
logWarning(s"Wrote data for ${splits.mkString("; ")} to $p")
}
}
val readOpts = CudfAvroOptions.builder()
.includeColumn(readDataSchema.fieldNames.toSeq: _*)
.build()
Expand All @@ -341,20 +347,16 @@ trait GpuAvroReaderBase extends Logging { self: FilePartitionReaderBase =>
} catch {
case e: Exception =>
val dumpMsg = debugDumpPrefix.map { prefix =>
val p = DumpUtils.dumpBuffer(conf, hostBuf, 0, bufSize, prefix, ".avro")
s", data dumped to $p"
if (!debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, hostBuf, 0, bufSize, prefix, ".avro")
s", data dumped to $p"
} else {
""
}
}.getOrElse("")
throw new IOException(
s"Error when processing file splits [${splits.mkString("; ")}]$dumpMsg", e)
}
closeOnExcept(table) { _ =>
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, hostBuf, 0, bufSize, prefix, ".avro")
logWarning(s"Wrote data for ${splits.mkString("; ")} to $p")
}
}
}
table
}

Expand Down