diff --git a/examples/cbindings/base64.c b/examples/cbindings/base64.c new file mode 100644 index 0000000000..0f9acdf942 --- /dev/null +++ b/examples/cbindings/base64.c @@ -0,0 +1,58 @@ + +#include "base64.h" + +// Base64 encoding +// source: https://nachtimwald.com/2017/11/18/base64-encode-and-decode-in-c/ +size_t b64_encoded_size(size_t inlen) +{ + size_t ret; + + ret = inlen; + if (inlen % 3 != 0) + ret += 3 - (inlen % 3); + ret /= 3; + ret *= 4; + + return ret; +} + +const char b64chars[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + +char *b64_encode(const unsigned char *in, size_t len) +{ + char *out; + size_t elen; + size_t i; + size_t j; + size_t v; + + if (in == NULL || len == 0) + return NULL; + + elen = b64_encoded_size(len); + out = malloc(elen+1); + out[elen] = '\0'; + + for (i=0, j=0; i> 18) & 0x3F]; + out[j+1] = b64chars[(v >> 12) & 0x3F]; + if (i+1 < len) { + out[j+2] = b64chars[(v >> 6) & 0x3F]; + } else { + out[j+2] = '='; + } + if (i+2 < len) { + out[j+3] = b64chars[v & 0x3F]; + } else { + out[j+3] = '='; + } + } + + return out; +} + +// End of Base64 encoding diff --git a/examples/cbindings/base64.h b/examples/cbindings/base64.h new file mode 100644 index 0000000000..37ca50f914 --- /dev/null +++ b/examples/cbindings/base64.h @@ -0,0 +1,11 @@ + +#ifndef _BASE64_H_ +#define _BASE64_H_ + +#include + +size_t b64_encoded_size(size_t inlen); + +char *b64_encode(const unsigned char *in, size_t len); + +#endif diff --git a/examples/cbindings/waku_example.c b/examples/cbindings/waku_example.c index e2dd834bda..3ffcd42535 100644 --- a/examples/cbindings/waku_example.c +++ b/examples/cbindings/waku_example.c @@ -79,7 +79,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { static struct argp argp = { options, parse_opt, args_doc, doc, 0, 0, 0 }; char* contentTopic = NULL; -void handle_content_topic(char* msg, size_t len) { +void handle_content_topic(const char* msg, size_t len) { if (contentTopic != NULL) { free(contentTopic); } @@ -89,7 +89,7 @@ void handle_content_topic(char* msg, size_t len) { } char* publishResponse = NULL; -void handle_publish_ok(char* msg, size_t len) { +void handle_publish_ok(const char* msg, size_t len) { printf("Publish Ok: %s %lu\n", msg, len); if (publishResponse != NULL) { @@ -100,14 +100,14 @@ void handle_publish_ok(char* msg, size_t len) { strcpy(publishResponse, msg); } -void handle_error(char* msg, size_t len) { +void handle_error(const char* msg, size_t len) { printf("Error: %s\n", msg); exit(1); } #define MAX_MSG_SIZE 65535 -void publish_message(char* pubsubTopic, char* msg) { +void publish_message(char* pubsubTopic, const char* msg) { char jsonWakuMsg[MAX_MSG_SIZE]; char *msgPayload = b64_encode(msg, strlen(msg)); @@ -138,15 +138,15 @@ void show_help_and_exit() { exit(1); } -void event_handler(char* msg, size_t len) { +void event_handler(const char* msg, size_t len) { printf("Receiving message %s\n", msg); } -void print_default_pubsub_topic(char* msg, size_t len) { +void print_default_pubsub_topic(const char* msg, size_t len) { printf("Default pubsub topic: %s\n", msg); } -void print_waku_version(char* msg, size_t len) { +void print_waku_version(const char* msg, size_t len) { printf("Git Version: %s\n", msg); } @@ -243,8 +243,6 @@ void handle_user_input() { int main(int argc, char** argv) { - waku_init_lib(); - struct ConfigNode cfgNode; // default values snprintf(cfgNode.host, 128, "0.0.0.0"); @@ -272,12 +270,13 @@ int main(int argc, char** argv) { WAKU_CALL( waku_default_pubsub_topic(print_default_pubsub_topic) ); WAKU_CALL( waku_version(print_waku_version) ); + printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port); printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO"); WAKU_CALL( waku_new(jsonConfig, handle_error) ); - waku_set_relay_callback(event_handler); + waku_set_event_callback(event_handler); waku_start(); printf("Establishing connection with: %s\n", cfgNode.peers); @@ -291,6 +290,6 @@ int main(int argc, char** argv) { show_main_menu(); while(1) { handle_user_input(); - waku_poll(); + // waku_poll(); } } diff --git a/library/alloc.nim b/library/alloc.nim new file mode 100644 index 0000000000..44d5e67eae --- /dev/null +++ b/library/alloc.nim @@ -0,0 +1,7 @@ + +proc alloc*(str: cstring): cstring = + # Byte allocation from the given address. + # There should be the corresponding manual deallocation with deallocShared ! + let ret = cast[cstring](allocShared(len(str) + 1)) + copyMem(ret, str, len(str) + 1) + return ret diff --git a/library/libwaku.h b/library/libwaku.h index 982d0b4f88..ed760b91dd 100644 --- a/library/libwaku.h +++ b/library/libwaku.h @@ -11,11 +11,7 @@ #define RET_ERR 1 #define RET_MISSING_CALLBACK 2 -typedef void (*WakuCallBack) (char* msg, size_t len_0); - -// This should only be called once. -// It initializes the nim runtime and GC. -void waku_init_lib(void); +typedef void (*WakuCallBack) (const char* msg, size_t len_0); // Creates a new instance of the waku node. // Sets up the waku node from the given configuration. @@ -27,7 +23,7 @@ void waku_stop(void); int waku_version(WakuCallBack onOkCb); -void waku_set_relay_callback(WakuCallBack callback); +void waku_set_event_callback(WakuCallBack callback); int waku_content_topic(const char* appName, unsigned int appVersion, @@ -56,6 +52,4 @@ int waku_connect(const char* peerMultiAddr, unsigned int timeoutMs, WakuCallBack onErrCb); -void waku_poll(void); - #endif /* __libwaku__ */ diff --git a/library/libwaku.nim b/library/libwaku.nim index cc5d80691e..e8ca65a9f1 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -1,26 +1,25 @@ +{.pragma: exported, exportc, cdecl, raises: [].} +{.pragma: callback, cdecl, raises: [], gcsafe.} +{.passc: "-fPIC".} + import std/[json,sequtils,times,strformat,options,atomics,strutils], - strutils, - os + strutils import chronicles, - chronos, - stew/shims/net + chronos import - ../../waku/common/enr/builder, - ../../waku/v2/waku_enr/capabilities, - ../../waku/v2/waku_enr/multiaddr, - ../../waku/v2/waku_enr/sharding, ../../waku/v2/waku_core/message/message, - ../../waku/v2/waku_core/topics/pubsub_topic, - ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/waku_node, - ../../waku/v2/node/builder, - ../../waku/v2/node/config, - ../../waku/v2/waku_relay/protocol, - ./events/[json_error_event,json_message_event,json_base_event], - ./config + ../../waku/v2/waku_core/topics/pubsub_topic, + ../../../waku/v2/waku_relay/protocol, + ./events/json_message_event, + ./waku_thread/waku_thread as waku_thread_module, + ./waku_thread/inter_thread_communication/node_lifecycle_request, + ./waku_thread/inter_thread_communication/peer_manager_request, + ./waku_thread/inter_thread_communication/protocols/relay_request, + ./alloc ################################################################################ ### Wrapper around the waku node @@ -33,45 +32,30 @@ const RET_OK: cint = 0 const RET_ERR: cint = 1 const RET_MISSING_CALLBACK: cint = 2 +type + WakuCallBack* = proc(msg: ptr cchar, len: csize_t) {.cdecl, gcsafe.} + ### End of exported types ################################################################################ ################################################################################ ### Not-exported components -proc alloc(str: cstring): cstring = - # Byte allocation from the given address. - # There should be the corresponding manual deallocation with deallocShared ! - let ret = cast[cstring](allocShared(len(str) + 1)) - copyMem(ret, str, len(str) + 1) - return ret - -type - WakuCallBack = proc(msg: ptr cchar, len: csize_t) {.cdecl, gcsafe.} - # May keep a reference to a callback defined externally -var extRelayEventCallback: WakuCallBack = nil +var extEventCallback*: WakuCallBack = nil -proc relayEventCallback(pubsubTopic: string, - msg: WakuMessage): - Future[void] {.gcsafe, raises: [Defect].} = +proc relayEventCallback(pubsubTopic: PubsubTopic, + msg: WakuMessage): Future[void] {.async, gcsafe.} = # Callback that hadles the Waku Relay events. i.e. messages or errors. - if not isNil(extRelayEventCallback): + if not isNil(extEventCallback): try: let event = $JsonMessageEvent.new(pubsubTopic, msg) - extRelayEventCallback(unsafeAddr event[0], cast[csize_t](len(event))) + extEventCallback(unsafeAddr event[0], cast[csize_t](len(event))) except Exception,CatchableError: error "Exception when calling 'eventCallBack': " & getCurrentExceptionMsg() else: - error "extRelayEventCallback is nil" - - var retFut = newFuture[void]() - retFut.complete() - return retFut - -# WakuNode instance -var node {.threadvar.}: WakuNode + error "extEventCallback is nil" ### End of not-exported components ################################################################################ @@ -79,21 +63,6 @@ var node {.threadvar.}: WakuNode ################################################################################ ### Exported procs -# Every Nim library must have this function called - the name is derived from -# the `--nimMainPrefix` command line option -proc NimMain() {.importc.} - -var initialized: Atomic[bool] - -proc waku_init_lib() {.dynlib, exportc, cdecl.} = - if not initialized.exchange(true): - NimMain() # Every Nim library needs to call `NimMain` once exactly - when declared(setupForeignThreadGc): setupForeignThreadGc() - when declared(nimGC_setStackBottom): - var locals {.volatile, noinit.}: pointer - locals = addr(locals) - nimGC_setStackBottom(locals) - proc waku_new(configJson: cstring, onErrCb: WakuCallback): cint {.dynlib, exportc, cdecl.} = @@ -103,78 +72,12 @@ proc waku_new(configJson: cstring, if isNil(onErrCb): return RET_MISSING_CALLBACK - var privateKey: PrivateKey - var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"), - Port(60000'u16)).value - var relay: bool - var topics = @[""] - var jsonResp: JsonEvent - - let cj = configJson.alloc() - - if not parseConfig($cj, - privateKey, - netConfig, - relay, - topics, - jsonResp): - deallocShared(cj) - let resp = $jsonResp - onErrCb(unsafeAddr resp[0], cast[csize_t](len(resp))) - return RET_ERR - - deallocShared(cj) - - var enrBuilder = EnrBuilder.init(privateKey) - - enrBuilder.withIpAddressAndPorts( - netConfig.enrIp, - netConfig.enrPort, - netConfig.discv5UdpPort - ) - - if netConfig.wakuFlags.isSome(): - enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get()) - - enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs) - - let addShardedTopics = enrBuilder.withShardedTopics(topics) - if addShardedTopics.isErr(): - let resp = $addShardedTopics.error - onErrCb(unsafeAddr resp[0], cast[csize_t](len(resp))) - return RET_ERR - - let recordRes = enrBuilder.build() - let record = - if recordRes.isErr(): - let resp = $recordRes.error - onErrCb(unsafeAddr resp[0], cast[csize_t](len(resp))) - return RET_ERR - else: recordRes.get() - - var builder = WakuNodeBuilder.init() - builder.withRng(crypto.newRng()) - builder.withNodeKey(privateKey) - builder.withRecord(record) - builder.withNetworkConfiguration(netConfig) - builder.withSwitchConfiguration( - maxConnections = some(50.int) - ) - - let wakuNodeRes = builder.build() - if wakuNodeRes.isErr(): - let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error - let jsonErrEvent = $JsonErrorEvent.new(errorMsg) - - onErrCb(unsafeAddr jsonErrEvent[0], cast[csize_t](len(jsonErrEvent))) + let createThRes = waku_thread_module.createWakuThread(configJson) + if createThRes.isErr(): + let msg = "Error in createWakuThread: " & $createThRes.error + onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR - node = wakuNodeRes.get() - - if relay: - waitFor node.mountRelay() - node.peerManager.start() - return RET_OK proc waku_version(onOkCb: WakuCallBack): cint {.dynlib, exportc.} = @@ -186,8 +89,8 @@ proc waku_version(onOkCb: WakuCallBack): cint {.dynlib, exportc.} = return RET_OK -proc waku_set_relay_callback(callback: WakuCallBack) {.dynlib, exportc.} = - extRelayEventCallback = callback +proc waku_set_event_callback(callback: WakuCallBack) {.dynlib, exportc.} = + extEventCallback = callback proc waku_content_topic(appName: cstring, appVersion: cuint, @@ -287,123 +190,84 @@ proc waku_relay_publish(pubSubTopic: cstring, else: $pst - if node.wakuRelay.isNil(): - let msg = "Can't publish. WakuRelay is not enabled." - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_ERR - - let pubMsgFut = node.wakuRelay.publish(targetPubSubTopic, wakuMessage) - - # With the next loop we convert an asynchronous call into a synchronous one - for i in 0 .. timeoutMs: - if pubMsgFut.finished(): - break - sleep(1) - - if pubMsgFut.finished(): - let numPeers = pubMsgFut.read() - if numPeers == 0: - let msg = "Message not sent because no peers found." - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_ERR - elif numPeers > 0: - # TODO: pending to return a valid message Id (response when all is correct) - let msg = "hard-coded-message-id" - onOkCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_OK + let sendReqRes = waku_thread_module.sendRequestToWakuThread( + RelayRequest.new(RelayMsgType.PUBLISH, + PubsubTopic($pst), + WakuRelayHandler(relayEventCallback), + wakuMessage)) + deallocShared(pst) - else: - let msg = "Timeout expired" + if sendReqRes.isErr(): + let msg = $sendReqRes.error onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR + return RET_OK + proc waku_start() {.dynlib, exportc.} = - waitFor node.start() + discard waku_thread_module.sendRequestToWakuThread( + NodeLifecycleRequest.new( + NodeLifecycleMsgType.START_NODE)) proc waku_stop() {.dynlib, exportc.} = - waitFor node.stop() + discard waku_thread_module.sendRequestToWakuThread( + NodeLifecycleRequest.new( + NodeLifecycleMsgType.STOP_NODE)) proc waku_relay_subscribe( pubSubTopic: cstring, onErrCb: WakuCallBack): cint {.dynlib, exportc.} = - # @params - # topic: Pubsub topic to subscribe to. If empty, it subscribes to the default pubsub topic. - if isNil(onErrCb): - return RET_MISSING_CALLBACK - if isNil(extRelayEventCallback): - let msg = $"""Cannot subscribe without a callback. -# Kindly set it with the 'waku_set_relay_callback' function""" - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_MISSING_CALLBACK + let pst = pubSubTopic.alloc() + let sendReqRes = waku_thread_module.sendRequestToWakuThread( + RelayRequest.new(RelayMsgType.SUBSCRIBE, + PubsubTopic($pst), + WakuRelayHandler(relayEventCallback))) + deallocShared(pst) - if node.wakuRelay.isNil(): - let msg = $"Cannot subscribe without Waku Relay enabled." + if sendReqRes.isErr(): + let msg = $sendReqRes.error onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR - let pst = pubSubTopic.alloc() - node.wakuRelay.subscribe(PubsubTopic($pst), - WakuRelayHandler(relayEventCallback)) - deallocShared(pst) - return RET_OK proc waku_relay_unsubscribe( pubSubTopic: cstring, onErrCb: WakuCallBack): cint {.dynlib, exportc.} = - # @params - # topic: Pubsub topic to subscribe to. If empty, it unsubscribes to the default pubsub topic. - if isNil(onErrCb): - return RET_MISSING_CALLBACK - if isNil(extRelayEventCallback): - let msg = """Cannot unsubscribe without a callback. -# Kindly set it with the 'waku_set_relay_callback' function""" - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_MISSING_CALLBACK + let pst = pubSubTopic.alloc() + let sendReqRes = waku_thread_module.sendRequestToWakuThread( + RelayRequest.new(RelayMsgType.UNSUBSCRIBE, + PubsubTopic($pst), + WakuRelayHandler(relayEventCallback))) + deallocShared(pst) - if node.wakuRelay.isNil(): - let msg = "Cannot unsubscribe without Waku Relay enabled." + if sendReqRes.isErr(): + let msg = $sendReqRes.error onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR - let pst = pubSubTopic.alloc() - node.wakuRelay.unsubscribe(PubsubTopic($pst)) - deallocShared(pst) - return RET_OK proc waku_connect(peerMultiAddr: cstring, timeoutMs: cuint, onErrCb: WakuCallBack): cint {.dynlib, exportc.} = - # peerMultiAddr: comma-separated list of fully-qualified multiaddresses. - # var ret = newString(len + 1) - # if len > 0: - # copyMem(addr ret[0], str, len + 1) - - let address = peerMultiAddr.alloc() - let peers = ($address).split(",").mapIt(strip(it)) - # TODO: the timeoutMs is not being used at all! - let connectFut = node.connectToNodes(peers, source="static") - while not connectFut.finished(): - poll() - - deallocShared(address) - - if not connectFut.completed(): - let msg = "Timeout expired." + let connRes = waku_thread_module.sendRequestToWakuThread( + PeerManagementRequest.new( + PeerManagementMsgType.CONNECT_TO, + $peerMultiAddr, + chronos.milliseconds(timeoutMs))) + if connRes.isErr(): + let msg = $connRes.error onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR return RET_OK -proc waku_poll() {.dynlib, exportc, gcsafe.} = - poll() - ### End of exported procs ################################################################################ diff --git a/library/config.nim b/library/waku_thread/config.nim similarity index 99% rename from library/config.nim rename to library/waku_thread/config.nim index 13e18212de..fd2186282e 100644 --- a/library/config.nim +++ b/library/waku_thread/config.nim @@ -10,7 +10,7 @@ import ../../waku/common/utils/nat, ../../waku/v2/node/waku_node, ../../waku/v2/node/config, - ./events/[json_error_event,json_base_event] + ../events/[json_error_event,json_base_event] proc parsePrivateKey(jsonNode: JsonNode, privateKey: var PrivateKey, diff --git a/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim new file mode 100644 index 0000000000..ffad851726 --- /dev/null +++ b/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim @@ -0,0 +1,37 @@ + +import + std/options +import + chronos, + stew/results, + stew/shims/net +import + ../../../waku/v2/node/waku_node, + ./request + +type + NodeLifecycleMsgType* = enum + START_NODE + STOP_NODE + +type + NodeLifecycleRequest* = ref object of InterThreadRequest + operation: NodeLifecycleMsgType + +proc new*(T: type NodeLifecycleRequest, + op: NodeLifecycleMsgType): T = + + return NodeLifecycleRequest(operation: op) + +method process*(self: NodeLifecycleRequest, + node: WakuNode): Future[Result[string, string]] {.async.} = + + case self.operation: + + of START_NODE: + waitFor node.start() + + of STOP_NODE: + waitFor node.stop() + + return ok("") diff --git a/library/waku_thread/inter_thread_communication/peer_manager_request.nim b/library/waku_thread/inter_thread_communication/peer_manager_request.nim new file mode 100644 index 0000000000..e3be6b65e7 --- /dev/null +++ b/library/waku_thread/inter_thread_communication/peer_manager_request.nim @@ -0,0 +1,59 @@ + +import + std/[options,sequtils,strutils] +import + chronicles, + chronos, + stew/results, + stew/shims/net +import + ../../../waku/v2/node/waku_node, + ./request + +type + PeerManagementMsgType* = enum + CONNECT_TO + +type + PeerManagementRequest* = ref object of InterThreadRequest + operation: PeerManagementMsgType + peerMultiAddr: string + dialTimeout: Duration + +proc new*(T: type PeerManagementRequest, + op: PeerManagementMsgType, + peerMultiAddr: string, + dialTimeout: Duration): T = + + return PeerManagementRequest(operation: op, + peerMultiAddr: peerMultiAddr, + dialTimeout: dialTimeout) + +proc connectTo(node: WakuNode, + peerMultiAddr: string, + dialTimeout: Duration): Result[void, string] = + + let peers = (peerMultiAddr).split(",").mapIt(strip(it)) + + # TODO: the dialTimeout is not being used at all! + let connectFut = node.connectToNodes(peers, source="static") + while not connectFut.finished(): + poll() + + if not connectFut.completed(): + let msg = "Timeout expired." + return err(msg) + + return ok() + +method process*(self: PeerManagementRequest, + node: WakuNode): Future[Result[string, string]] {.async.} = + + case self.operation: + + of CONNECT_TO: + let ret = node.connectTo(self.peerMultiAddr, self.dialTimeout) + if ret.isErr(): + return err(ret.error) + + return ok("") diff --git a/library/waku_thread/inter_thread_communication/protocols/relay_request.nim b/library/waku_thread/inter_thread_communication/protocols/relay_request.nim new file mode 100644 index 0000000000..c0fd1340d2 --- /dev/null +++ b/library/waku_thread/inter_thread_communication/protocols/relay_request.nim @@ -0,0 +1,64 @@ + +import + std/[options,sequtils,strutils] +import + chronicles, + chronos, + stew/results, + stew/shims/net +import + ../../../../waku/v2/waku_core/message/message, + ../../../../waku/v2/node/waku_node, + ../../../../waku/v2/waku_core/topics/pubsub_topic, + ../../../../waku/v2/waku_relay/protocol, + ../request + +type + RelayMsgType* = enum + SUBSCRIBE + UNSUBSCRIBE + PUBLISH + +type + RelayRequest* = ref object of InterThreadRequest + operation: RelayMsgType + pubsubTopic: PubsubTopic + relayEventCallback: WakuRelayHandler # not used in 'PUBLISH' requests + message: WakuMessage # this field is only used in 'PUBLISH' requests + +proc new*(T: type RelayRequest, + op: RelayMsgType, + pubsubTopic: PubsubTopic, + relayEventCallback: WakuRelayHandler = nil, + message = WakuMessage()): T = + + return RelayRequest(operation: op, + pubsubTopic: pubsubTopic, + relayEventCallback: relayEventCallback, + message: message) + +method process*(self: RelayRequest, + node: WakuNode): Future[Result[string, string]] {.async.} = + + if node.wakuRelay.isNil(): + return err("Operation not supported without Waku Relay enabled.") + + case self.operation: + + of SUBSCRIBE: + node.wakuRelay.subscribe(self.pubsubTopic, self.relayEventCallback) + + of UNSUBSCRIBE: + node.wakuRelay.unsubscribe(self.pubsubTopic) + + of PUBLISH: + let numPeers = await node.wakuRelay.publish(self.pubsubTopic, + self.message) + if numPeers == 0: + return err("Message not sent because no peers found.") + + elif numPeers > 0: + # TODO: pending to return a valid message Id + return ok("hard-coded-message-id") + + return ok("") diff --git a/library/waku_thread/inter_thread_communication/request.nim b/library/waku_thread/inter_thread_communication/request.nim new file mode 100644 index 0000000000..4f26e8006c --- /dev/null +++ b/library/waku_thread/inter_thread_communication/request.nim @@ -0,0 +1,21 @@ + +# This file contains the base message request type that will be handled +# by the Waku Node thread. + +import + std/json, + stew/results +import + chronos +import + ../../../waku/v2/node/waku_node, + ../waku_thread + +type + InterThreadRequest* = ref object of RootObj + +method process*(self: InterThreadRequest, node: WakuNode): + Future[Result[string, string]] {.base.} = discard + +proc `$`*(self: InterThreadRequest): string = + return $( %* self ) diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim new file mode 100644 index 0000000000..8574dbd99b --- /dev/null +++ b/library/waku_thread/waku_thread.nim @@ -0,0 +1,188 @@ + +{.pragma: exported, exportc, cdecl, raises: [].} +{.pragma: callback, cdecl, raises: [], gcsafe.} +{.passc: "-fPIC".} + +import + std/[json,sequtils,times,strformat,options,atomics,strutils,os] +import + chronicles, + chronos, + stew/results, + stew/shims/net +import + ../../../waku/common/enr/builder, + ../../../waku/v2/waku_enr/capabilities, + ../../../waku/v2/waku_enr/multiaddr, + ../../../waku/v2/waku_enr/sharding, + ../../../waku/v2/waku_core/message/message, + ../../../waku/v2/waku_core/topics/pubsub_topic, + ../../../waku/v2/node/peer_manager/peer_manager, + ../../../waku/v2/waku_core, + ../../../waku/v2/node/waku_node, + ../../../waku/v2/node/builder, + ../../../waku/v2/node/config, + ../../../waku/v2/waku_relay/protocol, + ../events/[json_error_event,json_message_event,json_base_event], + ../alloc, + ./config, + ./inter_thread_communication/request + +type + Context* = object + thread: Thread[(ptr Context)] + reqChannel: Channel[InterThreadRequest] + respChannel: Channel[Result[string, string]] + node: WakuNode + +var ctx {.threadvar.}: ptr Context + +# To control when the thread is running +var running: Atomic[bool] + +# Every Nim library must have this function called - the name is derived from +# the `--nimMainPrefix` command line option +proc NimMain() {.importc.} +var initialized: Atomic[bool] + +proc waku_init() = + if not initialized.exchange(true): + NimMain() # Every Nim library needs to call `NimMain` once exactly + when declared(setupForeignThreadGc): setupForeignThreadGc() + when declared(nimGC_setStackBottom): + var locals {.volatile, noinit.}: pointer + locals = addr(locals) + nimGC_setStackBottom(locals) + +proc createNode(configJson: cstring): Result[WakuNode, string] = + var privateKey: PrivateKey + var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"), + Port(60000'u16)).value + var relay: bool + var topics = @[""] + var jsonResp: JsonEvent + + let cj = configJson.alloc() + + if not parseConfig($cj, + privateKey, + netConfig, + relay, + topics, + jsonResp): + deallocShared(cj) + return err($jsonResp) + + deallocShared(cj) + + var enrBuilder = EnrBuilder.init(privateKey) + + enrBuilder.withIpAddressAndPorts( + netConfig.enrIp, + netConfig.enrPort, + netConfig.discv5UdpPort + ) + + if netConfig.wakuFlags.isSome(): + enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get()) + + enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs) + + let addShardedTopics = enrBuilder.withShardedTopics(topics) + if addShardedTopics.isErr(): + let msg = "Error setting shared topics: " & $addShardedTopics.error + return err($JsonErrorEvent.new(msg)) + + let recordRes = enrBuilder.build() + let record = + if recordRes.isErr(): + let msg = "Error building enr record: " & $recordRes.error + return err($JsonErrorEvent.new(msg)) + + else: recordRes.get() + + var builder = WakuNodeBuilder.init() + builder.withRng(crypto.newRng()) + builder.withNodeKey(privateKey) + builder.withRecord(record) + builder.withNetworkConfiguration(netConfig) + builder.withSwitchConfiguration( + maxConnections = some(50.int) + ) + + let wakuNodeRes = builder.build() + if wakuNodeRes.isErr(): + let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error + return err($JsonErrorEvent.new(errorMsg)) + + var newNode = wakuNodeRes.get() + + if relay: + waitFor newNode.mountRelay() + newNode.peerManager.start() + + return ok(newNode) + +proc run(ctx: ptr Context) {.thread.} = + ## This is the worker thread body. This thread runs the Waku node + ## and attends library user requests (stop, connect_to, etc.) + + while running.load == true: + ## Trying to get a request from the libwaku main thread + let req = ctx.reqChannel.tryRecv() + if req[0] == true: + let response = waitFor req[1].process(ctx.node) + ctx.respChannel.send( response ) + + poll() + + tearDownForeignThreadGc() + +proc createWakuThread*(configJson: cstring): Result[void, string] = + ## This proc is called from the main thread and it creates + ## the Waku working thread. + + waku_init() + + ctx = createShared(Context, 1) + ctx.reqChannel.open() + ctx.respChannel.open() + + let newNodeRes = createNode(configJson) + if newNodeRes.isErr(): + return err(newNodeRes.error) + + ctx.node = newNodeRes.get() + + running.store(true) + + try: + createThread(ctx.thread, run, ctx) + except ResourceExhaustedError: + # and freeShared for typed allocations! + freeShared(ctx) + + return err("failed to create a thread: " & getCurrentExceptionMsg()) + + return ok() + +proc stopWakuNodeThread*() = + running.store(false) + joinThread(ctx.thread) + + ctx.reqChannel.close() + ctx.respChannel.close() + + freeShared(ctx) + +proc sendRequestToWakuThread*(req: InterThreadRequest): Result[string, string] = + + ctx.reqChannel.send(req) + + var resp = ctx.respChannel.tryRecv() + while resp[0] == false: + resp = ctx.respChannel.tryRecv() + os.sleep(1) + + return resp[1] +