Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration option to StatisticsConverter to control interpretation of missing null counts in Parquet statistics #6485

Merged
merged 5 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,7 @@ where
///
/// # Schemas
///
/// The converter ues the schema of the Parquet file and the Arrow schema to
/// The converter uses the schema of the Parquet file and the Arrow schema to
/// convert the underlying statistics value (stored as a parquet value) into the
/// corresponding Arrow value. For example, Decimals are stored as binary in
/// parquet files and this structure handles mapping them to the `i128`
Expand All @@ -1175,6 +1175,8 @@ pub struct StatisticsConverter<'a> {
parquet_column_index: Option<usize>,
/// The field (with data type) of the column in the Arrow schema
arrow_field: &'a Field,
/// treat missing null_counts as 0 nulls
missing_null_counts_as_zero: bool,
}

impl<'a> StatisticsConverter<'a> {
Expand All @@ -1191,6 +1193,23 @@ impl<'a> StatisticsConverter<'a> {
self.arrow_field
}

/// Set the statistics converter to treat missing null counts as missing
///
/// By default, the converter will treat missing null counts as though
/// the null count is known to be `0`.
///
/// Note that parquet files written by parquet-rs currently do not store
/// null counts even when it is known there are zero nulls, and the reader
/// will return 0 for the null counts in that instance. This behavior may
/// change in a future release.
///
/// Both parquet-java and parquet-cpp store null counts as 0 when there are
/// no nulls, and don't write unknown values to the null count field.
pub fn with_missing_null_counts_as_zero(mut self, missing_null_counts_as_zero: bool) -> Self {
self.missing_null_counts_as_zero = missing_null_counts_as_zero;
self
}

/// Returns a [`UInt64Array`] with row counts for each row group
///
/// # Return Value
Expand Down Expand Up @@ -1284,6 +1303,7 @@ impl<'a> StatisticsConverter<'a> {
Ok(Self {
parquet_column_index: parquet_index,
arrow_field,
missing_null_counts_as_zero: true,
})
}

Expand Down Expand Up @@ -1382,7 +1402,15 @@ impl<'a> StatisticsConverter<'a> {
let null_counts = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| s.and_then(|s| s.null_count_opt()));
.map(|s| {
s.and_then(|s| {
if self.missing_null_counts_as_zero {
Some(s.null_count_opt().unwrap_or(0))
} else {
s.null_count_opt()
}
})
});
Ok(UInt64Array::from_iter(null_counts))
}

Expand Down Expand Up @@ -1593,3 +1621,5 @@ impl<'a> StatisticsConverter<'a> {
new_null_array(data_type, num_row_groups)
}
}

// See tests in parquet/tests/arrow_reader/statistics.rs
67 changes: 64 additions & 3 deletions parquet/tests/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::default::Default;
use std::fs::File;
use std::sync::Arc;

use super::make_test_file_rg;
use super::{struct_array, Scenario};
use arrow::compute::kernels::cast_utils::Parser;
use arrow::datatypes::{
Expand All @@ -37,16 +38,17 @@ use arrow_array::{
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use half::f16;
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::arrow::arrow_reader::{
ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
};
use parquet::arrow::ArrowWriter;
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use parquet::file::properties::{EnabledStatistics, WriterProperties};

use super::make_test_file_rg;
use parquet::file::statistics::{Statistics, ValueStatistics};
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};

#[derive(Debug, Default, Clone)]
struct Int64Case {
Expand Down Expand Up @@ -2139,6 +2141,65 @@ async fn test_missing_statistics() {
.run();
}

#[test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

fn missing_null_counts_as_zero() {
let min = None;
let max = None;
let distinct_count = None;
let null_count = None; // NB: no null count
let is_min_max_deprecated = false;
let stats = Statistics::Boolean(ValueStatistics::new(
min,
max,
distinct_count,
null_count,
is_min_max_deprecated,
));
let (arrow_schema, parquet_schema) = bool_arrow_and_parquet_schema();

let column_chunk = ColumnChunkMetaData::builder(parquet_schema.column(0))
.set_statistics(stats)
.build()
.unwrap();
let metadata = RowGroupMetaData::builder(parquet_schema.clone())
.set_column_metadata(vec![column_chunk])
.build()
.unwrap();

let converter = StatisticsConverter::try_new("b", &arrow_schema, &parquet_schema).unwrap();

// by default null count should be 0
assert_eq!(
converter.row_group_null_counts([&metadata]).unwrap(),
UInt64Array::from_iter(vec![Some(0)])
);

// if we disable missing null counts as zero flag null count will be None
let converter = converter.with_missing_null_counts_as_zero(false);
assert_eq!(
converter.row_group_null_counts([&metadata]).unwrap(),
UInt64Array::from_iter(vec![None])
);
}

/// return an Arrow schema and corresponding Parquet SchemaDescriptor for
/// a schema with a single boolean column "b"
fn bool_arrow_and_parquet_schema() -> (SchemaRef, SchemaDescPtr) {
let arrow_schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Boolean, true)]));
use parquet::schema::types::Type as ParquetType;
let parquet_schema = ParquetType::group_type_builder("schema")
.with_fields(vec![Arc::new(
ParquetType::primitive_type_builder("a", parquet::basic::Type::INT32)
.build()
.unwrap(),
)])
.build()
.unwrap();

let parquet_schema = Arc::new(SchemaDescriptor::new(Arc::new(parquet_schema)));
(arrow_schema, parquet_schema)
}

/////// NEGATIVE TESTS ///////
// column not found
#[tokio::test]
Expand Down
Loading