Skip to content

Commit

Permalink
Modernize Thread
Browse files Browse the repository at this point in the history
- Based on C++11's `thread` and `thread_local`
- No more need to allocate-deallocate or check for null
- No pointer anymore, just a member variable
- Platform-specific implementations no longer needed (except for the few cases of non-portable functions)
- Simpler for `NO_THREADS`
- Thread ids are now the same across platforms (main is 1; others follow)
  • Loading branch information
RandomShaper committed Jan 29, 2021
1 parent 6ddfc8e commit 99fe462
Show file tree
Hide file tree
Showing 87 changed files with 381 additions and 1,052 deletions.
26 changes: 3 additions & 23 deletions core/core_bind.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1990,43 +1990,27 @@ Error _Thread::start(Object *p_instance, const StringName &p_method, const Varia

Thread::Settings s;
s.priority = (Thread::Priority)p_priority;
thread = Thread::create(_start_func, ud, s);
if (!thread) {
active = false;
target_method = StringName();
target_instance = nullptr;
userdata = Variant();
return ERR_CANT_CREATE;
}
thread.start(_start_func, ud, s);

return OK;
}

String _Thread::get_id() const {
if (!thread) {
return String();
}

return itos(thread->get_id());
return itos(thread.get_id());
}

bool _Thread::is_active() const {
return active;
}

Variant _Thread::wait_to_finish() {
ERR_FAIL_COND_V_MSG(!thread, Variant(), "Thread must exist to wait for its completion.");
ERR_FAIL_COND_V_MSG(!active, Variant(), "Thread must be active to wait for its completion.");
Thread::wait_to_finish(thread);
thread.wait_to_finish();
Variant r = ret;
active = false;
target_method = StringName();
target_instance = nullptr;
userdata = Variant();
if (thread) {
memdelete(thread);
}
thread = nullptr;

return r;
}
Expand All @@ -2042,10 +2026,6 @@ void _Thread::_bind_methods() {
BIND_ENUM_CONSTANT(PRIORITY_HIGH);
}

_Thread::~_Thread() {
ERR_FAIL_COND_MSG(active, "Reference to a Thread object was lost while the thread is still running...");
}

////// _ClassDB //////

