Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

qs2 proof of concept #57

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
5 changes: 4 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 1.3.0.9004
Version: 1.3.0.9005
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library implementing 'Scalability Protocols', a reliable,
high-performance standard for common communications patterns including
Expand Down Expand Up @@ -30,6 +30,8 @@ SystemRequirements: 'libnng' >= 1.6 and 'libmbedtls' >= 2.5, or 'cmake' and 'xz'
to compile NNG and/or Mbed TLS included in package sources
Depends:
R (>= 3.6)
Imports:
qs2
Enhances:
promises
Suggests:
Expand All @@ -38,3 +40,4 @@ Suggests:
VignetteBuilder: litedown
RoxygenNote: 7.3.2
Config/build/compilation-database: true
Remotes: qsbase/qs2
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,5 @@ export(until_)
export(wait)
export(wait_)
export(write_cert)
importFrom(qs2,qs_serialize)
useDynLib(nanonext, .registration = TRUE)
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# nanonext 1.3.0.9004 (development)
# nanonext 1.3.0.9005 (development)

#### Updates

* Performs interruptible 'aio' waits from a single dedicated thread, only launching new threads if required, for higher performance and efficiency.
* Performance enhancements for 'ncurlAio' and 'recvAio' promises methods.
* Updates bundled 'libnng' to v1.9.0 stable release.
* The package has a shiny new hex logo.
Expand Down
1 change: 1 addition & 0 deletions R/nanonext-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,6 @@
#' (\href{https://orcid.org/0000-0002-0750-061X}{ORCID})
#'
#' @useDynLib nanonext, .registration = TRUE
#' @importFrom qs2 qs_serialize
#'
"_PACKAGE"
12 changes: 12 additions & 0 deletions R/sendrecv.R
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ send <- function(con, data, mode = c("serial", "raw"), block = NULL)
#' close(req)
#' close(rep)
#'
#' # using qs2 format (specify mode = 0L)
#'
#' s <- socket(listen = "inproc://qs2")
#' s1 <- socket(dial = "inproc://qs2")
#' q <- list(a = 1, b = "test", c = data.frame())
#' send(s, q, mode = 0L)
#' r <- recv(s1, mode = 0L)
#' identical(q, r)
#'
#' close(s)
#' close(s1)
#'
#' @export
#'
recv <- function(con,
Expand Down
1 change: 1 addition & 0 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ serial_config <- function(class, sfunc, ufunc, vec = FALSE)
#' if (Sys.info()[["sysname"]] == "Linux") {
#' rm(list = ls())
#' gc()
#' .Call(nanonext:::rnng_thread_shutdown)
#' Sys.sleep(1L)
#' .Call(nanonext:::rnng_fini)
#' }
Expand Down
12 changes: 12 additions & 0 deletions man/recv.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions man/zzz.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions src/comms.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,22 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) {

const int flags = block == R_NilValue ? NNG_DURATION_DEFAULT : TYPEOF(block) == LGLSXP ? 0 : nano_integer(block);
nano_buf buf;
int xc;
int xc, mod;

const SEXP ptrtag = NANO_TAG(con);
if (ptrtag == nano_SocketSymbol) {

nano_encodes(mode) == 2 ? nano_encode(&buf, data) : nano_serialize(&buf, data, NANO_PROT(con));
mod = nano_encodes(mode);
switch (mod) {
case 2:
nano_encode(&buf, data);
break;
case 0:
buf.buf = qs2_serialize(data, &buf.cur, 1, 0, 1);
break;
default:
nano_serialize(&buf, data, NANO_PROT(con));
}
nng_socket *sock = (nng_socket *) NANO_PTR(con);

if (flags <= 0) {
Expand Down Expand Up @@ -347,6 +357,10 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) {

}


if (mod == 0)
free(buf.buf);

} else if (ptrtag == nano_ContextSymbol) {

nano_encodes(mode) == 2 ? nano_encode(&buf, data) : nano_serialize(&buf, data, NANO_PROT(con));
Expand Down
5 changes: 4 additions & 1 deletion src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,12 @@ SEXP nano_decode(unsigned char *buf, const size_t sz, const uint8_t mod, SEXP ho
case 9:
data = rawToChar(buf, sz);
return data;
case 0:
data = qs2_deserialize(buf, sz, 0, 1);
return data;
default:
data = nano_unserialize(buf, sz, hook);
return data;
return data;
}

memcpy(NANO_DATAPTR(data), buf, sz);
Expand Down
16 changes: 15 additions & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,18 @@

void (*eln2)(void (*)(void *), void *, double, int) = NULL;

unsigned char *(*qs2_serialize)(SEXP, size_t *, const int, const bool, const int);
SEXP (*qs2_deserialize)(const unsigned char *, const size_t, const bool, const int);

uint8_t special_bit = 0;

extern int nano_wait_thread_created;
extern nng_thread *nano_wait_thr;
extern nng_aio *nano_shared_aio;
extern nng_mtx *nano_wait_mtx;
extern nng_cv *nano_wait_cv;
extern int nano_wait_condition;

SEXP nano_AioSymbol;
SEXP nano_ContextSymbol;
SEXP nano_CvSymbol;
Expand Down Expand Up @@ -184,8 +194,9 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_stream_listen", (DL_FUNC) &rnng_stream_listen, 3},
{"rnng_strerror", (DL_FUNC) &rnng_strerror, 1},
{"rnng_subscribe", (DL_FUNC) &rnng_subscribe, 3},
{"rnng_traverse_precious", (DL_FUNC) &rnng_traverse_precious, 0},
{"rnng_thread_shutdown", (DL_FUNC) &rnng_thread_shutdown, 0},
{"rnng_tls_config", (DL_FUNC) &rnng_tls_config, 4},
{"rnng_traverse_precious", (DL_FUNC) &rnng_traverse_precious, 0},
{"rnng_unresolved", (DL_FUNC) &rnng_unresolved, 1},
{"rnng_unresolved2", (DL_FUNC) &rnng_unresolved2, 1},
{"rnng_url_parse", (DL_FUNC) &rnng_url_parse, 1},
Expand All @@ -203,13 +214,16 @@ static const R_ExternalMethodDef externalMethods[] = {
void attribute_visible R_init_nanonext(DllInfo* dll) {
RegisterSymbols();
PreserveObjects();
qs2_serialize = (unsigned char *(*)(SEXP, size_t *, const int, const bool, const int)) R_GetCCallable("qs2", "c_qs_serialize");
qs2_deserialize = (SEXP (*)(const unsigned char *, const size_t, const bool, const int)) R_GetCCallable("qs2", "c_qs_deserialize");
R_registerRoutines(dll, NULL, callMethods, NULL, externalMethods);
R_useDynamicSymbols(dll, FALSE);
R_forceSymbols(dll, TRUE);
}

// # nocov start
void attribute_visible R_unload_nanonext(DllInfo *info) {
rnng_thread_shutdown();
ReleaseObjects();
}
// # nocov end
7 changes: 6 additions & 1 deletion src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ typedef struct nano_buf_s {
} nano_buf;

extern void (*eln2)(void (*)(void *), void *, double, int);

extern unsigned char *(*qs2_serialize)(SEXP, size_t *, const int, const bool, const int);
extern SEXP (*qs2_deserialize)(const unsigned char *, const size_t, const bool, const int);

extern uint8_t special_bit;

extern SEXP nano_AioSymbol;
Expand Down Expand Up @@ -364,8 +368,9 @@ SEXP rnng_stream_dial(SEXP, SEXP, SEXP);
SEXP rnng_stream_listen(SEXP, SEXP, SEXP);
SEXP rnng_strerror(SEXP);
SEXP rnng_subscribe(SEXP, SEXP, SEXP);
SEXP rnng_traverse_precious(void);
SEXP rnng_thread_shutdown(void);
SEXP rnng_tls_config(SEXP, SEXP, SEXP, SEXP);
SEXP rnng_traverse_precious(void);
SEXP rnng_unresolved(SEXP);
SEXP rnng_unresolved2(SEXP);
SEXP rnng_url_parse(SEXP);
Expand Down
Loading