Skip to content

Commit

Permalink
Minor: Return option from row_group_row_count (#10973)
Browse files Browse the repository at this point in the history
* refactor: return Option from row_group_row_count

* fix: doctest
  • Loading branch information
marvinlanhenke authored Jun 18, 2024
1 parent a873f51 commit a2c9d1a
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 30 deletions.
6 changes: 5 additions & 1 deletion datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,11 @@ impl ParquetMetadataIndexBuilder {
reader.schema(),
reader.parquet_schema(),
)?;
let row_counts = StatisticsConverter::row_group_row_counts(row_groups.iter())?;
let row_counts = converter
.row_group_row_counts(row_groups.iter())?
.ok_or_else(|| {
internal_datafusion_err!("Row group row counts are missing")
})?;
let value_column_mins = converter.row_group_mins(row_groups.iter())?;
let value_column_maxes = converter.row_group_maxes(row_groups.iter())?;

Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/benches/parquet_statistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let _ = converter.row_group_mins(row_groups.iter()).unwrap();
let _ = converter.row_group_maxes(row_groups.iter()).unwrap();
let _ = converter.row_group_null_counts(row_groups.iter()).unwrap();
let _ = StatisticsConverter::row_group_row_counts(row_groups.iter())
.unwrap();
let _ = converter.row_group_row_counts(row_groups.iter()).unwrap();
})
},
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,12 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
.map(|counts| Arc::new(counts) as ArrayRef)
}

fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
// row counts are the same for all columns in a row group
StatisticsConverter::row_group_row_counts(self.metadata_iter())
self.statistics_converter(column)
.and_then(|c| c.row_group_row_counts(self.metadata_iter()))
.ok()
.flatten()
.map(|counts| Arc::new(counts) as ArrayRef)
}

Expand Down
20 changes: 16 additions & 4 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,21 +718,33 @@ impl<'a> StatisticsConverter<'a> {
///
/// # Example
/// ```no_run
/// # use arrow::datatypes::Schema;
/// # use arrow_array::ArrayRef;
/// # use parquet::file::metadata::ParquetMetaData;
/// # use datafusion::datasource::physical_plan::parquet::StatisticsConverter;
/// # fn get_parquet_metadata() -> ParquetMetaData { unimplemented!() }
/// // Given the metadata for a parquet file
/// # fn get_arrow_schema() -> Schema { unimplemented!() }
/// // Given the metadata for a parquet file and the arrow schema
/// let metadata: ParquetMetaData = get_parquet_metadata();
/// let arrow_schema: Schema = get_arrow_schema();
/// let parquet_schema = metadata.file_metadata().schema_descr();
/// // create a converter
/// let converter = StatisticsConverter::try_new("foo", &arrow_schema, parquet_schema)
/// .unwrap();
/// // get the row counts for each row group
/// let row_counts = StatisticsConverter::row_group_row_counts(metadata
/// let row_counts = converter.row_group_row_counts(metadata
/// .row_groups()
/// .iter()
/// );
/// ```
pub fn row_group_row_counts<I>(metadatas: I) -> Result<UInt64Array>
pub fn row_group_row_counts<I>(&self, metadatas: I) -> Result<Option<UInt64Array>>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let Some(_) = self.parquet_index else {
return Ok(None);
};

let mut builder = UInt64Array::builder(10);
for metadata in metadatas.into_iter() {
let row_count = metadata.num_rows();
Expand All @@ -743,7 +755,7 @@ impl<'a> StatisticsConverter<'a> {
})?;
builder.append_value(row_count);
}
Ok(builder.finish())
Ok(Some(builder.finish()))
}

/// Create a new `StatisticsConverter` to extract statistics for a column
Expand Down
25 changes: 4 additions & 21 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,9 @@ impl<'a> Test<'a> {
Actual: {null_counts:?}. Expected: {expected_null_counts:?}"
);

let row_counts = StatisticsConverter::row_group_row_counts(
reader.metadata().row_groups().iter(),
)
.unwrap();
let row_counts = Some(row_counts);
let row_counts = converter
.row_group_row_counts(reader.metadata().row_groups().iter())
.unwrap();
assert_eq!(
row_counts, expected_row_counts,
"{column_name}: Mismatch with expected row counts. \
Expand Down Expand Up @@ -2001,21 +1999,6 @@ async fn test_column_non_existent() {
.build()
.await;

Test {
reader: &reader,
// mins are [-5, -4, 0, 5]
expected_min: Arc::new(Int64Array::from(vec![None, None, None, None])),
// maxes are [-1, 0, 4, 9]
expected_max: Arc::new(Int64Array::from(vec![None, None, None, None])),
// nulls are [0, 0, 0, 0]
expected_null_counts: UInt64Array::from(vec![None, None, None, None]),
// row counts are [5, 5, 5, 5]
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
column_name: "i_do_not_exist",
check: Check::RowGroup,
}
.run_with_schema(&schema);

Test {
reader: &reader,
// mins are [-5, -4, 0, 5]
Expand All @@ -2027,7 +2010,7 @@ async fn test_column_non_existent() {
// row counts are [5, 5, 5, 5]
expected_row_counts: None,
column_name: "i_do_not_exist",
check: Check::DataPage,
check: Check::Both,
}
.run_with_schema(&schema);
}

0 comments on commit a2c9d1a

Please sign in to comment.