Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipeline wait time of minTSO and pipeline breaker in execution summary #9566

Merged
merged 33 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1a723d8
Save work
yibin87 Oct 30, 2024
5b2046f
Fix compile issue
yibin87 Oct 30, 2024
af956b6
Fix compilation issue
yibin87 Oct 30, 2024
a966dc1
Little update
yibin87 Oct 30, 2024
7b2cefd
Little update
yibin87 Oct 30, 2024
f47dd7d
Little update
yibin87 Oct 30, 2024
0a5417b
Add minTSO wait time
yibin87 Oct 30, 2024
60904dd
Save minTSO wait time in exec context
yibin87 Oct 30, 2024
ceecb8b
Little refact
yibin87 Oct 30, 2024
adca744
Fix format issue
yibin87 Oct 30, 2024
bc43951
Little update
yibin87 Oct 30, 2024
257a3a9
Renaming and set pipeline flag in physicalJoin
yibin87 Oct 30, 2024
5ccb2ff
fix little issue
yibin87 Oct 30, 2024
876bb76
Format issue
yibin87 Oct 30, 2024
c7e099b
Little update
yibin87 Oct 30, 2024
caedffb
Update tipb protocol
yibin87 Oct 30, 2024
fe1770e
Define some constants
yibin87 Oct 31, 2024
20bc224
Update comments and several naming issue
yibin87 Oct 31, 2024
bded55b
Fix format issue
yibin87 Oct 31, 2024
9bb1b5a
Add failpoints to fake pipeline wait time
yibin87 Nov 1, 2024
c1849dd
Address comments
yibin87 Nov 5, 2024
5e96c43
Address comments
yibin87 Nov 6, 2024
9df82a1
Little fix
yibin87 Nov 6, 2024
6f3e063
Format files
yibin87 Nov 6, 2024
aed1705
Address comments
yibin87 Nov 6, 2024
1534670
Little update
yibin87 Nov 6, 2024
857600c
Fix compilation issue
yibin87 Nov 6, 2024
9500087
Remove useless pipeline breaker wait time in operator profile info
yibin87 Nov 6, 2024
95684ef
Fix format issue
yibin87 Nov 6, 2024
a531b62
Address comment to add todo comment
yibin87 Nov 11, 2024
7396209
Little update
yibin87 Nov 11, 2024
a40ddc6
Adjust schedule duration in Events
yibin87 Nov 12, 2024
45feb07
Merge branch 'master' into add_pipeline_wait_time
ti-chi-bot[bot] Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions dbms/src/Common/Stopwatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
#include <common/apple_rt.h>
#endif

static constexpr UInt64 SECOND_TO_NANO = 1000000000ULL;
static constexpr UInt64 MILLISECOND_TO_NANO = 1000000UL;
static constexpr UInt64 MICROSECOND_TO_NANO = 1000UL;

inline UInt64 clock_gettime_ns(clockid_t clock_type = CLOCK_MONOTONIC)
{
struct timespec ts
{
};
clock_gettime(clock_type, &ts);
return static_cast<UInt64>(ts.tv_sec * 1000000000ULL + ts.tv_nsec);
return static_cast<UInt64>(ts.tv_sec * SECOND_TO_NANO + ts.tv_nsec);
}

/// Sometimes monotonic clock may not be monotonic (due to bug in kernel?).
Expand All @@ -43,7 +47,7 @@ inline UInt64 clock_gettime_ns_adjusted(UInt64 prev_time, clockid_t clock_type =
return current_time;

/// Something probably went completely wrong if time stepped back for more than 1 second.
assert(prev_time - current_time <= 1000000000ULL);
assert(prev_time - current_time <= SECOND_TO_NANO);
return prev_time;
}

Expand Down Expand Up @@ -86,8 +90,8 @@ class Stopwatch
void restart() { start(); }

UInt64 elapsed() const { return is_running ? nanosecondsWithBound(start_ns) - start_ns : stop_ns - start_ns; }
UInt64 elapsedMilliseconds() const { return elapsed() / 1000000UL; }
double elapsedSeconds() const { return static_cast<double>(elapsed()) / 1000000000ULL; }
UInt64 elapsedMilliseconds() const { return elapsed() / MILLISECOND_TO_NANO; }
double elapsedSeconds() const { return static_cast<double>(elapsed()) / SECOND_TO_NANO; }

