Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Add widening type conversions to Kernel default parquet reader #3541

Conversation

johanl-db
Copy link
Collaborator

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Add a set of conversions to the default parquet reader provided by kernel to allow reading columns using a wider type than the actual in the parquet file.
This will support the type widening table feature, see https://github.com/delta-io/delta/blob/master/protocol_rfcs/type-widening.md.

Conversions added:

  • INT32 -> long
  • FLOAT -> double
  • decimal precision/scale increase
  • DATE -> timestamp_ntz
  • INT32 -> double
  • integers -> decimal

How was this patch tested?

Added tests covering all conversions in ParquetColumnReaderSuite

Does this PR introduce any user-facing changes?

This change alone doesn't allow reading Delta table that use the type widening table feature. That feature is still unsupported.
It does allow reading Delta tables that somehow have Parquet files that contain types that are different from the table schema, but that really should never happen for tables that don't support type widening..

@johanl-db johanl-db changed the title [Kernel] Add widening type conversions to Kernel default parquet reader [WIP][Kernel] Add widening type conversions to Kernel default parquet reader Aug 13, 2024
@johanl-db johanl-db self-assigned this Aug 13, 2024
@johanl-db johanl-db force-pushed the type-widening-kernel-default-parquet-handler branch 2 times, most recently from 4627545 to 58f8c86 Compare August 15, 2024 06:54
@johanl-db johanl-db changed the title [WIP][Kernel] Add widening type conversions to Kernel default parquet reader [Kernel] Add widening type conversions to Kernel default parquet reader Aug 15, 2024
@johanl-db johanl-db force-pushed the type-widening-kernel-default-parquet-handler branch from 58f8c86 to ea6dcb2 Compare August 15, 2024 07:02
Copy link
Collaborator

@vkorukanti vkorukanti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

/**
* Suite covering reading Parquet columns with different types.
*/
class ParquetColumnReaderSuite extends AnyFunSuite with ParquetSuiteBase {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to ParquetTypeWideningSuite or merge these tests into ParquetFileReaderSuite.scala?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the tests to ParquetFileReaderSuite

private val wideningTestCases: Seq[TestCase] = Seq(
TestCase("ByteType", ShortType.SHORT, i => if (i % 72 != 0) i.toByte.toShort else null),
TestCase("ByteType", IntegerType.INTEGER, i => if (i % 72 != 0) i.toByte.toInt else null),
TestCase("ByteType", LongType.LONG, i => if (i % 72 != 0) i.toByte.toLong else null),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is byte to float or double not allowed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Byte to double is allowed, byte to float isn't. I added more test cases

test(s"parquet widening conversion - ${testCase.columnName} -> ${testCase.toType.toString}") {
val inputLocation = goldenTablePath("parquet-all-types")
val readSchema = new StructType().add(testCase.columnName, testCase.toType)
val result = readParquetFilesUsingKernel(inputLocation, readSchema)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also read using Spark and verify? Or is it not possible because of the Spark 3.5.x dependency and not the Spark 4.0.0?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet-mr supports most of these conversions in 3.5 (the vectorized supports almost none though)

I'm adding a check against results produced by Spark + parquet-mr

checkAnswer(result, Seq(TestRow(1577836800000000L)))
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also can we add negative tests where type change is not valid? long read as int? I am not sure if we throw any errors at the moment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a relatively exhaustive list of tests to ensure we fail for unsupported conversions. Found a few where we don't properly fail and documented them in a separate list of test cases.

Not fixing them here though, this goes beyond the scope of this PR.

@johanl-db johanl-db force-pushed the type-widening-kernel-default-parquet-handler branch from 4fad52e to d6fa06c Compare August 21, 2024 17:32
Copy link
Collaborator

@vkorukanti vkorukanti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@johanl-db
Copy link
Collaborator Author

@vkorukanti I addressed your remaining comments, you can merge the PR once the tests finish running

@vkorukanti vkorukanti merged commit dcf9ea9 into delta-io:master Aug 26, 2024
16 checks passed
longvu-db pushed a commit to longvu-db/delta that referenced this pull request Aug 28, 2024
…er (delta-io#3541)

\## Description
Add a set of conversions to the default parquet reader provided by
kernel to allow reading columns using a wider type than the actual in
the parquet file.
This will support the type widening table feature, see
https://github.com/delta-io/delta/blob/master/protocol_rfcs/type-widening.md.

Conversions added:
- INT32 -> long
- FLOAT -> double
- decimal precision/scale increase
- DATE -> timestamp_ntz
- INT32 -> double
- integers -> decimal

## How was this patch tested?
Added tests covering all conversions in `ParquetColumnReaderSuite`

## Does this PR introduce _any_ user-facing changes?
This change alone doesn't allow reading Delta table that use the type
widening table feature. That feature is still unsupported.
It does allow reading Delta tables that somehow have Parquet files that
contain types that are different from the table schema, but that really
should never happen for tables that don't support type widening..
vkorukanti pushed a commit that referenced this pull request Sep 26, 2024
## Description
Allow reading and writing to tables that have the type widening table
features enabled (both preview and stable table feature).

Reading:
- The default kernel parquet reader supports widening conversions since
#3541. Engines may also choose to
implement type widening natively in their parquet reader if they wish.
Writing:
- Nothing to do, type widening doesn't impact the write path - writing
data always uses the latest data schema.

## How was this patch tested?
Added read integration tests.

Tests are based on golden tables. Generating the tables requires Spark
4.0, due to spark master cross-compilation being broken, the table
generation code is not included here.
The following steps where used to generate the tables.
1. Create a table with initial data types and insert initial data
2. Enable type widening and schema evolution
3. Insert data with wider type for each column. Column types are
automatically widened during schema evolution.

`type-widening` table:
| Column | Initial type | Widened Type |
| - | - | - |
| byte_long | byte | long |
| int_long | int | long |
| float_double | float | double |
| byte_double | byte | double |
| short_double | short | double |
| int_double | int | double |
| decimal_decimal_same_scale | decimal(10, 2) | decimal(20, 2) |
| decimal_decimal_greater_scale | decimal(10, 2) | decimal(20, 5) |
| byte_decimal | byte | decimal(11, 1) |
| short_decimal | short | decimal(11, 1) |
| int_decimal | int | decimal(11, 1) |
| long_decimal | long | decimal(21, 1) |
| date_timestamp_ntz | date | timestamp_ntz |

`type-widening-nested` table:
| Column | Initial type | Widened Type |
| - | - | - |
| struct | struct<a: int> | struct<a: long> |
| map | map<int, int> | map<long, long> |
| array | array<int> | array<long> |


## Does this PR introduce _any_ user-facing changes?
Yes, it's now possible to read from and write to delta tables with type
widening enabled using kernel.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants