Skip to content

Commit

Permalink
fix some paths
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Nov 10, 2024
1 parent c456e41 commit bdc2bbb
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static void nano_write_bytes(R_outpstream_t stream, void *src, int len) {
size_t req = buf->cur + (size_t) len;
if (req > buf->len) {
if (req > R_XLEN_T_MAX) {
if (buf->len) R_Free(buf->buf);
R_Free(buf->buf);
Rf_error("serialization exceeds max length of raw vector");
}
do {
Expand Down
9 changes: 4 additions & 5 deletions src/sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,15 +415,15 @@ SEXP rnng_request(SEXP con, SEXP data, SEXP sendmode, SEXP recvmode, SEXP timeou
nng_ctx *ctx = (nng_ctx *) NANO_PTR(con);
nano_cv *ncv = signal ? (nano_cv *) NANO_PTR(cvar) : NULL;

SEXP aio, env, fun;
nano_buf buf;

nano_encodes(sendmode, &buf, data, NANO_PROT(con));

int xc;
nano_saio *saio = R_Calloc(1, nano_saio);
nano_aio *raio = R_Calloc(1, nano_aio);

nng_msg *msg = NULL;
SEXP aio, env, fun;
int xc;

if ((xc = nng_msg_alloc(&msg, 0)) ||
(xc = nng_msg_append(msg, buf.buf, buf.cur)) ||
Expand All @@ -433,7 +433,6 @@ SEXP rnng_request(SEXP con, SEXP data, SEXP sendmode, SEXP recvmode, SEXP timeou
nng_aio_set_msg(saio->aio, msg);
nng_ctx_send(*ctx, saio->aio);

nano_aio *raio = R_Calloc(1, nano_aio);
raio->type = signal ? REQAIOS : REQAIO;
raio->mode = mod;
raio->cb = saio;
Expand All @@ -460,9 +459,9 @@ SEXP rnng_request(SEXP con, SEXP data, SEXP sendmode, SEXP recvmode, SEXP timeou
return env;

fail:
R_Free(raio);
if (saio->aio) nng_aio_free(saio->aio);
if (msg) nng_msg_free(msg);
R_Free(raio);
R_Free(saio);
NANO_FREE(buf);
return mk_error_data(xc);
Expand Down
39 changes: 21 additions & 18 deletions src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,9 @@ static void rnng_dispatch_thread(void *args) {
0x72, 0x6f, 0x72, 0xfe, 0x00, 0x00, 0x00
};

if (nng_rep0_open(&hsock))
goto fail;

for (R_xlen_t i = 0; i < n; i++) {

signal[i].cv = ncv;
Expand All @@ -620,17 +623,17 @@ static void rnng_dispatch_thread(void *args) {
nng_socket_set_ms(sock[i], "req:resend-time", 0) ||
nng_pipe_notify(sock[i], NNG_PIPE_EV_ADD_POST, nano_record_pipe, &signal[i]) ||
nng_pipe_notify(sock[i], NNG_PIPE_EV_REM_POST, nano_record_pipe, &signal[i]))
goto fail;
goto fail2;

if (disp->tls != NULL) {
if (nng_listener_create(&list[i], sock[i], url[i]) ||
nng_tls_config_server_name(disp->tls, disp->up->u_hostname) ||
nng_listener_set_ptr(list[i], NNG_OPT_TLS_CONFIG, disp->tls) ||
nng_listener_start(list[i], 0))
goto fail;
goto fail2;
} else {
if (nng_listen(sock[i], url[i], &list[i], 0))
goto fail;
goto fail2;
}

raio[i]->next = ncv;
Expand All @@ -640,28 +643,27 @@ static void rnng_dispatch_thread(void *args) {
if (nng_aio_alloc(&saio[i]->aio, sendaio_complete, saio[i]) ||
nng_aio_alloc(&raio[i]->aio, raio_complete_signal, raio[i]) ||
nng_aio_alloc(&haio[i]->aio, raio_complete_signal, haio[i]))
goto fail;
goto fail2;
}

if (nng_rep0_open(&hsock) ||
nng_dial(hsock, disp->host, &hdial, 0))
goto fail;
if (nng_dial(hsock, disp->host, &hdial, 0))
goto fail2;

for (R_xlen_t i = 0; i < n; i++) {
nng_mtx_lock(mtx);
while (ncv->condition == 0)
nng_cv_wait(cv);
if (ncv->condition < 0) {
nng_mtx_unlock(mtx);
goto fail;
goto fail2;
}
ncv->condition--;
nng_mtx_unlock(mtx);
}

for (R_xlen_t i = 0; i < n; i++) {
if (nng_ctx_open(&rctx[i], hsock))
goto fail;
goto fail2;
nng_ctx_recv(rctx[i], haio[i]->aio);
}

Expand All @@ -672,7 +674,7 @@ static void rnng_dispatch_thread(void *args) {
nng_cv_wait(cv);
if (ncv->condition < 0) {
nng_mtx_unlock(mtx);
goto fail;
goto fail2;
}
ncv->condition--;
memcpy(active, online, n * sizeof(int));
Expand All @@ -681,7 +683,7 @@ static void rnng_dispatch_thread(void *args) {
for (R_xlen_t i = 0; i < n; i++) {
if (active[i] > store[i]) {
if (nng_ctx_open(&rctx[i], hsock))
goto fail;
goto fail2;
nng_ctx_recv(rctx[i], haio[i]->aio);
}
}
Expand All @@ -699,7 +701,7 @@ static void rnng_dispatch_thread(void *args) {
buf = nng_msg_body((nng_msg *) raio[i]->data);
if (buf[3] == 0x1) {
if (nng_msg_alloc(&msg, 0))
goto fail;
goto fail2;
if (nng_ctx_sendmsg(ctx[i], msg, 0))
nng_msg_free(msg);
end = 1;
Expand All @@ -710,7 +712,7 @@ static void rnng_dispatch_thread(void *args) {
if (nng_msg_alloc(&msg, 0) ||
(xc == 19 ? nng_msg_append(msg, errnt, sizeof(errnt)) :
nng_msg_append(msg, &xc, sizeof(int))))
goto fail;
goto fail2;
if (nng_ctx_sendmsg(rctx[i], msg, 0))
nng_msg_free(msg);
end = 1;
Expand All @@ -722,7 +724,7 @@ static void rnng_dispatch_thread(void *args) {
end = 0;
} else {
if (nng_ctx_open(&rctx[i], hsock))
goto fail;
goto fail2;
nng_ctx_recv(rctx[i], haio[i]->aio);
}
break;
Expand All @@ -738,13 +740,13 @@ static void rnng_dispatch_thread(void *args) {
if (xc < 0) {
busy[i] = 1;
if (nng_ctx_open(&ctx[i], sock[i]))
goto fail;
goto fail2;
nng_aio_set_msg(saio[i]->aio, (nng_msg *) haio[i]->data);
nng_ctx_send(ctx[i], saio[i]->aio);
nng_ctx_recv(ctx[i], raio[i]->aio);
} else {
// exit if reaches here
goto fail;
goto fail2;
}
break;
}
Expand All @@ -754,10 +756,11 @@ static void rnng_dispatch_thread(void *args) {

}

fail:
nng_close(hsock);
fail2:
for (R_xlen_t i = 0; i < n; i++)
nng_close(sock[i]);
fail:
nng_close(hsock);

}

Expand Down

0 comments on commit bdc2bbb

Please sign in to comment.