Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WorkerThreadPool: Revamp interaction with ScriptServer #96959

Merged
merged 4 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 130 additions & 48 deletions core/object/worker_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,21 +180,24 @@ void WorkerThreadPool::_process_task(Task *p_task) {

void WorkerThreadPool::_thread_function(void *p_user) {
ThreadData *thread_data = (ThreadData *)p_user;

while (true) {
Task *task_to_process = nullptr;
{
MutexLock lock(singleton->task_mutex);
if (singleton->exit_threads) {
return;

bool exit = singleton->_handle_runlevel(thread_data, lock);
if (unlikely(exit)) {
break;
}

thread_data->signaled = false;

if (singleton->task_queue.first()) {
task_to_process = singleton->task_queue.first()->self();
singleton->task_queue.remove(singleton->task_queue.first());
} else {
thread_data->cond_var.wait(lock);
DEV_ASSERT(singleton->exit_threads || thread_data->signaled);
}
}

Expand All @@ -204,19 +207,24 @@ void WorkerThreadPool::_thread_function(void *p_user) {
}
}

void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority) {
void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock) {
// Fall back to processing on the calling thread if there are no worker threads.
// Separated into its own variable to make it easier to extend this logic
// in custom builds.
bool process_on_calling_thread = threads.size() == 0;
if (process_on_calling_thread) {
task_mutex.unlock();
p_lock.temp_unlock();
for (uint32_t i = 0; i < p_count; i++) {
_process_task(p_tasks[i]);
}
p_lock.temp_relock();
return;
}

while (runlevel == RUNLEVEL_EXIT_LANGUAGES) {
control_cond_var.wait(p_lock);
}

uint32_t to_process = 0;
uint32_t to_promote = 0;

Expand All @@ -238,8 +246,6 @@ void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count,
}

_notify_threads(caller_pool_thread, to_process, to_promote);

task_mutex.unlock();
}

void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) {
Expand Down Expand Up @@ -323,9 +329,8 @@ WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *
}

WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) {
ERR_FAIL_COND_V_MSG(threads.is_empty(), INVALID_TASK_ID, "Can't add a task because the WorkerThreadPool is either not initialized yet or already terminated.");
MutexLock<BinaryMutex> lock(task_mutex);

task_mutex.lock();
// Get a free task
Task *task = task_allocator.alloc();
TaskID id = last_task++;
Expand All @@ -337,7 +342,7 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable,
task->template_userdata = p_template_userdata;
tasks.insert(id, task);

_post_tasks_and_unlock(&task, 1, p_high_priority);
_post_tasks(&task, 1, p_high_priority, lock);

return id;
}
Expand Down Expand Up @@ -444,22 +449,34 @@ void WorkerThreadPool::_unlock_unlockable_mutexes() {
void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {
// Keep processing tasks until the condition to stop waiting is met.

#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed)

while (true) {
Task *task_to_process = nullptr;
bool relock_unlockables = false;
{
MutexLock lock(task_mutex);

bool was_signaled = p_caller_pool_thread->signaled;
p_caller_pool_thread->signaled = false;

if (IS_WAIT_OVER) {
if (unlikely(p_task == ThreadData::YIELDING)) {
bool exit = _handle_runlevel(p_caller_pool_thread, lock);
if (unlikely(exit)) {
break;
}

bool wait_is_over = false;
if (unlikely(p_task == ThreadData::YIELDING)) {
if (p_caller_pool_thread->yield_is_over) {
p_caller_pool_thread->yield_is_over = false;
wait_is_over = true;
}
} else {
if (p_task->completed) {
wait_is_over = true;
}
}

if (!exit_threads && was_signaled) {
if (wait_is_over) {
if (was_signaled) {
// This thread was awaken for some additional reason, but it's about to exit.
// Let's find out what may be pending and forward the requests.
uint32_t to_process = task_queue.first() ? 1 : 0;
Expand All @@ -474,28 +491,26 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
break;
}

if (!exit_threads) {
if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
if (_try_promote_low_priority_task()) {
_notify_threads(p_caller_pool_thread, 1, 0);
}
if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
if (_try_promote_low_priority_task()) {
_notify_threads(p_caller_pool_thread, 1, 0);
}
}

if (singleton->task_queue.first()) {
task_to_process = task_queue.first()->self();
task_queue.remove(task_queue.first());
}
if (singleton->task_queue.first()) {
task_to_process = task_queue.first()->self();
task_queue.remove(task_queue.first());
}

if (!task_to_process) {
p_caller_pool_thread->awaited_task = p_task;
if (!task_to_process) {
p_caller_pool_thread->awaited_task = p_task;

_unlock_unlockable_mutexes();
relock_unlockables = true;
p_caller_pool_thread->cond_var.wait(lock);
_unlock_unlockable_mutexes();
relock_unlockables = true;

DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER);
p_caller_pool_thread->awaited_task = nullptr;
}
p_caller_pool_thread->cond_var.wait(lock);

p_caller_pool_thread->awaited_task = nullptr;
}
}

Expand All @@ -509,16 +524,65 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
}
}

void WorkerThreadPool::_switch_runlevel(Runlevel p_runlevel) {
DEV_ASSERT(p_runlevel > runlevel);
runlevel = p_runlevel;
memset(&runlevel_data, 0, sizeof(runlevel_data));
for (uint32_t i = 0; i < threads.size(); i++) {
threads[i].cond_var.notify_one();
threads[i].signaled = true;
}
control_cond_var.notify_all();
}

// Returns whether threads have to exit. This may perform the check about handling needed.
bool WorkerThreadPool::_handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock) {
bool exit = false;
switch (runlevel) {
case RUNLEVEL_NORMAL: {
} break;
case RUNLEVEL_PRE_EXIT_LANGUAGES: {
if (!p_thread_data->pre_exited_languages) {
if (!task_queue.first() && !low_priority_task_queue.first()) {
p_thread_data->pre_exited_languages = true;
runlevel_data.pre_exit_languages.num_idle_threads++;
control_cond_var.notify_all();
}
}
} break;
case RUNLEVEL_EXIT_LANGUAGES: {
if (!p_thread_data->exited_languages) {
p_lock.temp_unlock();
ScriptServer::thread_exit();
p_lock.temp_relock();
p_thread_data->exited_languages = true;
runlevel_data.exit_languages.num_exited_threads++;
control_cond_var.notify_all();
}
} break;
case RUNLEVEL_EXIT: {
exit = true;
} break;
}
return exit;
}

void WorkerThreadPool::yield() {
int th_index = get_thread_index();
ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread.");
_wait_collaboratively(&threads[th_index], ThreadData::YIELDING);

// If this long-lived task started before the scripting server was initialized,
// now is a good time to have scripting languages ready for the current thread.
// Otherwise, such a piece of setup won't happen unless another task has been
// run during the collaborative wait.
ScriptServer::thread_enter();
task_mutex.lock();
if (runlevel < RUNLEVEL_EXIT_LANGUAGES) {
// If this long-lived task started before the scripting server was initialized,
// now is a good time to have scripting languages ready for the current thread.
// Otherwise, such a piece of setup won't happen unless another task has been
// run during the collaborative wait.
task_mutex.unlock();
ScriptServer::thread_enter();
} else {
task_mutex.unlock();
}
}

void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
Expand All @@ -543,13 +607,13 @@ void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
}

WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
ERR_FAIL_COND_V_MSG(threads.is_empty(), INVALID_TASK_ID, "Can't add a group task because the WorkerThreadPool is either not initialized yet or already terminated.");
ERR_FAIL_COND_V(p_elements < 0, INVALID_TASK_ID);
if (p_tasks < 0) {
p_tasks = MAX(1u, threads.size());
}

task_mutex.lock();
MutexLock<BinaryMutex> lock(task_mutex);

Group *group = group_allocator.alloc();
GroupID id = last_task++;
group->max = p_elements;
Expand Down Expand Up @@ -584,7 +648,7 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca

groups[id] = group;

_post_tasks_and_unlock(tasks_posted, p_tasks, p_high_priority);
_post_tasks(tasks_posted, p_tasks, p_high_priority, lock);

return id;
}
Expand Down Expand Up @@ -687,6 +751,9 @@ void WorkerThreadPool::thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {

void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) {
ERR_FAIL_COND(threads.size() > 0);

runlevel = RUNLEVEL_NORMAL;

if (p_thread_count < 0) {
p_thread_count = OS::get_singleton()->get_default_thread_pool_size();
}
Expand All @@ -704,6 +771,26 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio)
}
}

