Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve field metadata across expressions in logical plans #6920

Merged
merged 3 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,9 @@ pub trait ExprSchema: std::fmt::Debug {

/// What is the datatype of this column?
fn data_type(&self, col: &Column) -> Result<&DataType>;

/// Returns the column's optional metadata.
fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>>;
}

// Implement `ExprSchema` for `Arc<DFSchema>`
Expand All @@ -592,6 +595,10 @@ impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
fn data_type(&self, col: &Column) -> Result<&DataType> {
self.as_ref().data_type(col)
}

fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
ExprSchema::metadata(self.as_ref(), col)
}
}

impl ExprSchema for DFSchema {
Expand All @@ -602,6 +609,10 @@ impl ExprSchema for DFSchema {
fn data_type(&self, col: &Column) -> Result<&DataType> {
Ok(self.field_from_column(col)?.data_type())
}

fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
Ok(self.field_from_column(col)?.metadata())
}
}

/// DFField wraps an Arrow field and adds an optional qualifier
Expand Down Expand Up @@ -661,6 +672,10 @@ impl DFField {
self.field.is_nullable()
}

pub fn metadata(&self) -> &HashMap<String, String> {
self.field.metadata()
}

/// Returns a string to the `DFField`'s qualified name
pub fn qualified_name(&self) -> String {
if let Some(qualifier) = &self.qualifier {
Expand Down Expand Up @@ -708,6 +723,13 @@ impl DFField {
self.field = f.into();
self
}

/// Return field with new metadata
pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
let f = self.field().as_ref().clone().with_metadata(metadata);
self.field = f.into();
self
}
}

impl From<FieldRef> for DFField {
Expand Down
63 changes: 61 additions & 2 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{LogicalPlan, Projection, Subquery};
use arrow::compute::can_cast_types;
use arrow::datatypes::DataType;
use datafusion_common::{Column, DFField, DFSchema, DataFusionError, ExprSchema, Result};
use std::collections::HashMap;
use std::sync::Arc;

/// trait to allow expr to typable with respect to a schema
Expand All @@ -36,6 +37,9 @@ pub trait ExprSchemable {
/// given a schema, return the nullability of the expr
fn nullable<S: ExprSchema>(&self, input_schema: &S) -> Result<bool>;

/// given a schema, return the expr's optional metadata
fn metadata<S: ExprSchema>(&self, schema: &S) -> Result<HashMap<String, String>>;

/// convert to a field with respect to a schema
fn to_field(&self, input_schema: &DFSchema) -> Result<DFField>;

Expand Down Expand Up @@ -275,6 +279,14 @@ impl ExprSchemable for Expr {
}
}

fn metadata<S: ExprSchema>(&self, schema: &S) -> Result<HashMap<String, String>> {
match self {
Expr::Column(c) => Ok(schema.metadata(c)?.clone()),
Expr::Alias(Alias { expr, .. }) => expr.metadata(schema),
_ => Ok(HashMap::new()),
}
}

/// Returns a [arrow::datatypes::Field] compatible with this expression.
///
/// So for example, a projected expression `col(c1) + col(c2)` is
Expand All @@ -286,12 +298,14 @@ impl ExprSchemable for Expr {
&c.name,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)),
)
.with_metadata(self.metadata(input_schema)?)),
_ => Ok(DFField::new_unqualified(
&self.display_name()?,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)),
)
.with_metadata(self.metadata(input_schema)?)),
}
}

Expand Down Expand Up @@ -465,11 +479,46 @@ mod tests {
);
}

#[test]
fn test_expr_metadata() {
let mut meta = HashMap::new();
meta.insert("bar".to_string(), "buzz".to_string());
let expr = col("foo");
let schema = MockExprSchema::new()
.with_data_type(DataType::Int32)
.with_metadata(meta.clone());

// col and alias should be metadata-preserving
assert_eq!(meta, expr.metadata(&schema).unwrap());
assert_eq!(meta, expr.clone().alias("bar").metadata(&schema).unwrap());

// cast should drop input metadata since the type has changed
assert_eq!(
HashMap::new(),
expr.clone()
.cast_to(&DataType::Int64, &schema)
.unwrap()
.metadata(&schema)
.unwrap()
);

let schema = DFSchema::new_with_metadata(
vec![DFField::new_unqualified("foo", DataType::Int32, true)
.with_metadata(meta.clone())],
HashMap::new(),
)
.unwrap();

// verify to_field method populates metadata
assert_eq!(&meta, expr.to_field(&schema).unwrap().metadata());
}

#[derive(Debug)]
struct MockExprSchema {
nullable: bool,
data_type: DataType,
error_on_nullable: bool,
metadata: HashMap<String, String>,
}

impl MockExprSchema {
Expand All @@ -478,6 +527,7 @@ mod tests {
nullable: false,
data_type: DataType::Null,
error_on_nullable: false,
metadata: HashMap::new(),
}
}

Expand All @@ -495,6 +545,11 @@ mod tests {
self.error_on_nullable = error_on_nullable;
self
}

fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
self.metadata = metadata;
self
}
}

impl ExprSchema for MockExprSchema {
Expand All @@ -509,5 +564,9 @@ mod tests {
fn data_type(&self, _col: &Column) -> Result<&DataType> {
Ok(&self.data_type)
}

fn metadata(&self, _col: &Column) -> Result<&HashMap<String, String>> {
Ok(&self.metadata)
}
}
}