Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix predicate pushdown for outer joins #1618

Merged
merged 13 commits into from
Jan 24, 2022
Merged

Conversation

james727
Copy link
Contributor

Which issue does this PR close?

Closes #1586

What changes are included in this PR?

This change does a couple of things:

  1. Does not attempt to push down any predicates for the nullable (non-preserved) side of a given join
  2. Bails on the optimization to duplicate post-join filters on a column involved in the join equality to the other side of the join, if the other side of the join is nullable.

I'll elaborate on point 2 here as I think the above is a bit difficult to parse. Consider the following query:

SELECT *
FROM t1 JOIN t2 
  on t1.id = t2.uid 
WHERE t1.id > 1

Right now this is being rewritten as:

SELECT *
FROM t1 JOIN t2 
  on t1.id = t2.uid 
WHERE t1.id > 1 AND t2.uid > 1 -- Duplicate filter to t2.uid, so it can be pushed down

This is correct in the case of certain joins. However, I believe that in cases where the other join side is non-preserved, we should not apply this optimization. For example, if we change the above join to a LEFT join, the query rewrite becomes:

SELECT *
FROM t1 LEFT JOIN t2 
  on t1.id = t2.uid 
WHERE t1.id > 1 AND t2.uid > 1

The additional filter on t2.uid is wrong - as it omits null rows that would have been present in the original query.

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Jan 20, 2022
@xudong963
Copy link
Member

Oh, thanks @james727 ! -- I'll review it later.

@alamb
Copy link
Contributor

alamb commented Jan 20, 2022

I also hope to review this tomorrow

@houqp houqp added the performance Make DataFusion faster label Jan 21, 2022
@liukun4515
Copy link
Contributor

great fix.