void WorkerThreadPool::exit_languages_threads() {
if (threads.size() == 0) {
return;
}

MutexLock lock(task_mutex);

// Wait until all threads are idle.
_switch_runlevel(RUNLEVEL_PRE_EXIT_LANGUAGES);
while (runlevel_data.pre_exit_languages.num_idle_threads != threads.size()) {
control_cond_var.wait(lock);
}

// Wait until all threads have detached from scripting languages.
_switch_runlevel(RUNLEVEL_EXIT_LANGUAGES);
while (runlevel_data.exit_languages.num_exited_threads != threads.size()) {
control_cond_var.wait(lock);
}
}

void WorkerThreadPool::finish() {
if (threads.size() == 0) {
return;
Expand All @@ -716,15 +803,10 @@ void WorkerThreadPool::finish() {
print_error("Task waiting was never re-claimed: " + E->self()->description);
E = E->next();
}
}

{
MutexLock lock(task_mutex);
exit_threads = true;
}
for (ThreadData &data : threads) {
data.cond_var.notify_one();
_switch_runlevel(RUNLEVEL_EXIT);
}

for (ThreadData &data : threads) {
data.thread.wait_to_finish();
}
Expand Down Expand Up @@ -755,5 +837,5 @@ WorkerThreadPool::WorkerThreadPool() {
}

WorkerThreadPool::~WorkerThreadPool() {
DEV_ASSERT(threads.size() == 0 && "finish() hasn't been called!");
finish();
}
28 changes: 25 additions & 3 deletions core/object/worker_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,35 @@ class WorkerThreadPool : public Object {
Thread thread;
bool signaled : 1;
bool yield_is_over : 1;
bool pre_exited_languages : 1;
bool exited_languages : 1;
Task *current_task = nullptr;
Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING).
ConditionVariable cond_var;

ThreadData() :
signaled(false),
yield_is_over(false) {}
yield_is_over(false),
pre_exited_languages(false),
exited_languages(false) {}
};

TightLocalVector<ThreadData> threads;
bool exit_threads = false;
enum Runlevel {
RUNLEVEL_NORMAL,
RUNLEVEL_PRE_EXIT_LANGUAGES, // Block adding new tasks
RUNLEVEL_EXIT_LANGUAGES, // All threads detach from scripting threads.
RUNLEVEL_EXIT,
} runlevel = RUNLEVEL_NORMAL;
union { // Cleared on every runlevel change.
struct {
uint32_t num_idle_threads;
} pre_exit_languages;
struct {
uint32_t num_exited_threads;
} exit_languages;
} runlevel_data;
ConditionVariable control_cond_var;

HashMap<Thread::ID, int> thread_ids;
HashMap<
Expand Down Expand Up @@ -152,7 +170,7 @@ class WorkerThreadPool : public Object {

void _process_task(Task *task);

void _post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority);
void _post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock);
void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count);

bool _try_promote_low_priority_task();
Expand Down Expand Up @@ -193,6 +211,9 @@ class WorkerThreadPool : public Object {

void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task);

void _switch_runlevel(Runlevel p_runlevel);
bool _handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock);

#ifdef THREADS_ENABLED
static uint32_t _thread_enter_unlock_allowance_zone(THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &p_ulock);
#endif
Expand Down Expand Up @@ -256,6 +277,7 @@ class WorkerThreadPool : public Object {
#endif

void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3);
void exit_languages_threads();
void finish();
WorkerThreadPool();
~WorkerThreadPool();
Expand Down
Loading
Loading