Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: extract floats for parquet data pages #10972

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,22 @@ make_data_page_stats_iterator!(MinInt32DataPageStatsIterator, min, Index::INT32,
make_data_page_stats_iterator!(MaxInt32DataPageStatsIterator, max, Index::INT32, i32);
make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min, Index::INT64, i64);
make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max, Index::INT64, i64);
make_data_page_stats_iterator!(
Copy link
Contributor

@marvinlanhenke marvinlanhenke Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tshauck
It seems like you are not using those in the maching down below? that might cause your error on f16. Instead you are using an Float32 iterator which won't match on Index::FIXED_LEN_BYTE_ARRAY and thus return None.

I think you need to switch to the f16 iterator; but perhaps might need to construct an f16 from the bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the confusion. It pushed this as a WIP. I still need to work through the f16 case. I think I will need to construct an f16 from the bytes, but not sure yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's an example of what I mean...

image

Fixed length byte array is collected as parquet::data_type::FixedLenByteArray, which then means this doesn't match. Alternatively, using that data type in the macro causes later move issues when interacting with the calls to min() and max().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we take a friendly look at how f16 for row groups work:

DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(
[<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| x.and_then(|x| {
from_bytes_to_f16(x)
})),
))),

I suspect the statistics in data pages will be encodd the same

It is also possible that since f16 is a less common type, the parquet writer might write data page statistics for it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi there, I haven't noticed this PR and accidentally addressed the same issue in here #10982 . Apologies. I think the f32 and f64 cases we solve the ~same. I got the f16 working, though not in a way I'm particularly happy with (ie, the move issues you mention)

I don't have any other commits in that PR in my queue -- so happy to close it if you come up with any other f16 solution...

Copy link
Contributor Author

@tshauck tshauck Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooff, sorry @tmi, I should've done take on the issue. Taking @alamb's pointer to look at row groups, I might be wrong, but I think there's a couple of differences between it and the data page stats that complicate things...

  • max_bytes/min_bytes, which is used for row groups, does not exist on PageIndex in the parquet crate but does on Statistics, so using it in the macro isn't possible at this point.
  • Using [u8] as the stat_value_type is problematic when combined with the iterator macro in that type Item = Vec<Option<$stat_value_type>>; can't work with [u8] because it's unsized. Changing the item def to type Item = Vec<Option<&'a $stat_value_type>>; works and is similar to the item definition of row group (type Item = Option<&'a $stat_value_type>;).

f16 is less common, so maybe to have a solution that's similar to Statistics and hopefully avoids cloning, we could:

  1. Update this PR to do f32 and f64, get it in to support higher priority types and is mostly innocuous
  2. Update the parquet crate to support max_bytes/min_bytes on PageIndex
  3. Update the data page stats to use a &'a $stat_value_type and add f16 when that's merged

@tmi, maybe you and I could collaborate on 2 & 3 if that seems reasonable? Otherwise, certainly happy to go another route, if there's a better approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a good plan to me

I think it is perfectly reasonable to leave f16 support as a 'TODO' in datafusion as we add the needed support upstream

Let me know if I can help (e.g. file some tickets, etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think #10982 is good to go FWIW

Copy link
Contributor Author

@tshauck tshauck Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sounds good to me. Maybe a further improvement down the line to avoid the clone and simplify to use a method vs a function. I'll close this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid the clone

I think the ideal sequence would be

  1. Tracking Issue for f16 and f128 float types rust-lang/rust#116909 gets merged
  2. parquet replaces FixedLengthByteArray with genuine f16 type
  3. we drop the special handling here and elsewhere

and until then the f16 support in datafusion would be as is... Or alternatively, introduce the half crate to parquet rs first. Because dealing with FixedLengthByteArray on the user side is always going to be messy

MinFloat16DataPageStatsIterator,
min,
Index::FIXED_LEN_BYTE_ARRAY,
f16
);
make_data_page_stats_iterator!(
MaxFloat16DataPageStatsIterator,
max,
Index::FIXED_LEN_BYTE_ARRAY,
f16
);
make_data_page_stats_iterator!(MinFloat32DataPageStatsIterator, min, Index::FLOAT, f32);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Float (f32) and Double (f64) would be enough, in terms of naming. This would also be consistent with the row group stats iterators.

