Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug in HashJoin output_partition cause the input from left input not fully executed #5738

Closed
duongcongtoai opened this issue Mar 25, 2023 · 7 comments · Fixed by #5768
Closed
Labels
bug Something isn't working

Comments

@duongcongtoai
Copy link
Contributor

Describe the bug

The HashJoinExec decides output_partition based on this function: https://github.com/apache/arrow-datafusion/blob/b7a33317c2abf265f4ab6b3fe636f87c4d01334c/datafusion/core/src/physical_plan/joins/utils.rs#L90

If PartitionMode is set to Partitioned, join_type is RIGHT, output_partition will depend on output_partition of the right child, this may cause missing execution on left child partitions, if left child has more partitions than right child partition: https://github.com/apache/arrow-datafusion/blob/e87754cfe3afa4c358a8ca9c21c3c4acd020dfe5/datafusion/core/src/physical_plan/joins/hash_join.rs#L413

To Reproduce

Code in this gist

Create 2 ExecutionPlan input from csv with only 1 field "id" and create a HashJoinExec from these inputs. Because during the execution, some parition from the left input is not executed on, they are never probed with associated rows in the right input, so result in a false join:

+----+----+
| id | id |
+----+----+
|    | 2  |
|    | 3  |
|    | 6  |
|    | 7  |
|    | 9  |
|    | 1  |
|    | 4  |
|    | 5  |
|    | 8  |
+----+----+

Expected behavior

HashJoin executes correctly

+----+----+
| id | id |
+----+----+
| 1  | 1  |
| 9  | 9  |
| 5  | 5  |
| 8  | 8  |
| 6  | 6  |
| 7  | 7  |
| 4  | 4  |
| 2  | 2  |
| 3  | 3  |
+----+----+

Additional context

No response

@duongcongtoai duongcongtoai added the bug Something isn't working label Mar 25, 2023
@mingmwang
Copy link
Contributor

I will take a look.

@mingmwang
Copy link
Contributor

@duongcongtoai
Regarding more partitions, do you mean the input partition count is different among left input and right input ?
Currently in DataFusion, we will insert a RepartitionExec for left input and right input, after the RepartitionExec
the partition count should be always the same.

@duongcongtoai
Copy link
Contributor Author

Thank you. Shoud we have a small validation in new function to notify users about this constraint?

@mingmwang
Copy link
Contributor

@duongcongtoai Agree. We should add a validation to enforce this constraint.

@duongcongtoai
Copy link
Contributor Author

Okay, let me open my first PR :D

@mingmwang
Copy link
Contributor

Okay, let me open my first PR :D

Sure, I think you can add the input partition count check during the real execution time(not plan time) in the execute()method.
And I think both the HashJoinExec and SortMergeJoinExec will need this check. I'm not sure for SymmetricHashJoinExec whether the check is required or not.

@duongcongtoai
Copy link
Contributor Author

@mingmwang do we have special reason to validate at execution time instead of plan time? If the constraint is violated, we can avoid starting uncessary execution in partitioned tasks right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants