Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run one worker on main thread #437

Merged
merged 3 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 31 additions & 18 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -982,18 +982,6 @@ void lf_print_snapshot(environment_t* env) {
}
#endif // NDEBUG

// Start threads in the thread pool.
void start_threads(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);

LF_PRINT_LOG("Starting %u worker threads in environment", env->num_workers);
for (int i = 0; i < env->num_workers; i++) {
if (lf_thread_create(&env->thread_ids[i], worker, env) != 0) {
lf_print_error_and_exit("Could not start thread-%u", i);
}
}
}

/**
* @brief Determine the number of workers.
*/
Expand Down Expand Up @@ -1124,23 +1112,48 @@ int lf_reactor_c_main(int argc, const char* argv[]) {
_lf_initialize_start_tag(env);

lf_print("Environment %u: ---- Spawning %d workers.", env->id, env->num_workers);
start_threads(env);

for (int j = 0; j < env->num_workers; j++) {
if (i == 0 && j == 0) {
// The first worker thread of the first environment will be
// run on the main thread, rather than creating a new thread.
// This is important for bare-metal platforms, who can't
// afford to have the main thread sit idle.
continue;
}
if (lf_thread_create(&env->thread_ids[j], worker, env) != 0) {
lf_print_error_and_exit("Could not start thread-%u", j);
}
}

// Unlock mutex and allow threads proceed
LF_MUTEX_UNLOCK(&env->mutex);
}

// main thread worker (first worker thread of first environment)
void* main_thread_exit_status = NULL;
if (num_envs > 0 && envs[0].num_workers > 0) {
environment_t* env = &envs[0];
main_thread_exit_status = worker(env);
}

for (int i = 0; i < num_envs; i++) {
// Wait for the worker threads to exit.
environment_t* env = &envs[i];
void* worker_thread_exit_status = NULL;
int ret = 0;
for (int i = 0; i < env->num_workers; i++) {
int failure = lf_thread_join(env->thread_ids[i], &worker_thread_exit_status);
if (failure) {
lf_print_error("Failed to join thread listening for incoming messages: %s", strerror(failure));
for (int j = 0; j < env->num_workers; j++) {
if (i == 0 && j == 0) {
// main thread worker
worker_thread_exit_status = main_thread_exit_status;
} else {
int failure = lf_thread_join(env->thread_ids[j], &worker_thread_exit_status);
if (failure) {
lf_print_error("Failed to join thread listening for incoming messages: %s", strerror(failure));
}
}
if (worker_thread_exit_status != NULL) {
lf_print_error("---- Worker %d reports error code %p", i, worker_thread_exit_status);
lf_print_error("---- Worker %d reports error code %p", j, worker_thread_exit_status);
ret = 1;
}
}
Expand Down
4 changes: 3 additions & 1 deletion low_level_platform/impl/src/lf_zephyr_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ int lf_enable_interrupts_nested() {
#define NUMBER_OF_WATCHDOGS 0
#endif

#define NUMBER_OF_THREADS (NUMBER_OF_WORKERS + USER_THREADS + NUMBER_OF_WATCHDOGS)
// Number of additional threads that will be created
// One worker will run on the main thread, so for N workers, only (N - 1) worker threads should be created
#define NUMBER_OF_THREADS ((NUMBER_OF_WORKERS - 1) + USER_THREADS + NUMBER_OF_WATCHDOGS)

K_MUTEX_DEFINE(thread_mutex);

Expand Down
Loading