Skip to content

Commit

Permalink
Merge branch 'main' into feature/12648-udwf-rank-percentrank-denserank
Browse files Browse the repository at this point in the history
  • Loading branch information
jatin committed Oct 3, 2024
2 parents 00e9044 + 5740774 commit ceba9d4
Show file tree
Hide file tree
Showing 62 changed files with 1,744 additions and 1,097 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,12 @@ impl DFSchema {

for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
if let Some(qualifier) = qualifier {
qualified_names.insert((qualifier, field.name()));
if !qualified_names.insert((qualifier, field.name())) {
return _schema_err!(SchemaError::DuplicateQualifiedField {
qualifier: Box::new(qualifier.clone()),
name: field.name().to_string(),
});
}
} else if !unqualified_names.insert(field.name()) {
return _schema_err!(SchemaError::DuplicateUnqualifiedField {
name: field.name().to_string()
Expand Down Expand Up @@ -1165,7 +1170,10 @@ mod tests {
let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let join = left.join(&right);
assert!(join.err().is_none());
assert_eq!(
join.unwrap_err().strip_backtrace(),
"Schema error: Schema contains duplicate qualified field name t1.c0",
);
Ok(())
}

Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/catalog_common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ pub fn resolve_table_references(
let _ = s.as_ref().visit(visitor);
}
DFStatement::CreateExternalTable(table) => {
visitor
.relations
.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
visitor.relations.insert(table.name.clone());
}
DFStatement::CopyTo(CopyToStatement { source, .. }) => match source {
CopyToSource::Relation(table_name) => {
Expand Down
46 changes: 0 additions & 46 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3380,52 +3380,6 @@ mod tests {
Ok(())
}

// Table 't1' self join
// Supplementary test of issue: https://github.com/apache/datafusion/issues/7790
#[tokio::test]
async fn with_column_self_join() -> Result<()> {
let df = test_table().await?.select_columns(&["c1"])?;
let ctx = SessionContext::new();

ctx.register_table("t1", df.into_view())?;

let df = ctx
.table("t1")
.await?
.join(
ctx.table("t1").await?,
JoinType::Inner,
&["c1"],
&["c1"],
None,
)?
.sort(vec![
// make the test deterministic
col("t1.c1").sort(true, true),
])?
.limit(0, Some(1))?;

let df_results = df.clone().collect().await?;
assert_batches_sorted_eq!(
[
"+----+----+",
"| c1 | c1 |",
"+----+----+",
"| a | a |",
"+----+----+",
],
&df_results
);

let actual_err = df.clone().with_column("new_column", lit(true)).unwrap_err();
let expected_err = "Error during planning: Projections require unique expression names \
but the expression \"t1.c1\" at position 0 and \"t1.c1\" at position 1 have the same name. \
Consider aliasing (\"AS\") one of them.";
assert_eq!(actual_err.strip_backtrace(), expected_err);

Ok(())
}

#[tokio::test]
async fn with_column_renamed() -> Result<()> {
let df = test_table()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
// extract list values, with non-lists converted to Value::Null
let array_item_count = rows
.iter()
.map(|row| match row {
.map(|row| match maybe_resolve_union(row) {
Value::Array(values) => values.len(),
_ => 1,
})
Expand Down Expand Up @@ -1643,6 +1643,93 @@ mod test {
assert_batches_eq!(expected, &[batch]);
}

#[test]
fn test_avro_nullable_struct_array() {
let schema = apache_avro::Schema::parse_str(
r#"
{
"type": "record",
"name": "r1",
"fields": [
{
"name": "col1",
"type": [
"null",
{
"type": "array",
"items": {
"type": [
"null",
{
"type": "record",
"name": "Item",
"fields": [
{
"name": "id",
"type": "long"
}
]
}
]
}
}
],
"default": null
}
]
}"#,
)
.unwrap();
let jv1 = serde_json::json!({
"col1": [
{
"id": 234
},
{
"id": 345
}
]
});
let r1 = apache_avro::to_value(jv1)
.unwrap()
.resolve(&schema)
.unwrap();
let r2 = apache_avro::to_value(serde_json::json!({ "col1": null }))
.unwrap()
.resolve(&schema)
.unwrap();

let mut w = apache_avro::Writer::new(&schema, vec![]);
for _i in 0..5 {
w.append(r1.clone()).unwrap();
}
w.append(r2).unwrap();
let bytes = w.into_inner().unwrap();

let mut reader = ReaderBuilder::new()
.read_schema()
.with_batch_size(20)
.build(std::io::Cursor::new(bytes))
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 6);
assert_eq!(batch.num_columns(), 1);

let expected = [
"+------------------------+",
"| col1 |",
"+------------------------+",
"| [{id: 234}, {id: 345}] |",
"| [{id: 234}, {id: 345}] |",
"| [{id: 234}, {id: 345}] |",
"| [{id: 234}, {id: 345}] |",
"| [{id: 234}, {id: 345}] |",
"| |",
"+------------------------+",
];
assert_batches_eq!(expected, &[batch]);
}

#[test]
fn test_avro_iterator() {
let reader = build_reader("alltypes_plain.avro", 5);
Expand Down
13 changes: 10 additions & 3 deletions datafusion/core/src/datasource/dynamic_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,18 @@ impl UrlTableFactory for DynamicListTableFactory {
.ok_or_else(|| plan_datafusion_err!("get current SessionStore error"))?;

match ListingTableConfig::new(table_url.clone())
.infer(state)
.infer_options(state)
.await
{
Ok(cfg) => ListingTable::try_new(cfg)
.map(|table| Some(Arc::new(table) as Arc<dyn TableProvider>)),
Ok(cfg) => {
let cfg = cfg
.infer_partitions_from_path(state)
.await?
.infer_schema(state)
.await?;
ListingTable::try_new(cfg)
.map(|table| Some(Arc::new(table) as Arc<dyn TableProvider>))
}
Err(_) => Ok(None),
}
}
Expand Down
36 changes: 34 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::datasource::{
};
use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{DataFusionError, Result};
use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
Expand Down Expand Up @@ -192,6 +192,38 @@ impl ListingTableConfig {
pub async fn infer(self, state: &SessionState) -> Result<Self> {
self.infer_options(state).await?.infer_schema(state).await
}

/// Infer the partition columns from the path. Requires `self.options` to be set prior to using.
pub async fn infer_partitions_from_path(self, state: &SessionState) -> Result<Self> {
match self.options {
Some(options) => {
let Some(url) = self.table_paths.first() else {
return config_err!("No table path found");
};
let partitions = options
.infer_partitions(state, url)
.await?
.into_iter()
.map(|col_name| {
(
col_name,
DataType::Dictionary(
Box::new(DataType::UInt16),
Box::new(DataType::Utf8),
),
)
})
.collect::<Vec<_>>();
let options = options.with_table_partition_cols(partitions);
Ok(Self {
table_paths: self.table_paths,
file_schema: self.file_schema,
options: Some(options),
})
}
None => config_err!("No `ListingOptions` set for inferring schema"),
}
}
}

/// Options for creating a [`ListingTable`]
Expand Down Expand Up @@ -505,7 +537,7 @@ impl ListingOptions {
/// Infer the partitioning at the given path on the provided object store.
/// For performance reasons, it doesn't read all the files on disk
/// and therefore may fail to detect invalid partitioning.
async fn infer_partitions(
pub(crate) async fn infer_partitions(
&self,
state: &SessionState,
table_path: &ListingTableUrl,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2557,6 +2557,10 @@ mod tests {
) -> Result<Self> {
unimplemented!("NoOp");
}

fn supports_limit_pushdown(&self) -> bool {
false // Disallow limit push-down by default
}
}

#[derive(Debug)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,10 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode {
expr: replace_sort_expression(self.expr.clone(), exprs.swap_remove(0)),
})
}

fn supports_limit_pushdown(&self) -> bool {
false // Disallow limit push-down by default
}
}

/// Physical planner for TopK nodes
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ datafusion-expr-common = { workspace = true }
datafusion-functions-aggregate-common = { workspace = true }
datafusion-functions-window-common = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
indexmap = { workspace = true }
paste = "^1.0"
serde_json = { workspace = true }
sqlparser = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub use logical_plan::*;
pub use partition_evaluator::PartitionEvaluator;
pub use sqlparser;
pub use table_source::{TableProviderFilterPushDown, TableSource, TableType};
pub use udaf::{AggregateUDF, AggregateUDFImpl, ReversedUDAF};
pub use udaf::{AggregateUDF, AggregateUDFImpl, ReversedUDAF, StatisticsArgs};
pub use udf::{ScalarUDF, ScalarUDFImpl};
pub use udwf::{ReversedUDWF, WindowUDF, WindowUDFImpl};
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
Expand Down
24 changes: 24 additions & 0 deletions datafusion/expr/src/logical_plan/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,16 @@ pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync {
/// directly because it must remain object safe.
fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool;
fn dyn_ord(&self, other: &dyn UserDefinedLogicalNode) -> Option<Ordering>;

/// Returns `true` if a limit can be safely pushed down through this
/// `UserDefinedLogicalNode` node.
///
/// If this method returns `true`, and the query plan contains a limit at
/// the output of this node, DataFusion will push the limit to the input
/// of this node.
fn supports_limit_pushdown(&self) -> bool {
false
}
}

impl Hash for dyn UserDefinedLogicalNode {
Expand Down Expand Up @@ -295,6 +305,16 @@ pub trait UserDefinedLogicalNodeCore:
) -> Option<Vec<Vec<usize>>> {
None
}

/// Returns `true` if a limit can be safely pushed down through this
/// `UserDefinedLogicalNode` node.
///
/// If this method returns `true`, and the query plan contains a limit at
/// the output of this node, DataFusion will push the limit to the input
/// of this node.
fn supports_limit_pushdown(&self) -> bool {
false // Disallow limit push-down by default
}
}

/// Automatically derive UserDefinedLogicalNode to `UserDefinedLogicalNode`
Expand Down Expand Up @@ -361,6 +381,10 @@ impl<T: UserDefinedLogicalNodeCore> UserDefinedLogicalNode for T {
.downcast_ref::<Self>()
.and_then(|other| self.partial_cmp(other))
}

fn supports_limit_pushdown(&self) -> bool {
self.supports_limit_pushdown()
}
}

fn get_all_columns_from_schema(schema: &DFSchema) -> HashSet<String> {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use datafusion_common::{
DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
FunctionalDependencies, ParamValues, Result, TableReference, UnnestOptions,
};
use indexmap::IndexSet;

// backwards compatibility
use crate::display::PgJsonVisitor;
Expand Down Expand Up @@ -3071,6 +3072,8 @@ fn calc_func_dependencies_for_aggregate(
let group_by_expr_names = group_expr
.iter()
.map(|item| item.schema_name().to_string())
.collect::<IndexSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let aggregate_func_dependencies = aggregate_functional_dependencies(
input.schema(),
Expand Down
Loading

0 comments on commit ceba9d4

Please sign in to comment.