Skip to content

Commit

Permalink
Protobuf implementations with roundrobin tests (#193)
Browse files Browse the repository at this point in the history
* Protobuf implementations with roundrobin

* Proto

* Update mod.rs
  • Loading branch information
metesynnada committed Dec 15, 2023
1 parent 425d340 commit 9022c06
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 19 deletions.
48 changes: 46 additions & 2 deletions datafusion/physical-plan/src/joins/sliding_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ pub struct SlidingHashJoinExec {
/// If null_equals_null is true, null == null else null != null
pub(crate) null_equals_null: bool,
/// Left side sort expression(s)
left_sort_exprs: Vec<PhysicalSortExpr>,
pub(crate) left_sort_exprs: Vec<PhysicalSortExpr>,
/// Right side sort expression(s)
right_sort_exprs: Vec<PhysicalSortExpr>,
pub(crate) right_sort_exprs: Vec<PhysicalSortExpr>,
/// The output ordering
output_ordering: Option<Vec<PhysicalSortExpr>>,
/// Partition mode
Expand Down Expand Up @@ -309,6 +309,50 @@ impl SlidingHashJoinExec {
),
]
}
/// left (build) side which gets hashed
pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
&self.left
}

/// right (probe) side which are filtered by the hash table
pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
&self.right
}

/// Set of common columns used to join on
pub fn on(&self) -> &[(Column, Column)] {
&self.on
}

/// Filters applied before join output
pub fn filter(&self) -> &JoinFilter {
&self.filter
}

/// How the join is performed
pub fn join_type(&self) -> &JoinType {
&self.join_type
}

/// The partitioning mode of this hash join
pub fn partition_mode(&self) -> &StreamJoinPartitionMode {
&self.mode
}

/// Get null_equals_null
pub fn null_equals_null(&self) -> bool {
self.null_equals_null
}

/// Get left_sort_exprs
pub fn left_sort_exprs(&self) -> &Vec<PhysicalSortExpr> {
&self.left_sort_exprs
}

/// Get right_sort_exprs
pub fn right_sort_exprs(&self) -> &Vec<PhysicalSortExpr> {
&self.right_sort_exprs
}
}

impl DisplayAs for SlidingHashJoinExec {
Expand Down
14 changes: 12 additions & 2 deletions datafusion/physical-plan/src/joins/sliding_nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,12 @@ impl SlidingNestedLoopJoinExec {
})
}

/// left (build) side
/// left (build) side which gets hashed
pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
&self.left
}

/// right (probe) side
/// right (probe) side which are filtered by the hash table
pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
&self.right
}
Expand All @@ -250,6 +250,16 @@ impl SlidingNestedLoopJoinExec {
&self.join_type
}

/// Get left_sort_exprs
pub fn left_sort_exprs(&self) -> &Vec<PhysicalSortExpr> {
&self.left_sort_exprs
}

/// Get right_sort_exprs
pub fn right_sort_exprs(&self) -> &Vec<PhysicalSortExpr> {
&self.right_sort_exprs
}

/// Calculate order preservation flags for this join.
fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
vec![
Expand Down
33 changes: 28 additions & 5 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,8 @@ message PhysicalPlanNode {
SymmetricHashJoinExecNode symmetric_hash_join = 25;
InterleaveExecNode interleave = 26;
PlaceholderRowExecNode placeholder_row = 27;
SlidingHashJoinExecNode sliding_hash_join = 2400;
SlidingNestedLoopJoinExecNode sliding_nested_loop_join = 2500;
}
}

Expand Down Expand Up @@ -1434,6 +1436,11 @@ enum PartitionMode {
AUTO = 2;
}

enum StreamPartitionMode {
SINGLE_PARTITION = 0;
PARTITIONED_EXEC = 1;
}

message HashJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
Expand All @@ -1444,11 +1451,6 @@ message HashJoinExecNode {
JoinFilter filter = 8;
}

enum StreamPartitionMode {
SINGLE_PARTITION = 0;
PARTITIONED_EXEC = 1;
}

message SymmetricHashJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
Expand All @@ -1463,6 +1465,27 @@ message InterleaveExecNode {
repeated PhysicalPlanNode inputs = 1;
}

message SlidingHashJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
repeated JoinOn on = 3;
JoinType join_type = 4;
StreamPartitionMode partition_mode = 6;
bool null_equals_null = 7;
JoinFilter filter = 8;
repeated PhysicalExprNode left_sort_exprs = 9;
repeated PhysicalExprNode right_sort_exprs = 10;
}

message SlidingNestedLoopJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
JoinType join_type = 3;
JoinFilter filter = 4;
repeated PhysicalExprNode left_sort_exprs = 5;
repeated PhysicalExprNode right_sort_exprs = 6;
}

message UnionExecNode {
repeated PhysicalPlanNode inputs = 1;
}
Expand Down
Loading

0 comments on commit 9022c06

Please sign in to comment.