make_data_page_stats_iterator!(MaxFloat32DataPageStatsIterator, max, Index::FLOAT, f32);
make_data_page_stats_iterator!(MinFloat64DataPageStatsIterator, min, Index::DOUBLE, f64);
make_data_page_stats_iterator!(MaxFloat64DataPageStatsIterator, max, Index::DOUBLE, f64);

macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
Expand Down Expand Up @@ -581,7 +597,20 @@ macro_rules! get_data_page_statistics {
)),
Some(DataType::Int32) => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
_ => unimplemented!()
Some(DataType::Float16) => Ok(Arc::new(
Float16Array::from_iter(
[<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().filter_map(|x| {
x.and_then(|x| Some(f16::from_f32(x)))
})
})
.flatten()
)
)),
Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
_ => unimplemented!("Data type not supported for data page statistics"),
}
}
}
Expand Down Expand Up @@ -677,6 +706,21 @@ where
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::FLOAT(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::DOUBLE(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::FIXED_LEN_BYTE_ARRAY(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
_ => unimplemented!(),
});

Expand Down
28 changes: 25 additions & 3 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ async fn test_multiple_data_pages_nulls_and_negatives() {

/////////////// MORE GENERAL TESTS //////////////////////
// . Many columns in a file
// . Differnet data types
// . Different data types
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive by cleanup 🚀

// . Different row group sizes

// Four different integer types
Expand Down Expand Up @@ -1533,7 +1533,29 @@ async fn test_float64() {
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
column_name: "f",
check: Check::RowGroup,
check: Check::Both,
}
.run();
}

#[tokio::test]
async fn test_float32() {
// This creates a parquet files of 3 columns named "f16", "f32", "f64"
let reader = TestReader {
scenario: Scenario::Float32,
row_per_group: 5,
}
.build()
.await;

Test {
reader: &reader,
expected_min: Arc::new(Float32Array::from(vec![-5.0, -4.0, -0.0, 5.0])),
expected_max: Arc::new(Float32Array::from(vec![-1.0, 0.0, 4.0, 9.0])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
column_name: "f",
check: Check::Both,
}
.run();
}
Expand Down Expand Up @@ -1566,7 +1588,7 @@ async fn test_float16() {
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
column_name: "f",
check: Check::RowGroup,
check: Check::Both,
}
.run();
}
Expand Down
18 changes: 17 additions & 1 deletion datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ enum Scenario {
/// -MIN, -100, -1, 0, 1, 100, MAX
NumericLimits,
Float16,
Float32,
Float64,
Decimal,
Decimal256,
Expand Down Expand Up @@ -586,8 +587,15 @@ fn make_f64_batch(v: Vec<f64>) -> RecordBatch {
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
}

fn make_f32_batch(v: Vec<f32>) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float32, true)]));
let array = Arc::new(Float32Array::from(v)) as ArrayRef;
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
}

fn make_f16_batch(v: Vec<f16>) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float16, true)]));
let schema: Arc<Schema> =
Arc::new(Schema::new(vec![Field::new("f", DataType::Float16, true)]));
let array = Arc::new(Float16Array::from(v)) as ArrayRef;
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
}
Expand Down Expand Up @@ -1003,6 +1011,14 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
),
]
}
Scenario::Float32 => {
vec![
make_f32_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
make_f32_batch(vec![-4.0, -3.0, -2.0, -1.0, 0.0]),
make_f32_batch(vec![0.0, 1.0, 2.0, 3.0, 4.0]),
make_f32_batch(vec![5.0, 6.0, 7.0, 8.0, 9.0]),
]
}
Scenario::Float64 => {
vec![
make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
Expand Down
Loading