Skip to content

Commit

Permalink
Merge branch 'master' into feat/peer-manager
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Jul 7, 2023
2 parents 7b75d04 + 97f0236 commit 8011ee4
Show file tree
Hide file tree
Showing 23 changed files with 407 additions and 403 deletions.
2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
];
doCheck = false;
# FIXME: This needs to be manually changed when updating modules.
vendorSha256 = "sha256-6VBZ0ilGcXhTXCoUmGdrRQqgnRwVJOtAe4JIMUCVw8Y=";
vendorSha256 = "sha256-TU/jog0MZNC4g13gaGm88gsKTRvmlcKkMeXZbaVf3fc=";
# Fix for 'nix run' trying to execute 'go-waku'.
meta = { mainProgram = "waku"; };
};
Expand Down
1 change: 0 additions & 1 deletion mobile/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func NewNode(configJSON string) string {
node.WithPrivateKey(prvKey),
node.WithHostAddress(hostAddr),
node.WithKeepAlive(time.Duration(*config.KeepAliveInterval) * time.Second),
node.NoDefaultWakuTopic(),
}

if *config.EnableRelay {
Expand Down
8 changes: 5 additions & 3 deletions mobile/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (

// SignalHandler defines a minimal interface
// a signal handler needs to implement.
//nolint
// nolint
type SignalHandler interface {
HandleSignal(string)
}

// SignalHandler is a simple callback function that gets called when any signal is received
// MobileSignalHandler is a simple callback function that gets called when any signal is received
type MobileSignalHandler func([]byte)

// storing the current mobile signal handler here
Expand Down Expand Up @@ -64,7 +64,7 @@ func send(signalType string, event interface{}) {

// SetMobileSignalHandler setup geth callback to notify about new signal
// used for gomobile builds
//nolint
// nolint
func SetMobileSignalHandler(handler SignalHandler) {
mobileSignalHandler = func(data []byte) {
if len(data) > 0 {
Expand All @@ -73,6 +73,8 @@ func SetMobileSignalHandler(handler SignalHandler) {
}
}

// SetEventCallback is to set a callback in order to receive application
// signals which are used to react to asynchronous events in waku.
func SetEventCallback(cb unsafe.Pointer) {
C.SetEventCallback(cb)
}
79 changes: 52 additions & 27 deletions waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/pbnjay/memory"

"github.com/waku-org/go-waku/waku/persistence/sqlite"
wmetrics "github.com/waku-org/go-waku/waku/v2/metrics"
peerstore1 "github.com/waku-org/go-waku/waku/v2/peerstore"
wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/rendezvous"

"github.com/ethereum/go-ethereum/accounts/keystore"
Expand All @@ -43,7 +44,6 @@ import (
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/metrics"
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/persistence/sqlite"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
Expand Down Expand Up @@ -227,7 +227,6 @@ func Execute(options Options) {
}

if options.Store.Enable {
nodeOpts = append(nodeOpts, node.WithWakuStore(options.Store.ResumeNodes...))
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore))
}

Expand Down Expand Up @@ -314,27 +313,6 @@ func Execute(options Options) {
addStaticPeers(wakuNode, options.Rendezvous.Nodes, rendezvous.RendezvousID)
addStaticPeers(wakuNode, options.Filter.Nodes, filter.FilterSubscribeID_v20beta1)

if options.DiscV5.Enable {
if err = wakuNode.DiscV5().Start(ctx); err != nil {
logger.Fatal("starting discovery v5", zap.Error(err))
}
}

// retrieve and connect to peer exchange peers
if options.PeerExchange.Enable && options.PeerExchange.Node != nil {
logger.Info("retrieving peer info via peer exchange protocol")

peerId, err := wakuNode.AddPeer(*options.PeerExchange.Node, peerstore1.Static, peer_exchange.PeerExchangeID_v20alpha1)
if err != nil {
logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err))
} else {
desiredOutDegree := wakuNode.Relay().Params().D
if err = wakuNode.PeerExchange().Request(ctx, desiredOutDegree, peer_exchange.WithPeer(peerId)); err != nil {
logger.Error("requesting peers via peer exchange", zap.Error(err))
}
}
}

if len(options.Relay.Topics.Value()) == 0 {
options.Relay.Topics = *cli.NewStringSlice(relay.DefaultWakuTopic)
}
Expand Down Expand Up @@ -398,6 +376,27 @@ func Execute(options Options) {
}(ctx, n)
}

if options.DiscV5.Enable {
if err = wakuNode.DiscV5().Start(ctx); err != nil {
logger.Fatal("starting discovery v5", zap.Error(err))
}
}

// retrieve and connect to peer exchange peers
if options.PeerExchange.Enable && options.PeerExchange.Node != nil {
logger.Info("retrieving peer info via peer exchange protocol")

peerId, err := wakuNode.AddPeer(*options.PeerExchange.Node, wakupeerstore.Static, peer_exchange.PeerExchangeID_v20alpha1)
if err != nil {
logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err))
} else {
desiredOutDegree := wakuNode.Relay().Params().D
if err = wakuNode.PeerExchange().Request(ctx, desiredOutDegree, peer_exchange.WithPeer(peerId)); err != nil {
logger.Error("requesting peers via peer exchange", zap.Error(err))
}
}
}

