Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
wayneguow committed Aug 2, 2024
1 parent 5a5390c commit f431aa4
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,7 @@ private[sql] class AvroDeserializer(

case (FIXED, dt: DecimalType) =>
val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
if (preventReadingIncorrectType &&
d.getPrecision - d.getScale > dt.precision - dt.scale) {
if (preventReadingIncorrectType && !isDecimalTypeCompatible(d, dt)) {
throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
toFieldStr(catalystPath), realDataType.catalogString, dt.catalogString)
}
Expand All @@ -242,8 +241,7 @@ private[sql] class AvroDeserializer(

case (BYTES, dt: DecimalType) =>
val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
if (preventReadingIncorrectType &&
d.getPrecision - d.getScale > dt.precision - dt.scale) {
if (preventReadingIncorrectType && !isDecimalTypeCompatible(d, dt)) {
throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
toFieldStr(catalystPath), realDataType.catalogString, dt.catalogString)
}
Expand Down Expand Up @@ -385,6 +383,12 @@ private[sql] class AvroDeserializer(
}
}

private def isDecimalTypeCompatible(d: LogicalTypes.Decimal, dt: DecimalType): Boolean = {
val precisionIncrease = dt.precision - d.getPrecision
val scaleIncrease = dt.scale - d.getScale
scaleIncrease >= 0 && precisionIncrease >= scaleIncrease
}

// TODO: move the following method in Decimal object on creating Decimal from BigDecimal?
private def createDecimal(decimal: BigDecimal, precision: Int, scale: Int): Decimal = {
if (precision <= Decimal.MAX_LONG_DIGITS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,25 +888,43 @@ abstract class AvroSuite
sql("SELECT 13.1234567890 a").write.format("avro").save(path.toString)
// With the flag disabled, we will throw an exception if there is a mismatch
withSQLConf(confKey -> "false") {
val ex = intercept[SparkException] {
// scale and length of integer part both not match
val ex1 = intercept[SparkException] {
spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString).collect()
}
assert(ex.getErrorClass.startsWith("FAILED_READ_FILE"))
assert(ex1.getErrorClass.startsWith("FAILED_READ_FILE"))
checkError(
exception = ex.getCause.asInstanceOf[AnalysisException],
exception = ex1.getCause.asInstanceOf[AnalysisException],
errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
parameters = Map("avroPath" -> "field 'a'",
"sqlPath" -> "field 'a'",
"avroType" -> "decimal\\(12,10\\)",
"sqlType" -> "\"DECIMAL\\(4,3\\)\""),
matchPVals = true
)

// scale part match, but length of integer part not match
val ex2 = intercept[SparkException] {
spark.read.schema("a DECIMAL(11, 10)").format("avro").load(path.toString).collect()
}
assert(ex2.getErrorClass.startsWith("FAILED_READ_FILE"))
checkError(
exception = ex2.getCause.asInstanceOf[AnalysisException],
errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
parameters = Map("avroPath" -> "field 'a'",
"sqlPath" -> "field 'a'",
"avroType" -> "decimal\\(12,10\\)",
"sqlType" -> "\"DECIMAL\\(11,10\\)\""),
matchPVals = true
)
}
// The following used to work, so it should still work with the flag enabled

// all compatible
checkAnswer(
spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString),
Row(new java.math.BigDecimal("13.123"))
spark.read.schema("a DECIMAL(14, 10)").format("avro").load(path.toString),
Row(new java.math.BigDecimal("13.1234567890"))
)

withSQLConf(confKey -> "true") {
// With the flag enabled, we return a null silently, which isn't great
checkAnswer(
Expand Down

0 comments on commit f431aa4

Please sign in to comment.