Skip to content

Commit

Permalink
[SPARK-41096][SQL] Support reading parquet FIXED_LEN_BYTE_ARRAY type
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Parquet supports FIXED_LEN_BYTE_ARRAY (FLBA) data type. However, Spark Parquet reader currently cannot handle FLBA.
This PR proposes to read FLBA column as BinaryType data in Spark.

### Why are the changes needed?
Iceberg Parquet reader, for example, can handle FLBA. This PR reduces the implementation gap between Spark and Iceberg Parquet reader.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test added

Closes apache#38628 from kazuyukitanimura/SPARK-41096.

Authored-by: Kazuyuki Tanimura <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
  • Loading branch information
kazuyukitanimura committed Nov 14, 2022
1 parent b1562d7 commit feec791
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
return new FixedLenByteArrayAsLongUpdater(arrayLen);
} else if (canReadAsBinaryDecimal(descriptor, sparkType)) {
return new FixedLenByteArrayUpdater(arrayLen);
} else if (sparkType == DataTypes.BinaryType) {
return new FixedLenByteArrayUpdater(arrayLen);
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class ParquetToSparkSchemaConverter(
case _: DecimalLogicalTypeAnnotation =>
makeDecimalType(Decimal.maxPrecisionForBytes(parquetType.getTypeLength))
case _: IntervalLogicalTypeAnnotation => typeNotImplemented()
case null => BinaryType
case _ => illegalType()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.example.ExampleParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
Expand Down Expand Up @@ -110,6 +111,41 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

test("SPARK-41096: FIXED_LEN_BYTE_ARRAY support") {
Seq(true, false).foreach { dictionaryEnabled =>
def makeRawParquetFile(path: Path): Unit = {
val schemaStr =
"""message root {
| required FIXED_LEN_BYTE_ARRAY(1) a;
| required FIXED_LEN_BYTE_ARRAY(3) b;
|}
""".stripMargin
val schema = MessageTypeParser.parseMessageType(schemaStr)

val writer = createParquetWriter(schema, path, dictionaryEnabled)

(0 until 10).map(_.toString).foreach { n =>
val record = new SimpleGroup(schema)
record.add(0, Binary.fromString(n))
record.add(1, Binary.fromString(n + n + n))
writer.write(record)
}
writer.close()
}

withTempDir { dir =>
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
makeRawParquetFile(path)
Seq(true, false).foreach { vectorizedReaderEnabled =>
readParquetFile(path.toString, vectorizedReaderEnabled) { df =>
checkAnswer(df, (48 until 58).map(n => // char '0' is 48 in ascii
Row(Array(n), Array(n, n, n))))
}
}
}
}
}

test("string") {
val data = (1 to 4).map(i => Tuple1(i.toString))
// Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL
Expand Down

0 comments on commit feec791

Please sign in to comment.