Skip to content

Commit

Permalink
atomics
Browse files Browse the repository at this point in the history
Signed-off-by: Pantelis Antoniou <[email protected]>
  • Loading branch information
pantoniou committed Sep 5, 2023
1 parent b14d5d5 commit 27ae08c
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 36 deletions.
139 changes: 104 additions & 35 deletions src/blake3/blake3.c
Original file line number Diff line number Diff line change
Expand Up @@ -398,16 +398,16 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t
}

if (left_thread) {
blake3_thread_wait_result(left_thread);
left_n = left_state.n;
blake3_thread_release(left_thread);
blake3_thread_wait_result_and_release(left_thread);
blake3_thread_pool_unreserve(hs, left_thread);

left_n = left_state.n;
}
if (right_thread) {
blake3_thread_wait_result(right_thread);
right_n = right_state.n;
blake3_thread_release(right_thread);
blake3_thread_wait_result_and_release(right_thread);
blake3_thread_pool_unreserve(hs, right_thread);

right_n = right_state.n;
}

assert(left_n);
Expand Down Expand Up @@ -675,6 +675,7 @@ void *blake3_host_state_thread(void *arg)
{
blake3_thread *t = arg;
blake3_host_state *hs;
unsigned int state;
int rc;

(void)rc;
Expand All @@ -683,39 +684,38 @@ void *blake3_host_state_thread(void *arg)
hs = t->hs;
assert(hs);

assert(t->state == B3TS_STARTUP);
assert(atomic_load(&t->state) == B3TS_STARTUP);

/* verify that thread state is sane */
assert(t >= hs->threads && t < hs->threads + hs->num_threads);
assert(t->id < hs->num_threads);

// fprintf(stderr, "%s: starting #%d\n", __func__, t->id);

/* note that the lock is long held, while the operation takes place */
pthread_mutex_lock(&t->lock);

/* change state and signal the user */
t->state = B3TS_WAITING;
atomic_store(&t->state, B3TS_WAITING);
pthread_cond_signal(&t->cond);

for (;;) {

/* wait here until we get handed a function to execute */
while (t->state == B3TS_WAITING) {
while ((state = atomic_load(&t->state)) == B3TS_WAITING) {
rc = pthread_cond_wait(&t->cond, &t->lock);
assert(!rc);
}

/* shutting down? */
if (t->state == B3TS_SHUTTINGDOWN)
if (state == B3TS_SHUTTINGDOWN)
break;
submitted:
/* only submitted allowed here */
assert(t->state == B3TS_SUBMITTED);
assert(state == B3TS_SUBMITTED);
assert(t->fn);

/* we're now processing */
t->state = B3TS_PROCESSING;
atomic_store(&t->state, B3TS_PROCESSING);
pthread_mutex_unlock(&t->lock);

/* execute out of the lock */
Expand All @@ -724,41 +724,41 @@ void *blake3_host_state_thread(void *arg)
pthread_mutex_lock(&t->lock);

/* shutting down? */
if (t->state == B3TS_SHUTTINGDOWN)
if (atomic_load(&t->state) == B3TS_SHUTTINGDOWN)
break;

assert(t->state == B3TS_PROCESSING);
assert(atomic_load(&t->state) == B3TS_PROCESSING);

/* we're done... change state and signal all */
t->state = B3TS_DONE;
atomic_store(&t->state, B3TS_DONE);
pthread_cond_signal(&t->cond);

/* wait here until the results are consumed */
while (t->state == B3TS_DONE) {
while ((state = atomic_load(&t->state)) == B3TS_DONE) {
rc = pthread_cond_wait(&t->cond, &t->lock);
assert(!rc);
}

/* shutting down? */
if (t->state == B3TS_SHUTTINGDOWN)
if (state == B3TS_SHUTTINGDOWN)
break;

/* user must consume the data */
assert(t->state == B3TS_RELEASE || t->state == B3TS_SUBMITTED);
assert(state == B3TS_RELEASE || state == B3TS_SUBMITTED);

/* we went from done to submitted, it's cool */
if (t->state == B3TS_SUBMITTED)
if (state == B3TS_SUBMITTED)
goto submitted;

t->fn = NULL;

/* and now we're back to waiting */
t->state = B3TS_WAITING;
atomic_store(&t->state, B3TS_WAITING);
rc = pthread_cond_signal(&t->cond);
assert(rc == 0);
}

t->state = B3TS_SHUTDOWN;
atomic_store(&t->state, B3TS_SHUTDOWN);
pthread_mutex_unlock(&t->lock);

return NULL;
Expand All @@ -778,7 +778,7 @@ int blake3_thread_init(blake3_host_state *hs, blake3_thread *t)
rc = pthread_cond_init(&t->cond, NULL);
assert(!rc);

t->state = B3TS_STARTUP;
atomic_store(&t->state, B3TS_STARTUP);

rc = pthread_create(&t->tid, NULL, blake3_host_state_thread, t);
assert(rc == 0);
Expand Down Expand Up @@ -819,7 +819,7 @@ void blake3_thread_shutdown(blake3_thread *t)
assert(!rc);
}

t->state = B3TS_SHUTTINGDOWN;
atomic_store(&t->state, B3TS_SHUTTINGDOWN);

rc = pthread_cond_signal(&t->cond);
assert(rc == 0);
Expand All @@ -831,13 +831,14 @@ void blake3_thread_shutdown(blake3_thread *t)
assert(rc == 0);
}

blake3_thread *blake3_thread_pool_reserve(blake3_host_state *hs, bool wait)
blake3_thread *blake3_thread_pool_reserve_portable(blake3_host_state *hs, bool wait)
{
blake3_thread *t = NULL;
unsigned int slot;
unsigned int slot, state;
int rc;

(void)rc;
(void)state;

assert(hs);

Expand All @@ -853,7 +854,8 @@ blake3_thread *blake3_thread_pool_reserve(blake3_host_state *hs, bool wait)
slot = highest_one(hs->thread_free);
assert(slot < hs->num_threads);
t = hs->threads + slot;
assert(t->state == B3TS_WAITING || t->state == B3TS_RELEASE);

assert((state = atomic_load(&t->state)) == B3TS_WAITING || state == B3TS_RELEASE);

/* clear the slot */
hs->thread_free &= ~((uint64_t)1 << slot);
Expand All @@ -864,7 +866,7 @@ blake3_thread *blake3_thread_pool_reserve(blake3_host_state *hs, bool wait)
return t;
}

void blake3_thread_pool_unreserve(blake3_host_state *hs, blake3_thread *t)
void blake3_thread_pool_unreserve_portable(blake3_host_state *hs, blake3_thread *t)
{
unsigned int slot;
int rc;
Expand All @@ -881,6 +883,56 @@ void blake3_thread_pool_unreserve(blake3_host_state *hs, blake3_thread *t)
pthread_mutex_unlock(&hs->lock_thread_free);
}

#if defined(__GNUC__) || defined(__clang__)

blake3_thread *blake3_thread_pool_reserve(blake3_host_state *hs, bool wait)
{
blake3_thread *t;
unsigned int slot, state;
uint64_t v, exp;

(void)state;

v = atomic_load(&hs->thread_free);
while (v) {
slot = highest_one(v);
exp = v; /* expecting the previous value */
v &= ~((uint64_t)1 << slot); /* clear this bit */
if (atomic_compare_exchange_strong(&hs->thread_free, &exp, v)) {
t = hs->threads + slot;
assert((state = atomic_load(&t->state)) == B3TS_WAITING || state == B3TS_RELEASE);
return t;
}
v = exp;
}

if (!wait)
return NULL;

/* fallback to the portable case */
return blake3_thread_pool_reserve_portable(hs, wait);
}

void blake3_thread_pool_unreserve(blake3_host_state *hs, blake3_thread *t)
{
atomic_fetch_or(&hs->thread_free, (uint64_t)1 << t->id);
pthread_cond_broadcast(&hs->cond_thread_free);
}

#else

blake3_thread *blake3_thread_pool_reserve(blake3_host_state *hs, bool wait)
{
return blake3_thread_pool_reserve_portable(hs, wait);
}

void blake3_thread_pool_unreserve(blake3_host_state *hs, blake3_thread *t)
{
blake3_thread_pool_unreserve_portable(hs, t);
}

#endif

int blake3_thread_submit(blake3_thread *t, blake3_thread_fn fn, void *arg)
{
blake3_host_state *hs;
Expand All @@ -906,7 +958,7 @@ int blake3_thread_submit(blake3_thread *t, blake3_thread_fn fn, void *arg)
assert(!rc);
}

t->state = B3TS_SUBMITTED;
atomic_store(&t->state, B3TS_SUBMITTED);
t->fn = fn;
t->arg = arg;

Expand All @@ -922,6 +974,7 @@ int blake3_thread_submit(blake3_thread *t, blake3_thread_fn fn, void *arg)
int blake3_thread_wait_result(blake3_thread *t)
{
blake3_host_state *hs;
unsigned int state;
int rc;

(void)rc;
Expand All @@ -939,12 +992,12 @@ int blake3_thread_wait_result(blake3_thread *t)

rc = pthread_mutex_lock(&t->lock);
assert(rc == 0);
while (t->state == B3TS_SUBMITTED || t->state == B3TS_PROCESSING) {
while ((state = atomic_load(&t->state)) == B3TS_SUBMITTED || state == B3TS_PROCESSING) {
rc = pthread_cond_wait(&t->cond, &t->lock);
assert(!rc);
}

assert(t->state == B3TS_DONE);
assert(state == B3TS_DONE);

rc = pthread_mutex_unlock(&t->lock);
assert(rc == 0);
Expand Down Expand Up @@ -975,9 +1028,9 @@ void blake3_thread_release(blake3_thread *t)
assert(rc == 0);
#endif

assert(t->state == B3TS_DONE);
assert(atomic_load(&t->state) == B3TS_DONE);

t->state = B3TS_RELEASE;
atomic_store(&t->state, B3TS_RELEASE);
rc = pthread_cond_signal(&t->cond);
assert(rc == 0);

Expand All @@ -987,6 +1040,20 @@ void blake3_thread_release(blake3_thread *t)
#endif
}

int blake3_thread_wait_result_and_release(blake3_thread *t)
{
int rc;

(void)rc;

rc = blake3_thread_wait_result(t);
assert(!rc);

blake3_thread_release(t);

return 0;
}

#if 0
static void test_thread_fn(struct _blake3_thread *t, void *arg)
{
Expand Down Expand Up @@ -1071,7 +1138,8 @@ static inline void cpuid(uint32_t out[4], uint32_t id)
#if defined(_MSC_VER)
__cpuid((int *)out, id);
#elif defined(__i386__) || defined(_M_IX86)
__asm__ __volatile__("movl %%ebx, %1\n"
__asm__ __volatile__(
"movl %%ebx, %1\n"
"cpuid\n"
"xchgl %1, %%ebx\n"
: "=a"(out[0]), "=r"(out[1]), "=c"(out[2]), "=d"(out[3])
Expand All @@ -1088,7 +1156,8 @@ static inline void cpuidex(uint32_t out[4], uint32_t id, uint32_t sid)
#if defined(_MSC_VER)
__cpuidex((int *)out, id, sid);
#elif defined(__i386__) || defined(_M_IX86)
__asm__ __volatile__("movl %%ebx, %1\n"
__asm__ __volatile__(
"movl %%ebx, %1\n"
"cpuid\n"
"xchgl %1, %%ebx\n"
: "=a"(out[0]), "=r"(out[1]), "=c"(out[2]), "=d"(out[3])
Expand Down
1 change: 1 addition & 0 deletions src/blake3/blake3_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <stddef.h>
#include <stdint.h>
#include <string.h>
#include <stdatomic.h>

#include "blake3.h"
#include "blake3_internal.h"
Expand Down
8 changes: 7 additions & 1 deletion src/blake3/blake3_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <stddef.h>
#include <stdint.h>
#include <pthread.h>
#include <stdatomic.h>

#include "blake3.h"

Expand Down Expand Up @@ -144,7 +145,11 @@ typedef struct _blake3_host_state {
struct _blake3_thread *threads;
pthread_mutex_t lock_thread_free; // should not be needed at the end if we use atomics
pthread_cond_t cond_thread_free;
#if defined(__GNUC__) || defined(__clang__)
_Atomic uint64_t thread_free;
#else
uint64_t thread_free;
#endif
} blake3_host_state;

typedef struct _blake3_hasher {
Expand Down Expand Up @@ -263,14 +268,15 @@ typedef struct _blake3_thread {
pthread_t tid;
pthread_mutex_t lock;
pthread_cond_t cond;
blake3_thread_state state;
_Atomic unsigned int state;
blake3_thread_fn fn;
void *arg;
} blake3_thread;

int blake3_thread_submit(blake3_thread *t, blake3_thread_fn fn, void *arg);
int blake3_thread_wait_result(blake3_thread *t);
void blake3_thread_release(blake3_thread *t);
int blake3_thread_wait_result_and_release(blake3_thread *t);

blake3_thread *blake3_thread_pool_reserve(blake3_host_state *hs, bool wait);
void blake3_thread_pool_unreserve(blake3_host_state *hs, blake3_thread *t);
Expand Down

0 comments on commit 27ae08c

Please sign in to comment.