Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve stats convert performance for Binary/String/Boolean arrays #11319

Merged
merged 15 commits into from
Jul 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 130 additions & 85 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@

// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328

use arrow::array::builder::FixedSizeBinaryBuilder;
use arrow::array::{
BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder,
};
use arrow::datatypes::i256;
use arrow::{array::ArrayRef, datatypes::DataType};
use arrow_array::{
new_empty_array, new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
LargeStringArray, StringArray, Time32MillisecondArray, Time32SecondArray,
Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
UInt64Array, UInt8Array,
};
use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
Expand Down Expand Up @@ -393,51 +395,73 @@ macro_rules! get_statistics {
})
},
DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x| x.to_vec())),
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
))),
DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x|x.to_vec())),
))),
DataType::Utf8 => Ok(Arc::new(StringArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| {
let res = std::str::from_utf8(x).map(|s| s.to_string()).ok();
if res.is_none() {
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
}
res
})
}),
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
))),
DataType::Utf8 => {
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = StringBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

let Ok(x) = std::str::from_utf8(x) else {
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
builder.append_null();
continue;
};

builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
},
DataType::LargeUtf8 => {
Ok(Arc::new(LargeStringArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| {
let res = std::str::from_utf8(x).map(|s| s.to_string()).ok();
if res.is_none() {
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
}
res
})
}),
)))
}
DataType::FixedSizeBinary(size) => Ok(Arc::new(FixedSizeBinaryArray::from(
[<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| {
if x.len().try_into() == Ok(*size) {
Some(x)
} else {
log::debug!(
"FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.",
size,
x.len(),
);
None
}
})
}).collect::<Vec<_>>(),
))),
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = LargeStringBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

let Ok(x) = std::str::from_utf8(x) else {
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
builder.append_null();
continue;
};

builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
},
DataType::FixedSizeBinary(size) => {
let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator);
let mut builder = FixedSizeBinaryBuilder::new(*size);
for x in iterator {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

// ignore invalid values
if x.len().try_into() != Ok(*size){
log::debug!(
"FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.",
size,
x.len(),
);
builder.append_null();
continue;
}

builder.append_value(x).expect("ensure to append successfully here, because size have been checked before");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using expect it is likely possible to check the value of builder.append_value and if it is an error append null. But perhaps that is overly confusing. This formulation is quite clear and explicit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using expect it is likely possible to check the value of builder.append_value and if it is an error append null. But perhaps that is overly confusing. This formulation is quite clear and explicit

🤔 Yes, I append_null before. And I read the codes of append_value after, I found the only reason to return Err is to append a value with different width that has been checked and ensured actually.

}
Ok(Arc::new(builder.finish()))
},
DataType::Decimal128(precision, scale) => {
let arr = Decimal128Array::from_iter(
[<$stat_type_prefix Decimal128StatsIterator>]::new($iterator)
Expand Down Expand Up @@ -740,15 +764,20 @@ macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
paste! {
match $data_type {
Some(DataType::Boolean) => Ok(Arc::new(
BooleanArray::from_iter(
[<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator)
.flatten()
// BooleanArray::from_iter required a sized iterator, so collect into Vec first
.collect::<Vec<_>>()
.into_iter()
)
)),
Some(DataType::Boolean) => {
let iterator = [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator);
let mut builder = BooleanBuilder::new();
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};
builder.append_value(x);
}
}
Ok(Arc::new(builder.finish()))
},
Some(DataType::UInt8) => Ok(Arc::new(
UInt8Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
Expand Down Expand Up @@ -830,32 +859,48 @@ macro_rules! get_data_page_statistics {
Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Utf8) => Ok(Arc::new(StringArray::from(
[<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| {
let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok();
if res.is_none() {
Some(DataType::Utf8) => {
let mut builder = StringBuilder::new();
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

let Ok(x) = std::str::from_utf8(x.data()) else {
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
}
res
})
})
}).flatten().collect::<Vec<_>>(),
))),
Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from(
[<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| {
let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok();
if res.is_none() {
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
}
res
})
})
}).flatten().collect::<Vec<_>>(),
))),
builder.append_null();
continue;
};

builder.append_value(x);
}
}
Ok(Arc::new(builder.finish()))
},
Some(DataType::LargeUtf8) => {
let mut builder = LargeStringBuilder::new();
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

let Ok(x) = std::str::from_utf8(x.data()) else {
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
builder.append_null();
continue;
};

builder.append_value(x);
}
}
Ok(Arc::new(builder.finish()))
},
Some(DataType::Dictionary(_, value_type)) => {
[<$stat_type_prefix:lower _ page_statistics>](Some(value_type), $iterator)
},
Expand All @@ -871,14 +916,14 @@ macro_rules! get_data_page_statistics {
Some(DataType::Date32) => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Date64) => Ok(
Arc::new(
Date64Array::from([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter()
.map(|x| {
alamb marked this conversation as resolved.
Show resolved Hide resolved
x.and_then(|x| i64::try_from(x).ok())
.map(|x| x * 24 * 60 * 60 * 1000)
})
}).flatten().collect::<Vec<_>>()
.map(|x| x.map(|x| x * 24 * 60 * 60 * 1000))
}).flatten()
)
)
),
Expand Down