Skip to content

Commit

Permalink
add min tso scheduler for each resource group (pingcap#8072)
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Sep 20, 2023
1 parent 6dce05b commit 939a18d
Show file tree
Hide file tree
Showing 14 changed files with 304 additions and 156 deletions.
6 changes: 4 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,10 +482,12 @@ namespace DB
F(type_active_queries_count, {"type", "active_queries_count"}), \
F(type_waiting_tasks_count, {"type", "waiting_tasks_count"}), \
F(type_active_tasks_count, {"type", "active_tasks_count"}), \
F(type_global_estimated_thread_usage, {"type", "global_estimated_thread_usage"}), \
F(type_estimated_thread_usage, {"type", "estimated_thread_usage"}), \
F(type_thread_soft_limit, {"type", "thread_soft_limit"}), \
F(type_thread_hard_limit, {"type", "thread_hard_limit"}), \
F(type_hard_limit_exceeded_count, {"type", "hard_limit_exceeded_count"})) \
F(type_hard_limit_exceeded_count, {"type", "hard_limit_exceeded_count"}), \
F(type_group_entry_count, {"type", "group_entry_count"})) \
M(tiflash_task_scheduler_waiting_duration_seconds, \
"Bucketed histogram of task waiting for scheduling duration", \
Histogram, \
Expand Down Expand Up @@ -705,7 +707,7 @@ namespace DB
F(type_fsync, {{"type", "fsync"}}, ExpBuckets{0.0001, 2, 20})) \
M(tiflash_storage_delta_index_cache, "", Counter, F(type_hit, {"type", "hit"}), F(type_miss, {"type", "miss"})) \
M(tiflash_resource_group, \
"meta infos of resource groups", \
"meta info of resource group", \
Gauge, \
F(type_remaining_tokens, {"type", "remaining_tokens"}), \
F(type_avg_speed, {"type", "avg_speed"}), \
Expand Down
5 changes: 0 additions & 5 deletions dbms/src/Flash/Executor/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@ ExecutionResult PipelineExecutor::execute(ResultHandler && result_handler)
}
LOG_TRACE(log, "query finish with {}", exec_context.getQueryProfileInfo().toJson());

// For read_ru, only report it to GAC for now.
LocalAdmissionController::global_instance->consumeResource(
exec_context.getResourceGroupName(),
dagContext().getReadRU(),
0);
return exec_context.toExecutionResult();
}

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,9 @@ bool MPPTaskManager::tryToScheduleTask(MPPTaskScheduleEntry & schedule_entry)
return scheduler->tryToSchedule(schedule_entry, *this);
}

void MPPTaskManager::releaseThreadsFromScheduler(const int needed_threads)
void MPPTaskManager::releaseThreadsFromScheduler(const String & resource_group_name, const int needed_threads)
{
std::lock_guard lock(mu);
scheduler->releaseThreadsThenSchedule(needed_threads, *this);
scheduler->releaseThreadsThenSchedule(resource_group_name, needed_threads, *this);
}
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class MPPTaskManager : private boost::noncopyable

bool tryToScheduleTask(MPPTaskScheduleEntry & schedule_entry);

void releaseThreadsFromScheduler(int needed_threads);
void releaseThreadsFromScheduler(const String & resource_group_name, int needed_threads);

std::pair<MPPTunnelPtr, String> findTunnelWithTimeout(
const ::mpp::EstablishMPPConnectionRequest * request,
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskScheduleEntry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ MPPTaskScheduleEntry::~MPPTaskScheduleEntry()
{
if (schedule_state == ScheduleState::SCHEDULED)
{
manager->releaseThreadsFromScheduler(needed_threads);
manager->releaseThreadsFromScheduler(getResourceGroupName(), needed_threads);
schedule_state = ScheduleState::COMPLETED;
}
}
Expand Down Expand Up @@ -63,7 +63,8 @@ void MPPTaskScheduleEntry::waitForSchedule()
std::unique_lock lock(schedule_mu);
schedule_cv.wait(lock, [&] { return schedule_state != ScheduleState::WAITING; });
time_cost = stopwatch.elapsedSeconds();
GET_METRIC(tiflash_task_scheduler_waiting_duration_seconds).Observe(time_cost);
GET_RESOURCE_GROUP_METRIC(tiflash_task_scheduler_waiting_duration_seconds, getResourceGroupName())
.Observe(time_cost);

if (schedule_state == ScheduleState::EXCEEDED)
{
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskScheduleEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class MPPTaskScheduleEntry

const MPPTaskId & getMPPTaskId() const;

const String & getResourceGroupName() const { return id.gather_id.query_id.resource_group_name; }

~MPPTaskScheduleEntry();

MPPTaskScheduleEntry(MPPTaskManager * manager_, const MPPTaskId & id_);
Expand Down
Loading

0 comments on commit 939a18d

Please sign in to comment.