Skip to content

Commit

Permalink
improve hashjoin execution metrics (#4394)
Browse files Browse the repository at this point in the history
* improve hashjoin execution metrics

* improve hashjoin execution metrics
  • Loading branch information
HuSen8891 authored Nov 29, 2022
1 parent fa4bea8 commit 66c95e7
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions datafusion/core/src/physical_plan/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ pub struct HashJoinExec {
#[derive(Debug)]
struct HashJoinMetrics {
/// Total time for joining probe-side batches to the build-side batches
join_time: metrics::Time,
probe_time: metrics::Time,
/// Total time for building hashmap
build_time: metrics::Time,
/// Number of batches consumed by this operator
input_batches: metrics::Count,
/// Number of rows consumed by this operator
Expand All @@ -162,7 +164,9 @@ struct HashJoinMetrics {

impl HashJoinMetrics {
pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition);
let probe_time = MetricBuilder::new(metrics).subset_time("probe_time", partition);

let build_time = MetricBuilder::new(metrics).subset_time("build_time", partition);

let input_batches =
MetricBuilder::new(metrics).counter("input_batches", partition);
Expand All @@ -175,7 +179,8 @@ impl HashJoinMetrics {
let output_rows = MetricBuilder::new(metrics).output_rows(partition);

Self {
join_time,
probe_time,
build_time,
input_batches,
input_rows,
output_batches,
Expand Down Expand Up @@ -1487,10 +1492,12 @@ impl HashJoinStream {
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<ArrowResult<RecordBatch>>> {
let build_timer = self.join_metrics.build_time.timer();
let left_data = match ready!(self.left_fut.get(cx)) {
Ok(left_data) => left_data,
Err(e) => return Poll::Ready(Some(Err(e))),
};
build_timer.done();

let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
let num_rows = left_data.1.num_rows();
Expand All @@ -1516,7 +1523,7 @@ impl HashJoinStream {
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Some(Ok(batch)) => {
let timer = self.join_metrics.join_time.timer();
let timer = self.join_metrics.probe_time.timer();
let result = build_batch(
&batch,
left_data,
Expand All @@ -1532,7 +1539,6 @@ impl HashJoinStream {
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(batch.num_rows());
if let Ok((ref batch, ref left_side)) = result {
timer.done();
self.join_metrics.output_batches.add(1);
self.join_metrics.output_rows.add(batch.num_rows());

Expand All @@ -1551,11 +1557,13 @@ impl HashJoinStream {
| JoinType::RightAnti => {}
}
}
Some(result.map(|x| x.0))
let final_result = Some(result.map(|x| x.0));
timer.done();
final_result
}
Some(err) => Some(err),
None => {
let timer = self.join_metrics.join_time.timer();
let timer = self.join_metrics.probe_time.timer();
// For the left join, produce rows for unmatched rows
match self.join_type {
JoinType::Left
Expand Down

0 comments on commit 66c95e7

Please sign in to comment.