if len(discoveredNodes) != 0 {
for _, n := range discoveredNodes {
go func(ctx context.Context, info peer.AddrInfo) {
Expand All @@ -412,14 +411,40 @@ func Execute(options Options) {
}
}

var wg sync.WaitGroup

if options.Store.Enable && len(options.Store.ResumeNodes) != 0 {
// TODO: extract this to a function and run it when you go offline
// TODO: determine if a store is listening to a topic

var peerIDs []peer.ID
for _, n := range options.Store.ResumeNodes {
pID, err := wakuNode.AddPeer(n, wakupeerstore.Static, store.StoreID_v20beta4)
if err != nil {
logger.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err))
}
peerIDs = append(peerIDs, pID)
}

for _, t := range options.Relay.Topics.Value() {
wg.Add(1)
go func(topic string) {
defer wg.Done()
ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second)
defer ctxCancel()
if _, err := wakuNode.Store().Resume(ctxWithTimeout, topic, peerIDs); err != nil {
logger.Error("Could not resume history", zap.Error(err))
}
}(t)
}
}

var rpcServer *rpc.WakuRpc
if options.RPCServer.Enable {
rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.RPCServer.Private, options.PProf, options.RPCServer.RelayCacheCapacity, logger)
rpcServer.Start()
}

var wg sync.WaitGroup

