Skip to content

Commit

Permalink
working thread-pool-ng
Browse files Browse the repository at this point in the history
Signed-off-by: Pantelis Antoniou <[email protected]>
  • Loading branch information
pantoniou committed Sep 5, 2023
1 parent fdc08ec commit 6977fe5
Show file tree
Hide file tree
Showing 3 changed files with 477 additions and 129 deletions.
216 changes: 108 additions & 108 deletions src/blake3/blake3.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ static inline size_t blake3_left_len(size_t content_len)
return round_down_to_power_of_2(full_chunks) * BLAKE3_CHUNK_LEN;
}

const char *blake3_version(void)
{
return BLAKE3_VERSION_STRING;
}

static inline void blake3_chunk_state_update(blake3_host_state *hs, blake3_chunk_state *self, const uint8_t *input, size_t input_len)
{
size_t take;
Expand Down Expand Up @@ -68,7 +63,7 @@ static inline void blake3_chunk_state_update(blake3_host_state *hs, blake3_chunk
// interface handles both user input and parent node blocks, so it accepts
// bytes. For that reason, chaining values in the CV stack are represented as
// bytes.
static inline void blake3_output_chaining_value(blake3_host_state *hs, const blake3_output_t *self, uint8_t cv[32])
static void blake3_output_chaining_value(blake3_host_state *hs, const blake3_output_t *self, uint8_t cv[32])
{
uint32_t cv_words[8];

Expand All @@ -77,7 +72,7 @@ static inline void blake3_output_chaining_value(blake3_host_state *hs, const bla
store_cv_words(cv, cv_words);
}

static inline void blake3_output_root_bytes(blake3_host_state *hs, const blake3_output_t *self, uint64_t seek, uint8_t *out, size_t out_len)
static void blake3_output_root_bytes(blake3_host_state *hs, const blake3_output_t *self, uint64_t seek, uint8_t *out, size_t out_len)
{
uint64_t output_block_counter;
size_t offset_within_block;
Expand Down Expand Up @@ -247,7 +242,7 @@ static size_t blake3_compress_parents_parallel(blake3_host_state *hs, const uint

static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t *input, size_t input_len, const uint32_t key[8], uint64_t chunk_counter, uint8_t flags, uint8_t *out);

static inline void blake3_compress_subtree_wide_thread(struct _blake3_thread *t, void *arg)
static void blake3_compress_subtree_wide_thread(struct _blake3_thread *t, void *arg)
{
blake3_compress_subtree_state *s = arg;

Expand Down Expand Up @@ -349,14 +344,14 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t

// get a thread (if possible)
if (left_mt) {
left_thread = blake3_thread_pool_reserve(hs, false);
left_thread = blake3_host_state_thread_pool_reserve(hs, false);
// fprintf(stderr, "%s: left_thread #%d ctr=%lu chunks=%lu\n", __func__, left_thread ? (int)left_thread->id : -1,
// left_chunk_counter, left_input_len / BLAKE3_CHUNK_LEN);
} else
left_thread = NULL;

if (right_mt) {
right_thread = blake3_thread_pool_reserve(hs, false);
right_thread = blake3_host_state_thread_pool_reserve(hs, false);
// fprintf(stderr, "%s: right_thread #%d ctr=%lu chunks=%lu\n", __func__, right_thread ? (int)right_thread->id : -1,
// right_chunk_counter, right_input_len / BLAKE3_CHUNK_LEN);
} else
Expand Down Expand Up @@ -400,13 +395,13 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t

if (left_thread) {
blake3_thread_wait_result_and_release(left_thread);
blake3_thread_pool_unreserve(hs, left_thread);
blake3_host_state_thread_pool_unreserve(hs, left_thread);

left_n = left_state.n;
}
if (right_thread) {
blake3_thread_wait_result_and_release(right_thread);
blake3_thread_pool_unreserve(hs, right_thread);
blake3_host_state_thread_pool_unreserve(hs, right_thread);

right_n = right_state.n;
}
Expand All @@ -426,101 +421,6 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t
return blake3_compress_parents_parallel(hs, cv_array, left_n + right_n, key, flags, out);
}

static inline void hasher_init_base(blake3_host_state *hs, blake3_hasher *self, const uint32_t key[8], uint8_t flags)
{
self->hs = hs;
memcpy(self->key, key, BLAKE3_KEY_LEN);
blake3_chunk_state_init(&self->chunk, key, flags);
self->cv_stack_len = 0;
}

void blake3_hasher_init(blake3_host_state *hs, blake3_hasher *self)
{
hasher_init_base(hs, self, IV, 0);
}

void blake3_hasher_init_keyed(blake3_host_state *hs, blake3_hasher *self, const uint8_t key[BLAKE3_KEY_LEN])
{
uint32_t key_words[8];

load_key_words(key, key_words);
hasher_init_base(hs, self, key_words, KEYED_HASH);
}

void blake3_hasher_init_derive_key_raw(blake3_host_state *hs, blake3_hasher *self, const void *context, size_t context_len)
{
uint8_t context_key[BLAKE3_KEY_LEN];
uint32_t context_key_words[8];
blake3_hasher context_hasher;

hasher_init_base(hs, &context_hasher, IV, DERIVE_KEY_CONTEXT);
blake3_hasher_update(&context_hasher, context, context_len);
blake3_hasher_finalize(&context_hasher, context_key, BLAKE3_KEY_LEN);
load_key_words(context_key, context_key_words);
hasher_init_base(hs, self, context_key_words, DERIVE_KEY_MATERIAL);
}

void blake3_hasher_init_derive_key(blake3_host_state *hs, blake3_hasher *self, const char *context)
{
blake3_hasher_init_derive_key_raw(hs, self, context, strlen(context));
}

void blake3_hasher_reset(blake3_hasher *self)
{
blake3_chunk_state_reset(&self->chunk, self->key, 0);
self->cv_stack_len = 0;
}

void blake3_hasher_finalize_seek(const blake3_hasher *self, uint64_t seek, uint8_t *out, size_t out_len)
{
blake3_output_t output;
size_t cvs_remaining;
uint8_t parent_block[BLAKE3_BLOCK_LEN];

// Explicitly checking for zero avoids causing UB by passing a null pointer
// to memcpy. This comes up in practice with things like:
// std::vector<uint8_t> v;
// blake3_hasher_finalize(&hasher, v.data(), v.size());
if (out_len == 0)
return;

// If the subtree stack is empty, then the current chunk is the root.
if (self->cv_stack_len == 0) {
output = blake3_chunk_state_output(&self->chunk);
blake3_output_root_bytes(self->hs, &output, seek, out, out_len);
return;
}

// If there are any bytes in the chunk state, finalize that chunk and do a
// roll-up merge between that chunk hash and every subtree in the stack. In
// this case, the extra merge loop at the end of blake3_hasher_update
// guarantees that none of the subtrees in the stack need to be merged with
// each other first. Otherwise, if there are no bytes in the chunk state,
// then the top of the stack is a chunk hash, and we start the merge from
// that.
if (blake3_chunk_state_len(&self->chunk) > 0) {
cvs_remaining = self->cv_stack_len;
output = blake3_chunk_state_output(&self->chunk);
} else {
// There are always at least 2 CVs in the stack in this case.
cvs_remaining = self->cv_stack_len - 2;
output = blake3_parent_output(&self->cv_stack[cvs_remaining * 32], self->key, self->chunk.flags);
}

while (cvs_remaining > 0) {
cvs_remaining--;
memcpy(parent_block, &self->cv_stack[cvs_remaining * 32], 32);
blake3_output_chaining_value(self->hs, &output, &parent_block[32]);
output = blake3_parent_output(parent_block, self->key, self->chunk.flags);
}
blake3_output_root_bytes(self->hs, &output, seek, out, out_len);
}

void blake3_hasher_finalize(const blake3_hasher *self, uint8_t *out, size_t out_len)
{
blake3_hasher_finalize_seek(self, 0, out, out_len);
}

// Hash a subtree with compress_subtree_wide(), and then condense the resulting
// list of chaining values down to a single parent node. Don't compress that
// last parent node, however. Instead, return its message bytes (the
Expand All @@ -531,7 +431,7 @@ void blake3_hasher_finalize(const blake3_hasher *self, uint8_t *out, size_t out_
//
// As with compress_subtree_wide(), this function is not used on inputs of 1
// chunk or less. That's a different codepath.
static inline void blake3_compress_subtree_to_parent_node(blake3_host_state *hs, const uint8_t *input, size_t input_len, const uint32_t key[8], uint64_t chunk_counter, uint8_t flags, uint8_t out[2 * BLAKE3_OUT_LEN])
static void blake3_compress_subtree_to_parent_node(blake3_host_state *hs, const uint8_t *input, size_t input_len, const uint32_t key[8], uint64_t chunk_counter, uint8_t flags, uint8_t out[2 * BLAKE3_OUT_LEN])
{
uint8_t *cv_array, *out_array;
size_t simd_degree_or_2, num_cvs;
Expand Down Expand Up @@ -563,6 +463,11 @@ static inline void blake3_compress_subtree_to_parent_node(blake3_host_state *hs,
memcpy(out, cv_array, 2 * BLAKE3_OUT_LEN);
}

const char *blake3_version(void)
{
return BLAKE3_VERSION_STRING;
}

void blake3_hasher_update(blake3_hasher *self, const void *input, size_t input_len)
{
const uint8_t *input_bytes = input;
Expand Down Expand Up @@ -671,3 +576,98 @@ void blake3_hasher_update(blake3_hasher *self, const void *input, size_t input_l
blake3_hasher_merge_cv_stack(self, self->chunk.chunk_counter);
}
}

static void hasher_init_base(blake3_host_state *hs, blake3_hasher *self, const uint32_t key[8], uint8_t flags)
{
self->hs = hs;
memcpy(self->key, key, BLAKE3_KEY_LEN);
blake3_chunk_state_init(&self->chunk, key, flags);
self->cv_stack_len = 0;
}

void blake3_hasher_init(blake3_host_state *hs, blake3_hasher *self)
{
hasher_init_base(hs, self, IV, 0);
}

void blake3_hasher_init_keyed(blake3_host_state *hs, blake3_hasher *self, const uint8_t key[BLAKE3_KEY_LEN])
{
uint32_t key_words[8];

load_key_words(key, key_words);
hasher_init_base(hs, self, key_words, KEYED_HASH);
}

void blake3_hasher_init_derive_key_raw(blake3_host_state *hs, blake3_hasher *self, const void *context, size_t context_len)
{
uint8_t context_key[BLAKE3_KEY_LEN];
uint32_t context_key_words[8];
blake3_hasher context_hasher;

hasher_init_base(hs, &context_hasher, IV, DERIVE_KEY_CONTEXT);
blake3_hasher_update(&context_hasher, context, context_len);
blake3_hasher_finalize(&context_hasher, context_key, BLAKE3_KEY_LEN);
load_key_words(context_key, context_key_words);
hasher_init_base(hs, self, context_key_words, DERIVE_KEY_MATERIAL);
}

void blake3_hasher_init_derive_key(blake3_host_state *hs, blake3_hasher *self, const char *context)
{
blake3_hasher_init_derive_key_raw(hs, self, context, strlen(context));
}

void blake3_hasher_reset(blake3_hasher *self)
{
blake3_chunk_state_reset(&self->chunk, self->key, 0);
self->cv_stack_len = 0;
}

void blake3_hasher_finalize_seek(const blake3_hasher *self, uint64_t seek, uint8_t *out, size_t out_len)
{
blake3_output_t output;
size_t cvs_remaining;
uint8_t parent_block[BLAKE3_BLOCK_LEN];

// Explicitly checking for zero avoids causing UB by passing a null pointer
// to memcpy. This comes up in practice with things like:
// std::vector<uint8_t> v;
// blake3_hasher_finalize(&hasher, v.data(), v.size());
if (out_len == 0)
return;

// If the subtree stack is empty, then the current chunk is the root.
if (self->cv_stack_len == 0) {
output = blake3_chunk_state_output(&self->chunk);
blake3_output_root_bytes(self->hs, &output, seek, out, out_len);
return;
}

// If there are any bytes in the chunk state, finalize that chunk and do a
// roll-up merge between that chunk hash and every subtree in the stack. In
// this case, the extra merge loop at the end of blake3_hasher_update
// guarantees that none of the subtrees in the stack need to be merged with
// each other first. Otherwise, if there are no bytes in the chunk state,
// then the top of the stack is a chunk hash, and we start the merge from
// that.
if (blake3_chunk_state_len(&self->chunk) > 0) {
cvs_remaining = self->cv_stack_len;
output = blake3_chunk_state_output(&self->chunk);
} else {
// There are always at least 2 CVs in the stack in this case.
cvs_remaining = self->cv_stack_len - 2;
output = blake3_parent_output(&self->cv_stack[cvs_remaining * 32], self->key, self->chunk.flags);
}

while (cvs_remaining > 0) {
cvs_remaining--;
memcpy(parent_block, &self->cv_stack[cvs_remaining * 32], 32);
blake3_output_chaining_value(self->hs, &output, &parent_block[32]);
output = blake3_parent_output(parent_block, self->key, self->chunk.flags);
}
blake3_output_root_bytes(self->hs, &output, seek, out, out_len);
}

void blake3_hasher_finalize(const blake3_hasher *self, uint8_t *out, size_t out_len)
{
blake3_hasher_finalize_seek(self, 0, out, out_len);
}
24 changes: 15 additions & 9 deletions src/blake3/blake3_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ typedef struct {
typedef void (*blake3_thread_fn)(struct _blake3_thread *t, void *arg);

typedef enum {
B3TS_DEAD,
B3TS_STARTUP,
B3TS_WAITING,
B3TS_SUBMITTED,
Expand All @@ -230,28 +231,33 @@ typedef enum {

struct _blake3_thread_pool;

typedef struct _blake3_thread_work {
void (*fn)(void *arg);
void *arg;
} blake3_thread_work;

typedef struct _blake3_thread {
struct _blake3_host_state *hs;
struct _blake3_thread_pool *pool;
struct _blake3_thread_pool *tp;
unsigned int id;
pthread_t tid;
pthread_mutex_t lock;
pthread_cond_t cond;
_Atomic unsigned int state;
unsigned int state;
blake3_thread_fn fn;
void *arg;
const blake3_thread_work *work;
pthread_mutex_t wait_lock;
pthread_cond_t wait_cond;
} blake3_thread;

typedef struct _blake3_thread_pool {
unsigned int num_threads;
struct _blake3_thread *threads;
pthread_mutex_t lock_thread_free; // should not be needed at the end if we use atomics
pthread_cond_t cond_thread_free;
#if defined(__GNUC__) || defined(__clang__)
_Atomic uint64_t thread_free;
#else
uint64_t thread_free;
#endif
uint64_t free;
uint64_t ready;
} blake3_thread_pool;

typedef struct _blake3_host_state {
Expand Down Expand Up @@ -296,8 +302,8 @@ int blake3_thread_wait_result(blake3_thread *t);
void blake3_thread_release(blake3_thread *t);
int blake3_thread_wait_result_and_release(blake3_thread *t);

blake3_thread *blake3_thread_pool_reserve(blake3_host_state *hs, bool wait);
void blake3_thread_pool_unreserve(blake3_host_state *hs, blake3_thread *t);
blake3_thread *blake3_host_state_thread_pool_reserve(blake3_host_state *hs, bool wait);
void blake3_host_state_thread_pool_unreserve(blake3_host_state *hs, blake3_thread *t);

int blake3_host_state_init(blake3_host_state *hs, const blake3_host_config *cfg);
void blake3_host_state_cleanup(blake3_host_state *hs);
Expand Down
Loading

0 comments on commit 6977fe5

Please sign in to comment.