Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support nested schema projection (#5148) #5149

Merged
merged 3 commits into from
Nov 29, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 193 additions & 1 deletion arrow-schema/src/fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use crate::{ArrowError, Field, FieldRef, SchemaBuilder};
use std::ops::Deref;
use std::sync::Arc;

use crate::{ArrowError, DataType, Field, FieldRef, SchemaBuilder};

/// A cheaply cloneable, owned slice of [`FieldRef`]
///
/// Similar to `Arc<Vec<FieldRef>>` or `Arc<[FieldRef]>`
Expand Down Expand Up @@ -99,6 +100,93 @@ impl Fields {
.all(|(a, b)| Arc::ptr_eq(a, b) || a.contains(b))
}

/// Performs a depth-first scan of [`Fields`] filtering the [`FieldRef`] with no children
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a proposal of how to better phrase what this is doing (as well as give a usecase that might not be obvious to the casual reader)

Suggested change
/// Performs a depth-first scan of [`Fields`] filtering the [`FieldRef`] with no children
/// Returns a copy of this [`Fields`], with only those non nested (leaf) [`FieldRef`]s that
/// pass a predicate.
///
/// This can be used to select only a subset of fields in nested types such as
/// [`DataType::Struct`].
/// Leaf [`FieldRef`]s `DataType`s have no nested `FieldRef`s`. For example
/// a field with [`DatatType::Int32`] would be a leaf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I incoporated this into 8d97950. I tweaked the wording a little as the depth-first part is critical to understanding what the leaf ordering means. I also added some comments to the doctest to highlight

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you -- the comments are most helpful

///
/// Invokes `filter` with each leaf [`FieldRef`], i.e. one containing no children, and a
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure this could be expressed better, but I struggled to come up with something 😅

/// count of the number of previous calls to `filter` - i.e. the leaf's index.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Invokes `filter` with each leaf [`FieldRef`], i.e. one containing no children, and a
/// count of the number of previous calls to `filter` - i.e. the leaf's index.
/// Invokes `filter` with each leaf [`FieldRef`] and a
/// count of the depth to the `field` (i.e. the number of previous calls to `filter`, and the leaf's index.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't correct, it is the count of leaves encountered, not the depth

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to clarify this in 8d97950

///
/// Returns a new [`Fields`] comprising the [`FieldRef`] for which `filter` returned `true`
///
/// ```
/// # use arrow_schema::{DataType, Field, Fields};
/// let fields = Fields::from(vec![
/// Field::new("a", DataType::Int32, true),
/// Field::new("b", DataType::Struct(Fields::from(vec![
/// Field::new("c", DataType::Float32, false),
/// Field::new("d", DataType::Float64, false),
/// ])), false)
/// ]);
/// let filtered = fields.filter_leaves(|idx, _| idx == 0 || idx == 2);
/// let expected = Fields::from(vec![
/// Field::new("a", DataType::Int32, true),
/// Field::new("b", DataType::Struct(Fields::from(vec![
/// Field::new("d", DataType::Float64, false),
/// ])), false)
/// ]);
/// assert_eq!(filtered, expected);
/// ```
pub fn filter_leaves<F: FnMut(usize, &FieldRef) -> bool>(&self, mut filter: F) -> Self {
Copy link
Contributor Author

@tustvold tustvold Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I had two methods, one which was called for all Fields, however, it wasn't clear to me how such an API could be used - as it would lack any context as to where in the schema a given field appeared. I therefore just stuck with filter_leaves which has immediate concrete utility for #5135. A case could be made for a fully-fledged visitor pattern, but I'm not aware of any immediate use-cases for this, and therefore what the requirements might be for such an API.

FYI @Jefffrey

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how we would support selecting a subfield that is itself a field?

For example:

{ 
  a : {
    b: {
      c: 1, 
    },
    d: 2
}

Could I use this API to support only selecting a.b?

{ 
  a : {
    b: {
      c: 1, 
    },
}

Maybe we could if the parent FieldRef was also passed

    /// calls filter(depth, parent_field, child_field)` 
    pub fn filter_leaves<F: FnMut(usize, &FieldRef, &FieldRef) -> bool>(&self, mut filter: F) -> Self {
...
}

🤔

Copy link
Contributor Author

@tustvold tustvold Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would select b by selecting all its children, if you call filter with intermediate fields you end up in a position of how much context do you provide, just the parent, what about the parent's parent - reasoning in terms of leaves is the only coherent mechanism I can think of

fn filter_field<F: FnMut(&FieldRef) -> bool>(
f: &FieldRef,
filter: &mut F,
) -> Option<FieldRef> {
use DataType::*;

let (k, v) = match f.data_type() {
Dictionary(k, v) => (Some(k.clone()), v.as_ref()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this presumes the key itself doesn't have any leaves which I think is reasonable, but figured I would point it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments in 8d97950

d => (None, d),
};
let d = match v {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map and RunEndEncoded appear to have embedded fields too. It seems like they might also need to be handled 🤔

The usecase might be "I want only the values of the map? Or maybe Arrow can't express that concept (Apply the filter to only the values 🤔 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map is there, I missed RunEndEncoded 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunEndEncoded support added in 8d97950

List(child) => List(filter_field(child, filter)?),
LargeList(child) => LargeList(filter_field(child, filter)?),
Map(child, ordered) => Map(filter_field(child, filter)?, *ordered),
FixedSizeList(child, size) => FixedSizeList(filter_field(child, filter)?, *size),
Struct(fields) => {
let filtered: Fields = fields
.iter()
.filter_map(|f| filter_field(f, filter))
.collect();

if filtered.is_empty() {
return None;
}

Struct(filtered)
}
Union(fields, mode) => {
let filtered: UnionFields = fields
.iter()
.filter_map(|(id, f)| Some((id, filter_field(f, filter)?)))
.collect();

if filtered.is_empty() {
return None;
}

Union(filtered, *mode)
}
_ => return filter(f).then(|| f.clone()),
};
let d = match k {
Some(k) => Dictionary(k, Box::new(d)),
None => d,
};
Some(Arc::new(f.as_ref().clone().with_data_type(d)))
}

let mut leaf_idx = 0;
let mut filter = |f: &FieldRef| {
let t = filter(leaf_idx, f);
leaf_idx += 1;
t
};

self.0
.iter()
.filter_map(|f| filter_field(f, &mut filter))
.collect()
}

/// Remove a field by index and return it.
///
/// # Panic
Expand Down Expand Up @@ -307,3 +395,107 @@ impl FromIterator<(i8, FieldRef)> for UnionFields {
Self(iter.into_iter().collect())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::UnionMode;

#[test]
fn test_filter() {
let floats = Fields::from(vec![
Field::new("a", DataType::Float32, false),
Field::new("b", DataType::Float32, false),
]);
let fields = Fields::from(vec![
Field::new("a", DataType::Int32, true),
Field::new("floats", DataType::Struct(floats.clone()), true),
Field::new("b", DataType::Int16, true),
Field::new(
"c",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
),
Field::new(
"d",
DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(DataType::Struct(floats.clone())),
),
false,
),
Field::new_list(
"e",
Field::new("floats", DataType::Struct(floats.clone()), true),
true,
),
Field::new(
"f",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, false)), 3),
false,
),
Field::new_map(
"g",
"entries",
Field::new("keys", DataType::LargeUtf8, false),
Field::new("values", DataType::Int32, true),
false,
false,
),
Field::new(
"h",
DataType::Union(
UnionFields::new(
vec![1, 3],
vec![
Field::new("field1", DataType::UInt8, false),
Field::new("field3", DataType::Utf8, false),
],
),
UnionMode::Dense,
),
true,
),
]);

let floats_a = DataType::Struct(vec![floats[0].clone()].into());

let r = fields.filter_leaves(|idx, _| idx == 0 || idx == 1);
assert_eq!(r.len(), 2);
assert_eq!(r[0], fields[0]);
assert_eq!(r[1].data_type(), &floats_a);

let r = fields.filter_leaves(|_, f| f.name() == "a");
assert_eq!(r.len(), 4);
assert_eq!(r[0], fields[0]);
assert_eq!(r[1].data_type(), &floats_a);
assert_eq!(
r[2].data_type(),
&DataType::Dictionary(Box::new(DataType::Int32), Box::new(floats_a.clone()))
);
assert_eq!(
r[3].as_ref(),
&Field::new_list("e", Field::new("floats", floats_a.clone(), true), true)
);

let r = fields.filter_leaves(|_, f| f.name() == "floats");
assert_eq!(r.len(), 0);

let r = fields.filter_leaves(|idx, _| idx == 9);
assert_eq!(r.len(), 1);
assert_eq!(r[0], fields[6]);

let r = fields.filter_leaves(|idx, _| idx == 10 || idx == 11);
assert_eq!(r.len(), 1);
assert_eq!(r[0], fields[7]);

let union = DataType::Union(
UnionFields::new(vec![1], vec![Field::new("field1", DataType::UInt8, false)]),
UnionMode::Dense,
);

let r = fields.filter_leaves(|idx, _| idx == 12);
assert_eq!(r.len(), 1);
assert_eq!(r[0].data_type(), &union);
}
}
Loading