UInt64 elapsedFromLastTime()
{
Expand All @@ -104,8 +108,8 @@ class Stopwatch
}
}

UInt64 elapsedMillisecondsFromLastTime() { return elapsedFromLastTime() / 1000000UL; }
double elapsedSecondsFromLastTime() { return static_cast<double>(elapsedFromLastTime()) / 1000000000ULL; }
UInt64 elapsedMillisecondsFromLastTime() { return elapsedFromLastTime() / MILLISECOND_TO_NANO; }
double elapsedSecondsFromLastTime() { return static_cast<double>(elapsedFromLastTime()) / SECOND_TO_NANO; }

private:
UInt64 start_ns = 0;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ class DAGContext
String dummy_query_string;
ASTPtr dummy_ast;
Int64 compile_time_ns = 0;
Int64 minTSO_wait_time_ns = 0;
size_t final_concurrency = 1;
size_t initialize_concurrency = 1;
bool has_read_wait_index = false;
Expand Down
23 changes: 21 additions & 2 deletions dbms/src/Flash/Coprocessor/ExecutionSummary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@ ExecutionSummary::ExecutionSummary()

void ExecutionSummary::merge(const ExecutionSummary & other)
{
time_processed_ns = std::max(time_processed_ns, other.time_processed_ns);
if (time_processed_ns < other.time_processed_ns)
{
time_processed_ns = other.time_processed_ns;
time_minTSO_wait_ns = other.time_minTSO_wait_ns;
time_pipeline_breaker_wait_ns = other.time_pipeline_breaker_wait_ns;
time_pipeline_queue_ns = other.time_pipeline_queue_ns;
}

num_produced_rows += other.num_produced_rows;
num_iterations += other.num_iterations;
concurrency += other.concurrency;
Expand All @@ -36,7 +43,13 @@ void ExecutionSummary::merge(const ExecutionSummary & other)

void ExecutionSummary::merge(const tipb::ExecutorExecutionSummary & other)
{
time_processed_ns = std::max(time_processed_ns, other.time_processed_ns());
if (time_processed_ns < other.time_processed_ns())
{
time_processed_ns = other.time_processed_ns();
time_minTSO_wait_ns = other.tiflash_wait_summary().mintso_wait_ns();
time_pipeline_breaker_wait_ns = other.tiflash_wait_summary().pipeline_breaker_wait_ns();
time_pipeline_queue_ns = other.tiflash_wait_summary().pipeline_queue_wait_ns();
}
num_produced_rows += other.num_produced_rows();
num_iterations += other.num_iterations();
concurrency += other.concurrency();
Expand All @@ -47,6 +60,9 @@ void ExecutionSummary::merge(const tipb::ExecutorExecutionSummary & other)
void ExecutionSummary::fill(const BaseRuntimeStatistics & other)
{
time_processed_ns = other.execution_time_ns;
time_minTSO_wait_ns = other.minTSO_wait_time_ns;
time_pipeline_breaker_wait_ns = other.pipeline_breaker_wait_time_ns;
time_pipeline_queue_ns = other.queue_wait_time_ns;
num_produced_rows = other.rows;
num_iterations = other.blocks;
concurrency = other.concurrency;
Expand All @@ -55,6 +71,9 @@ void ExecutionSummary::fill(const BaseRuntimeStatistics & other)
void ExecutionSummary::init(const tipb::ExecutorExecutionSummary & other)
{
time_processed_ns = other.time_processed_ns();
time_minTSO_wait_ns = other.tiflash_wait_summary().mintso_wait_ns();
time_pipeline_breaker_wait_ns = other.tiflash_wait_summary().pipeline_breaker_wait_ns();
time_pipeline_queue_ns = other.tiflash_wait_summary().pipeline_queue_wait_ns();
num_produced_rows = other.num_produced_rows();
num_iterations = other.num_iterations();
concurrency = other.concurrency();
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/ExecutionSummary.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ struct BaseRuntimeStatistics;
struct ExecutionSummary
{
UInt64 time_processed_ns = 0;
UInt64 time_minTSO_wait_ns = 0;
UInt64 time_pipeline_breaker_wait_ns = 0;
UInt64 time_pipeline_queue_ns = 0;
UInt64 num_produced_rows = 0;
UInt64 num_iterations = 0;
UInt64 concurrency = 0;
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,8 @@ void MPPTask::runImpl()
{
LOG_DEBUG(log, "task starts preprocessing");
preprocess();
auto time_cost_in_preprocess_ms = stopwatch.elapsedMilliseconds();
auto time_cost_in_preprocess_ns = stopwatch.elapsed();
auto time_cost_in_preprocess_ms = time_cost_in_preprocess_ns / MILLISECOND_TO_NANO;
LOG_DEBUG(log, "task preprocess done");
schedule_entry.setNeededThreads(estimateCountOfNewThreads());
LOG_DEBUG(
Expand All @@ -539,7 +540,9 @@ void MPPTask::runImpl()

scheduleOrWait();

auto time_cost_in_schedule_ms = stopwatch.elapsedMilliseconds() - time_cost_in_preprocess_ms;
auto time_cost_in_schedule_ns = stopwatch.elapsed() - time_cost_in_preprocess_ns;
dag_context->minTSO_wait_time_ns = time_cost_in_schedule_ns;
auto time_cost_in_schedule_ms = time_cost_in_schedule_ns / MILLISECOND_TO_NANO;
LOG_INFO(
log,
"task starts running, time cost in schedule: {} ms, time cost in preprocess: {} ms",
Expand Down Expand Up @@ -593,7 +596,7 @@ void MPPTask::runImpl()
LOG_DEBUG(
log,
"finish with {} seconds, {} rows, {} blocks, {} bytes",
runtime_statistics.execution_time_ns / static_cast<double>(1000000000),
runtime_statistics.execution_time_ns / static_cast<double>(SECOND_TO_NANO),
runtime_statistics.rows,
runtime_statistics.blocks,
runtime_statistics.bytes);
Expand Down
37 changes: 22 additions & 15 deletions dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,15 @@ extern const char random_pipeline_model_execute_suffix_failpoint[];
return (op_status); \
}

PipelineExec::PipelineExec(SourceOpPtr && source_op_, TransformOps && transform_ops_, SinkOpPtr && sink_op_)
PipelineExec::PipelineExec(
SourceOpPtr && source_op_,
TransformOps && transform_ops_,
SinkOpPtr && sink_op_,
bool has_pipeline_breaker_wait_time_)
: source_op(std::move(source_op_))
, transform_ops(std::move(transform_ops_))
, sink_op(std::move(sink_op_))
, has_pipeline_breaker_wait_time(has_pipeline_breaker_wait_time_)
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_execute_prefix_failpoint);
}
Expand Down Expand Up @@ -240,28 +245,30 @@ void PipelineExec::finalizeProfileInfo(UInt64 queuing_time, UInt64 pipeline_brea
// However, if there are multiple pipeline breaker operators within a single pipeline, it can become very complex.
// Therefore, to simplify matters, we will include the pipeline schedule duration in the execution time of the source operator.
//
// For the queuing_time, it should be evenly distributed across all operators.
// Currently pipeline_breaker_wait_time only works for join probe pipelines. We put the wait time in source op
// instead of join operator due to multiple concurrent builds cases:
// A(build) -> [B(build) -> C(probe)], A and B will run concurrently to reduce total latency, and pipeline_breaker_wait_time
// recorded in C pipeline_exec is actually max(A_Build, B_build). Thus we can't easily calculate the exact build time for each
// Join operator.
// TODO: We can record pipeline_breaker_wait_time with source pipeline ids in the future to map join build time to exact join executor
//
// TODO Refining execution summary, excluding extra time from execution time.
// For example: [total_time:6s, execution_time:1s, queuing_time:2s, pipeline_breaker_wait_time:3s]

// The execution time of operator[i] = self_time_from_profile_info + sum(self_time_from_profile_info[i-1, .., 0]) + (i + 1) * extra_time / operator_num.

// For the queuing_time, it is added into the source operator's execution time also.
// Also keep these time separately to provide more info.
source_op->getProfileInfo()->execution_time += pipeline_breaker_wait_time;
source_op->getProfileInfo()->execution_time += queuing_time;
if (has_pipeline_breaker_wait_time)
{
source_op->getProfileInfo()->pipeline_breaker_wait_time = pipeline_breaker_wait_time;
}
source_op->getProfileInfo()->task_wait_time = queuing_time;

UInt64 operator_num = 2 + transform_ops.size();
UInt64 per_operator_queuing_time = queuing_time / operator_num;

source_op->getProfileInfo()->execution_time += per_operator_queuing_time;
// Compensate for the values missing due to rounding.
source_op->getProfileInfo()->execution_time += (queuing_time - (per_operator_queuing_time * operator_num));
UInt64 time_for_prev_op = source_op->getProfileInfo()->execution_time;
for (const auto & transform_op : transform_ops)
{
transform_op->getProfileInfo()->execution_time += (per_operator_queuing_time + time_for_prev_op);
transform_op->getProfileInfo()->execution_time += time_for_prev_op;
time_for_prev_op = transform_op->getProfileInfo()->execution_time;
}
sink_op->getProfileInfo()->execution_time += (per_operator_queuing_time + time_for_prev_op);
sink_op->getProfileInfo()->execution_time += time_for_prev_op;
}

} // namespace DB
7 changes: 6 additions & 1 deletion dbms/src/Flash/Pipeline/Exec/PipelineExec.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ namespace DB
class PipelineExec : private boost::noncopyable
{
public:
PipelineExec(SourceOpPtr && source_op_, TransformOps && transform_ops_, SinkOpPtr && sink_op_);
PipelineExec(
SourceOpPtr && source_op_,
TransformOps && transform_ops_,
SinkOpPtr && sink_op_,
bool has_pipeline_breaker_wait_time_);

void executePrefix();
void executeSuffix();
Expand Down Expand Up @@ -75,6 +79,7 @@ class PipelineExec : private boost::noncopyable
SourceOpPtr source_op;
TransformOps transform_ops;
SinkOpPtr sink_op;
bool has_pipeline_breaker_wait_time = false;

// hold the operator which is ready for executing await.
Operator * awaitable = nullptr;
Expand Down
12 changes: 8 additions & 4 deletions dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ void PipelineExecBuilder::setSinkOp(SinkOpPtr && sink_op_)
sink_op = std::move(sink_op_);
}

PipelineExecPtr PipelineExecBuilder::build()
PipelineExecPtr PipelineExecBuilder::build(bool has_pipeline_breaker_wait_time)
{
RUNTIME_CHECK(source_op && sink_op);
return std::make_unique<PipelineExec>(std::move(source_op), std::move(transform_ops), std::move(sink_op));
return std::make_unique<PipelineExec>(
std::move(source_op),
std::move(transform_ops),
std::move(sink_op),
has_pipeline_breaker_wait_time);
}

OperatorProfileInfoPtr PipelineExecBuilder::getCurProfileInfo() const
Expand Down Expand Up @@ -113,15 +117,15 @@ void PipelineExecGroupBuilder::merge(PipelineExecGroupBuilder && other)
std::make_move_iterator(other.groups[i].end()));
}

PipelineExecGroup PipelineExecGroupBuilder::build()
PipelineExecGroup PipelineExecGroupBuilder::build(bool has_pipeline_breaker_wait_time)
{
RUNTIME_CHECK(!groups.empty());
PipelineExecGroup pipeline_exec_group;
for (auto & group : groups)
{
RUNTIME_CHECK(!group.empty());
for (auto & builder : group)
pipeline_exec_group.push_back(builder.build());
pipeline_exec_group.push_back(builder.build(has_pipeline_breaker_wait_time));
}
return pipeline_exec_group;
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct PipelineExecBuilder

Block getCurrentHeader() const;

PipelineExecPtr build();
PipelineExecPtr build(bool has_pipeline_breaker_wait_time);

OperatorProfileInfoPtr getCurProfileInfo() const;

Expand Down Expand Up @@ -77,7 +77,7 @@ class PipelineExecGroupBuilder
}
}

PipelineExecGroup build();
PipelineExecGroup build(bool has_pipeline_breaker_wait_time);

Block getCurrentHeader();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class SimpleOperatorTestRunner : public DB::tests::ExecutorTest
group_builder.transform([&](auto & builder) {
builder.setSinkOp(std::make_unique<SimpleGetResultSinkOp>(exec_context, "", result_handler));
});
auto result = group_builder.build();
auto result = group_builder.build(false);
assert(result.size() == 1);
return std::move(result.back());
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ PipelineExecGroup Pipeline::buildExecGroup(
{
plan_node->buildPipelineExecGroup(exec_context, builder, context, concurrency);
}
return builder.build();
return builder.build(has_pipeline_breaker_wait_time);
}

/**
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class Pipeline : public std::enable_shared_from_this<Pipeline>

String getFinalPlanExecId() const;

void setHasPipelineBreakerWaitTime(bool value) { has_pipeline_breaker_wait_time = value; }
yibin87 marked this conversation as resolved.
Show resolved Hide resolved

private:
void toTreeStringImpl(FmtBuffer & buffer, size_t level) const;
void toSelfString(FmtBuffer & buffer, size_t level) const;
Expand All @@ -113,5 +115,6 @@ class Pipeline : public std::enable_shared_from_this<Pipeline>
std::vector<PipelinePtr> children;

mutable String tree_string;
bool has_pipeline_breaker_wait_time = false;
};
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Pipeline/PipelineBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class PipelineBuilder
return PipelineBuilder(id_generator, PipelineBreaker{pipeline, breaker_node}, log->identifier());
}

void setHasPipelineBreakerWaitTime(bool value) { pipeline->setHasPipelineBreakerWaitTime(value); }

PipelinePtr build()
{
RUNTIME_CHECK(pipeline);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Schedule/Events/Event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ void Event::schedule()
log,
"unfinished_inputs must be 0 in `schedule`, but actual value is {}",
unfinished_inputs);
schedule_duration = stopwatch.elapsed();
if (is_source)
{
assertStatus(EventStatus::SCHEDULED);
Expand All @@ -136,7 +137,6 @@ void Event::schedule()
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_event_schedule_failpoint);
}
CATCH
schedule_duration = stopwatch.elapsed();
scheduleTasks();
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ void PhysicalJoin::buildPipeline(PipelineBuilder & builder, Context & context, P
join_build_builder.build();

// Join probe pipeline.
builder.setHasPipelineBreakerWaitTime(true);
yibin87 marked this conversation as resolved.
Show resolved Hide resolved
probe()->buildPipeline(builder, context, exec_context);
auto join_probe = std::make_shared<PhysicalJoinProbe>(
executor_id,
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Flash/Statistics/BaseRuntimeStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ void BaseRuntimeStatistics::append(const OperatorProfileInfo & profile_info)
blocks += profile_info.blocks;
bytes += profile_info.bytes;
allocated_bytes += profile_info.allocated_bytes;
execution_time_ns = std::max(execution_time_ns, profile_info.execution_time);
if (execution_time_ns < profile_info.execution_time)
{
execution_time_ns = profile_info.execution_time;
queue_wait_time_ns = profile_info.task_wait_time;
pipeline_breaker_wait_time_ns = profile_info.pipeline_breaker_wait_time;
}
++concurrency;
}
} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Flash/Statistics/BaseRuntimeStatistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ struct BaseRuntimeStatistics
size_t allocated_bytes = 0;
size_t concurrency = 0;
UInt64 execution_time_ns = 0;
UInt64 minTSO_wait_time_ns = 0;
UInt64 queue_wait_time_ns = 0;
UInt64 pipeline_breaker_wait_time_ns = 0;

void append(const BlockStreamProfileInfo &);
void append(const OperatorProfileInfo &);
Expand Down
Loading