Skip to content

Commit

Permalink
WorkerThreadPool: Add a runlevel between languages finished and termi…
Browse files Browse the repository at this point in the history
…nation
  • Loading branch information
RandomShaper committed Sep 13, 2024
1 parent 8b086f7 commit c30931a
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 12 deletions.
72 changes: 61 additions & 11 deletions core/object/worker_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,20 @@ void WorkerThreadPool::_process_task(Task *p_task) {
void WorkerThreadPool::_thread_function(void *p_user) {
ThreadData *thread_data = (ThreadData *)p_user;

#define RUNLEVEL_HANDLING_NEEDED (singleton->runlevel == RUNLEVEL_EXIT)
#define RUNLEVEL_HANDLING_NEEDED ((singleton->runlevel == RUNLEVEL_EXIT_LANGUAGES && !thread_data->exited_languages) || singleton->runlevel == RUNLEVEL_EXIT)

while (true) {
Task *task_to_process = nullptr;
{
MutexLock lock(singleton->task_mutex);
if (unlikely(RUNLEVEL_HANDLING_NEEDED)) {
DEV_ASSERT(singleton->runlevel == RUNLEVEL_EXIT);
break;
if (singleton->runlevel == RUNLEVEL_EXIT_LANGUAGES) {
ScriptServer::thread_exit();
thread_data->exited_languages = true;
} else {
DEV_ASSERT(singleton->runlevel == RUNLEVEL_EXIT);
break;
}
}
thread_data->signaled = false;

Expand Down Expand Up @@ -448,7 +453,7 @@ void WorkerThreadPool::_unlock_unlockable_mutexes() {
void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {
// Keep processing tasks until the condition to stop waiting is met.

#define RUNLEVEL_HANDLING_NEEDED (runlevel == RUNLEVEL_EXIT)
#define RUNLEVEL_HANDLING_NEEDED ((runlevel == RUNLEVEL_EXIT_LANGUAGES && !p_caller_pool_thread->exited_languages) || runlevel == RUNLEVEL_EXIT)
#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed)

while (true) {
Expand All @@ -460,8 +465,15 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
p_caller_pool_thread->signaled = false;

if (unlikely(RUNLEVEL_HANDLING_NEEDED)) {
DEV_ASSERT(singleton->runlevel == RUNLEVEL_EXIT);
break;
if (singleton->runlevel == RUNLEVEL_EXIT_LANGUAGES) {
lock.temp_unlock();
ScriptServer::thread_exit();
lock.temp_relock();
p_caller_pool_thread->exited_languages = true;
} else {
DEV_ASSERT(singleton->runlevel == RUNLEVEL_EXIT);
break;
}
}

if (IS_WAIT_OVER) {
Expand Down Expand Up @@ -525,11 +537,17 @@ void WorkerThreadPool::yield() {
ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread.");
_wait_collaboratively(&threads[th_index], ThreadData::YIELDING);

// If this long-lived task started before the scripting server was initialized,
// now is a good time to have scripting languages ready for the current thread.
// Otherwise, such a piece of setup won't happen unless another task has been
// run during the collaborative wait.
ScriptServer::thread_enter();
task_mutex.lock();
if (runlevel < RUNLEVEL_EXIT_LANGUAGES) {
// If this long-lived task started before the scripting server was initialized,
// now is a good time to have scripting languages ready for the current thread.
// Otherwise, such a piece of setup won't happen unless another task has been
// run during the collaborative wait.
task_mutex.unlock();
ScriptServer::thread_enter();
} else {
task_mutex.unlock();
}
}

void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
Expand Down Expand Up @@ -717,6 +735,38 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio)
}
}

void WorkerThreadPool::exit_languages_threads() {
if (threads.size() == 0) {
return;
}

ERR_FAIL_COND(runlevel >= RUNLEVEL_EXIT_LANGUAGES);

{
MutexLock lock(task_mutex);
runlevel = RUNLEVEL_EXIT_LANGUAGES;
for (ThreadData &data : threads) {
data.cond_var.notify_one();
}
}

// Wait until all threeads have detached from scripting languages.
while (true) {
OS::get_singleton()->delay_usec(1000);

MutexLock lock(task_mutex);
uint32_t n = 0;
for (ThreadData &data : threads) {
if (data.exited_languages) {
n++;
}
}
if (n == threads.size()) {
break;
}
}
}

void WorkerThreadPool::finish() {
if (threads.size() == 0) {
return;
Expand Down
6 changes: 5 additions & 1 deletion core/object/worker_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,21 @@ class WorkerThreadPool : public Object {
Thread thread;
bool signaled : 1;
bool yield_is_over : 1;
bool exited_languages : 1;
Task *current_task = nullptr;
Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING).
ConditionVariable cond_var;

ThreadData() :
signaled(false),
yield_is_over(false) {}
yield_is_over(false),
exited_languages(false) {}
};

TightLocalVector<ThreadData> threads;
enum {
RUNLEVEL_NORMAL,
RUNLEVEL_EXIT_LANGUAGES,
RUNLEVEL_EXIT,
} runlevel = RUNLEVEL_NORMAL;

Expand Down Expand Up @@ -259,6 +262,7 @@ class WorkerThreadPool : public Object {
#endif

void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3);
void exit_languages_threads();
void finish();
WorkerThreadPool();
~WorkerThreadPool();
Expand Down
2 changes: 2 additions & 0 deletions main/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4501,6 +4501,8 @@ void Main::cleanup(bool p_force) {
ResourceLoader::clear_translation_remaps();
ResourceLoader::clear_path_remaps();

WorkerThreadPool::get_singleton()->exit_languages_threads();

ScriptServer::finish_languages();

// Sync pending commands that may have been queued from a different thread during ScriptServer finalization
Expand Down

0 comments on commit c30931a

Please sign in to comment.