From 20693e3623260ca4ca35eaf79b3e0b2827453afa Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Wed, 26 Jul 2023 12:36:41 +0200 Subject: [PATCH 1/4] first commit refactoring library to use a background thread --- examples/cbindings/base64.c | 58 ++++ examples/cbindings/base64.h | 11 + examples/cbindings/waku_example.c | 24 +- library/libwaku.h | 10 +- library/libwaku.nim | 325 +++++++----------- library/node/protocols/relay.nim | 0 .../working_thread/inter_thread_msg_types.nim | 15 + library/working_thread/request_manager.nim | 12 + library/working_thread/waku_node_thread.nim | 233 +++++++++++++ 9 files changed, 475 insertions(+), 213 deletions(-) create mode 100644 examples/cbindings/base64.c create mode 100644 examples/cbindings/base64.h create mode 100644 library/node/protocols/relay.nim create mode 100644 library/working_thread/inter_thread_msg_types.nim create mode 100644 library/working_thread/request_manager.nim create mode 100644 library/working_thread/waku_node_thread.nim diff --git a/examples/cbindings/base64.c b/examples/cbindings/base64.c new file mode 100644 index 0000000000..0f9acdf942 --- /dev/null +++ b/examples/cbindings/base64.c @@ -0,0 +1,58 @@ + +#include "base64.h" + +// Base64 encoding +// source: https://nachtimwald.com/2017/11/18/base64-encode-and-decode-in-c/ +size_t b64_encoded_size(size_t inlen) +{ + size_t ret; + + ret = inlen; + if (inlen % 3 != 0) + ret += 3 - (inlen % 3); + ret /= 3; + ret *= 4; + + return ret; +} + +const char b64chars[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + +char *b64_encode(const unsigned char *in, size_t len) +{ + char *out; + size_t elen; + size_t i; + size_t j; + size_t v; + + if (in == NULL || len == 0) + return NULL; + + elen = b64_encoded_size(len); + out = malloc(elen+1); + out[elen] = '\0'; + + for (i=0, j=0; i> 18) & 0x3F]; + out[j+1] = b64chars[(v >> 12) & 0x3F]; + if (i+1 < len) { + out[j+2] = b64chars[(v >> 6) & 0x3F]; + } else { + out[j+2] = '='; + } + if (i+2 < len) { + out[j+3] = b64chars[v & 0x3F]; + } else { + out[j+3] = '='; + } + } + + return out; +} + +// End of Base64 encoding diff --git a/examples/cbindings/base64.h b/examples/cbindings/base64.h new file mode 100644 index 0000000000..37ca50f914 --- /dev/null +++ b/examples/cbindings/base64.h @@ -0,0 +1,11 @@ + +#ifndef _BASE64_H_ +#define _BASE64_H_ + +#include + +size_t b64_encoded_size(size_t inlen); + +char *b64_encode(const unsigned char *in, size_t len); + +#endif diff --git a/examples/cbindings/waku_example.c b/examples/cbindings/waku_example.c index e2dd834bda..4040dac5de 100644 --- a/examples/cbindings/waku_example.c +++ b/examples/cbindings/waku_example.c @@ -79,7 +79,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { static struct argp argp = { options, parse_opt, args_doc, doc, 0, 0, 0 }; char* contentTopic = NULL; -void handle_content_topic(char* msg, size_t len) { +void handle_content_topic(const char* msg, size_t len) { if (contentTopic != NULL) { free(contentTopic); } @@ -89,7 +89,7 @@ void handle_content_topic(char* msg, size_t len) { } char* publishResponse = NULL; -void handle_publish_ok(char* msg, size_t len) { +void handle_publish_ok(const char* msg, size_t len) { printf("Publish Ok: %s %lu\n", msg, len); if (publishResponse != NULL) { @@ -100,14 +100,14 @@ void handle_publish_ok(char* msg, size_t len) { strcpy(publishResponse, msg); } -void handle_error(char* msg, size_t len) { +void handle_error(const char* msg, size_t len) { printf("Error: %s\n", msg); exit(1); } #define MAX_MSG_SIZE 65535 -void publish_message(char* pubsubTopic, char* msg) { +void publish_message(char* pubsubTopic, const char* msg) { char jsonWakuMsg[MAX_MSG_SIZE]; char *msgPayload = b64_encode(msg, strlen(msg)); @@ -138,15 +138,15 @@ void show_help_and_exit() { exit(1); } -void event_handler(char* msg, size_t len) { +void event_handler(const char* msg, size_t len) { printf("Receiving message %s\n", msg); } -void print_default_pubsub_topic(char* msg, size_t len) { +void print_default_pubsub_topic(const char* msg, size_t len) { printf("Default pubsub topic: %s\n", msg); } -void print_waku_version(char* msg, size_t len) { +void print_waku_version(const char* msg, size_t len) { printf("Git Version: %s\n", msg); } @@ -243,8 +243,6 @@ void handle_user_input() { int main(int argc, char** argv) { - waku_init_lib(); - struct ConfigNode cfgNode; // default values snprintf(cfgNode.host, 128, "0.0.0.0"); @@ -272,15 +270,17 @@ int main(int argc, char** argv) { WAKU_CALL( waku_default_pubsub_topic(print_default_pubsub_topic) ); WAKU_CALL( waku_version(print_waku_version) ); + printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port); printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO"); WAKU_CALL( waku_new(jsonConfig, handle_error) ); - waku_set_relay_callback(event_handler); + + waku_set_event_callback(event_handler); waku_start(); - printf("Establishing connection with: %s\n", cfgNode.peers); + // printf("Establishing connection with: %s\n", cfgNode.peers); WAKU_CALL( waku_connect(cfgNode.peers, 10000 /* timeoutMs */, @@ -291,6 +291,6 @@ int main(int argc, char** argv) { show_main_menu(); while(1) { handle_user_input(); - waku_poll(); + // waku_poll(); } } diff --git a/library/libwaku.h b/library/libwaku.h index 982d0b4f88..ed760b91dd 100644 --- a/library/libwaku.h +++ b/library/libwaku.h @@ -11,11 +11,7 @@ #define RET_ERR 1 #define RET_MISSING_CALLBACK 2 -typedef void (*WakuCallBack) (char* msg, size_t len_0); - -// This should only be called once. -// It initializes the nim runtime and GC. -void waku_init_lib(void); +typedef void (*WakuCallBack) (const char* msg, size_t len_0); // Creates a new instance of the waku node. // Sets up the waku node from the given configuration. @@ -27,7 +23,7 @@ void waku_stop(void); int waku_version(WakuCallBack onOkCb); -void waku_set_relay_callback(WakuCallBack callback); +void waku_set_event_callback(WakuCallBack callback); int waku_content_topic(const char* appName, unsigned int appVersion, @@ -56,6 +52,4 @@ int waku_connect(const char* peerMultiAddr, unsigned int timeoutMs, WakuCallBack onErrCb); -void waku_poll(void); - #endif /* __libwaku__ */ diff --git a/library/libwaku.nim b/library/libwaku.nim index cc5d80691e..ed44e85b5b 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -1,4 +1,8 @@ +{.pragma: exported, exportc, cdecl, raises: [].} +{.pragma: callback, cdecl, raises: [], gcsafe.} +{.passc: "-fPIC".} + import std/[json,sequtils,times,strformat,options,atomics,strutils], strutils, @@ -18,9 +22,10 @@ import ../../waku/v2/node/waku_node, ../../waku/v2/node/builder, ../../waku/v2/node/config, - ../../waku/v2/waku_relay/protocol, + # ../../waku/v2/waku_relay/protocol, ./events/[json_error_event,json_message_event,json_base_event], - ./config + ./working_thread/waku_node_thread, + ./memory ################################################################################ ### Wrapper around the waku node @@ -39,39 +44,31 @@ const RET_MISSING_CALLBACK: cint = 2 ################################################################################ ### Not-exported components -proc alloc(str: cstring): cstring = - # Byte allocation from the given address. - # There should be the corresponding manual deallocation with deallocShared ! - let ret = cast[cstring](allocShared(len(str) + 1)) - copyMem(ret, str, len(str) + 1) - return ret - -type - WakuCallBack = proc(msg: ptr cchar, len: csize_t) {.cdecl, gcsafe.} - -# May keep a reference to a callback defined externally -var extRelayEventCallback: WakuCallBack = nil - -proc relayEventCallback(pubsubTopic: string, - msg: WakuMessage): - Future[void] {.gcsafe, raises: [Defect].} = - # Callback that hadles the Waku Relay events. i.e. messages or errors. - if not isNil(extRelayEventCallback): - try: - let event = $JsonMessageEvent.new(pubsubTopic, msg) - extRelayEventCallback(unsafeAddr event[0], cast[csize_t](len(event))) - except Exception,CatchableError: - error "Exception when calling 'eventCallBack': " & - getCurrentExceptionMsg() - else: - error "extRelayEventCallback is nil" - - var retFut = newFuture[void]() - retFut.complete() - return retFut - -# WakuNode instance -var node {.threadvar.}: WakuNode +# proc relayEventCallback(pubsubTopic: string, +# msg: WakuMessage): +# Future[void] {.gcsafe, raises: [Defect].} + +# proc startNodeAndWaitForever(ctx: ptr Context) {.thread.} = + +# waitFor ctx.node.mountRelay() +# ctx.node.peerManager.start() + +# waitFor ctx.node.start() + +# ctx.node.wakuRelay.subscribe(PubsubTopic("/waku/2/default-waku/proto"), +# WakuRelayHandler(relayEventCallback)) + +# let address = "/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN" +# let peers = ($address).split(",").mapIt(strip(it)) + +# # TODO: the timeoutMs is not being used at all! +# let connectFut = ctx.node.connectToNodes(peers, source="static") +# while not connectFut.finished(): +# poll() + +# while true: +# # echo "JJJJ ", $(ctx[].node) +# poll() ### End of not-exported components ################################################################################ @@ -79,21 +76,6 @@ var node {.threadvar.}: WakuNode ################################################################################ ### Exported procs -# Every Nim library must have this function called - the name is derived from -# the `--nimMainPrefix` command line option -proc NimMain() {.importc.} - -var initialized: Atomic[bool] - -proc waku_init_lib() {.dynlib, exportc, cdecl.} = - if not initialized.exchange(true): - NimMain() # Every Nim library needs to call `NimMain` once exactly - when declared(setupForeignThreadGc): setupForeignThreadGc() - when declared(nimGC_setStackBottom): - var locals {.volatile, noinit.}: pointer - locals = addr(locals) - nimGC_setStackBottom(locals) - proc waku_new(configJson: cstring, onErrCb: WakuCallback): cint {.dynlib, exportc, cdecl.} = @@ -103,78 +85,18 @@ proc waku_new(configJson: cstring, if isNil(onErrCb): return RET_MISSING_CALLBACK - var privateKey: PrivateKey - var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"), - Port(60000'u16)).value - var relay: bool - var topics = @[""] - var jsonResp: JsonEvent - - let cj = configJson.alloc() - - if not parseConfig($cj, - privateKey, - netConfig, - relay, - topics, - jsonResp): - deallocShared(cj) - let resp = $jsonResp - onErrCb(unsafeAddr resp[0], cast[csize_t](len(resp))) - return RET_ERR - - deallocShared(cj) - - var enrBuilder = EnrBuilder.init(privateKey) - - enrBuilder.withIpAddressAndPorts( - netConfig.enrIp, - netConfig.enrPort, - netConfig.discv5UdpPort - ) - - if netConfig.wakuFlags.isSome(): - enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get()) - - enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs) - - let addShardedTopics = enrBuilder.withShardedTopics(topics) - if addShardedTopics.isErr(): - let resp = $addShardedTopics.error - onErrCb(unsafeAddr resp[0], cast[csize_t](len(resp))) + let startThRes = waku_node_thread.startThread(configJson) + if startThRes.isErr(): + let msg = "Error in startThread: " & $startThRes.error + onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR - let recordRes = enrBuilder.build() - let record = - if recordRes.isErr(): - let resp = $recordRes.error - onErrCb(unsafeAddr resp[0], cast[csize_t](len(resp))) - return RET_ERR - else: recordRes.get() - - var builder = WakuNodeBuilder.init() - builder.withRng(crypto.newRng()) - builder.withNodeKey(privateKey) - builder.withRecord(record) - builder.withNetworkConfiguration(netConfig) - builder.withSwitchConfiguration( - maxConnections = some(50.int) - ) - - let wakuNodeRes = builder.build() - if wakuNodeRes.isErr(): - let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error - let jsonErrEvent = $JsonErrorEvent.new(errorMsg) - - onErrCb(unsafeAddr jsonErrEvent[0], cast[csize_t](len(jsonErrEvent))) + let sendReqRes = sendRequestToWakuThread("waku_new") + if sendReqRes.isErr(): + let msg = "Failed sending waku_new req: " & $sendReqRes.error + onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR - node = wakuNodeRes.get() - - if relay: - waitFor node.mountRelay() - node.peerManager.start() - return RET_OK proc waku_version(onOkCb: WakuCallBack): cint {.dynlib, exportc.} = @@ -186,8 +108,8 @@ proc waku_version(onOkCb: WakuCallBack): cint {.dynlib, exportc.} = return RET_OK -proc waku_set_relay_callback(callback: WakuCallBack) {.dynlib, exportc.} = - extRelayEventCallback = callback +proc waku_set_event_callback(callback: WakuCallBack) {.dynlib, exportc.} = + waku_node_thread.extEventCallback = callback proc waku_content_topic(appName: cstring, appVersion: cuint, @@ -286,67 +208,81 @@ proc waku_relay_publish(pubSubTopic: cstring, DefaultPubsubTopic else: $pst + return RET_OK - if node.wakuRelay.isNil(): - let msg = "Can't publish. WakuRelay is not enabled." - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_ERR - - let pubMsgFut = node.wakuRelay.publish(targetPubSubTopic, wakuMessage) - - # With the next loop we convert an asynchronous call into a synchronous one - for i in 0 .. timeoutMs: - if pubMsgFut.finished(): - break - sleep(1) - - if pubMsgFut.finished(): - let numPeers = pubMsgFut.read() - if numPeers == 0: - let msg = "Message not sent because no peers found." - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_ERR - elif numPeers > 0: - # TODO: pending to return a valid message Id (response when all is correct) - let msg = "hard-coded-message-id" - onOkCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_OK - - else: - let msg = "Timeout expired" - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_ERR + # if ctx.node.wakuRelay.isNil(): + # let msg = "Can't publish. WakuRelay is not enabled." + # onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + # return RET_ERR + + # let pubMsgFut = ctx.node.wakuRelay.publish(targetPubSubTopic, wakuMessage) + + # # With the next loop we convert an asynchronous call into a synchronous one + # for i in 0 .. timeoutMs: + # if pubMsgFut.finished(): + # break + # sleep(1) + + # if pubMsgFut.finished(): + # let numPeers = pubMsgFut.read() + # if numPeers == 0: + # let msg = "Message not sent because no peers found." + # onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + # return RET_ERR + # elif numPeers > 0: + # # TODO: pending to return a valid message Id (response when all is correct) + # let msg = "hard-coded-message-id" + # onOkCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + # return RET_OK + + # else: + # let msg = "Timeout expired" + # onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + # return RET_ERR proc waku_start() {.dynlib, exportc.} = - waitFor node.start() + # createThread(ctx.thread, startNodeAndWaitForever, ctx) + discard sendRequestToWakuThread("waku_start") + # if sendReqRes.isErr(): + # let msg = "Failed sending waku_start req: " & $sendReqRes.error + # onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + + echo "start" proc waku_stop() {.dynlib, exportc.} = - waitFor node.stop() + # waitFor ctx.node.stop() + echo "stop" proc waku_relay_subscribe( pubSubTopic: cstring, onErrCb: WakuCallBack): cint {.dynlib, exportc.} = + + let sendReqRes = sendRequestToWakuThread("waku_subscribe") + if sendReqRes.isErr(): + let msg = "Failed sending waku_subscribe req: " & $sendReqRes.error + onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + return RET_ERR # @params # topic: Pubsub topic to subscribe to. If empty, it subscribes to the default pubsub topic. - if isNil(onErrCb): - return RET_MISSING_CALLBACK +# if isNil(onErrCb): +# return RET_MISSING_CALLBACK - if isNil(extRelayEventCallback): - let msg = $"""Cannot subscribe without a callback. -# Kindly set it with the 'waku_set_relay_callback' function""" - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_MISSING_CALLBACK +# if isNil(extEventCallback): +# let msg = $"""Cannot subscribe without a callback. +# # Kindly set it with the 'waku_set_relay_callback' function""" +# onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) +# return RET_MISSING_CALLBACK - if node.wakuRelay.isNil(): - let msg = $"Cannot subscribe without Waku Relay enabled." - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_ERR +# if ctx.node.wakuRelay.isNil(): +# let msg = $"Cannot subscribe without Waku Relay enabled." +# onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) +# return RET_ERR - let pst = pubSubTopic.alloc() - node.wakuRelay.subscribe(PubsubTopic($pst), - WakuRelayHandler(relayEventCallback)) - deallocShared(pst) +# let pst = pubSubTopic.alloc() +# ctx.node.wakuRelay.subscribe(PubsubTopic($pst), +# WakuRelayHandler(relayEventCallback)) +# deallocShared(pst) return RET_OK @@ -356,23 +292,23 @@ proc waku_relay_unsubscribe( {.dynlib, exportc.} = # @params # topic: Pubsub topic to subscribe to. If empty, it unsubscribes to the default pubsub topic. - if isNil(onErrCb): - return RET_MISSING_CALLBACK +# if isNil(onErrCb): +# return RET_MISSING_CALLBACK - if isNil(extRelayEventCallback): - let msg = """Cannot unsubscribe without a callback. +# if isNil(extEventCallback): +# let msg = """Cannot unsubscribe without a callback. # Kindly set it with the 'waku_set_relay_callback' function""" - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_MISSING_CALLBACK +# onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) +# return RET_MISSING_CALLBACK - if node.wakuRelay.isNil(): - let msg = "Cannot unsubscribe without Waku Relay enabled." - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_ERR +# if ctx.node.wakuRelay.isNil(): +# let msg = "Cannot unsubscribe without Waku Relay enabled." +# onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) +# return RET_ERR - let pst = pubSubTopic.alloc() - node.wakuRelay.unsubscribe(PubsubTopic($pst)) - deallocShared(pst) +# let pst = pubSubTopic.alloc() +# ctx.node.wakuRelay.unsubscribe(PubsubTopic($pst)) +# deallocShared(pst) return RET_OK @@ -380,30 +316,33 @@ proc waku_connect(peerMultiAddr: cstring, timeoutMs: cuint, onErrCb: WakuCallBack): cint {.dynlib, exportc.} = + + let sendReqRes = sendRequestToWakuThread("waku_connect") + if sendReqRes.isErr(): + let msg = "Failed sending waku_connect req: " & $sendReqRes.error + onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + return RET_ERR # peerMultiAddr: comma-separated list of fully-qualified multiaddresses. # var ret = newString(len + 1) # if len > 0: # copyMem(addr ret[0], str, len + 1) - let address = peerMultiAddr.alloc() - let peers = ($address).split(",").mapIt(strip(it)) + # let address = peerMultiAddr.alloc() + # let peers = ($address).split(",").mapIt(strip(it)) - # TODO: the timeoutMs is not being used at all! - let connectFut = node.connectToNodes(peers, source="static") - while not connectFut.finished(): - poll() + # # TODO: the timeoutMs is not being used at all! + # let connectFut = ctx.node.connectToNodes(peers, source="static") + # while not connectFut.finished(): + # poll() - deallocShared(address) + # deallocShared(address) - if not connectFut.completed(): - let msg = "Timeout expired." - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_ERR + # if not connectFut.completed(): + # let msg = "Timeout expired." + # onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + # return RET_ERR return RET_OK -proc waku_poll() {.dynlib, exportc, gcsafe.} = - poll() - ### End of exported procs ################################################################################ diff --git a/library/node/protocols/relay.nim b/library/node/protocols/relay.nim new file mode 100644 index 0000000000..e69de29bb2 diff --git a/library/working_thread/inter_thread_msg_types.nim b/library/working_thread/inter_thread_msg_types.nim new file mode 100644 index 0000000000..dbf19149f7 --- /dev/null +++ b/library/working_thread/inter_thread_msg_types.nim @@ -0,0 +1,15 @@ + + +type + MsgType = enum + NODE_MANAGEMENT, + RELAY, + STORE, + FILTER, + ADMIN, + LIGHT_PUSH + +type + RequestMsg = ref object + msgType: MsgType + # content: MsgReqContent diff --git a/library/working_thread/request_manager.nim b/library/working_thread/request_manager.nim new file mode 100644 index 0000000000..35211f3323 --- /dev/null +++ b/library/working_thread/request_manager.nim @@ -0,0 +1,12 @@ + +import + std/json + +proc handleRequest*(ctx: ptr Context, request: cstring) = + + + { + "type": "relay", + "content": { + } + } \ No newline at end of file diff --git a/library/working_thread/waku_node_thread.nim b/library/working_thread/waku_node_thread.nim new file mode 100644 index 0000000000..8343c3737f --- /dev/null +++ b/library/working_thread/waku_node_thread.nim @@ -0,0 +1,233 @@ + +{.pragma: exported, exportc, cdecl, raises: [].} +{.pragma: callback, cdecl, raises: [], gcsafe.} +{.passc: "-fPIC".} + +import + std/[json,sequtils,times,strformat,options,atomics,strutils,os] +import + chronicles, + chronos, + stew/results, + stew/shims/net +import + ../../../waku/common/enr/builder, + ../../../waku/v2/waku_enr/capabilities, + ../../../waku/v2/waku_enr/multiaddr, + ../../../waku/v2/waku_enr/sharding, + ../../../waku/v2/waku_core/message/message, + ../../../waku/v2/waku_core/topics/pubsub_topic, + ../../../waku/v2/node/peer_manager/peer_manager, + ../../../waku/v2/waku_core, + ../../../waku/v2/node/waku_node, + ../../../waku/v2/node/builder, + ../../../waku/v2/node/config, + ../../../waku/v2/waku_relay/protocol, + ../events/[json_error_event,json_message_event,json_base_event], + ../memory, + ../config + +type + Context* = object + thread: Thread[(ptr Context, cstring)] + node: WakuNode + reqChannel: Channel[cstring] + respChannel: Channel[cstring] + +var ctx {.threadvar.}: ptr Context + +# To control when the thread is running +var running: Atomic[bool] + +type + WakuCallBack* = proc(msg: ptr cchar, len: csize_t) {.cdecl, gcsafe.} + +# May keep a reference to a callback defined externally +var extEventCallback*: WakuCallBack = nil + +proc relayEventCallback(pubsubTopic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = +# proc relayEventCallback(pubsubTopic: string, +# msg: WakuMessage): +# Future[void] {.gcsafe, raises: [Defect].} = + # Callback that hadles the Waku Relay events. i.e. messages or errors. + if not isNil(extEventCallback): + try: + let event = $JsonMessageEvent.new(pubsubTopic, msg) + extEventCallback(unsafeAddr event[0], cast[csize_t](len(event))) + except Exception,CatchableError: + error "Exception when calling 'eventCallBack': " & + getCurrentExceptionMsg() + else: + error "extEventCallback is nil" + +proc createNode*(ctx: ptr Context, + configJson: cstring): Result[void, string] = + + var privateKey: PrivateKey + var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"), + Port(60000'u16)).value + var relay: bool + var topics = @[""] + var jsonResp: JsonEvent + + let cj = configJson.alloc() + + if not parseConfig($cj, + privateKey, + netConfig, + relay, + topics, + jsonResp): + deallocShared(cj) + return err($jsonResp) + + deallocShared(cj) + + var enrBuilder = EnrBuilder.init(privateKey) + + enrBuilder.withIpAddressAndPorts( + netConfig.enrIp, + netConfig.enrPort, + netConfig.discv5UdpPort + ) + + if netConfig.wakuFlags.isSome(): + enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get()) + + enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs) + + let addShardedTopics = enrBuilder.withShardedTopics(topics) + if addShardedTopics.isErr(): + let msg = "Error setting shared topics: " & $addShardedTopics.error + return err($JsonErrorEvent.new(msg)) + + let recordRes = enrBuilder.build() + let record = + if recordRes.isErr(): + let msg = "Error building enr record: " & $recordRes.error + return err($JsonErrorEvent.new(msg)) + + else: recordRes.get() + + var builder = WakuNodeBuilder.init() + builder.withRng(crypto.newRng()) + builder.withNodeKey(privateKey) + builder.withRecord(record) + builder.withNetworkConfiguration(netConfig) + builder.withSwitchConfiguration( + maxConnections = some(50.int) + ) + + let wakuNodeRes = builder.build() + if wakuNodeRes.isErr(): + let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error + return err($JsonErrorEvent.new(errorMsg)) + + ctx.node = wakuNodeRes.get() + + if relay: + waitFor ctx.node.mountRelay() + ctx.node.peerManager.start() + + return ok() + +# Every Nim library must have this function called - the name is derived from +# the `--nimMainPrefix` command line option +proc NimMain() {.importc.} +var initialized: Atomic[bool] + +proc waku_init() = + if not initialized.exchange(true): + NimMain() # Every Nim library needs to call `NimMain` once exactly + when declared(setupForeignThreadGc): setupForeignThreadGc() + when declared(nimGC_setStackBottom): + var locals {.volatile, noinit.}: pointer + locals = addr(locals) + nimGC_setStackBottom(locals) + +proc run(args: tuple [ctx: ptr Context, + configJson: cstring]) {.thread.} = + # This is the worker thread body. This thread runs the Waku node + # and attend possible library user requests (stop, connect_to, etc.) + waku_init() + + while running.load == true: + # Trying to get a request from the libwaku main thread + let req = args.ctx.reqChannel.tryRecv() + if req[0] == true: + + if req[1] == "waku_new": + let ret = createNode(args.ctx, args.configJson) + if ret.isErr(): + let msg = "ERROR: " & ret.error + args.ctx.respChannel.send(cast[cstring](msg)) + + if req[1] == "waku_start": + # TODO: wait the future properly + discard args.ctx.node.start() + + if req[1] == "waku_connect": + discard args.ctx.node.connectToNodes(@["/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN"], source="static") + + if req[1] == "waku_subscribe": + let a = PubsubTopic("/waku/2/default-waku/proto") + let b = WakuRelayHandler(relayEventCallback) + args.ctx.node.wakuRelay.subscribe(a, b) + + args.ctx.respChannel.send("OK") + + poll() + +proc startThread*(configJson: cstring): Result[void, string] = + # This proc is called from the main thread and it creates + # the Waku working thread. + waku_init() + ctx = createShared(Context, 1) + + ctx.reqChannel.open() + ctx.respChannel.open() + + running.store(true) + + let cfgJson = configJson.alloc() + + try: + createThread(ctx.thread, run, (ctx, cfgJson)) + except ResourceExhaustedError: + # deallocShared for byte allocations + deallocShared(cfgJson) + # and freeShared for typed allocations! + freeShared(ctx) + + return err("failed to create a thread: " & getCurrentExceptionMsg()) + + return ok() + +proc stopWakuNodeThread*() = + running.store(false) + ctx.reqChannel.send("Close") + joinThread(ctx.thread) + + ctx.reqChannel.close() + ctx.respChannel.close() + +proc sendRequestToWakuThread*(req: cstring, + timeoutMs: int = 300_000): + Result[string, string] = + + ctx.reqChannel.send(req) + + var count = 0 + var resp = ctx.respChannel.tryRecv() + while resp[0] == false: + resp = ctx.respChannel.tryRecv() + os.sleep(1) + count.inc() + + if count > timeoutMs: + let msg = "Timeout expired. request: " & $req & + ". timeout in ms: " & $timeoutMs + return err(msg) + + return ok($resp[1]) + From 5dfac1f17ed024ea387f759bdd7cf97422125959 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 28 Jul 2023 08:04:52 +0200 Subject: [PATCH 2/4] Refactoring to have a working waku_thread --- examples/cbindings/waku_example.c | 3 +- library/alloc.nim | 7 + library/libwaku.nim | 223 ++++++------------ library/node/protocols/relay.nim | 0 library/{ => waku_thread}/config.nim | 2 +- .../node_lifecycle_request.nim | 36 +++ .../peer_manager_request.nim | 59 +++++ .../protocols/relay_request.nim | 50 ++++ .../inter_thread_communication/request.nim | 21 ++ .../inter_thread_communication/response.nim | 16 ++ .../waku_thread.nim} | 137 ++++------- .../working_thread/inter_thread_msg_types.nim | 15 -- library/working_thread/request_manager.nim | 12 - 13 files changed, 310 insertions(+), 271 deletions(-) create mode 100644 library/alloc.nim delete mode 100644 library/node/protocols/relay.nim rename library/{ => waku_thread}/config.nim (99%) create mode 100644 library/waku_thread/inter_thread_communication/node_lifecycle_request.nim create mode 100644 library/waku_thread/inter_thread_communication/peer_manager_request.nim create mode 100644 library/waku_thread/inter_thread_communication/protocols/relay_request.nim create mode 100644 library/waku_thread/inter_thread_communication/request.nim create mode 100644 library/waku_thread/inter_thread_communication/response.nim rename library/{working_thread/waku_node_thread.nim => waku_thread/waku_thread.nim} (59%) delete mode 100644 library/working_thread/inter_thread_msg_types.nim delete mode 100644 library/working_thread/request_manager.nim diff --git a/examples/cbindings/waku_example.c b/examples/cbindings/waku_example.c index 4040dac5de..3ffcd42535 100644 --- a/examples/cbindings/waku_example.c +++ b/examples/cbindings/waku_example.c @@ -276,11 +276,10 @@ int main(int argc, char** argv) { WAKU_CALL( waku_new(jsonConfig, handle_error) ); - waku_set_event_callback(event_handler); waku_start(); - // printf("Establishing connection with: %s\n", cfgNode.peers); + printf("Establishing connection with: %s\n", cfgNode.peers); WAKU_CALL( waku_connect(cfgNode.peers, 10000 /* timeoutMs */, diff --git a/library/alloc.nim b/library/alloc.nim new file mode 100644 index 0000000000..44d5e67eae --- /dev/null +++ b/library/alloc.nim @@ -0,0 +1,7 @@ + +proc alloc*(str: cstring): cstring = + # Byte allocation from the given address. + # There should be the corresponding manual deallocation with deallocShared ! + let ret = cast[cstring](allocShared(len(str) + 1)) + copyMem(ret, str, len(str) + 1) + return ret diff --git a/library/libwaku.nim b/library/libwaku.nim index ed44e85b5b..69c062397a 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -5,27 +5,21 @@ import std/[json,sequtils,times,strformat,options,atomics,strutils], - strutils, - os + strutils import chronicles, - chronos, - stew/shims/net + chronos import - ../../waku/common/enr/builder, - ../../waku/v2/waku_enr/capabilities, - ../../waku/v2/waku_enr/multiaddr, - ../../waku/v2/waku_enr/sharding, ../../waku/v2/waku_core/message/message, - ../../waku/v2/waku_core/topics/pubsub_topic, - ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/waku_node, - ../../waku/v2/node/builder, - ../../waku/v2/node/config, - # ../../waku/v2/waku_relay/protocol, - ./events/[json_error_event,json_message_event,json_base_event], - ./working_thread/waku_node_thread, - ./memory + ../../waku/v2/waku_core/topics/pubsub_topic, + ../../../waku/v2/waku_relay/protocol, + ./events/json_message_event, + ./waku_thread/waku_thread, + ./waku_thread/inter_thread_communication/node_lifecycle_request, + ./waku_thread/inter_thread_communication/peer_manager_request, + ./waku_thread/inter_thread_communication/protocols/relay_request, + ./alloc ################################################################################ ### Wrapper around the waku node @@ -38,37 +32,30 @@ const RET_OK: cint = 0 const RET_ERR: cint = 1 const RET_MISSING_CALLBACK: cint = 2 +type + WakuCallBack* = proc(msg: ptr cchar, len: csize_t) {.cdecl, gcsafe.} + ### End of exported types ################################################################################ ################################################################################ ### Not-exported components -# proc relayEventCallback(pubsubTopic: string, -# msg: WakuMessage): -# Future[void] {.gcsafe, raises: [Defect].} - -# proc startNodeAndWaitForever(ctx: ptr Context) {.thread.} = - -# waitFor ctx.node.mountRelay() -# ctx.node.peerManager.start() - -# waitFor ctx.node.start() - -# ctx.node.wakuRelay.subscribe(PubsubTopic("/waku/2/default-waku/proto"), -# WakuRelayHandler(relayEventCallback)) - -# let address = "/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN" -# let peers = ($address).split(",").mapIt(strip(it)) - -# # TODO: the timeoutMs is not being used at all! -# let connectFut = ctx.node.connectToNodes(peers, source="static") -# while not connectFut.finished(): -# poll() - -# while true: -# # echo "JJJJ ", $(ctx[].node) -# poll() +# May keep a reference to a callback defined externally +var extEventCallback*: WakuCallBack = nil + +proc relayEventCallback(pubsubTopic: PubsubTopic, + msg: WakuMessage): Future[void] {.async, gcsafe.} = + # Callback that hadles the Waku Relay events. i.e. messages or errors. + if not isNil(extEventCallback): + try: + let event = $JsonMessageEvent.new(pubsubTopic, msg) + extEventCallback(unsafeAddr event[0], cast[csize_t](len(event))) + except Exception,CatchableError: + error "Exception when calling 'eventCallBack': " & + getCurrentExceptionMsg() + else: + error "extEventCallback is nil" ### End of not-exported components ################################################################################ @@ -85,15 +72,9 @@ proc waku_new(configJson: cstring, if isNil(onErrCb): return RET_MISSING_CALLBACK - let startThRes = waku_node_thread.startThread(configJson) - if startThRes.isErr(): - let msg = "Error in startThread: " & $startThRes.error - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_ERR - - let sendReqRes = sendRequestToWakuThread("waku_new") - if sendReqRes.isErr(): - let msg = "Failed sending waku_new req: " & $sendReqRes.error + let createThRes = waku_thread.createWakuThread(configJson) + if createThRes.isErr(): + let msg = "Error in createWakuThread: " & $createThRes.error onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR @@ -109,7 +90,7 @@ proc waku_version(onOkCb: WakuCallBack): cint {.dynlib, exportc.} = return RET_OK proc waku_set_event_callback(callback: WakuCallBack) {.dynlib, exportc.} = - waku_node_thread.extEventCallback = callback + extEventCallback = callback proc waku_content_topic(appName: cstring, appVersion: cuint, @@ -210,79 +191,35 @@ proc waku_relay_publish(pubSubTopic: cstring, $pst return RET_OK - # if ctx.node.wakuRelay.isNil(): - # let msg = "Can't publish. WakuRelay is not enabled." - # onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - # return RET_ERR - - # let pubMsgFut = ctx.node.wakuRelay.publish(targetPubSubTopic, wakuMessage) - - # # With the next loop we convert an asynchronous call into a synchronous one - # for i in 0 .. timeoutMs: - # if pubMsgFut.finished(): - # break - # sleep(1) - - # if pubMsgFut.finished(): - # let numPeers = pubMsgFut.read() - # if numPeers == 0: - # let msg = "Message not sent because no peers found." - # onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - # return RET_ERR - # elif numPeers > 0: - # # TODO: pending to return a valid message Id (response when all is correct) - # let msg = "hard-coded-message-id" - # onOkCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - # return RET_OK - - # else: - # let msg = "Timeout expired" - # onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - # return RET_ERR - proc waku_start() {.dynlib, exportc.} = - # createThread(ctx.thread, startNodeAndWaitForever, ctx) - discard sendRequestToWakuThread("waku_start") - # if sendReqRes.isErr(): - # let msg = "Failed sending waku_start req: " & $sendReqRes.error - # onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - - echo "start" + discard waku_thread.sendRequestToWakuThread( + NodeLifecycleRequest.new( + NodeLifecycleMsgType.START_NODE, + )) proc waku_stop() {.dynlib, exportc.} = - # waitFor ctx.node.stop() - echo "stop" + discard waku_thread.sendRequestToWakuThread( + # let createThRes = waku_thread.sendRequestToWakuThread( + NodeLifecycleRequest.new( + NodeLifecycleMsgType.STOP_NODE, + )) proc waku_relay_subscribe( pubSubTopic: cstring, onErrCb: WakuCallBack): cint {.dynlib, exportc.} = + + let pst = pubSubTopic.alloc() + let sendReqRes = waku_thread.sendRequestToWakuThread( + RelayRequest.new(RelayMsgType.SUBSCRIBE, + PubsubTopic($pst), + WakuRelayHandler(relayEventCallback))) + deallocShared(pst) - let sendReqRes = sendRequestToWakuThread("waku_subscribe") if sendReqRes.isErr(): - let msg = "Failed sending waku_subscribe req: " & $sendReqRes.error + let msg = $sendReqRes.error onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR - # @params - # topic: Pubsub topic to subscribe to. If empty, it subscribes to the default pubsub topic. -# if isNil(onErrCb): -# return RET_MISSING_CALLBACK - -# if isNil(extEventCallback): -# let msg = $"""Cannot subscribe without a callback. -# # Kindly set it with the 'waku_set_relay_callback' function""" -# onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) -# return RET_MISSING_CALLBACK - -# if ctx.node.wakuRelay.isNil(): -# let msg = $"Cannot subscribe without Waku Relay enabled." -# onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) -# return RET_ERR - -# let pst = pubSubTopic.alloc() -# ctx.node.wakuRelay.subscribe(PubsubTopic($pst), -# WakuRelayHandler(relayEventCallback)) -# deallocShared(pst) return RET_OK @@ -290,58 +227,32 @@ proc waku_relay_unsubscribe( pubSubTopic: cstring, onErrCb: WakuCallBack): cint {.dynlib, exportc.} = - # @params - # topic: Pubsub topic to subscribe to. If empty, it unsubscribes to the default pubsub topic. -# if isNil(onErrCb): -# return RET_MISSING_CALLBACK - -# if isNil(extEventCallback): -# let msg = """Cannot unsubscribe without a callback. -# Kindly set it with the 'waku_set_relay_callback' function""" -# onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) -# return RET_MISSING_CALLBACK - -# if ctx.node.wakuRelay.isNil(): -# let msg = "Cannot unsubscribe without Waku Relay enabled." -# onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) -# return RET_ERR - -# let pst = pubSubTopic.alloc() -# ctx.node.wakuRelay.unsubscribe(PubsubTopic($pst)) -# deallocShared(pst) - - return RET_OK -proc waku_connect(peerMultiAddr: cstring, - timeoutMs: cuint, - onErrCb: WakuCallBack): cint - {.dynlib, exportc.} = + let pst = pubSubTopic.alloc() + let sendReqRes = waku_thread.sendRequestToWakuThread( + RelayRequest.new(RelayMsgType.UNSUBSCRIBE, + PubsubTopic($pst), + WakuRelayHandler(relayEventCallback))) + deallocShared(pst) - let sendReqRes = sendRequestToWakuThread("waku_connect") if sendReqRes.isErr(): - let msg = "Failed sending waku_connect req: " & $sendReqRes.error + let msg = $sendReqRes.error onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR - # peerMultiAddr: comma-separated list of fully-qualified multiaddresses. - # var ret = newString(len + 1) - # if len > 0: - # copyMem(addr ret[0], str, len + 1) - # let address = peerMultiAddr.alloc() - # let peers = ($address).split(",").mapIt(strip(it)) - - # # TODO: the timeoutMs is not being used at all! - # let connectFut = ctx.node.connectToNodes(peers, source="static") - # while not connectFut.finished(): - # poll() - - # deallocShared(address) + return RET_OK - # if not connectFut.completed(): - # let msg = "Timeout expired." - # onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - # return RET_ERR +proc waku_connect(peerMultiAddr: cstring, + timeoutMs: cuint, + onErrCb: WakuCallBack): cint + {.dynlib, exportc.} = + discard waku_thread.sendRequestToWakuThread( + PeerManagementRequest.new( + PeerManagementMsgType.CONNECT_TO, + $peerMultiAddr, + timeoutMs, + )) return RET_OK ### End of exported procs diff --git a/library/node/protocols/relay.nim b/library/node/protocols/relay.nim deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/library/config.nim b/library/waku_thread/config.nim similarity index 99% rename from library/config.nim rename to library/waku_thread/config.nim index 13e18212de..fd2186282e 100644 --- a/library/config.nim +++ b/library/waku_thread/config.nim @@ -10,7 +10,7 @@ import ../../waku/common/utils/nat, ../../waku/v2/node/waku_node, ../../waku/v2/node/config, - ./events/[json_error_event,json_base_event] + ../events/[json_error_event,json_base_event] proc parsePrivateKey(jsonNode: JsonNode, privateKey: var PrivateKey, diff --git a/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim new file mode 100644 index 0000000000..6af9ca1df3 --- /dev/null +++ b/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim @@ -0,0 +1,36 @@ + +import + std/options +import + chronos, + stew/results, + stew/shims/net +import + ../../../waku/v2/node/waku_node, + ./request, + ./response + +type + NodeLifecycleMsgType* = enum + START_NODE + STOP_NODE + +type + NodeLifecycleRequest* = ref object of InterThreadRequest + operation: NodeLifecycleMsgType + +proc new*(T: type NodeLifecycleRequest, + op: NodeLifecycleMsgType): T = + + return NodeLifecycleRequest(operation: op) + +method process*(self: NodeLifecycleRequest, + node: WakuNode): Future[InterThreadResponse] {.async.} = + case self.operation: + of START_NODE: + waitFor node.start() + + of STOP_NODE: + waitFor node.stop() + + return InterThreadResponse(result: ResultType.OK) diff --git a/library/waku_thread/inter_thread_communication/peer_manager_request.nim b/library/waku_thread/inter_thread_communication/peer_manager_request.nim new file mode 100644 index 0000000000..38517a3ed4 --- /dev/null +++ b/library/waku_thread/inter_thread_communication/peer_manager_request.nim @@ -0,0 +1,59 @@ + +import + std/[options,sequtils,strutils] +import + chronicles, + chronos, + stew/results, + stew/shims/net +import + ../../../waku/v2/node/waku_node, + ./request, + ./response + +type + PeerManagementMsgType* = enum + CONNECT_TO + +type + PeerManagementRequest* = ref object of InterThreadRequest + operation: PeerManagementMsgType + peerMultiAddr: string + timeoutMs: cuint + +proc new*(T: type PeerManagementRequest, + op: PeerManagementMsgType, + peerMultiAddr: string, + timeoutMs: cuint): T = + + return PeerManagementRequest(operation: op, + peerMultiAddr: peerMultiAddr, + timeoutMs: timeoutMs) + +proc connectTo(node: WakuNode, + peerMultiAddr: string): Result[void, string] = + # let sendReqRes = sendRequestToWakuThread("waku_connect") + + let peers = (peerMultiAddr).split(",").mapIt(strip(it)) + + # TODO: the timeoutMs is not being used at all! + let connectFut = node.connectToNodes(peers, source="static") + while not connectFut.finished(): + poll() + + if not connectFut.completed(): + let msg = "Timeout expired." + return err(msg) + + return ok() + +method process*(self: PeerManagementRequest, + node: WakuNode): Future[InterThreadResponse] {.async.} = + case self.operation: + of CONNECT_TO: + let ret = node.connectTo(self.peerMultiAddr) + if ret.isErr(): + return InterThreadResponse(result: ResultType.OK, + message: ret.error) + + return InterThreadResponse(result: ResultType.OK) diff --git a/library/waku_thread/inter_thread_communication/protocols/relay_request.nim b/library/waku_thread/inter_thread_communication/protocols/relay_request.nim new file mode 100644 index 0000000000..3bd703c325 --- /dev/null +++ b/library/waku_thread/inter_thread_communication/protocols/relay_request.nim @@ -0,0 +1,50 @@ + +import + std/[options,sequtils,strutils] +import + chronicles, + chronos, + stew/results, + stew/shims/net +import + ../../../../waku/v2/node/waku_node, + ../../../../waku/v2/waku_core/topics/pubsub_topic, + ../../../../waku/v2/waku_relay/protocol, + ../request, + ../response + +type + RelayMsgType* = enum + SUBSCRIBE + UNSUBSCRIBE + +type + RelayRequest* = ref object of InterThreadRequest + operation: RelayMsgType + pubsubTopic: PubsubTopic + relayEventCallback: WakuRelayHandler + +proc new*(T: type RelayRequest, + op: RelayMsgType, + pubsubTopic: PubsubTopic, + relayEventCallback: WakuRelayHandler): T = + + return RelayRequest(operation: op, + pubsubTopic: pubsubTopic, + relayEventCallback: relayEventCallback) + +method process*(self: RelayRequest, + node: WakuNode): Future[InterThreadResponse] {.async.} = + + if node.wakuRelay.isNil(): + let msg = "Cannot subscribe or unsubscribe without Waku Relay enabled." + return InterThreadResponse(result: ResultType.ERROR, + message: msg) + case self.operation: + of SUBSCRIBE: + node.wakuRelay.subscribe(self.pubsubTopic, + self.relayEventCallback) + of UNSUBSCRIBE: + node.wakuRelay.unsubscribe(self.pubsubTopic) + + return InterThreadResponse(result: ResultType.OK) diff --git a/library/waku_thread/inter_thread_communication/request.nim b/library/waku_thread/inter_thread_communication/request.nim new file mode 100644 index 0000000000..617555ad7c --- /dev/null +++ b/library/waku_thread/inter_thread_communication/request.nim @@ -0,0 +1,21 @@ + +# This file contains the base message request type that will be handled +# by the Waku Node thread. + +import + std/json +import + chronos +import + ./response, + ../../../waku/v2/node/waku_node, + ../waku_thread + +type + InterThreadRequest* = ref object of RootObj + +method process*(self: InterThreadRequest, node: WakuNode): + Future[InterThreadResponse] {.base.} = discard + +proc `$`*(self: InterThreadRequest): string = + return $( %* self ) diff --git a/library/waku_thread/inter_thread_communication/response.nim b/library/waku_thread/inter_thread_communication/response.nim new file mode 100644 index 0000000000..7a7a284134 --- /dev/null +++ b/library/waku_thread/inter_thread_communication/response.nim @@ -0,0 +1,16 @@ + +import + std/json + +type + ResultType* = enum + OK + ERROR + +type + InterThreadResponse* = object + result*: ResultType + message*: string # only used to give feedback when an error occurs + +proc `$`*(self: InterThreadResponse): string = + return $( %* self ) diff --git a/library/working_thread/waku_node_thread.nim b/library/waku_thread/waku_thread.nim similarity index 59% rename from library/working_thread/waku_node_thread.nim rename to library/waku_thread/waku_thread.nim index 8343c3737f..edf25c73b2 100644 --- a/library/working_thread/waku_node_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -24,44 +24,39 @@ import ../../../waku/v2/node/config, ../../../waku/v2/waku_relay/protocol, ../events/[json_error_event,json_message_event,json_base_event], - ../memory, - ../config + ../alloc, + ./config, + ./inter_thread_communication/request, + ./inter_thread_communication/response type Context* = object - thread: Thread[(ptr Context, cstring)] + thread: Thread[(ptr Context)] + reqChannel: Channel[InterThreadRequest] + respChannel: Channel[InterThreadResponse] node: WakuNode - reqChannel: Channel[cstring] - respChannel: Channel[cstring] var ctx {.threadvar.}: ptr Context # To control when the thread is running var running: Atomic[bool] -type - WakuCallBack* = proc(msg: ptr cchar, len: csize_t) {.cdecl, gcsafe.} - -# May keep a reference to a callback defined externally -var extEventCallback*: WakuCallBack = nil - -proc relayEventCallback(pubsubTopic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = -# proc relayEventCallback(pubsubTopic: string, -# msg: WakuMessage): -# Future[void] {.gcsafe, raises: [Defect].} = - # Callback that hadles the Waku Relay events. i.e. messages or errors. - if not isNil(extEventCallback): - try: - let event = $JsonMessageEvent.new(pubsubTopic, msg) - extEventCallback(unsafeAddr event[0], cast[csize_t](len(event))) - except Exception,CatchableError: - error "Exception when calling 'eventCallBack': " & - getCurrentExceptionMsg() - else: - error "extEventCallback is nil" - -proc createNode*(ctx: ptr Context, - configJson: cstring): Result[void, string] = +# Every Nim library must have this function called - the name is derived from +# the `--nimMainPrefix` command line option +proc NimMain() {.importc.} +var initialized: Atomic[bool] + +proc waku_init() = + if not initialized.exchange(true): + NimMain() # Every Nim library needs to call `NimMain` once exactly + when declared(setupForeignThreadGc): setupForeignThreadGc() + when declared(nimGC_setStackBottom): + var locals {.volatile, noinit.}: pointer + locals = addr(locals) + nimGC_setStackBottom(locals) + +proc createNode(configJson: cstring): Result[WakuNode, string] = + ## Creates a new WakuNode and assigns it to the node parameter var privateKey: PrivateKey var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"), @@ -123,79 +118,50 @@ proc createNode*(ctx: ptr Context, let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error return err($JsonErrorEvent.new(errorMsg)) - ctx.node = wakuNodeRes.get() + var newNode = wakuNodeRes.get() if relay: - waitFor ctx.node.mountRelay() - ctx.node.peerManager.start() - - return ok() - -# Every Nim library must have this function called - the name is derived from -# the `--nimMainPrefix` command line option -proc NimMain() {.importc.} -var initialized: Atomic[bool] + waitFor newNode.mountRelay() + newNode.peerManager.start() -proc waku_init() = - if not initialized.exchange(true): - NimMain() # Every Nim library needs to call `NimMain` once exactly - when declared(setupForeignThreadGc): setupForeignThreadGc() - when declared(nimGC_setStackBottom): - var locals {.volatile, noinit.}: pointer - locals = addr(locals) - nimGC_setStackBottom(locals) + return ok(newNode) -proc run(args: tuple [ctx: ptr Context, - configJson: cstring]) {.thread.} = - # This is the worker thread body. This thread runs the Waku node - # and attend possible library user requests (stop, connect_to, etc.) - waku_init() +proc run(ctx: ptr Context) {.thread.} = + ## This is the worker thread body. This thread runs the Waku node + ## and attends library user requests (stop, connect_to, etc.) while running.load == true: - # Trying to get a request from the libwaku main thread - let req = args.ctx.reqChannel.tryRecv() + ## Trying to get a request from the libwaku main thread + let req = ctx.reqChannel.tryRecv() if req[0] == true: + let response = waitFor req[1].process(ctx.node) + ctx.respChannel.send( response ) - if req[1] == "waku_new": - let ret = createNode(args.ctx, args.configJson) - if ret.isErr(): - let msg = "ERROR: " & ret.error - args.ctx.respChannel.send(cast[cstring](msg)) - - if req[1] == "waku_start": - # TODO: wait the future properly - discard args.ctx.node.start() - - if req[1] == "waku_connect": - discard args.ctx.node.connectToNodes(@["/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN"], source="static") + poll() - if req[1] == "waku_subscribe": - let a = PubsubTopic("/waku/2/default-waku/proto") - let b = WakuRelayHandler(relayEventCallback) - args.ctx.node.wakuRelay.subscribe(a, b) + tearDownForeignThreadGc() - args.ctx.respChannel.send("OK") +proc createWakuThread*(configJson: cstring): Result[void, string] = + ## This proc is called from the main thread and it creates + ## the Waku working thread. - poll() - -proc startThread*(configJson: cstring): Result[void, string] = - # This proc is called from the main thread and it creates - # the Waku working thread. waku_init() - ctx = createShared(Context, 1) + ctx = createShared(Context, 1) ctx.reqChannel.open() ctx.respChannel.open() - running.store(true) + let newNodeRes = createNode(configJson) + if newNodeRes.isErr(): + return err(newNodeRes.error) - let cfgJson = configJson.alloc() + ctx.node = newNodeRes.get() + + running.store(true) try: - createThread(ctx.thread, run, (ctx, cfgJson)) + createThread(ctx.thread, run, ctx) except ResourceExhaustedError: - # deallocShared for byte allocations - deallocShared(cfgJson) # and freeShared for typed allocations! freeShared(ctx) @@ -205,15 +171,16 @@ proc startThread*(configJson: cstring): Result[void, string] = proc stopWakuNodeThread*() = running.store(false) - ctx.reqChannel.send("Close") joinThread(ctx.thread) ctx.reqChannel.close() ctx.respChannel.close() -proc sendRequestToWakuThread*(req: cstring, + freeShared(ctx) + +proc sendRequestToWakuThread*(req: InterThreadRequest, timeoutMs: int = 300_000): - Result[string, string] = + Result[InterThreadResponse, string] = ctx.reqChannel.send(req) @@ -229,5 +196,5 @@ proc sendRequestToWakuThread*(req: cstring, ". timeout in ms: " & $timeoutMs return err(msg) - return ok($resp[1]) + return ok(resp[1]) diff --git a/library/working_thread/inter_thread_msg_types.nim b/library/working_thread/inter_thread_msg_types.nim deleted file mode 100644 index dbf19149f7..0000000000 --- a/library/working_thread/inter_thread_msg_types.nim +++ /dev/null @@ -1,15 +0,0 @@ - - -type - MsgType = enum - NODE_MANAGEMENT, - RELAY, - STORE, - FILTER, - ADMIN, - LIGHT_PUSH - -type - RequestMsg = ref object - msgType: MsgType - # content: MsgReqContent diff --git a/library/working_thread/request_manager.nim b/library/working_thread/request_manager.nim deleted file mode 100644 index 35211f3323..0000000000 --- a/library/working_thread/request_manager.nim +++ /dev/null @@ -1,12 +0,0 @@ - -import - std/json - -proc handleRequest*(ctx: ptr Context, request: cstring) = - - - { - "type": "relay", - "content": { - } - } \ No newline at end of file From 23e33bfb9ced1bf7c064c7407bd31e50b022e48d Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 28 Jul 2023 14:30:42 +0200 Subject: [PATCH 3/4] Code simplification --- library/libwaku.nim | 54 ++++++++++++------- .../node_lifecycle_request.nim | 9 ++-- .../peer_manager_request.nim | 26 ++++----- .../protocols/relay_request.nim | 38 ++++++++----- .../inter_thread_communication/request.nim | 6 +-- .../inter_thread_communication/response.nim | 16 ------ library/waku_thread/waku_thread.nim | 18 ++----- 7 files changed, 85 insertions(+), 82 deletions(-) delete mode 100644 library/waku_thread/inter_thread_communication/response.nim diff --git a/library/libwaku.nim b/library/libwaku.nim index 69c062397a..e8ca65a9f1 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -15,7 +15,7 @@ import ../../waku/v2/waku_core/topics/pubsub_topic, ../../../waku/v2/waku_relay/protocol, ./events/json_message_event, - ./waku_thread/waku_thread, + ./waku_thread/waku_thread as waku_thread_module, ./waku_thread/inter_thread_communication/node_lifecycle_request, ./waku_thread/inter_thread_communication/peer_manager_request, ./waku_thread/inter_thread_communication/protocols/relay_request, @@ -72,7 +72,7 @@ proc waku_new(configJson: cstring, if isNil(onErrCb): return RET_MISSING_CALLBACK - let createThRes = waku_thread.createWakuThread(configJson) + let createThRes = waku_thread_module.createWakuThread(configJson) if createThRes.isErr(): let msg = "Error in createWakuThread: " & $createThRes.error onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) @@ -189,28 +189,38 @@ proc waku_relay_publish(pubSubTopic: cstring, DefaultPubsubTopic else: $pst + + let sendReqRes = waku_thread_module.sendRequestToWakuThread( + RelayRequest.new(RelayMsgType.PUBLISH, + PubsubTopic($pst), + WakuRelayHandler(relayEventCallback), + wakuMessage)) + deallocShared(pst) + + if sendReqRes.isErr(): + let msg = $sendReqRes.error + onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + return RET_ERR + return RET_OK proc waku_start() {.dynlib, exportc.} = - discard waku_thread.sendRequestToWakuThread( - NodeLifecycleRequest.new( - NodeLifecycleMsgType.START_NODE, - )) + discard waku_thread_module.sendRequestToWakuThread( + NodeLifecycleRequest.new( + NodeLifecycleMsgType.START_NODE)) proc waku_stop() {.dynlib, exportc.} = - discard waku_thread.sendRequestToWakuThread( - # let createThRes = waku_thread.sendRequestToWakuThread( - NodeLifecycleRequest.new( - NodeLifecycleMsgType.STOP_NODE, - )) + discard waku_thread_module.sendRequestToWakuThread( + NodeLifecycleRequest.new( + NodeLifecycleMsgType.STOP_NODE)) proc waku_relay_subscribe( pubSubTopic: cstring, onErrCb: WakuCallBack): cint {.dynlib, exportc.} = - + let pst = pubSubTopic.alloc() - let sendReqRes = waku_thread.sendRequestToWakuThread( + let sendReqRes = waku_thread_module.sendRequestToWakuThread( RelayRequest.new(RelayMsgType.SUBSCRIBE, PubsubTopic($pst), WakuRelayHandler(relayEventCallback))) @@ -229,7 +239,7 @@ proc waku_relay_unsubscribe( {.dynlib, exportc.} = let pst = pubSubTopic.alloc() - let sendReqRes = waku_thread.sendRequestToWakuThread( + let sendReqRes = waku_thread_module.sendRequestToWakuThread( RelayRequest.new(RelayMsgType.UNSUBSCRIBE, PubsubTopic($pst), WakuRelayHandler(relayEventCallback))) @@ -247,12 +257,16 @@ proc waku_connect(peerMultiAddr: cstring, onErrCb: WakuCallBack): cint {.dynlib, exportc.} = - discard waku_thread.sendRequestToWakuThread( - PeerManagementRequest.new( - PeerManagementMsgType.CONNECT_TO, - $peerMultiAddr, - timeoutMs, - )) + let connRes = waku_thread_module.sendRequestToWakuThread( + PeerManagementRequest.new( + PeerManagementMsgType.CONNECT_TO, + $peerMultiAddr, + chronos.milliseconds(timeoutMs))) + if connRes.isErr(): + let msg = $connRes.error + onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + return RET_ERR + return RET_OK ### End of exported procs diff --git a/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim index 6af9ca1df3..ffad851726 100644 --- a/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim @@ -7,8 +7,7 @@ import stew/shims/net import ../../../waku/v2/node/waku_node, - ./request, - ./response + ./request type NodeLifecycleMsgType* = enum @@ -25,12 +24,14 @@ proc new*(T: type NodeLifecycleRequest, return NodeLifecycleRequest(operation: op) method process*(self: NodeLifecycleRequest, - node: WakuNode): Future[InterThreadResponse] {.async.} = + node: WakuNode): Future[Result[string, string]] {.async.} = + case self.operation: + of START_NODE: waitFor node.start() of STOP_NODE: waitFor node.stop() - return InterThreadResponse(result: ResultType.OK) + return ok("") diff --git a/library/waku_thread/inter_thread_communication/peer_manager_request.nim b/library/waku_thread/inter_thread_communication/peer_manager_request.nim index 38517a3ed4..e3be6b65e7 100644 --- a/library/waku_thread/inter_thread_communication/peer_manager_request.nim +++ b/library/waku_thread/inter_thread_communication/peer_manager_request.nim @@ -8,8 +8,7 @@ import stew/shims/net import ../../../waku/v2/node/waku_node, - ./request, - ./response + ./request type PeerManagementMsgType* = enum @@ -19,24 +18,24 @@ type PeerManagementRequest* = ref object of InterThreadRequest operation: PeerManagementMsgType peerMultiAddr: string - timeoutMs: cuint + dialTimeout: Duration proc new*(T: type PeerManagementRequest, op: PeerManagementMsgType, peerMultiAddr: string, - timeoutMs: cuint): T = + dialTimeout: Duration): T = return PeerManagementRequest(operation: op, peerMultiAddr: peerMultiAddr, - timeoutMs: timeoutMs) + dialTimeout: dialTimeout) proc connectTo(node: WakuNode, - peerMultiAddr: string): Result[void, string] = - # let sendReqRes = sendRequestToWakuThread("waku_connect") + peerMultiAddr: string, + dialTimeout: Duration): Result[void, string] = let peers = (peerMultiAddr).split(",").mapIt(strip(it)) - # TODO: the timeoutMs is not being used at all! + # TODO: the dialTimeout is not being used at all! let connectFut = node.connectToNodes(peers, source="static") while not connectFut.finished(): poll() @@ -48,12 +47,13 @@ proc connectTo(node: WakuNode, return ok() method process*(self: PeerManagementRequest, - node: WakuNode): Future[InterThreadResponse] {.async.} = + node: WakuNode): Future[Result[string, string]] {.async.} = + case self.operation: + of CONNECT_TO: - let ret = node.connectTo(self.peerMultiAddr) + let ret = node.connectTo(self.peerMultiAddr, self.dialTimeout) if ret.isErr(): - return InterThreadResponse(result: ResultType.OK, - message: ret.error) + return err(ret.error) - return InterThreadResponse(result: ResultType.OK) + return ok("") diff --git a/library/waku_thread/inter_thread_communication/protocols/relay_request.nim b/library/waku_thread/inter_thread_communication/protocols/relay_request.nim index 3bd703c325..c0fd1340d2 100644 --- a/library/waku_thread/inter_thread_communication/protocols/relay_request.nim +++ b/library/waku_thread/inter_thread_communication/protocols/relay_request.nim @@ -7,44 +7,58 @@ import stew/results, stew/shims/net import + ../../../../waku/v2/waku_core/message/message, ../../../../waku/v2/node/waku_node, ../../../../waku/v2/waku_core/topics/pubsub_topic, ../../../../waku/v2/waku_relay/protocol, - ../request, - ../response + ../request type RelayMsgType* = enum SUBSCRIBE UNSUBSCRIBE + PUBLISH type RelayRequest* = ref object of InterThreadRequest operation: RelayMsgType pubsubTopic: PubsubTopic - relayEventCallback: WakuRelayHandler + relayEventCallback: WakuRelayHandler # not used in 'PUBLISH' requests + message: WakuMessage # this field is only used in 'PUBLISH' requests proc new*(T: type RelayRequest, op: RelayMsgType, pubsubTopic: PubsubTopic, - relayEventCallback: WakuRelayHandler): T = + relayEventCallback: WakuRelayHandler = nil, + message = WakuMessage()): T = return RelayRequest(operation: op, pubsubTopic: pubsubTopic, - relayEventCallback: relayEventCallback) + relayEventCallback: relayEventCallback, + message: message) method process*(self: RelayRequest, - node: WakuNode): Future[InterThreadResponse] {.async.} = + node: WakuNode): Future[Result[string, string]] {.async.} = if node.wakuRelay.isNil(): - let msg = "Cannot subscribe or unsubscribe without Waku Relay enabled." - return InterThreadResponse(result: ResultType.ERROR, - message: msg) + return err("Operation not supported without Waku Relay enabled.") + case self.operation: + of SUBSCRIBE: - node.wakuRelay.subscribe(self.pubsubTopic, - self.relayEventCallback) + node.wakuRelay.subscribe(self.pubsubTopic, self.relayEventCallback) + of UNSUBSCRIBE: node.wakuRelay.unsubscribe(self.pubsubTopic) - return InterThreadResponse(result: ResultType.OK) + of PUBLISH: + let numPeers = await node.wakuRelay.publish(self.pubsubTopic, + self.message) + if numPeers == 0: + return err("Message not sent because no peers found.") + + elif numPeers > 0: + # TODO: pending to return a valid message Id + return ok("hard-coded-message-id") + + return ok("") diff --git a/library/waku_thread/inter_thread_communication/request.nim b/library/waku_thread/inter_thread_communication/request.nim index 617555ad7c..4f26e8006c 100644 --- a/library/waku_thread/inter_thread_communication/request.nim +++ b/library/waku_thread/inter_thread_communication/request.nim @@ -3,11 +3,11 @@ # by the Waku Node thread. import - std/json + std/json, + stew/results import chronos import - ./response, ../../../waku/v2/node/waku_node, ../waku_thread @@ -15,7 +15,7 @@ type InterThreadRequest* = ref object of RootObj method process*(self: InterThreadRequest, node: WakuNode): - Future[InterThreadResponse] {.base.} = discard + Future[Result[string, string]] {.base.} = discard proc `$`*(self: InterThreadRequest): string = return $( %* self ) diff --git a/library/waku_thread/inter_thread_communication/response.nim b/library/waku_thread/inter_thread_communication/response.nim deleted file mode 100644 index 7a7a284134..0000000000 --- a/library/waku_thread/inter_thread_communication/response.nim +++ /dev/null @@ -1,16 +0,0 @@ - -import - std/json - -type - ResultType* = enum - OK - ERROR - -type - InterThreadResponse* = object - result*: ResultType - message*: string # only used to give feedback when an error occurs - -proc `$`*(self: InterThreadResponse): string = - return $( %* self ) diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index edf25c73b2..f23e9e5b2c 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -26,14 +26,13 @@ import ../events/[json_error_event,json_message_event,json_base_event], ../alloc, ./config, - ./inter_thread_communication/request, - ./inter_thread_communication/response + ./inter_thread_communication/request type Context* = object thread: Thread[(ptr Context)] reqChannel: Channel[InterThreadRequest] - respChannel: Channel[InterThreadResponse] + respChannel: Channel[Result[string, string]] node: WakuNode var ctx {.threadvar.}: ptr Context @@ -178,23 +177,14 @@ proc stopWakuNodeThread*() = freeShared(ctx) -proc sendRequestToWakuThread*(req: InterThreadRequest, - timeoutMs: int = 300_000): - Result[InterThreadResponse, string] = +proc sendRequestToWakuThread*(req: InterThreadRequest): Result[string, string] = ctx.reqChannel.send(req) - var count = 0 var resp = ctx.respChannel.tryRecv() while resp[0] == false: resp = ctx.respChannel.tryRecv() os.sleep(1) - count.inc() - if count > timeoutMs: - let msg = "Timeout expired. request: " & $req & - ". timeout in ms: " & $timeoutMs - return err(msg) - - return ok(resp[1]) + return resp[1] From f9e67e64b9d0f0dcc3996d67a1ed2a60c1a9ea05 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 28 Jul 2023 15:52:43 +0200 Subject: [PATCH 4/4] waku_thread.nim: removing useless comment --- library/waku_thread/waku_thread.nim | 2 -- 1 file changed, 2 deletions(-) diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index f23e9e5b2c..8574dbd99b 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -55,8 +55,6 @@ proc waku_init() = nimGC_setStackBottom(locals) proc createNode(configJson: cstring): Result[WakuNode, string] = - ## Creates a new WakuNode and assigns it to the node parameter - var privateKey: PrivateKey var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"), Port(60000'u16)).value