Skip to content

Commit

Permalink
more than 64 threads
Browse files Browse the repository at this point in the history
Signed-off-by: Pantelis Antoniou <[email protected]>
  • Loading branch information
pantoniou committed Sep 6, 2023
1 parent b4f1d16 commit 79a903d
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 30 deletions.
59 changes: 42 additions & 17 deletions src/blake3/blake3_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ int blake3_thread_submit_work(blake3_thread *t, const blake3_thread_work *work)

/* atomically update the work */

assert(t);
assert(work);

pthread_mutex_lock(&t->lock);
exp_work = NULL;
if (!atomic_compare_exchange_strong(&t->work, &exp_work, work)) {
Expand Down Expand Up @@ -224,29 +227,36 @@ blake3_thread *blake3_thread_pool_reserve(blake3_thread_pool *tp)
{
blake3_thread *t;
unsigned int slot;
uint64_t v, exp;
uint64_t *free, v, exp;
unsigned int i;

t = NULL;
v = atomic_load(&tp->free);
while (v) {
slot = lowest_one(v);
assert(v & BIT64(slot));
exp = v; /* expecting the previous value */
v &= ~BIT64(slot); /* clear this bit */
if (atomic_compare_exchange_strong(&tp->free, &exp, v)) {
t = tp->threads + slot;
assert(slot == t->id);
return t;
for (i = 0, free = tp->freep; i < tp->free_count; i++, free++) {
v = atomic_load(free);
while (v) {
slot = lowest_one(v);
assert(v & BIT64(slot));
exp = v; /* expecting the previous value */
v &= ~BIT64(slot); /* clear this bit */
if (atomic_compare_exchange_strong(free, &exp, v)) {
slot += i * 64;
t = tp->threads + slot;
assert(slot == t->id);
return t;
}
v = exp;
}
v = exp;
}

return NULL;
}

void blake3_thread_pool_unreserve(blake3_thread_pool *tp, blake3_thread *t)
{
atomic_fetch_or(&tp->free, BIT64(t->id));
uint64_t *free;

free = tp->freep + (unsigned int)(t->id / 64);
atomic_fetch_or(free, BIT64(t->id & 63));
}

void blake3_thread_pool_cleanup(blake3_thread_pool *tp)
Expand All @@ -267,6 +277,9 @@ void blake3_thread_pool_cleanup(blake3_thread_pool *tp)
free(tp->threads);
}

if (tp->freep)
free(tp->freep);

memset(tp, 0, sizeof(*tp));
}

Expand All @@ -279,19 +292,31 @@ int blake3_thread_pool_init(blake3_thread_pool *tp, unsigned int num_threads)
(void)rc;

assert(tp);


if (!num_threads)
return -1;

memset(tp, 0, sizeof(*tp));

tp->num_threads = num_threads >= 64 ? 64 : num_threads;
tp->num_threads = num_threads;

tp->free_count = (tp->num_threads / 64) + ((tp->num_threads & 63) ? 1 : 0);

tp->freep = malloc(tp->free_count * sizeof(uint64_t));
if (!tp->freep)
goto err_out;

for (i = 0; i < tp->free_count; i++)
tp->freep[i] = (uint64_t)-1;
if (tp->num_threads & 63)
tp->freep[tp->free_count - 1] = BIT64(tp->num_threads & 63) - 1;

tp->threads = malloc(sizeof(*tp->threads) * tp->num_threads);
if (!tp->threads)
goto err_out;

memset(tp->threads, 0, sizeof(*tp->threads) * tp->num_threads);

tp->free = BIT64(tp->num_threads & 63) - 1;

for (i = 0, t = tp->threads; i < tp->num_threads; i++, t++) {

t->tp = tp;
Expand Down
4 changes: 3 additions & 1 deletion src/blake3/blake3_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ typedef struct _blake3_thread_work {
void *arg;
} blake3_thread_work;

//#define BLAKE3_THREAD_PORTABLE
#define BLAKE3_THREAD_PORTABLE
typedef struct _blake3_thread {
struct _blake3_thread_pool *tp;
unsigned int id;
Expand All @@ -41,7 +41,9 @@ typedef struct _blake3_thread {
typedef struct _blake3_thread_pool {
unsigned int num_threads;
struct _blake3_thread *threads;
uint64_t *freep;
uint64_t free;
unsigned int free_count;
} blake3_thread_pool;

blake3_thread_pool *blake3_thread_pool_create(unsigned int num_threads);
Expand Down
30 changes: 18 additions & 12 deletions src/internal/fy-b3sum.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ static void test_worker_thread_fn(void *arg)
}
}

void test_worker_threads(void)
void test_worker_threads(unsigned int num_threads)
{
blake3_thread_pool *tp;
blake3_thread **threads, *t;
Expand All @@ -55,9 +55,12 @@ void test_worker_threads(void)

(void)rc;

scval = sysconf(_SC_NPROCESSORS_ONLN);
assert(scval > 0);
num_cpus = (unsigned int)scval;
if (num_threads == 0) {
scval = sysconf(_SC_NPROCESSORS_ONLN);
assert(scval > 0);
num_cpus = (unsigned int)scval;
} else
num_cpus = num_threads;

fprintf(stderr, "calling: blake3_thread_pool_init()\n");
tp = blake3_thread_pool_create(num_cpus);
Expand Down Expand Up @@ -106,7 +109,7 @@ void test_worker_threads(void)
blake3_thread_pool_destroy(tp);
}

void test_thread_join(void)
void test_thread_join(unsigned int num_threads)
{
blake3_thread_pool *tp;
void **args;
Expand All @@ -116,9 +119,12 @@ void test_thread_join(void)

(void)rc;

scval = sysconf(_SC_NPROCESSORS_ONLN);
assert(scval > 0);
num_cpus = (unsigned int)scval;
if (num_threads == 0) {
scval = sysconf(_SC_NPROCESSORS_ONLN);
assert(scval > 0);
num_cpus = (unsigned int)scval;
} else
num_cpus = num_threads;

fprintf(stderr, "calling: blake3_thread_pool_create()\n");
tp = blake3_thread_pool_create(num_cpus);
Expand Down Expand Up @@ -229,8 +235,8 @@ int main(int argc, char *argv[])
break;
case OPT_NUM_THREADS:
num_threads = atoi(optarg);
if (num_threads < 0 || num_threads > 64) {
fprintf(stderr, "bad num_threads=%d (must be >= 0 and < 64)\n", num_threads);
if (num_threads < 0) {
fprintf(stderr, "bad num_threads=%d (must be >= 0)\n", num_threads);
display_usage(opt == 'h' ? stdout : stderr, argv[0]);
return EXIT_FAILURE;
}
Expand Down Expand Up @@ -275,8 +281,8 @@ int main(int argc, char *argv[])
}

if (do_test) {
test_worker_threads();
test_thread_join();
test_worker_threads(num_threads);
test_thread_join(num_threads);
return EXIT_SUCCESS;
}

Expand Down

0 comments on commit 79a903d

Please sign in to comment.