diff --git a/src/lib/work-stealing.c b/src/lib/work-stealing.c index 05fa93866..3a79fcd50 100644 --- a/src/lib/work-stealing.c +++ b/src/lib/work-stealing.c @@ -18,14 +18,20 @@ struct Work; -// A Task is a function pointer that consumes a Work* and returns a Work* -// The input is the `Work` Always passed a pointer to the containing Work struct +// A Task is a function pointer that consumes a Work* and returns a Work*. +// The input `Work` is always a pointer to the Work struct containing that +// Task, which it accepts in order to be able to deallocate it. // Question: Do we also want to tell the task the thread id of the worker // that's running it? Maybe to support thread-local accumulators for // commutative reductions? // Oh yeah, also to know which worker's queue to put more stuff onto. -// Trampoline: returns the next work to do, if ready, or NULL if not. -typedef struct Work* (*Task)(struct Work*); +// +// The return value is a trampoline: a `Task` returns the next work to do, if +// it's runnable, or NULL if there isn't one. +// +// `Task`s are internal to the work-stealing system; the client does not +// provide or consume `Task`s. +typedef struct Work* (*Task)(int thread_id, struct Work*); typedef struct Work { Task code; @@ -33,8 +39,8 @@ typedef struct Work { void* args[]; } Work; -Work* EMPTY = -1; -Work* ABORT = -2; +Work* EMPTY = (Work*)-1; +Work* ABORT = (Work*)-2; ///////////////////////// // Work-stealing deque // @@ -56,8 +62,13 @@ typedef struct { void init(Deque* q, int size_hint) { // This does not appear in https://fzn.fr/readings/ppopp13.pdf; I am imputing // it. - atomic_init(&q->top, 0); - atomic_init(&q->bottom, 0); + // Initialize the buffer indices at 1 to prevent underflow. The buffer + // indices are of type `size_t`; the top index never decreases, and the bottom + // index is never less than the top index at rest. The smallest intermediate + // value ever used is `bottom-1`, inside `take`. Initializing `top` and + // `bottom` at 1 suffices to prevent this computation from underflowing. + atomic_init(&q->top, 1); + atomic_init(&q->bottom, 1); Array* a = (Array*) malloc(sizeof(Array) + sizeof(Work*) * size_hint); atomic_init(&a->size, size_hint); atomic_init(&q->array, a); @@ -147,7 +158,7 @@ Work* steal(Deque *q) { // Worker loop // ///////////////// -#define nthreads 24 +int thread_count; Deque* thread_queues; @@ -156,7 +167,7 @@ atomic_bool done; // Trampoline: Returns the next item to work on, or NULL if there aren't any. Work* do_one_work(int id, Work* work) { printf("Worker %d running item %p\n", id, work); - return (*(work->code))(work); + return (*(work->code))(id, work); } void do_work(int id, Work* work) { @@ -185,7 +196,7 @@ void* thread(void* payload) { } else { // No work in my own queue Work* stolen = EMPTY; - for (int i = 0; i < nthreads; ++i) { + for (int i = 0; i < thread_count; ++i) { if (i == id) continue; stolen = steal(&thread_queues[i]); if (stolen == ABORT) { @@ -198,7 +209,7 @@ void* thread(void* payload) { } } if (stolen == EMPTY) { - // Even though the queues we all empty when I tried them, somebody + // Even though the queues were all empty when I tried them, somebody // might have added some more work since. Busy-wait until the global // "done" flag is set. if (atomic_load(&done)) { @@ -215,63 +226,222 @@ void* thread(void* payload) { return NULL; } -//////////////////// -// Client program // -//////////////////// +/////////////////////////// +// Dex codegen interface // +/////////////////////////// -Work* print_task(Work* w) { - int* payload = (int*)w->args[0]; - int item = *payload; - printf("Did item %p with payload %d\n", w, item); - Work* cont = (Work*)w->args[1]; - free(payload); - free(w); - return join_work(cont); +// A (pointer to a) code-generated function. +// This should either return the result of calling `begin_pure_loop` or return `NULL`. +typedef Work* (*GenBlock)(int thread_id, void** env); + +// A (pointer to a) code-generated function that is a loop body. +// This should either return the result of calling `begin_pure_loop` or return `NULL`. +typedef Work* (*GenLoopBody)(int thread_id, int iteration, void** env); + +// Call this from Haskell once at the start of the process. +// The integer is the number of OS threads to spawn to run work-stealing. +void initialize_work_stealing(int nthreads); + +// Call this from Haskell to run a top block with work-stealing. When this +// exits, the work-stealing system is stopped, and results are written to their +// proper `Dest`s. +void execute_top_block(GenBlock body, void** env); + +// Call this from code-gen at the end of each top-level block. +void finish_work_stealing(); + +// Call this from code-gen to start a loop that you want work-stealing to +// parallelize. +// This assumes that the environment frame for the loop body and for the +// continuation is the same. That assumption isn't hard to change. +Work* begin_pure_loop(int thread_id, GenLoopBody body, GenBlock cont, void** env, int trip_count); + +///////////////////////// +// Dex codegen support // +///////////////////////// + +Work* run_gen_block(int thread_id, Work* self) { + GenBlock body = (GenBlock)self->args[0]; + void** env = (void**)self->args[1]; + free(self); + return body(thread_id, env); } -Work* done_task(Work* w) { - free(w); - atomic_store(&done, true); - return NULL; +// Return a `Work*` such that joining it `joiners` times is equivalent to joining +// the argument `cont` once. +// - `joiners` >= 1. +// - Do not use `cont` directly afterward, as this is allowed to mutate it. +Work* increase_cont_capacity(Work* cont, int joiners) { + // One way to achieve the goal is to just atomically increase the `join_count` + // of `cont` by `joiners - 1` and reuse it: + atomic_fetch_add(&cont->join_count, joiners - 1); + return cont; + // An alternative would be allocate a new `Work` with `join_count` equal to + // `joiners` and `task` to `join` the current `cont`. The advantage of this + // alternative is avoiding the atomic increment (on a potentially contentious + // variable if `cont` has many joiners already); the disadvantage is the + // allocation (which presumably entails some synchronization of its own), and + // an extra indirection at the end due to executing that mini-task. } -int main(int argc, char **argv) { +Work* execute_pure_loop_task(int id, Work* self); + +Work* execute_pure_loop(int thread_id, Work* cont, GenLoopBody body, void** env, int start_iter, int end_iter) { + if (end_iter - start_iter <= 1) { + // Few enough iterations; just do them. + for (int i = start_iter; i < end_iter; i++) { + do_work(thread_id, body(thread_id, i, env)); + } + return join_work(cont); + } else { + // Create Works that represent schedulable pieces of the loop. + int branching_factor = 2; + div_t iters_per_branch = div(end_iter - start_iter, branching_factor); + int this_iter = start_iter; + Work* subcont = increase_cont_capacity(cont, branching_factor); + // Queue up all but one chunk of the loop + for (int i = 0; i < branching_factor - 1; i++) { + int next_iter = this_iter + iters_per_branch.quot; + if (i < iters_per_branch.rem) { + next_iter++; + } + Work* section = (Work*) malloc(sizeof(Work) + 5 * sizeof(int*)); + section->code = &execute_pure_loop_task; + section->join_count = 0; + section->args[0] = subcont; + section->args[1] = body; + section->args[2] = env; + // TODO Is just casting ok here, or do I have to heap-allocate these ints? + // gcc complains about the integer and the pointer having different sizes. + section->args[3] = (void*)this_iter; + section->args[4] = (void*)next_iter; + push(&thread_queues[thread_id], section); + this_iter = next_iter; + } + // Do the last chunk directly yourself + return execute_pure_loop(thread_id, subcont, body, env, this_iter, end_iter); + } +} + +Work* execute_pure_loop_task(int id, Work* self) { + Work* cont = self->args[0]; + GenLoopBody body = self->args[1]; + void** env = self->args[2]; + int start_iter = (int)self->args[3]; + int end_iter = (int)self->args[4]; + return execute_pure_loop(id, cont, body, env, start_iter, end_iter); +} + +Work* begin_pure_loop(int thread_id, GenLoopBody body, GenBlock cont, void** env, int trip_count) { + // TODO: If the whole loop is smaller than the grain size for + // execute_pure_loop, I can avoid allocating the `Work` for the continuation + // too by just executing the iterations inline here. + Work* k = (Work*) malloc(sizeof(Work) + 5 * sizeof(int*)); + k->code = &run_gen_block; + k->join_count = 1; + k->args[0] = cont; + k->args[1] = env; + return execute_pure_loop(thread_id, k, body, env, 0, trip_count); +} + +pthread_t* the_threads; +int* tids; + +void initialize_work_stealing(int nthreads) { // Check that top and bottom are 64-bit so they never overflow assert(sizeof(atomic_size_t) == 8); - pthread_t threads[nthreads]; - int tids[nthreads]; + the_threads = (pthread_t*) malloc(nthreads * sizeof(pthread_t)); + tids = (int*) malloc(nthreads * sizeof(int)); thread_queues = (Deque*) malloc(nthreads * sizeof(Deque)); - int nprints = 10; - atomic_store(&done, false); - Work* done_work = (Work*) malloc(sizeof(Work)); - done_work->code = &done_task; - done_work->join_count = nthreads * nprints; for (int i = 0; i < nthreads; ++i) { tids[i] = i; init(&thread_queues[i], 8); - for (int j = 0; j < nprints; ++j) { - Work* work = (Work*) malloc(sizeof(Work) + 2 * sizeof(int*)); - work->code = &print_task; - work->join_count = 0; - int* payload = malloc(sizeof(int)); - *payload = 1000 * i + j; - work->args[0] = payload; - work->args[1] = done_work; - push(&thread_queues[i], work); - } } - for (int i = 0; i < nthreads; ++i) { - if (pthread_create(&threads[i], NULL, thread, &tids[i]) != 0) { + thread_count = nthreads; +} + +void execute_top_block(GenBlock body, void** env) { + Work* job = (Work*) malloc(sizeof(Work) + 2 * sizeof(int*)); + job->code = &run_gen_block; + job->join_count = 0; + job->args[0] = body; + job->args[1] = env; + atomic_store(&done, false); + push(&thread_queues[0], job); + // TODO: Do we really want to start and kill all the threads for every top + // level block, or is there a way to suspend and reuse them? + for (int i = 0; i < thread_count; ++i) { + if (pthread_create(&the_threads[i], NULL, thread, &tids[i]) != 0) { perror("failed to start the thread"); exit(EXIT_FAILURE); } } - for (int i = 0; i < nthreads; ++i) { - if (pthread_join(threads[i], NULL) != 0) { + for (int i = 0; i < thread_count; ++i) { + if (pthread_join(the_threads[i], NULL) != 0) { perror("failed to join the thread"); exit(EXIT_FAILURE); } } - printf("Expect %d lines of output (including this one)\n", 2 * nthreads * nprints + nthreads + 2); + // We expect all the queues to be empty at this point. TODO: Check? +} + +void finish_work_stealing() { + atomic_store(&done, true); +} + +//////////////////// +// Client program // +//////////////////// + +// A slightly silly program that iterates a single loop a synamic number of +// times, and has each loop iteration (and the coda) echo the trip count + 1, +// just to show that data can be mutated. + +Work* gen_loop_body(int thread_id, int iteration, void** env) { + int* payload = (int*)env[0]; + int item = *payload; + printf("Loop iteration %d on worker %d, payload %d\n", + iteration, thread_id, item); + return NULL; +} + +Work* end_gen_block(int thread_id, void** env) { + int* payload = (int*)env[0]; + int item = *payload; + printf("Finishing on worker %d, payload %d\n", thread_id, item); + free(payload); + free(env); + finish_work_stealing(); + return NULL; +} + +Work* start_gen_block(int thread_id, void** env) { + int* payload = (int*)env[0]; + int item = *payload; + printf("Starting on worker %d, payload %d\n", thread_id, item); + *payload = item + 1; + return begin_pure_loop(thread_id, gen_loop_body, end_gen_block, env, item); +} + +int main(int argc, char **argv) { + initialize_work_stealing(24); + void** env = malloc(sizeof(int*)); + int* payload = malloc(sizeof(int)); + int num_iters = 200; + *payload = num_iters; + env[0] = payload; + execute_top_block(&start_gen_block, env); + int expected_output_lines = + 1 // "Starting" + + 1 // "Finishing" + + thread_count // "Worker n finished" + + 1 // Expected line report + + num_iters // Each loop iteration + + 1 // "Worker running item" for the entry point + + 1 // "Worker running item" for the end + + (num_iters - 1) // "Worker running item" for itermediates in the loop tree + ; + printf("Expect %d lines of output (including this one)\n", + expected_output_lines); return 0; }