Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mismatch in MemTable (Select Into with aggregate window functions having no alias) #6492

Closed
berkaysynnada opened this issue May 30, 2023 · 6 comments · Fixed by #6566
Closed
Labels
bug Something isn't working

Comments

@berkaysynnada
Copy link
Contributor

berkaysynnada commented May 30, 2023

Describe the bug

When writing query result to MemTable using SELECT .. INTO syntax, some queries gives error because of schema mismatch. As an example,

SELECT SUM(c1) OVER(ORDER BY c1) as sum1 INTO new_table FROM annotated_data_infinite
has no problem but
SELECT SUM(c1) OVER(ORDER BY c1) INTO new_table FROM annotated_data_infinite
gives an error: Plan("Mismatch between schema and batches").

The reason is that in MemTable::try_new(), the schema and partitions' schema don't match. I have tracked the reason and saw that the schema, which is created from the input LogicalPlan, has fields whose names are the result of display_name() (It writes the whole expression, func + window specs). However, the RecordBatch's fields of partitions are the result of physical_name(), in case of no alias. (It writes only the function part of the expr).

To Reproduce

SELECT SUM(c1) OVER(ORDER BY c1) INTO new_table FROM annotated_data_infinite

Expected behavior

I have 2 solution approaches:

  1. create_window_expr() gives the name with display_name() while constructing the window expr. However, there needs to be many changes in tests, and the exec lines will become too long like:
    ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]
    Maybe Display's can be changed?

  2. contains() function implemented for Schema is used in try_new() of MemTable to match fields, and it uses contains() implemented for Field. It checks one-to-one equalities of all elements in the Field struct. Just for the name element, we can reduce the equality to something like schema_field.name().starts_with(partition_field.name()). If we do not prefer changing contains() function, maybe we can write some specialized function like

fn validate_partitions_wth_schema(schema: &SchemaRef, partitions: &Vec<Vec<RecordBatch>>) -> bool {
    if !partitions.iter().flatten().all(|p| p.schema().fields().len() == schema.fields().len())
    { return false; }
    for partition in partitions.iter().flatten() {
        for (schema_field, partition_field) in schema.fields().iter().zip(partition.schema().fields().iter())
        {
            if !schema_field.name().starts_with(partition_field.name()) || 
                schema_field.data_type() != partition_field.data_type()
            { return false; }
        }
    }
    true
}

But this approach also does not seem solid to me.

Additional context

Any advice is welcomed. I will solve the issue when we reach a common ground.

@berkaysynnada berkaysynnada added the bug Something isn't working label May 30, 2023
@berkaysynnada berkaysynnada changed the title Mismatch in MemTable (Select Into with no alias) Mismatch in MemTable (Select Into with aggregate window functions having no alias) May 30, 2023
@berkaysynnada
Copy link
Contributor Author

@alamb, when you have some time, could you please review my suggestions? I wonder about your thoughts on how to solve this issue.

@alamb
Copy link
Contributor

alamb commented Jun 1, 2023

Hi @berkaysynnada --

Here is a small reproducer:

