Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mismatch in MemTable of Select Into when projecting on aggregate window functions #6566

Merged
merged 10 commits into from
Jul 5, 2023
Merged

Mismatch in MemTable of Select Into when projecting on aggregate window functions #6566

merged 10 commits into from
Jul 5, 2023

Conversation

berkaysynnada
Copy link
Contributor

Which issue does this PR close?

Closes #6492.

Rationale for this change

When writing query result to MemTable using SELECT .. INTO syntax, window aggregate projections without alias give error because of schema mismatch. As an example,

SELECT SUM(c1) OVER(ORDER BY c1) as sum1 INTO new_table FROM annotated_data_infinite
has no problem but
SELECT SUM(c1) OVER(ORDER BY c1) INTO new_table FROM annotated_data_infinite
gives an error:
Plan("Mismatch between schema and batches").

This is because of the schema, which is created from the input LogicalPlan, has fields whose names are the result of display_name() (It writes the whole expression, func + window specs). However, the RecordBatch's fields of partitions are the result of physical_name(). (It writes only the function part of the expr).

What changes are included in this PR?

In create_memory_table() function, there is a match arm that handles the case which the table does not exist. In that case, we initialize the MemTable with try_new(), comparing the fields one-to-one. For these not registered and newly created tables, we don't need to check LogicalPlan schema and the schema coming from partitions. By implementing a MemTable::new_not_registered() function, we can directly adopt the schema coming from partitions. In case of empty batches (Create Table statements without values inserted), we can use input plan's schema.

Are these changes tested?

Yes, the erroneous example above is tested.

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Jun 6, 2023
Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We touched this window problem here #5695 (comment)

so to fix the issue completely we need to realias correctly instead of shorten the name in physical plan
https://github.com/apache/arrow-datafusion/blob/26e1b20ea3362ea62cb713004a0636b8af6a16d7/datafusion/core/src/physical_plan/planner.rs#L1630

@@ -73,6 +75,26 @@ impl MemTable {
}
}