var restServer *rest.WakuRest
if options.RESTServer.Enable {
wg.Add(1)
Expand Down Expand Up @@ -462,7 +487,7 @@ func Execute(options Options) {

func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, protocols ...protocol.ID) {
for _, addr := range addresses {
_, err := wakuNode.AddPeer(addr, peerstore1.Static, protocols...)
_, err := wakuNode.AddPeer(addr, wakupeerstore.Static, protocols...)
failOnErr(err, "error adding peer")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package migrations
package migrate

import (
"database/sql"
Expand All @@ -10,10 +10,10 @@ import (
)

// Migrate applies migrations.
func Migrate(db *sql.DB, driver database.Driver) error {
func Migrate(db *sql.DB, driver database.Driver, assetNames []string, assetFunc bindata.AssetFunc) error {
return migrateDB(db, bindata.Resource(
AssetNames(),
Asset,
assetNames,
assetFunc,
), driver)
}

Expand Down
98 changes: 16 additions & 82 deletions waku/persistence/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,88 +6,12 @@ import (

"github.com/golang-migrate/migrate/v4/database"
"github.com/golang-migrate/migrate/v4/database/pgx"
_ "github.com/jackc/pgx/v5/stdlib"
_ "github.com/jackc/pgx/v5/stdlib" // Blank import to register the postgres driver
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/persistence/migrate"
"github.com/waku-org/go-waku/waku/persistence/postgres/migrations"
)

// Queries are the postgresql queries for a given table.
type Queries struct {
deleteQuery string
existsQuery string
getQuery string
putQuery string
queryQuery string
prefixQuery string
limitQuery string
offsetQuery string
getSizeQuery string
}

// NewQueries creates a new Postgresql set of queries for the passed table
func NewQueries(tbl string, db *sql.DB) (*Queries, error) {
err := CreateTable(db, tbl)
if err != nil {
return nil, err
}
return &Queries{
deleteQuery: fmt.Sprintf("DELETE FROM %s WHERE key = $1", tbl),
existsQuery: fmt.Sprintf("SELECT exists(SELECT 1 FROM %s WHERE key=$1)", tbl),
getQuery: fmt.Sprintf("SELECT data FROM %s WHERE key = $1", tbl),
putQuery: fmt.Sprintf("INSERT INTO %s (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2", tbl),
queryQuery: fmt.Sprintf("SELECT key, data FROM %s", tbl),
prefixQuery: ` WHERE key LIKE '%s%%' ORDER BY key`,
limitQuery: ` LIMIT %d`,
offsetQuery: ` OFFSET %d`,
getSizeQuery: fmt.Sprintf("SELECT length(data) FROM %s WHERE key = $1", tbl),
}, nil
}

// Delete returns the query for deleting a row.
func (q Queries) Delete() string {
return q.deleteQuery
}

// Exists returns the query for determining if a row exists.
func (q Queries) Exists() string {
return q.existsQuery
}

// Get returns the query for getting a row.
func (q Queries) Get() string {
return q.getQuery
}

// Put returns the query for putting a row.
func (q Queries) Put() string {
return q.putQuery
}

// Query returns the query for getting multiple rows.
func (q Queries) Query() string {
return q.queryQuery
}

// Prefix returns the query fragment for getting a rows with a key prefix.
func (q Queries) Prefix() string {
return q.prefixQuery
}

// Limit returns the query fragment for limiting results.
func (q Queries) Limit() string {
return q.limitQuery
}

// Offset returns the query fragment for returning rows from a given offset.
func (q Queries) Offset() string {
return q.offsetQuery
}

// GetSize returns the query for determining the size of a value.
func (q Queries) GetSize() string {
return q.getSizeQuery
}

// WithDB is a DBOption that lets you use a postgresql DBStore and run migrations
func WithDB(dburl string, migrate bool) persistence.DBOption {
return func(d *persistence.DBStore) error {
Expand Down Expand Up @@ -127,6 +51,15 @@ func migrationDriver(db *sql.DB) (database.Driver, error) {
})
}

// Migrate is the function used for DB migration with postgres driver
func Migrate(db *sql.DB) error {
migrationDriver, err := migrationDriver(db)
if err != nil {
return err
}
return migrate.Migrate(db, migrationDriver, migrations.AssetNames(), migrations.Asset)
}

// CreateTable creates the table that will persist the peers
func CreateTable(db *sql.DB, tableName string) error {
sqlStmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL UNIQUE, data BYTEA);", tableName)
Expand All @@ -137,10 +70,11 @@ func CreateTable(db *sql.DB, tableName string) error {
return nil
}

func Migrate(db *sql.DB) error {
migrationDriver, err := migrationDriver(db)
// NewQueries creates a new SQL set of queries for the passed table
func NewQueries(tbl string, db *sql.DB) (*persistence.Queries, error) {
err := CreateTable(db, tbl)
if err != nil {
return err
return nil, err
}
return migrations.Migrate(db, migrationDriver)
return persistence.CreateQueries(tbl, db), nil
}
Loading

0 comments on commit 8011ee4

Please sign in to comment.