PackedStringArray _ClassDB::get_class_list() const {
Expand Down
5 changes: 1 addition & 4 deletions core/core_bind.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ class _Thread : public Reference {
volatile bool active = false;
Object *target_instance = nullptr;
StringName target_method;
Thread *thread = nullptr;
Thread thread;
static void _bind_methods();
static void _start_func(void *ud);

Expand All @@ -569,9 +569,6 @@ class _Thread : public Reference {
String get_id() const;
bool is_active() const;
Variant wait_to_finish();

_Thread() {}
~_Thread();
};

VARIANT_ENUM_CAST(_Thread::Priority);
Expand Down
12 changes: 4 additions & 8 deletions core/debugger/remote_debugger_peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,8 @@ int RemoteDebuggerPeerTCP::get_max_message_size() const {
}

void RemoteDebuggerPeerTCP::close() {
if (thread) {
running = false;
Thread::wait_to_finish(thread);
memdelete(thread);
thread = nullptr;
}
running = false;
thread.wait_to_finish();
tcp_client->disconnect_from_host();
out_buf.resize(0);
in_buf.resize(0);
Expand All @@ -85,7 +81,7 @@ RemoteDebuggerPeerTCP::RemoteDebuggerPeerTCP(Ref<StreamPeerTCP> p_tcp) {
connected = true;
#ifndef NO_THREADS
running = true;
thread = Thread::create(_thread_func, this);
thread.start(_thread_func, this);
#endif
} else {
tcp_client.instance();
Expand Down Expand Up @@ -188,7 +184,7 @@ Error RemoteDebuggerPeerTCP::connect_to_host(const String &p_host, uint16_t p_po
connected = true;
#ifndef NO_THREADS
running = true;
thread = Thread::create(_thread_func, this);
thread.start(_thread_func, this);
#endif
return OK;
}
Expand Down
2 changes: 1 addition & 1 deletion core/debugger/remote_debugger_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class RemoteDebuggerPeerTCP : public RemoteDebuggerPeer {
private:
Ref<StreamPeerTCP> tcp_client;
Mutex mutex;
Thread *thread = nullptr;
Thread thread;
List<Array> in_queue;
List<Array> out_queue;
int out_left = 0;
Expand Down
11 changes: 4 additions & 7 deletions core/io/file_access_network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ Error FileAccessNetworkClient::connect(const String &p_host, int p_port, const S
return ERR_INVALID_PARAMETER;
}

thread = Thread::create(_thread_func, this);
thread.start(_thread_func, this);

return OK;
}
Expand All @@ -214,12 +214,9 @@ FileAccessNetworkClient::FileAccessNetworkClient() {
}

FileAccessNetworkClient::~FileAccessNetworkClient() {
if (thread) {
quit = true;
sem.post();
Thread::wait_to_finish(thread);
memdelete(thread);
}
quit = true;
sem.post();
thread.wait_to_finish();
}

void FileAccessNetwork::_set_block(int p_offset, const Vector<uint8_t> &p_block) {
Expand Down
2 changes: 1 addition & 1 deletion core/io/file_access_network.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class FileAccessNetworkClient {
List<BlockRequest> block_requests;

Semaphore sem;
Thread *thread = nullptr;
Thread thread;
bool quit = false;
Mutex mutex;
Mutex blockrequest_mutex;
Expand Down
24 changes: 6 additions & 18 deletions core/io/ip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct _IP_ResolverPrivate {
Mutex mutex;
Semaphore sem;

Thread *thread;
Thread thread;
//Semaphore* semaphore;
bool thread_abort;

Expand Down Expand Up @@ -141,7 +141,7 @@ IP::ResolverID IP::resolve_hostname_queue_item(const String &p_hostname, IP::Typ
} else {
resolver->queue[id].response = IP_Address();
resolver->queue[id].status = IP::RESOLVER_STATUS_WAITING;
if (resolver->thread) {
if (resolver->thread.is_started()) {
resolver->sem.post();
} else {
resolver->resolve_queues();
Expand Down Expand Up @@ -285,26 +285,14 @@ IP::IP() {
singleton = this;
resolver = memnew(_IP_ResolverPrivate);

#ifndef NO_THREADS

resolver->thread_abort = false;

resolver->thread = Thread::create(_IP_ResolverPrivate::_thread_function, resolver);
#else
resolver->thread = nullptr;
#endif
resolver->thread.start(_IP_ResolverPrivate::_thread_function, resolver);
}

IP::~IP() {
#ifndef NO_THREADS
if (resolver->thread) {
resolver->thread_abort = true;
resolver->sem.post();
Thread::wait_to_finish(resolver->thread);
memdelete(resolver->thread);
}

#endif
resolver->thread_abort = true;
resolver->sem.post();
resolver->thread.wait_to_finish();

memdelete(resolver);
}
5 changes: 3 additions & 2 deletions core/io/resource_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ Error ResourceLoader::load_threaded_request(const String &p_path, const String &

print_lt("REQUEST: load count: " + itos(thread_loading_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_loading_count - thread_suspended_count));

load_task.thread = Thread::create(_thread_load_function, &thread_load_tasks[local_path]);
load_task.thread = memnew(Thread);
load_task.thread->start(_thread_load_function, &thread_load_tasks[local_path]);
load_task.loader_id = load_task.thread->get_id();
}

Expand Down Expand Up @@ -489,7 +490,7 @@ RES ResourceLoader::load_threaded_get(const String &p_path, Error *r_error) {

if (load_task.requests == 0) {
if (load_task.thread) { //thread may not have been used
Thread::wait_to_finish(load_task.thread);
load_task.thread->wait_to_finish();
memdelete(load_task.thread);
}
thread_load_tasks.erase(local_path);
Expand Down
81 changes: 66 additions & 15 deletions core/os/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,70 @@

#include "thread.h"

Thread *(*Thread::create_func)(ThreadCreateCallback, void *, const Settings &) = nullptr;
Thread::ID (*Thread::get_thread_id_func)() = nullptr;
void (*Thread::wait_to_finish_func)(Thread *) = nullptr;
#include "core/object/script_language.h"

#if !defined(NO_THREADS)

Error (*Thread::set_name_func)(const String &) = nullptr;
void (*Thread::set_priority_func)(Thread::Priority) = nullptr;
void (*Thread::init_func)() = nullptr;
void (*Thread::term_func)() = nullptr;

Thread::ID Thread::main_thread_id = 1;
Thread::ID Thread::last_thread_id = 1;
thread_local Thread::ID Thread::caller_id = 1;

Thread::ID Thread::_main_thread_id = 0;
void Thread::_set_platform_funcs(
Error (*p_set_name_func)(const String &),
void (*p_set_priority_func)(Thread::Priority),
void (*p_init_func)(),
void (*p_term_func)()) {
Thread::set_name_func = p_set_name_func;
Thread::set_priority_func = p_set_priority_func;
Thread::init_func = p_init_func;
Thread::term_func = p_term_func;
}

Thread::ID Thread::get_caller_id() {
if (get_thread_id_func) {
return get_thread_id_func();
void Thread::callback(Thread *p_self, const Settings &p_settings, Callback p_callback, void *p_userdata) {
Thread::caller_id = p_self->id;
if (set_priority_func) {
set_priority_func(p_settings.priority);
}
if (init_func) {
init_func();
}
ScriptServer::thread_enter(); //scripts may need to attach a stack
p_callback(p_userdata);
ScriptServer::thread_exit();
if (term_func) {
term_func();
}
return 0;
}

Thread *Thread::create(ThreadCreateCallback p_callback, void *p_user, const Settings &p_settings) {
if (create_func) {
return create_func(p_callback, p_user, p_settings);
void Thread::start(Thread::Callback p_callback, void *p_user, const Settings &p_settings) {
if (id != 0) {
#ifdef DEBUG_ENABLED
WARN_PRINT("A Thread object has been re-started without wait_to_finish() having been called on it. Please do so to ensure correct cleanup of the thread.");
#endif
thread.detach();
std::thread empty_thread;
thread.swap(empty_thread);
}
return nullptr;
id = atomic_increment(&last_thread_id);
std::thread new_thread(&Thread::callback, this, p_settings, p_callback, p_user);
thread.swap(new_thread);
}

void Thread::wait_to_finish(Thread *p_thread) {
if (wait_to_finish_func) {
wait_to_finish_func(p_thread);
bool Thread::is_started() const {
return id != 0;
}

void Thread::wait_to_finish() {
if (id != 0) {
thread.join();
std::thread empty_thread;
thread.swap(empty_thread);
id = 0;
}
}

Expand All @@ -64,3 +104,14 @@ Error Thread::set_name(const String &p_name) {

return ERR_UNAVAILABLE;
}

Thread::~Thread() {
if (id != 0) {
#ifdef DEBUG_ENABLED
WARN_PRINT("A Thread object has been destroyed without wait_to_finish() having been called on it. Please do so to ensure correct cleanup of the thread.");
#endif
thread.detach();
}
}

#endif
Loading

0 comments on commit 99fe462

Please sign in to comment.