Skip to content

Commit

Permalink
WorkerThreadPool: Refactor running and exit-requested as runlevels
Browse files Browse the repository at this point in the history
  • Loading branch information
RandomShaper committed Sep 13, 2024
1 parent e2fd88e commit 8b086f7
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 23 deletions.
62 changes: 40 additions & 22 deletions core/object/worker_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,16 @@ void WorkerThreadPool::_process_task(Task *p_task) {

void WorkerThreadPool::_thread_function(void *p_user) {
ThreadData *thread_data = (ThreadData *)p_user;

#define RUNLEVEL_HANDLING_NEEDED (singleton->runlevel == RUNLEVEL_EXIT)

while (true) {
Task *task_to_process = nullptr;
{
MutexLock lock(singleton->task_mutex);
if (singleton->exit_threads) {
return;
if (unlikely(RUNLEVEL_HANDLING_NEEDED)) {
DEV_ASSERT(singleton->runlevel == RUNLEVEL_EXIT);
break;
}
thread_data->signaled = false;

Expand All @@ -194,14 +198,16 @@ void WorkerThreadPool::_thread_function(void *p_user) {
singleton->task_queue.remove(singleton->task_queue.first());
} else {
thread_data->cond_var.wait(lock);
DEV_ASSERT(singleton->exit_threads || thread_data->signaled);
DEV_ASSERT(RUNLEVEL_HANDLING_NEEDED || thread_data->signaled);
}
}

if (task_to_process) {
singleton->_process_task(task_to_process);
}
}

#undef RUNLEVEL_HANDLING_NEEDED
}

void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority) {
Expand Down Expand Up @@ -442,6 +448,7 @@ void WorkerThreadPool::_unlock_unlockable_mutexes() {
void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {
// Keep processing tasks until the condition to stop waiting is met.

#define RUNLEVEL_HANDLING_NEEDED (runlevel == RUNLEVEL_EXIT)
#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed)

while (true) {
Expand All @@ -452,12 +459,17 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
bool was_signaled = p_caller_pool_thread->signaled;
p_caller_pool_thread->signaled = false;

if (unlikely(RUNLEVEL_HANDLING_NEEDED)) {
DEV_ASSERT(singleton->runlevel == RUNLEVEL_EXIT);
break;
}

if (IS_WAIT_OVER) {
if (unlikely(p_task == ThreadData::YIELDING)) {
p_caller_pool_thread->yield_is_over = false;
}

if (!exit_threads && was_signaled) {
if (was_signaled) {
// This thread was awaken for some additional reason, but it's about to exit.
// Let's find out what may be pending and forward the requests.
uint32_t to_process = task_queue.first() ? 1 : 0;
Expand All @@ -472,28 +484,26 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
break;
}

if (!exit_threads) {
if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
if (_try_promote_low_priority_task()) {
_notify_threads(p_caller_pool_thread, 1, 0);
}
if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
if (_try_promote_low_priority_task()) {
_notify_threads(p_caller_pool_thread, 1, 0);
}
}

if (singleton->task_queue.first()) {
task_to_process = task_queue.first()->self();
task_queue.remove(task_queue.first());
}
if (singleton->task_queue.first()) {
task_to_process = task_queue.first()->self();
task_queue.remove(task_queue.first());
}

if (!task_to_process) {
p_caller_pool_thread->awaited_task = p_task;
if (!task_to_process) {
p_caller_pool_thread->awaited_task = p_task;

_unlock_unlockable_mutexes();
relock_unlockables = true;
p_caller_pool_thread->cond_var.wait(lock);
_unlock_unlockable_mutexes();
relock_unlockables = true;
p_caller_pool_thread->cond_var.wait(lock);

DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER);
p_caller_pool_thread->awaited_task = nullptr;
}
DEV_ASSERT(RUNLEVEL_HANDLING_NEEDED || IS_WAIT_OVER || p_caller_pool_thread->signaled);
p_caller_pool_thread->awaited_task = nullptr;
}
}

Expand All @@ -505,6 +515,9 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
_process_task(task_to_process);
}
}

#undef RUNLEVEL_HANDLING_NEEDED
#undef IS_WAIT_OVER
}

void WorkerThreadPool::yield() {
Expand Down Expand Up @@ -684,6 +697,9 @@ void WorkerThreadPool::thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {

void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) {
ERR_FAIL_COND(threads.size() > 0);

runlevel = RUNLEVEL_NORMAL;

if (p_thread_count < 0) {
p_thread_count = OS::get_singleton()->get_default_thread_pool_size();
}
Expand All @@ -706,6 +722,8 @@ void WorkerThreadPool::finish() {
return;
}

ERR_FAIL_COND(runlevel >= RUNLEVEL_EXIT);

{
MutexLock lock(task_mutex);
SelfList<Task> *E = low_priority_task_queue.first();
Expand All @@ -717,7 +735,7 @@ void WorkerThreadPool::finish() {

{
MutexLock lock(task_mutex);
exit_threads = true;
runlevel = RUNLEVEL_EXIT;
}
for (ThreadData &data : threads) {
data.cond_var.notify_one();
Expand Down
5 changes: 4 additions & 1 deletion core/object/worker_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ class WorkerThreadPool : public Object {
};

TightLocalVector<ThreadData> threads;
bool exit_threads = false;
enum {
RUNLEVEL_NORMAL,
RUNLEVEL_EXIT,
} runlevel = RUNLEVEL_NORMAL;

HashMap<Thread::ID, int> thread_ids;
HashMap<
Expand Down

0 comments on commit 8b086f7

Please sign in to comment.