/// Create a new in-memory table from the record batches.
/// In case of empty table, the schema is inferred from the input plan.
pub fn new_not_registered(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code effectively ignores the schema argument if any of partitions has a RecordBatch and uses that RecordBatch's schema instead.

I think this is a surprising behavior and might mask errors in the future

@@ -518,7 +518,7 @@ impl SessionContext {
let physical = DataFrame::new(self.state(), input);

let batches: Vec<_> = physical.collect_partitioned().await?;
let table = Arc::new(MemTable::try_new(schema, batches)?);
let table = Arc::new(MemTable::new_not_registered(schema, batches));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the core problem here is schema (aka the schema of input) does not actually match the schema of the batches that are produced.

As I think @comphead is suggesting in #6566 (review), this PR seems to be trying to workaround the deeper problem of the window exec producing an output schema (different column names) than the LogicalPlan says.

Have you looked into making the names consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, my first suggestion is to use longer version of the name in planner (what is done in the LogicalPlan). However, there need to be many changes in tests, and the BoundedWindowAggExec lines may become too long. If it is not a problem, we can solve the issue like that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using the same names in physical and logical plans is preferable because the rest of the parts of the code expects this and sometimes makes assumptions that it is the case (because it mostly is).

If we don't make the logical and physical plans match up, I predict we will continue to hit a long tail of bugs related to schema mismatches, only when using window functions related to the discrepancy.

If the long display name is a problem (and I can see how it would be) perhaps we can figure out how to make display_name produce something shorter for window functions other than serializing the entire window definition

Here is what postgres does:

postgres=# select first_value(x) over (order by x) from foo;
 first_value
-------------
           1
(1 row)

We probably need to do something more sophisticated as DataFusion needs distinct column names.

@berkaysynnada
Copy link
Contributor Author

We touched this window problem here #5695 (comment)

so to fix the issue completely we need to realias correctly instead of shorten the name in physical plan

https://github.com/apache/arrow-datafusion/blob/26e1b20ea3362ea62cb713004a0636b8af6a16d7/datafusion/core/src/physical_plan/planner.rs#L1630

In planner, modifying the lines

    let (name, e) = match e {
        Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
        _ => (physical_name(e)?, e),
    };

to that

    let (name, e) = match e {
        Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
        _ => (e.canonical_name(), e), 
    };

solves my issue. Is this what you mean by realiasing?

@alamb
Copy link
Contributor

alamb commented Jun 7, 2023

solves my issue. Is this what you mean by realiasing?

Yes, that is what I was referring to

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/arrow-datafusion/pull/6373/files
This is realias for one of the similar cases in the plan builder. After that schema and batches are in sync.

Sorry I should have attached this PR earlier to save @berkaysynnada time

@github-actions github-actions bot added the sql SQL Planner label Jun 8, 2023
@berkaysynnada
Copy link
Contributor Author

I have updated the code and took out the old stuff. When the final window plan is built, we realias the aggregate window expressions. It is worked for SELECT INTO case. I'd like to know what you think about these changes. Thank you.

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @berkaysynnada. Great work. The realias way is good.
And also its good to have a generic function to shorten the column name.

I will let @alamb to look into this again. Initial vision was to move the window shortening name logic to the builder and realias instead of changing the name forcibly in physical plan.

With introduced logic the PR becomes more breaking.

  • columns names not synced in the SELECT and SELECT INTO which might be unexpected.
  • columns are our contract with users, so new col names in the memtable need to be tested for every function and documented

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @berkaysynnada and @comphead -- I am sorry for the delay in response. I don't think this is quite the right approach yet

let right = create_physical_name(right, false)?;
Ok(format!("{left} {op} {right}"))
}
Expr::Case(case) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, this appears to the same code as https://github.com/apache/arrow-datafusion/blob/1af846bd8de387ce7a6e61a2008917a7610b9a7b/datafusion/physical-expr/src/expressions/case.rs#L66-L77

If we ever changed the code in phsical-expr and did not change this code, would that cause problems?

@@ -555,3 +565,288 @@ fn match_window_definitions(
}
Ok(())
}

