Skip to content

Commit

Permalink
chore: delivery monitor for store v3 reliability protocol (#2977)
Browse files Browse the repository at this point in the history
- Use of observer observable pattern to inform delivery_monitor about subscription state
- send_monitor becomes a publish observer of lightpush and relay
- deliver monitor add more protection against possible crash and better logs
- creating a separate proc in store client for delivery monitor
  • Loading branch information
Ivansete-status authored Aug 27, 2024
1 parent c3cb06a commit 0f68274
Show file tree
Hide file tree
Showing 17 changed files with 679 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS NotDeliveredMessages(
messageHash BLOB PRIMARY KEY,
timestamp INTEGER NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
meta BLOB,
version INTEGER NOT NULL
);
9 changes: 9 additions & 0 deletions waku/factory/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,15 @@ type WakuNodeConf* = object
name: "lightpushnode"
.}: string

## Reliability config
reliabilityEnabled* {.
desc:
"""Adds an extra effort in the delivery/reception of messages by leveraging store-v3 requests.
with the drawback of consuming some more bandwitdh.""",
defaultValue: false,
name: "reliability"
.}: bool

## REST HTTP config
rest* {.
desc: "Enable Waku REST HTTP server: true|false", defaultValue: true, name: "rest"
Expand Down
25 changes: 24 additions & 1 deletion waku/factory/waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import
../waku_node,
../node/peer_manager,
../node/health_monitor,
../node/delivery_monitor/delivery_monitor,
../waku_api/message_cache,
../waku_api/rest/server,
../waku_archive,
Expand Down Expand Up @@ -51,6 +52,8 @@ type Waku* = object

node*: WakuNode

deliveryMonitor: DeliveryMonitor

restServer*: WakuRestServerRef
metricsServer*: MetricsHttpServerRef

Expand Down Expand Up @@ -147,13 +150,29 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] =
error "Failed setting up node", error = nodeRes.error
return err("Failed setting up node: " & nodeRes.error)

let node = nodeRes.get()

var deliveryMonitor: DeliveryMonitor
if conf.reliabilityEnabled:
if conf.storenode == "":
return err("A storenode should be set when reliability mode is on")

let deliveryMonitorRes = DeliveryMonitor.new(
node.wakuStoreClient, node.wakuRelay, node.wakuLightpushClient,
node.wakuFilterClient,
)
if deliveryMonitorRes.isErr():
return err("could not create delivery monitor: " & $deliveryMonitorRes.error)
deliveryMonitor = deliveryMonitorRes.get()

var waku = Waku(
version: git_version,
conf: confCopy,
rng: rng,
key: confCopy.nodekey.get(),
node: nodeRes.get(),
node: node,
dynamicBootstrapNodes: dynamicBootstrapNodesRes.get(),
deliveryMonitor: deliveryMonitor,
)

ok(waku)
Expand Down Expand Up @@ -237,6 +256,10 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
(await waku.wakuDiscV5.start()).isOkOr:
return err("failed to start waku discovery v5: " & $error)

## Reliability
if not waku[].deliveryMonitor.isNil():
waku[].deliveryMonitor.startDeliveryMonitor()

return ok()

# Waku shutdown
Expand Down
17 changes: 17 additions & 0 deletions waku/node/delivery_monitor/delivery_callback.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import ../../waku_core

type DeliveryDirection* {.pure.} = enum
PUBLISHING
RECEIVING

type DeliverySuccess* {.pure.} = enum
SUCCESSFUL
UNSUCCESSFUL

type DeliveryFeedbackCallback* = proc(
success: DeliverySuccess,
dir: DeliveryDirection,
comment: string,
msgHash: WakuMessageHash,
msg: WakuMessage,
) {.gcsafe, raises: [].}
43 changes: 43 additions & 0 deletions waku/node/delivery_monitor/delivery_monitor.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
## This module helps to ensure the correct transmission and reception of messages

import results
import chronos
import
./recv_monitor,
./send_monitor,
./delivery_callback,
../../waku_core,
../../waku_store/client,
../../waku_relay/protocol,
../../waku_lightpush/client,
../../waku_filter_v2/client