@@ -210,6 +210,26 @@ async fn left_join_unbalanced() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn left_join_filter_pushdown() -> Result<()> {
// Since t2 is the non-preserved side of the join, we cannot push down a NULL filter.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think after you finish #1585, the annotation will be stale. So we can add a todo to update it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this case, we can't even rewrite the join, since IS NULL by definition does not remove nulls. So this should still be valid? I'll try and clarify a bit in the comment though.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all @james727 thank you for looking into this -- NULL semantics are some of the trickiest parts of SQL and I salute you and @xudong963 for working on this. ❤️

I think there are still some subtle bugs here (though I admit it is tricky) as I have highlighted; The code is really nice and I think it is very close.

I think we can add some additional tests based on the examples in #1321 (expected postgres behavior) as well as in #1339 (comment) and #1339 (comment). I recommend we port those tests to datafusion as part of this PR.

let expected = "\
Filter: #test2.a <= Int64(1)\
\n Join: Using #test.a = #test2.a\
\n Filter: #test.a <= Int64(1)\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct -- the predicate test2.a <= 1 can be pushed down to the non preserved (aka test2) side of the join. It can't be pushed down the test1 side. The rationale is that each row of test (NOT test2) must be preserved through the LEFT JOIN as if there are no matching rows in test2 then the row in test is returned padded with nulls.

Here is an example of the discrepancy with postgres:

create table t1 as select * from (values (1), (null), (2)) as sq;
create table t2 as select * from (values (2), (null), (3)) as sq;

Postgres:

alamb=# select * from t1 LEFT JOIN t2 ON t1.column1=t2.column1 WHERE t2.column1 IS NULL;
 column1 | column1 
---------+---------
       1 |        
         |        
(2 rows)

This branch incorrectly filters out the value from column1

select * from t1 LEFT JOIN t2 ON t1.column1=t2.column1 WHERE t2.column1 IS NULL;
+---------+---------+
| column1 | column1 |
+---------+---------+
|         |         |
+---------+---------+

Copy link
Contributor Author

@james727 james727 Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh man, great catch! I think this gets back to some previous discussion around filters that preserve nulls vs filters that do not preserve nulls. It seems this filter duplication logic is valid for filters that do not preserve nulls (e.g. - in this test case), but breaks when the filter preserves nulls (as shown in your example above).

E.g. - correct me if I'm wrong I think that duplicating the filter is correct in the following query:

SELECT *
FROM t1 LEFT JOIN t2
ON t1.x = t2.y
WHERE t2.y > 5 -- Can duplicate filter for t1.x > 5

What do you think about going back to the logic in this commit that just skips this optimization for non-inner joins (before I got fancy with it). If a filter does not preserve nulls, we will be able to rewrite the join as an INNER join anyways, so once that optimization is implemented we'll get the correct duplication logic with another pass of this optimizer.

async fn left_join_filter_pushdown() -> Result<()> {
// Since t2 is the non-preserved side of the join, we cannot push down a NULL filter.
let mut ctx = create_join_context_with_nulls()?;
let sql = "SELECT t1_id, t2_id, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id WHERE t2_name IS NULL ORDER BY t1_id";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 this does match postgres:

alamb=# create table t1(t1_id, t1_name) as select * from (values (11, 'a'), (22, 'b'), (33, 'c'), (44, 'd'), (77, 'e')) as sq;
SELECT 5
alamb=# create table t2(t2_id, t2_name) as select * from (values (11, 'z'), (22, null), (44, 'x'), (55, 'w')) as sq;
SELECT 4
alamb=# SELECT t1_id, t2_id, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id WHERE t2_name IS NULL ORDER BY t1_id;
 t1_id | t2_id | t2_name 
-------+-------+---------
    22 |    22 | 
    33 |       | 
    77 |       | 
(3 rows)

I think the key would be to test adding a filter on t2_id however (as that is the predicate type that is created)

JoinType::Inner => (true, true),
JoinType::Left => (true, false),
JoinType::Right => (false, true),
JoinType::Full => (false, false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both sides are preserved in a FULL join (as in this should probably be (true, true)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - my impression was that FULL was a FULL OUTER join - i.e. both sides are nullable/non-preserved? Is this not right? I might have mixed it up with CrossJoin.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Full is FULL OUTER join... I think in that case the inputs are both preserved (as in you can't push filters down through the join if it isn't for that column)

Though that doesn't seem consistent with JoinType::Inner

Maybe

            JoinType::Inner => (false, false),

as predicates can be pushed down both sides there 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have the opposite definition of preserved 😭 - I've been using preserved to refer to the side that cannot provide nulls - e.g. the left side of a left join, right side of a right join, both sides of an inner join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe whether a side is nullable is the better distinction here? I see your point about both sides of the FULL join being preserved, but you still can't push filters because they can provide extra null rows.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are two different things making this confusing:

  1. for predicates that are null preserving (like t1 <= 5) in the WHERE clause it is actually fine to create a predicate like t2 < 5 even for outer joins (try it with postgres, you'll find that predicate filters out any rows that didn't have matches on either side so it is ok)
  2. For predicates that are not null preserving (like t1 is NULL) you CAN'T push predicates down (which I think is the core of this PR)

To make things even more confusing, the rules of when you can push / not push are different if the predicate is in the ON clause)

Also I don't think DataFusion currently tracks the null preserving property for Exprs -- so the safe (correct thing) is to treat all predicates like they may not be null preserving and not create/push them for outer joins. A follow on exercise would be to allow more predicates to be pushed down by checking null preserving

schema: &DFSchema,
preserved: bool,
) -> Predicates<'a> {
if !preserved {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this check is wrong -- you can't push predicates to the preserved side

Suggested change
if !preserved {
if preserved {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the idea was that for the nullable side of the join, you cannot push down filters (but you may be able to rewrite the join to an INNER and then push down filters), but for the preserved/non-nullable side you can. This was my takeaway from #1586 and from reading around online.

E.g. - it seems safe to push down the filter in this query:

SELECT *
FROM t1 LEFT JOIN t2
ON t1.x = t2.y
WHERE t1.z > 5

While if the filter was on a column of t2, we could not push it down.

There is a good chance I have this totally wrong but wanted to double check before proceeding with the suggested change.

@james727
Copy link
Contributor Author

james727 commented Jan 21, 2022

Thanks for the review @alamb! We already have some active discussion on this PR but I'd propose the following next steps on my end:

  1. Add the suggested tests (and fix any associated breakage)
  2. Roll back the filter duplication optimization to only apply for INNER joins (as discussed in Fix predicate pushdown for outer joins #1618 (comment))

I think the other main open thread is around the term preserved and its definition, it seems there is some misalignment (I probably just used the wrong definition). What are your thoughts on the best way to decide on the terminology/definitions to use here (and document it going forward)?

@alamb
Copy link
Contributor

alamb commented Jan 21, 2022

Roll back the filter duplication optimization to only apply for INNER joins (as discussed in Fix predicate pushdown for outer joins #1618 (comment))

I think this is a good (safe) outcome. Pushing predicates down through outer joins can be treated as a follow on, more sophisticated optimization 👍

@james727
Copy link
Contributor Author

@alamb I think this is ready for another look. I've updated the logic as discussed (do not duplicate filters on the join column unless it's an inner join) and added a bunch of tests. I also extended the comment on lr_is_preserved to hopefully be clear about how I've defined "preserved" in this PR. I think the way I have defined it, the logic is correct, but I'm curious your thoughts on whether this is the best concept to move forward with.

@alamb
Copy link
Contributor

alamb commented Jan 24, 2022

Sorry for the delay @james727 -- this type of PR takes a non trivial amount of time for me to review ;) I need to ensure I have focus time to do so. Going in now

} else if col == r {
join_cols_to_replace.insert(col, l);
break;
if *join_type == JoinType::Inner {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes sense (to only create synthetic predicates for inner joins for now).

I will try and capture some of the potential follow on improvements in follow on tickets

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @james727 I went through this carefully and the code now makes a lot of sense to me.

I will try and file some fun follow on tickets about more join tricks that can be done.

@alamb
Copy link
Contributor

alamb commented Jan 24, 2022

Epic work @james727 -- thanks again. Kudos to @xudong963 for the help reviewing

@alamb alamb merged commit 992624a into apache:master Jan 24, 2022
@alamb
Copy link
Contributor

alamb commented Jan 24, 2022

Follow on #1670

@xudong963
Copy link
Member

Thanks @james727 !

paveltiunov pushed a commit to cube-js/arrow-datafusion that referenced this pull request Nov 29, 2022
* Add failing test for outer join with null filter

* Fix typos in filter_push_down comment

* Don't push predicates to unpreserved join sides

* Do not duplicate filters for joined columns for non inner joins

* Add some more tests

* Only skip filter duplication for non-preserved side

* Only duplicate filters on the join column for inner joins

* Clarify test comment

* Add some sql tests

* Add some data to test tables

* Add more sql tests

* Clarify definition of preserved

* Remove redundant logic
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate performance Make DataFusion faster
Projects
None yet
Development

Successfully merging this pull request may close these issues.

datafusion doesn't process predicate pushdown correctly when there is outer join
5 participants