fn create_function_physical_name(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this functionality need to remain in sync with the creation of physical names?

Comment on lines 201 to 207
if select.into.is_some() {
for expr in select_exprs_post_aggr.iter_mut() {
if let Expr::Column(_) = expr.clone() {
*expr = expr.clone().alias(physical_name(expr)?);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry if my past comments have been confusing. Here is what I was trying to say earlier in #6566 (comment):

I ran this command to get some logs (with the extra debug in PR #6626):

RUST_LOG=debug cargo test --test sqllogictests -- ddl 2>&1 | tee /tmp/debug.log

Here is the content of debug.log: debug.log

From the log, here is the LogialPlan that shows the WindowAggr declares it makes a column named SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW (yes that whole thing!)

    Projection: SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, test_table.c2, test_table.c3
      WindowAggr: windowExpr=[[SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
        TableScan: test_table projection=[c1, c2, c3]

Here is the final ExecutionPlan, also showing the same giant column as the declared output name:

[2023-06-10T11:43:29Z DEBUG datafusion::physical_plan::planner] Optimized physical plan:
    ProjectionExec: expr=[SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(test_table.c1), c2@1 as c2, c3@2 as c3]
      BoundedWindowAggExec: wdw=[SUM(test_table.c1): Ok(Field { name: "SUM(test_table.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Float32(NULL)), end_bound: CurrentRow }], mode=[Sorted]
        SortPreservingMergeExec: [c2@1 ASC NULLS LAST]
          SortExec: expr=[c2@1 ASC NULLS LAST]
            MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]

However, looking at the logs, what the execution plan actually produces a column named SUM(test_table.c1):

[2023-06-10T11:43:29Z DEBUG datafusion::datasource::memory] mem schema does not contain batches schema.

Target_schema: Schema { fields: [
  Field { name: "SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
  Field { name: "c2", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
  Field { name: "c3", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {}
}.


Batches Schema: Schema { fields: [
  Field { name: "SUM(test_table.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
  Field { name: "c2", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
  Field { name: "c3", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {}
}

Thus, what I was trying to say earlier is that I think the root of the problem is the mismatch between what the plans say the field name of the output is and what the field name that the WindowExec is actually producing.

So I think we should fix this bug by resolving the mismatch. Either:

  1. Update the Logical/Physical plans so the field names of WindowAgg matches what the BoundedWindowAggExec actually produces
  2. OR Update BoundedWindowAggExec to produce the field names declared by the `WindowAggExec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alamb @berkaysynnada
Would you mind if I also open a small PR for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind if I also open a small PR for this?

I would be very much appreciative, personally

Copy link
Contributor Author

@berkaysynnada berkaysynnada Jun 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply. I have tried both suggestions @alamb:

  1. We need to modify 3 parts of the code:
    a- In the project() function in select.rs, the final schema will be constructed with a shortened form of the window function.
    fn to_field(&self, input_schema: &DFSchema) -> Result<DFField> {
        match self {
            Expr::Column(c) => Ok(DFField::new(
                c.relation.clone(),
                &c.name,
                self.get_type(input_schema)?,
                self.nullable(input_schema)?,
            )),
            _ => {
                Ok(DFField::new_unqualified(
                    &self.display_name()?,
                    self.get_type(input_schema)?,
                    self.nullable(input_schema)?,
                ))
            }
        }
    }

is expanded with that arm:

          Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
                Ok(DFField::new_unqualified(
                    &vec![create_function_name(&fun.to_string(), false, args)?].join(" "),
                    self.get_type(input_schema)?,
                    self.nullable(input_schema)?,
                ))
            }

b- In the project() function again, qualified wildcard columns are normalized. However, the column name is in the longer form, and the schema of the plan is in the shorter form. Therefore, we also change the expr_as_column_expr() function so that window function expressions are converted to column expressions with the shortened column name format, which can be copied from the schema.
c- PushDownProjection rule again creates new column expressions with display_name() function (which returns the long format) in the window handling arm. These column names also need to be shortened to satisfy subsequent control.

My opinion: Can we directly change the display_name() function for window functions such that only function name and arguments are returned? Thus we don't need to change any of what I mentioned above.

        Expr::WindowFunction(WindowFunction {
            fun, args, window_frame, partition_by, order_by,
        }) => {
            let mut parts: Vec<String> =
                vec![create_function_name(&fun.to_string(), false, args)?];
            if !partition_by.is_empty() {
                parts.push(format!("PARTITION BY {partition_by:?}"));
            }
            if !order_by.is_empty() {
                parts.push(format!("ORDER BY {order_by:?}"));
            }
            parts.push(format!("{window_frame}"));
            Ok(parts.join(" "))
        }

new version:

Expr::WindowFunction(WindowFunction {
            fun, args, ..
        }) => {
            let mut parts: Vec<String> =
                vec![create_function_name(&fun.to_string(), false, args)?];
            Ok(parts.join(" "))
        }
  1. We can only change create_window_expr() such that it creates the window name with display_name() rather than physical_name(), but it makes the plans longer, also lots of test change burden.

I would like to wrap up this PR and any thoughts you have would be really helpful. Can you review the alternatives above when you get a chance? Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@berkaysynnada thanks for checking that.

I was also working on that.
Changing display_name was the one I started with but in this case other scenarios will fail. When window plan created the DFS schema check name uniqueness from display_name not considering aliases. So this query will fail

SELECT
  first_value(c9) OVER (PARTITION BY c2 ORDER BY c9) first_c9,
  first_value(c9) OVER (PARTITION BY c2 ORDER BY c9 DESC) first_c9_desc
FROM aggregate_test_100

I'm still thinking how to overcome that without breaking changes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb, @comphead: What do you think? Should we move forward with this approach?

Yes, that is what I think we should do. If the overly verbose column (at the output) names are a problem, perhaps we can look into updating the planner to automatically add more reasonable aliases

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, we need to get back on column naming convention as currently long names are not user friendly and not useful without aliases in nested queries

Copy link
Contributor Author

@berkaysynnada berkaysynnada Jun 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should also be considered that we cannot support more than one column with the same alias, if we intend to shorten the names at the output by realiasing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. We will go forward with that approach and @berkaysynnada will update you guys of the progress.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related issues: #6543 and #6758

@alamb
Copy link
Contributor

alamb commented Jun 20, 2023

Marking as draft as we work through feedback

@alamb alamb marked this pull request as draft June 20, 2023 19:54
@github-actions github-actions bot added the logical-expr Logical plan and expressions label Jun 21, 2023
@github-actions github-actions bot removed the logical-expr Logical plan and expressions label Jul 3, 2023
@berkaysynnada
Copy link
Contributor Author

berkaysynnada commented Jul 3, 2023

We have observed that the mismatch problem is not only related to SELECT INTO's. Table creation with CREATE TABLE AS has the same problem while matching the schemas.

With my last commits, I have changed how the window expressions are named to be used in window executors. Now, window function columns of the batches have the same name as the plans.

.slt and unit tests are edited accordingly. We now have longer plans and column names, but the problem is solved. To make progress, I think applying this PR and opening another issue that shortens the window names would work better.

@comphead
Copy link
Contributor

comphead commented Jul 3, 2023

We have observed that the mismatch problem is not only related to SELECT INTO's. Table creation with CREATE TABLE AS has the same problem while matching the schemas.

With my last commits, I have changed how the window expressions are named to be used in window executors. Now, window function columns of the batches have the same name as the plans.

.slt and unit tests are edited accordingly. We now have longer plans and column names, but the problem is solved. To make progress, I think applying this PR and opening another issue that shortens the window names would work better.

Thanks @berkaysynnada I will check this today. You right, the problem is related to create mem table and can be reproduced with

let sql = "create table t as SELECT SUM(c1) OVER(ORDER BY c1) FROM (select 1 c1)"

To make it window aliases shorten this is good idea, I can create a followup issue. I was looking into this for last couple of days and there is mess in select.rs for window processing, actually leading DFSchema to fail on shortened names even if they aliased

@berkaysynnada berkaysynnada marked this pull request as ready for review July 3, 2023 20:48
@ozankabak
Copy link
Contributor

@comphead, I agree that it is better to get correct behavior first in this PR and fix the bug, and make name shortening the subject of a follow-on PR.

@alamb, does this look good to you?

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. Thankss @berkaysynnada I will create a followup issue

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you everyone. 👍

@alamb alamb merged commit da63d34 into apache:main Jul 5, 2023
20 checks passed
2010YOUY01 pushed a commit to 2010YOUY01/arrow-datafusion that referenced this pull request Jul 5, 2023
…ow functions (apache#6566)

* Schema check of partitions and input plan is removed for newly registered tables.

* minor changes

* In Select Into queries, aggregate windows are realiased with physical_name()

* debugging

* display_name() output is simplified for window functions

* Windows are displayed in long format

* Window names in tests are edited

* Create table as test is added

---------

Co-authored-by: Mustafa Akur <[email protected]>
@berkaysynnada berkaysynnada deleted the feature/select-into-mismatch branch July 6, 2023 08:36
alamb pushed a commit to alamb/datafusion that referenced this pull request Jul 6, 2023
…ow functions (apache#6566)

* Schema check of partitions and input plan is removed for newly registered tables.

* minor changes

* In Select Into queries, aggregate windows are realiased with physical_name()

* debugging

* display_name() output is simplified for window functions

* Windows are displayed in long format

* Window names in tests are edited

* Create table as test is added

---------

Co-authored-by: Mustafa Akur <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Mismatch in MemTable (Select Into with aggregate window functions having no alias)
5 participants