type DeliveryMonitor* = ref object
sendMonitor: SendMonitor
recvMonitor: RecvMonitor

proc new*(
T: type DeliveryMonitor,
storeClient: WakuStoreClient,
wakuRelay: protocol.WakuRelay,
wakuLightpushClient: WakuLightPushClient,
wakuFilterClient: WakuFilterClient,
): Result[T, string] =
## storeClient is needed to give store visitility to DeliveryMonitor
## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendMonitor to re-publish
let sendMonitor = ?SendMonitor.new(storeClient, wakuRelay, wakuLightpushClient)
let recvMonitor = RecvMonitor.new(storeClient, wakuFilterClient)
return ok(DeliveryMonitor(sendMonitor: sendMonitor, recvMonitor: recvMonitor))

proc startDeliveryMonitor*(self: DeliveryMonitor) =
self.sendMonitor.startSendMonitor()
self.recvMonitor.startRecvMonitor()

proc stopDeliveryMonitor*(self: DeliveryMonitor) {.async.} =
self.sendMonitor.stopSendMonitor()
await self.recvMonitor.stopRecvMonitor()

proc setDeliveryCallback*(self: DeliveryMonitor, deliveryCb: DeliveryFeedbackCallback) =
## The deliveryCb is a proc defined by the api client so that it can get delivery feedback
self.sendMonitor.setDeliveryCallback(deliveryCb)
self.recvMonitor.setDeliveryCallback(deliveryCb)
26 changes: 26 additions & 0 deletions waku/node/delivery_monitor/not_delivered_storage/migrations.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{.push raises: [].}

import std/[tables, strutils, os], results, chronicles
import ../../../common/databases/db_sqlite, ../../../common/databases/common

logScope:
topics = "waku node delivery_monitor"

const TargetSchemaVersion* = 1
# increase this when there is an update in the database schema

template projectRoot(): string =
currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".."

const PeerStoreMigrationPath: string = projectRoot / "migrations" / "sent_msgs"

proc migrate*(db: SqliteDatabase): DatabaseResult[void] =
debug "starting peer store's sqlite database migration for sent messages"

let migrationRes =
migrate(db, TargetSchemaVersion, migrationsScriptsDir = PeerStoreMigrationPath)
if migrationRes.isErr():
return err("failed to execute migration scripts: " & migrationRes.error)

debug "finished peer store's sqlite database migration for sent messages"
ok()
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
## This module is aimed to keep track of the sent/published messages that are considered
## not being properly delivered.
##
## The archiving of such messages will happen in a local sqlite database.
##
## In the very first approach, we consider that a message is sent properly is it has been
## received by any store node.
##

import results
import
../../../common/databases/db_sqlite,
../../../waku_core/message/message,
../../../node/delivery_monitor/not_delivered_storage/migrations

const NotDeliveredMessagesDbUrl = "not-delivered-messages.db"

type NotDeliveredStorage* = ref object
database: SqliteDatabase

type TrackedWakuMessage = object
msg: WakuMessage
numTrials: uint
## for statistics purposes. Counts the number of times the node has tried to publish it

proc new*(T: type NotDeliveredStorage): Result[T, string] =
let db = ?SqliteDatabase.new(NotDeliveredMessagesDbUrl)

?migrate(db)

return ok(NotDeliveredStorage(database: db))

proc archiveMessage*(
self: NotDeliveredStorage, msg: WakuMessage
): Result[void, string] =
## Archives a waku message so that we can keep track of it
## even when the app restarts
return ok()
9 changes: 9 additions & 0 deletions waku/node/delivery_monitor/publish_observer.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import chronicles
import ../../waku_core/message/message

type PublishObserver* = ref object of RootObj

method onMessagePublished*(
self: PublishObserver, pubsubTopic: string, message: WakuMessage
) {.base, gcsafe, raises: [].} =
error "onMessagePublished not implemented"
Loading

0 comments on commit 0f68274

Please sign in to comment.