Skip to content

Commit

Permalink
[SPARK-49082][SQL] Widening type promotions in AvroDeserializer
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR aims to widen type promotions in `AvroDeserializer`. Supported as following(Avro Type -> Spark Type):

- Int -> Long ;
- Int -> Double ;
- Float -> Double;

### Why are the changes needed?

Similar to PR apache#44368 for `Parquet` reader, we'd better to enable type promotion/widening for `Avro` deserializer.

### Does this PR introduce _any_ user-facing change?

Yes, but more convenient for users.

### How was this patch tested?

Pass GA and add a new test case.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47582 from wayneguow/SPARK-49082.

Authored-by: Wei Guo <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
wayneguow authored and cloud-fan committed Aug 7, 2024
1 parent ce4f185 commit 7769ef1
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ private[sql] class AvroDeserializer(
case (INT, IntegerType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])

case (INT, LongType) => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Int])

case (INT, DoubleType) => (updater, ordinal, value) =>
updater.setDouble(ordinal, value.asInstanceOf[Int])

case (INT, dt: DatetimeType)
if preventReadingIncorrectType && realDataType.isInstanceOf[YearMonthIntervalType] =>
throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
Expand Down Expand Up @@ -194,6 +200,9 @@ private[sql] class AvroDeserializer(
case (FLOAT, FloatType) => (updater, ordinal, value) =>
updater.setFloat(ordinal, value.asInstanceOf[Float])

case (FLOAT, DoubleType) => (updater, ordinal, value) =>
updater.setDouble(ordinal, value.asInstanceOf[Float])

case (DOUBLE, DoubleType) => (updater, ordinal, value) =>
updater.setDouble(ordinal, value.asInstanceOf[Double])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,39 @@ abstract class AvroSuite
}
}

test("SPARK-49082: Widening type promotions in AvroDeserializer") {
withTempPath { tempPath =>
// Int -> Long
val intPath = s"$tempPath/int_data"
val intDf = Seq(1, Int.MinValue, Int.MaxValue).toDF("col")
intDf.write.format("avro").save(intPath)
checkAnswer(
spark.read.schema("col Long").format("avro").load(intPath),
Seq(Row(1L), Row(-2147483648L), Row(2147483647L))
)

// Int -> Double
checkAnswer(
spark.read.schema("col Double").format("avro").load(intPath),
Seq(Row(1D), Row(-2147483648D), Row(2147483647D))
)

// Float -> Double
val floatPath = s"$tempPath/float_data"
val floatDf = Seq(1F,
Float.MinValue, Float.MinPositiveValue, Float.MaxValue,
Float.NaN, Float.NegativeInfinity, Float.PositiveInfinity
).toDF("col")
floatDf.write.format("avro").save(floatPath)
checkAnswer(
spark.read.schema("col Double").format("avro").load(floatPath),
Seq(Row(1D),
Row(-3.4028234663852886E38D), Row(1.401298464324817E-45D), Row(3.4028234663852886E38D),
Row(Double.NaN), Row(Double.NegativeInfinity), Row(Double.PositiveInfinity))
)
}
}

test("SPARK-43380: Fix Avro data type conversion" +
" of DayTimeIntervalType to avoid producing incorrect results") {
withTempPath { path =>
Expand Down

0 comments on commit 7769ef1

Please sign in to comment.