Skip to content

Commit

Permalink
simulate more realistic usage + add coverage markers
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Nov 14, 2024
1 parent 02d5033 commit b13ac0d
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 73 deletions.
151 changes: 78 additions & 73 deletions src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ SEXP rnng_messenger_thread_create(SEXP args) {

// threaded functions ----------------------------------------------------------

// # nocov start
// tested interactively

static void thread_aio_finalizer(SEXP xptr) {

if (NANO_PTR(xptr) == NULL) return;
Expand All @@ -209,6 +212,81 @@ static void thread_aio_finalizer(SEXP xptr) {

}

static void rnng_wait_thread_single(void *args) {

nano_thread_aio *taio = (nano_thread_aio *) args;
nano_cv *ncv = taio->cv;
nng_mtx *mtx = ncv->mtx;
nng_cv *cv = ncv->cv;

nng_aio_wait(taio->aio);

nng_mtx_lock(mtx);
ncv->condition = 1;
nng_cv_wake(cv);
nng_mtx_unlock(mtx);

}

void single_wait_thread_create(SEXP x) {

nano_aio *aiop = (nano_aio *) NANO_PTR(x);
nano_thread_aio *taio = R_Calloc(1, nano_thread_aio);
nano_cv *ncv = R_Calloc(1, nano_cv);
taio->aio = aiop->aio;
taio->cv = ncv;
nng_mtx *mtx;
nng_cv *cv;
int xc, signalled;

if ((xc = nng_mtx_alloc(&mtx)))
goto exitlevel1;

if ((xc = nng_cv_alloc(&cv, mtx)))
goto exitlevel2;

ncv->mtx = mtx;
ncv->cv = cv;

if ((xc = nng_thread_create(&taio->thr, rnng_wait_thread_single, taio)))
goto exitlevel3;

SEXP xptr;
PROTECT(xptr = R_MakeExternalPtr(taio, R_NilValue, R_NilValue));
R_RegisterCFinalizerEx(xptr, thread_aio_finalizer, TRUE);
R_MakeWeakRef(x, xptr, R_NilValue, TRUE);
UNPROTECT(1);

nng_time time = nng_clock();

while (1) {
time = time + 400;
signalled = 1;
nng_mtx_lock(mtx);
while (ncv->condition == 0) {
if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) {
signalled = 0;
break;
}
}
nng_mtx_unlock(mtx);
if (signalled) break;
R_CheckUserInterrupt();
}

return;

exitlevel3:
nng_cv_free(cv);
exitlevel2:
nng_mtx_free(mtx);
exitlevel1:
ERROR_OUT(xc);

}

// # nocov end

static void thread_duo_finalizer(SEXP xptr) {

if (NANO_PTR(xptr) == NULL) return;
Expand Down Expand Up @@ -286,79 +364,6 @@ static void rnng_wait_thread(void *args) {

}

static void rnng_wait_thread_single(void *args) {

nano_thread_aio *taio = (nano_thread_aio *) args;
nano_cv *ncv = taio->cv;
nng_mtx *mtx = ncv->mtx;
nng_cv *cv = ncv->cv;

nng_aio_wait(taio->aio);

nng_mtx_lock(mtx);
ncv->condition = 1;
nng_cv_wake(cv);
nng_mtx_unlock(mtx);

}

void single_wait_thread_create(SEXP x) {

nano_aio *aiop = (nano_aio *) NANO_PTR(x);
nano_thread_aio *taio = R_Calloc(1, nano_thread_aio);
nano_cv *ncv = R_Calloc(1, nano_cv);
taio->aio = aiop->aio;
taio->cv = ncv;
nng_mtx *mtx;
nng_cv *cv;
int xc, signalled;

if ((xc = nng_mtx_alloc(&mtx)))
goto exitlevel1;

if ((xc = nng_cv_alloc(&cv, mtx)))
goto exitlevel2;

ncv->mtx = mtx;
ncv->cv = cv;

if ((xc = nng_thread_create(&taio->thr, rnng_wait_thread_single, taio)))
goto exitlevel3;

SEXP xptr;
PROTECT(xptr = R_MakeExternalPtr(taio, R_NilValue, R_NilValue));
R_RegisterCFinalizerEx(xptr, thread_aio_finalizer, TRUE);
R_MakeWeakRef(x, xptr, R_NilValue, TRUE);
UNPROTECT(1);

nng_time time = nng_clock();

while (1) {
time = time + 400;
signalled = 1;
nng_mtx_lock(mtx);
while (ncv->condition == 0) {
if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) {
signalled = 0;
break;
}
}
nng_mtx_unlock(mtx);
if (signalled) break;
R_CheckUserInterrupt();
}

return;

exitlevel3:
nng_cv_free(cv);
exitlevel2:
nng_mtx_free(mtx);
exitlevel1:
ERROR_OUT(xc);

}

SEXP rnng_wait_thread_create(SEXP x) {

const SEXPTYPE typ = TYPEOF(x);
Expand Down
1 change: 1 addition & 0 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ test_true(recv(disp, block = 500L))
test_zero(send(disp, NULL, block = 500L))
test_null(recv(s, block = 500L))
test_zero(reap(s))
test_null(msleep(100L))
test_class("nanoSocket", s <- socket(protocol = "rep"))
test_zero(pipe_notify(s, cv, add = TRUE))
test_zero(dial(s, url = "inproc://disp/1"))
Expand Down

0 comments on commit b13ac0d

Please sign in to comment.