Skip to content

Commit

Permalink
futex thread pool
Browse files Browse the repository at this point in the history
Signed-off-by: Pantelis Antoniou <[email protected]>
  • Loading branch information
pantoniou committed Sep 6, 2023
1 parent 9178104 commit b4f1d16
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 4 deletions.
112 changes: 112 additions & 0 deletions src/blake3/blake3_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,121 @@
#include <unistd.h>
#include <errno.h>

#if defined(__linux__)
#include <sys/syscall.h>
#include <linux/futex.h>
#endif

#include "blake3_impl.h"
#include "blake3_thread.h"

#define B3WORK_SHUTDOWN ((const blake3_thread_work *)(void *)-1)

#if defined(__linux__) && !defined(BLAKE3_THREAD_PORTABLE)

/* linux pedal to the metal implementation */
static inline int futex(uint32_t *uaddr, int futex_op, uint32_t val, const struct timespec *timeout, uint32_t *uaddr2, uint32_t val3)
{
return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3);
}

static inline int fwait(uint32_t *futexp)
{
long s;
const uint32_t one = 1;

while (!atomic_compare_exchange_strong(futexp, &one, 0)) {
s = futex(futexp, FUTEX_WAIT, 0, NULL, NULL, 0);
if (s == -1 && errno != EAGAIN)
return -1;
}
return 0;
}

static inline int fpost(uint32_t *futexp)
{
long s;
const uint32_t zero = 0;

if (atomic_compare_exchange_strong(futexp, &zero, 1)) {
s = futex(futexp, FUTEX_WAKE, 1, NULL, NULL, 0);
if (s == -1)
return -1;
}
return 0;
}

static inline void blake3_thread_init_sync(blake3_thread *t)
{
/* nothing more needed for futexes */
atomic_store(&t->submit, 0);
atomic_store(&t->done, 0);
}

static inline const blake3_thread_work *blake3_worker_wait_for_work(blake3_thread *t)
{
if (fwait(&t->submit)) {
fprintf(stderr, "%s:%d\n", __FILE__, __LINE__);
abort();
return NULL;
}
return t->work;
}

static inline void blake3_worker_signal_work_done(blake3_thread *t, const blake3_thread_work *work)
{
const blake3_thread_work *exp_work;

/* note that the work won't be replaced if it's a shutdown */
exp_work = work;
if (!atomic_compare_exchange_strong(&t->work, &exp_work, NULL)) {
assert(exp_work == B3WORK_SHUTDOWN);
fprintf(stderr, "%s:%d\n", __FILE__, __LINE__);
abort();
return;
}

(void)fpost(&t->done);
}

int blake3_thread_submit_work(blake3_thread *t, const blake3_thread_work *work)
{
const blake3_thread_work *exp_work;

/* atomically update the work */
exp_work = NULL;
if (!atomic_compare_exchange_strong(&t->work, &exp_work, work)) {
fprintf(stderr, "%s:%d - %p\n", __FILE__, __LINE__, exp_work);
assert(exp_work == B3WORK_SHUTDOWN);
abort();
return -1;
}

return fpost(&t->submit);
}

int blake3_thread_wait_work(blake3_thread *t)
{
const blake3_thread_work *work;

while ((work = atomic_load(&t->work)) != NULL)
fwait(&t->done);

atomic_store(&t->done, 0);

return 0;
}

void blake3_worker_thread_shutdown(blake3_thread *t)
{
atomic_store(&t->work, B3WORK_SHUTDOWN);
fpost(&t->submit);
}

#else

/* portable pthread implementation */

static inline void blake3_thread_init_sync(blake3_thread *t)
{
pthread_mutex_init(&t->lock, NULL);
Expand Down Expand Up @@ -95,6 +205,8 @@ void blake3_worker_thread_shutdown(blake3_thread *t)
pthread_join(t->tid, NULL);
}

#endif

void *blake3_worker_thread(void *arg)
{
blake3_thread *t = arg;
Expand Down
10 changes: 8 additions & 2 deletions src/blake3/blake3_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,22 @@ typedef struct _blake3_thread_work {
void *arg;
} blake3_thread_work;

//#define BLAKE3_THREAD_PORTABLE
typedef struct _blake3_thread {
struct _blake3_thread_pool *tp;
unsigned int id;
pthread_t tid;
pthread_mutex_t lock;
pthread_cond_t cond;
void *arg;
const blake3_thread_work *work;
#if defined(__linux__) && !defined(BLAKE3_THREAD_PORTABLE)
uint32_t submit;
uint32_t done;
#else
pthread_mutex_t lock;
pthread_cond_t cond;
pthread_mutex_t wait_lock;
pthread_cond_t wait_cond;
#endif
} blake3_thread;

typedef struct _blake3_thread_pool {
Expand Down
4 changes: 2 additions & 2 deletions src/internal/fy-b3sum.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void test_worker_threads(void)
}

fprintf(stderr, "%s: test_count=%d\n", __func__, test_count);
assert(test_count == num_cpus);
assert(test_count == (int)num_cpus);

for (i = 0; i < count; i++) {
t = threads[i];
Expand Down Expand Up @@ -132,7 +132,7 @@ void test_thread_join(void)
blake3_thread_arg_join(tp, test_worker_thread_fn, NULL, &test_count, count);

fprintf(stderr, "%s: test_count=%d\n", __func__, test_count);
assert(test_count == num_cpus);
assert(test_count == (int)num_cpus);

fprintf(stderr, "calling: blake3_thread_pool_destroy()\n");
blake3_thread_pool_destroy(tp);
Expand Down

0 comments on commit b4f1d16

Please sign in to comment.