Skip to content

Commit

Permalink
Merge pull request #1341 from axch/work-stealing-4
Browse files Browse the repository at this point in the history
Slightly refactor the work-stealing runtime, hopefully more legible now.
  • Loading branch information
axch committed Nov 20, 2023
2 parents 7996c76 + 499d0ef commit 438d957
Showing 1 changed file with 44 additions and 35 deletions.
79 changes: 44 additions & 35 deletions src/lib/work-stealing.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct Work;
typedef struct Work* (*Task)(int thread_id, struct Work*);

typedef struct Work {
Task code;
Task task;
atomic_int join_count;
void* args[];
} Work;
Expand All @@ -59,7 +59,7 @@ typedef struct {
_Atomic(Array *) array;
} Deque;

void init(Deque* q, int size_hint) {
void init(Deque* q, size_t size_hint) {
// This does not appear in https://fzn.fr/readings/ppopp13.pdf; I am imputing
// it.
// Initialize the buffer indices at 1 to prevent underflow. The buffer
Expand Down Expand Up @@ -167,7 +167,7 @@ atomic_bool done;
// Trampoline: Returns the next item to work on, or NULL if there aren't any.
Work* do_one_work(int id, Work* work) {
printf("Worker %d running item %p\n", id, work);
return (*(work->code))(id, work);
return (*(work->task))(id, work);
}

void do_work(int id, Work* work) {
Expand Down Expand Up @@ -236,7 +236,7 @@ typedef Work* (*GenBlock)(int thread_id, void** env);

// A (pointer to a) code-generated function that is a loop body.
// This should either return the result of calling `begin_pure_loop` or return `NULL`.
typedef Work* (*GenLoopBody)(int thread_id, int iteration, void** env);
typedef Work* (*GenLoopBody)(int thread_id, size_t iteration, void** env);

// Call this from Haskell once at the start of the process.
// The integer is the number of OS threads to spawn to run work-stealing.
Expand All @@ -254,19 +254,28 @@ void finish_work_stealing();
// parallelize.
// This assumes that the environment frame for the loop body and for the
// continuation is the same. That assumption isn't hard to change.
Work* begin_pure_loop(int thread_id, GenLoopBody body, GenBlock cont, void** env, int trip_count);
Work* begin_pure_loop(int thread_id, GenLoopBody body, GenBlock cont, void** env, size_t trip_count);

/////////////////////////
// Dex codegen support //
/////////////////////////

Work* run_gen_block(int thread_id, Work* self) {
Work* run_gen_block_task(int thread_id, Work* self) {
GenBlock body = (GenBlock)self->args[0];
void** env = (void**)self->args[1];
free(self);
return body(thread_id, env);
}

Work* allocate_run_gen_block_work(int join_count, GenBlock cont, void** env) {
Work* work = (Work*) malloc(sizeof(Work) + 2 * sizeof(int*));
work->task = &run_gen_block_task;
work->join_count = join_count;
work->args[0] = cont;
work->args[1] = env;
return work;
}

// Return a `Work*` such that joining it `joiners` times is equivalent to joining
// the argument `cont` once.
// - `joiners` >= 1.
Expand All @@ -286,42 +295,50 @@ Work* increase_cont_capacity(Work* cont, int joiners) {

Work* execute_pure_loop_task(int id, Work* self);

Work* allocate_execute_pure_loop_work(
Work* cont, GenLoopBody body, void** env,
size_t start_iter, size_t end_iter) {
Work* chunk = (Work*) malloc(sizeof(Work) + 5 * sizeof(int*));
chunk->task = &execute_pure_loop_task;
chunk->join_count = 0;
chunk->args[0] = cont;
chunk->args[1] = body;
chunk->args[2] = env;
chunk->args[3] = (void*)start_iter;
chunk->args[4] = (void*)end_iter;
return chunk;
}

// The recursive workhorse for running a pure loop.
Work* execute_pure_loop(int thread_id, Work* cont, GenLoopBody body, void** env, int start_iter, int end_iter) {
int grain_size = 1
Work* execute_pure_loop(
int thread_id, Work* cont, GenLoopBody body, void** env,
size_t start_iter, size_t end_iter) {
int grain_size = 1;
if (end_iter - start_iter <= grain_size) {
// Few enough iterations; just do them.
for (int i = start_iter; i < end_iter; i++) {
for (size_t i = start_iter; i < end_iter; i++) {
do_work(thread_id, body(thread_id, i, env));
}
return join_work(cont);
} else {
// Break the loop up into chunks of iterations, and schedule those.
int branching_factor = 2;
div_t iters_per_branch = div(end_iter - start_iter, branching_factor);
int this_iter = start_iter;
size_t this_iter = start_iter;
// We are increasing the number of `Work`s that are permitted to join our
// continuation `cont`. Account for that.
Work* subcont = increase_cont_capacity(cont, branching_factor);
// Queue up all but one chunk of the loop for idle workers to potentially
// steal.
for (int i = 0; i < branching_factor - 1; i++) {
int next_iter = this_iter + iters_per_branch.quot;
size_t next_iter = this_iter + iters_per_branch.quot;
if (i < iters_per_branch.rem) {
// The chunks may be slightly uneven if the trip count is not evenly
// divisible by the branching factor.
next_iter++;
}
Work* chunk = (Work*) malloc(sizeof(Work) + 5 * sizeof(int*));
chunk->code = &execute_pure_loop_task;
chunk->join_count = 0;
chunk->args[0] = subcont;
chunk->args[1] = body;
chunk->args[2] = env;
// TODO Is just casting ok here, or do I have to heap-allocate these ints?
// gcc complains about the integer and the pointer having different sizes.
chunk->args[3] = (void*)this_iter;
chunk->args[4] = (void*)next_iter;
Work* chunk = allocate_execute_pure_loop_work(
subcont, body, env, this_iter, next_iter);
push(&thread_queues[thread_id], chunk);
this_iter = next_iter;
}
Expand All @@ -334,20 +351,16 @@ Work* execute_pure_loop_task(int id, Work* self) {
Work* cont = self->args[0];
GenLoopBody body = self->args[1];
void** env = self->args[2];
int start_iter = (int)self->args[3];
int end_iter = (int)self->args[4];
size_t start_iter = (size_t)self->args[3];
size_t end_iter = (size_t)self->args[4];
return execute_pure_loop(id, cont, body, env, start_iter, end_iter);
}

Work* begin_pure_loop(int thread_id, GenLoopBody body, GenBlock cont, void** env, int trip_count) {
Work* begin_pure_loop(int thread_id, GenLoopBody body, GenBlock cont, void** env, size_t trip_count) {
// TODO: If the whole loop is smaller than the grain size for
// execute_pure_loop, I can avoid allocating the `Work` for the continuation
// too by just executing the iterations inline here.
Work* k = (Work*) malloc(sizeof(Work) + 5 * sizeof(int*));
k->code = &run_gen_block;
k->join_count = 1;
k->args[0] = cont;
k->args[1] = env;
Work* k = allocate_run_gen_block_work(1, cont, env);
return execute_pure_loop(thread_id, k, body, env, 0, trip_count);
}

Expand All @@ -368,11 +381,7 @@ void initialize_work_stealing(int nthreads) {
}

void execute_top_block(GenBlock body, void** env) {
Work* job = (Work*) malloc(sizeof(Work) + 2 * sizeof(int*));
job->code = &run_gen_block;
job->join_count = 0;
job->args[0] = body;
job->args[1] = env;
Work* job = allocate_run_gen_block_work(0, body, env);
atomic_store(&done, false);
push(&thread_queues[0], job);
// TODO: Do we really want to start and kill all the threads for every top
Expand Down Expand Up @@ -404,7 +413,7 @@ void finish_work_stealing() {
// times, and has each loop iteration (and the coda) echo the trip count + 1,
// just to show that data can be mutated.

Work* gen_loop_body(int thread_id, int iteration, void** env) {
Work* gen_loop_body(int thread_id, size_t iteration, void** env) {
int* payload = (int*)env[0];
int item = *payload;
printf("Loop iteration %d on worker %d, payload %d\n",
Expand Down

0 comments on commit 438d957

Please sign in to comment.