❯ create table foo (x int) as values (1);
0 rows in set. Query took 0.001 seconds.
❯ explain select first_value(x) OVER () INTO bar from foo ;
+--------------+------------------------------------------------------------------------------------------------------------+
| plan_type    | plan                                                                                                       |
+--------------+------------------------------------------------------------------------------------------------------------+
| logical_plan | CreateMemoryTable: Bare { table: "bar" }                                                                   |
|              |   Projection: FIRST_VALUE(foo.x) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING                  |
|              |     WindowAggr: windowExpr=[[FIRST_VALUE(foo.x) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] |
|              |       TableScan: foo projection=[x]                                                                        |
+--------------+------------------------------------------------------------------------------------------------------------+
1 row in set. Query took 0.001 seconds.
❯  select first_value(x) OVER () INTO bar from foo ;
Error during planning: Mismatch between schema and batches

I wonder if you can solve this with an explicit alias rather than changing how the display names work.

So when the CreateMemoryTable is planned, add explicit aliases (maybe select "first_value(x) OVER () INTO bar from foo" ;

+--------------+------------------------------------------------------------------------------------------------------------+
| plan_type    | plan                                                                                                       |
+--------------+------------------------------------------------------------------------------------------------------------+
| logical_plan | CreateMemoryTable: Bare { table: "bar" }                                                                   |
|              |   Projection: FIRST_VALUE(foo.x) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING                  
 AS "first_value(x)"   <---- add this alias??|
|              |     WindowAggr: windowExpr=[[FIRST_VALUE(foo.x) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] |
|              |       TableScan: foo projection=[x]                                                                        |
+--------------+------------------------------------------------------------------------------------------------------------+

For the record, here is what postgres does:

postgres=# select first_value(x) OVER () INTO bar from foo ;
SELECT 1
postgres=# select * from bar;
 first_value
-------------
           1
(1 row)

@jonahgao
Copy link
Member

jonahgao commented Jun 2, 2023

Another similar case of failure:

DataFusion CLI v25.0.0
❯ create table t (a int not null);
0 rows in set. Query took 0.004 seconds.

❯ insert into t values(1);
Error during planning: Inserting query must have the same schema with the table.

Two non-matching schemas are:
input schema: Schema { fields: [Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }
table schema: Schema { fields: [Field { name: "a", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }

Can we ignore the schema part of batches and only focus on the actual data part?
And use the function RecordBatch::try_new to check if the data in the RecordBatch matches the schema of the target table.

impl MemTable {
    /// Create a new in-memory table from the provided schema and record batches
    pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) -> Result<Self> {
        let mut batches = Vec::with_capacity(partitions.len());
        for partition in partitions {
            let new_partition = partition
                .iter()
                .map(|batch| {
                    RecordBatch::try_new(schema.clone(), batch.columns().to_vec())
                        .map_err(DataFusionError::ArrowError)
                })
                .collect::<Result<Vec<_>>>()?;
            batches.push(Arc::new(RwLock::new(new_partition)));
        }
        Ok(Self { schema, batches })
    }
}

@alamb
Copy link
Contributor

alamb commented Jun 2, 2023

I think the proper solution is to update the planner code to project (e.g. with a LogicalPlan::Projection) the output of the query into the types of the target table

In the example #6492 (comment) from @byteink the table's schema says "it can't have nulls" (because it is marked as non-nullable) so in my opinion DataFusion is correctly rejecting this plan

I think this was not previously a problem because MemoryTables could not be updated so if the initial contents didn't contain nulls, they never would contain nulls (and thus that could be reflected in the schema)

Now that we an add new data to MemTables, what do you think about ensuring the schema of the MemTable when created via INSERT ... is nullable?

@jonahgao
Copy link
Member

jonahgao commented Jun 5, 2023

I think there may be two issues that need to be considered.

1. What is the schema of a table created with initial contents (particularly the behavior of nullability)

MySQL

Query expressions and column definitions can appear together in the create table statement, and MySQL will check if there is a conflict between the two.

mysql> create table t1(a int not null) as select 1 as a;
Query OK, 1 row affected (0.06 sec)
Records: 1  Duplicates: 0  Warnings: 0

mysql> describe t1;
+-------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------+------+-----+---------+-------+
| a     | int  | NO   |     | NULL    |       |
+-------+------+------+-----+---------+-------+
1 row in set (0.00 sec)


mysql> create table t2(a int not null) as select null as a;
ERROR 1048 (23000): Column 'a' cannot be null

PostgreSQL

SELECT INTO and CREATE TABLE AS statements do not support specifying column constraints.
It seems that the schema of the table created through these statements is always nullable.

postgres => create table t1(a int not null);
CREATE TABLE
postgres => create table t2 as select * from t1;
SELECT 0
postgres => \d t2
                 Table "public.t2"
 Column |  Type   | Collation | Nullable | Default
--------+---------+-----------+----------+---------
 a      | integer |           |          |

postgres=> select * into t3 from t1;
SELECT 0
postgres=> \d t3
                 Table "public.t3"
 Column |  Type   | Collation | Nullable | Default
--------+---------+-----------+----------+---------
 a      | integer |           |          |

Datafusion

Now that we an add new data to MemTables, what do you think about ensuring the schema of the MemTable when created via INSERT ... is nullable?

I think it is ok, just like PostgreSQL.

But we should reject the following statement:

DataFusion CLI v25.0.0
❯ create table t(a int not null) as select null as a;
0 rows in set. Query took 0.005 seconds.
❯ select * from t;
+---+
| a |
+---+
|   |
+---+
1 row in set. Query took 0.005 seconds.

The user specified that the column should not be nullable, but this specification did not take effect.

2. What kind of data can be inserted into MemoryTable

I think using the comparison of whether the two schemas are the same may be too strict.
Declaring the schema as nullable doesn't necessarily mean that the actual data is always NULL.

The following example can run successfully in PostgreSQL, but not in Datafusion.

postgres=> create table foo(a int, b int);
CREATE TABLE
postgres=> insert into foo values(1,10), (2, NULL);
INSERT 0 2
postgres=> create table bar(b int not null);
CREATE TABLE

postgres=> insert into bar select b from foo where a=1;
INSERT 0 1
postgres=> insert into bar select b from foo where a=2;
ERROR:  null value in column "b" of relation "bar" violates not-null constraint
DETAIL:  Failing row contains (null).

We should verify the resulting data after executing the input's execution plan.

@berkaysynnada
Copy link
Contributor Author

berkaysynnada commented Jun 5, 2023

In create_memory_table() function, there is a match arm that handles the case which the table does not exist. In that case, we initialize the MemTable with try_new(), comparing the fields one-to-one. For these not registered and newly created tables, do we need to check LogicalPlan schema and the schema coming from partitions? By implementing a MemTable::new_not_registered() function, can we directly adopt the schema coming from partitions? In case of empty batches (Create Table statements without values inserted), we can use input plan's schema.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants