From 279752344fb86aec420b3510ffbf29b98fae179b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Mon, 30 Oct 2023 09:22:50 -0400 Subject: [PATCH 1/7] chore: print a message periodically indicating that VACUUM is still being executed (#838) --- waku/persistence/postgres/postgres.go | 33 ++++++++++++++++++++++++--- waku/persistence/sqlite/sqlite.go | 33 ++++++++++++++++++++++++--- 2 files changed, 60 insertions(+), 6 deletions(-) diff --git a/waku/persistence/postgres/postgres.go b/waku/persistence/postgres/postgres.go index cfc372fe3..565ee8f4c 100644 --- a/waku/persistence/postgres/postgres.go +++ b/waku/persistence/postgres/postgres.go @@ -1,8 +1,10 @@ package postgres import ( + "context" "database/sql" "fmt" + "time" "github.com/golang-migrate/migrate/v4/database" "github.com/golang-migrate/migrate/v4/database/pgx" @@ -15,10 +17,35 @@ import ( func executeVacuum(db *sql.DB, logger *zap.Logger) error { logger.Info("starting PostgreSQL database vacuuming") - _, err := db.Exec("VACUUM FULL") - if err != nil { - return err + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errCh := make(chan error) + + go func() { + defer cancel() + _, err := db.Exec("VACUUM FULL") + if err != nil { + errCh <- err + } + }() + + t := time.NewTicker(2 * time.Minute) + defer t.Stop() + +loop: + for { + select { + case <-ctx.Done(): + break loop + case err := <-errCh: + return err + case <-t.C: + logger.Info("still vacuuming...") + } } + logger.Info("finished PostgreSQL database vacuuming") return nil } diff --git a/waku/persistence/sqlite/sqlite.go b/waku/persistence/sqlite/sqlite.go index 75181608c..9a7181996 100644 --- a/waku/persistence/sqlite/sqlite.go +++ b/waku/persistence/sqlite/sqlite.go @@ -1,9 +1,11 @@ package sqlite import ( + "context" "database/sql" "fmt" "strings" + "time" "github.com/golang-migrate/migrate/v4/database" "github.com/golang-migrate/migrate/v4/database/sqlite3" @@ -32,10 +34,35 @@ func addSqliteURLDefaults(dburl string) string { func executeVacuum(db *sql.DB, logger *zap.Logger) error { logger.Info("starting sqlite database vacuuming") - _, err := db.Exec("VACUUM") - if err != nil { - return err + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errCh := make(chan error) + + go func() { + defer cancel() + _, err := db.Exec("VACUUM") + if err != nil { + errCh <- err + } + }() + + t := time.NewTicker(2 * time.Minute) + defer t.Stop() + +loop: + for { + select { + case <-ctx.Done(): + break loop + case err := <-errCh: + return err + case <-t.C: + logger.Info("still vacuuming...") + } } + logger.Info("finished sqlite database vacuuming") return nil } From ddf188bbf872b66124e8cba9701ffd5a128391d4 Mon Sep 17 00:00:00 2001 From: harsh jain Date: Mon, 30 Oct 2023 21:56:26 +0700 Subject: [PATCH 2/7] feat: remove named topic (#844) * feat: remove named topic * fix: update examples * Update library/mobile/api.go --- .../src/main/java/com/example/waku/Utils.kt | 9 -- examples/filter2/main.go | 2 +- examples/rln/README.md | 2 +- examples/rln/main.go | 2 +- .../waku-csharp/waku-csharp/Waku.Utils.cs | 14 -- library/c/api.go | 7 - library/mobile/api.go | 7 +- library/node.go | 7 +- library/relay.go | 2 +- waku/v2/protocol/content_topic.go | 3 - waku/v2/protocol/enr/shards.go | 14 +- waku/v2/protocol/pubsub_topic.go | 133 +++++------------- waku/v2/protocol/relay/waku_relay.go | 2 +- waku/v2/protocol/shard.go | 18 ++- .../store/waku_store_pagination_test.go | 5 +- waku/v2/protocol/topic_test.go | 78 +++++----- waku/v2/rendezvous/rendezvous.go | 4 +- 17 files changed, 105 insertions(+), 204 deletions(-) diff --git a/examples/android-kotlin/app/src/main/java/com/example/waku/Utils.kt b/examples/android-kotlin/app/src/main/java/com/example/waku/Utils.kt index 574225968..acb5596ce 100644 --- a/examples/android-kotlin/app/src/main/java/com/example/waku/Utils.kt +++ b/examples/android-kotlin/app/src/main/java/com/example/waku/Utils.kt @@ -22,12 +22,3 @@ fun ContentTopic(applicationName: String, applicationVersion: Long, contentTopic return Gowaku.contentTopic(applicationName, applicationVersion, contentTopicName, encoding) } -/** - * Create a pubsub topic string - * @param name - * @param encoding - * @return Pubsub topic string according to RFC 23 - */ -fun PubsubTopic(name: String, encoding: String): String { - return Gowaku.pubsubTopic(name, encoding) -} diff --git a/examples/filter2/main.go b/examples/filter2/main.go index b2b985e90..a4f24c1a5 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -24,7 +24,7 @@ import ( var log = logging.Logger("filter2") -var pubSubTopic = protocol.DefaultPubsubTopic() +var pubSubTopic = protocol.DefaultPubsubTopic{} const contentTopic = "/filter2test/1/testTopic/proto" diff --git a/examples/rln/README.md b/examples/rln/README.md index da81f7c12..f70d3c201 100644 --- a/examples/rln/README.md +++ b/examples/rln/README.md @@ -15,7 +15,7 @@ const credentialsPath = "" const credentialsPassword = "" var contentTopic = protocol.NewContentTopic("rln", 1, "test", "proto").String() -var pubsubTopic = protocol.DefaultPubsubTopic() +var pubsubTopic = protocol.DefaultPubsubTopic{} ``` The private key used here should contain enough Sepolia ETH to register on the contract (0.001 ETH). An ethereum client address is required as well. After updating these values, execute `make` diff --git a/examples/rln/main.go b/examples/rln/main.go index 6b0c9907f..6a2aec2a7 100644 --- a/examples/rln/main.go +++ b/examples/rln/main.go @@ -31,7 +31,7 @@ var keystorePath = "./rlnKeystore.json" var keystorePassword = "password" var membershipIndex = uint(0) var contentTopic, _ = protocol.NewContentTopic("rln", 1, "test", "proto") -var pubsubTopic = protocol.DefaultPubsubTopic() +var pubsubTopic = protocol.DefaultPubsubTopic{} // ============================================================================ diff --git a/examples/waku-csharp/waku-csharp/Waku.Utils.cs b/examples/waku-csharp/waku-csharp/Waku.Utils.cs index 80424e1fa..ea2d9e4ee 100644 --- a/examples/waku-csharp/waku-csharp/Waku.Utils.cs +++ b/examples/waku-csharp/waku-csharp/Waku.Utils.cs @@ -34,19 +34,5 @@ public static string ContentTopic(string applicationName, uint applicationVersio return Response.PtrToStringUtf8(ptr); } - [DllImport(Constants.dllName)] - internal static extern IntPtr waku_pubsub_topic(string name, string encoding); - - /// - /// Create a pubsub topic string - /// - /// - /// - /// Pubsub topic string according to RFC 23 - public static string PubsubTopic(string name, string encoding) - { - IntPtr ptr = waku_pubsub_topic(name, encoding); - return Response.PtrToStringUtf8(ptr); - } } } diff --git a/library/c/api.go b/library/c/api.go index e932dc043..0ae2c0941 100644 --- a/library/c/api.go +++ b/library/c/api.go @@ -199,13 +199,6 @@ func waku_content_topic(applicationName *C.char, applicationVersion C.uint, cont return onSuccesfulResponse(contentTopic.String(), cb, userData) } -// Create a pubsub topic string according to RFC 23 -// -//export waku_pubsub_topic -func waku_pubsub_topic(name *C.char, encoding *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - topic := library.PubsubTopic(C.GoString(name), C.GoString(encoding)) - return onSuccesfulResponse(topic, cb, userData) -} // Get the default pubsub topic used in waku2: /waku/2/default-waku/proto // diff --git a/library/mobile/api.go b/library/mobile/api.go index 5a43b4366..d7489e7ff 100644 --- a/library/mobile/api.go +++ b/library/mobile/api.go @@ -78,14 +78,9 @@ func ContentTopic(applicationName string, applicationVersion int, contentTopicNa return contentTopic.String() } -// PubsubTopic creates a pubsub topic string according to RFC 23 -func PubsubTopic(name string, encoding string) string { - return protocol.NewNamedShardingPubsubTopic(name + "/" + encoding).String() -} - // DefaultPubsubTopic returns the default pubsub topic used in waku2: /waku/2/default-waku/proto func DefaultPubsubTopic() string { - return protocol.DefaultPubsubTopic().String() + return library.DefaultPubsubTopic() } // Peers retrieves the list of peers known by the waku node diff --git a/library/node.go b/library/node.go index 2b3b93935..cebc37975 100644 --- a/library/node.go +++ b/library/node.go @@ -340,14 +340,9 @@ func ContentTopic(applicationName string, applicationVersion int, contentTopicNa return contentTopic.String() } -// PubsubTopic creates a pubsub topic string according to RFC 23 -func PubsubTopic(name string, encoding string) string { - return protocol.NewNamedShardingPubsubTopic(name + "/" + encoding).String() -} - // DefaultPubsubTopic returns the default pubsub topic used in waku2: /waku/2/default-waku/proto func DefaultPubsubTopic() string { - return protocol.DefaultPubsubTopic().String() + return protocol.DefaultPubsubTopic{}.String() } type subscriptionMsg struct { diff --git a/library/relay.go b/library/relay.go index fa2d4c1e3..63a5d2327 100644 --- a/library/relay.go +++ b/library/relay.go @@ -16,7 +16,7 @@ func RelayEnoughPeers(topic string) (bool, error) { return false, errWakuNodeNotReady } - topicToCheck := protocol.DefaultPubsubTopic().String() + topicToCheck := protocol.DefaultPubsubTopic{}.String() if topic != "" { topicToCheck = topic } diff --git a/waku/v2/protocol/content_topic.go b/waku/v2/protocol/content_topic.go index 9c737070a..1f931347e 100644 --- a/waku/v2/protocol/content_topic.go +++ b/waku/v2/protocol/content_topic.go @@ -7,9 +7,6 @@ import ( "strings" ) -// DefaultContentTopic is the default content topic used in Waku network if no content topic is specified. -const DefaultContentTopic = "/waku/2/default-content/proto" - var ErrInvalidFormat = errors.New("invalid content topic format") var ErrMissingGeneration = errors.New("missing part: generation") var ErrInvalidGeneration = errors.New("generation should be a number") diff --git a/waku/v2/protocol/enr/shards.go b/waku/v2/protocol/enr/shards.go index 3883ac90e..0e57eda25 100644 --- a/waku/v2/protocol/enr/shards.go +++ b/waku/v2/protocol/enr/shards.go @@ -122,22 +122,22 @@ func ContainsShard(record *enr.Record, cluster uint16, index uint16) bool { return rs.Contains(cluster, index) } -func ContainsShardWithNsTopic(record *enr.Record, topic protocol.NamespacedPubsubTopic) bool { - if topic.Kind() != protocol.StaticSharding { +func ContainsShardWithWakuTopic(record *enr.Record, topic protocol.WakuPubSubTopic) bool { + if shardTopic, err := protocol.ToShardPubsubTopic(topic); err != nil { return false + } else { + return ContainsShard(record, shardTopic.Cluster(), shardTopic.Shard()) } - shardTopic := topic.(protocol.StaticShardingPubsubTopic) - return ContainsShard(record, shardTopic.Cluster(), shardTopic.Shard()) } func ContainsRelayShard(record *enr.Record, topic protocol.StaticShardingPubsubTopic) bool { - return ContainsShardWithNsTopic(record, topic) + return ContainsShardWithWakuTopic(record, topic) } func ContainsShardTopic(record *enr.Record, topic string) bool { - shardTopic, err := protocol.ToShardedPubsubTopic(topic) + shardTopic, err := protocol.ToWakuPubsubTopic(topic) if err != nil { return false } - return ContainsShardWithNsTopic(record, shardTopic) + return ContainsShardWithWakuTopic(record, shardTopic) } diff --git a/waku/v2/protocol/pubsub_topic.go b/waku/v2/protocol/pubsub_topic.go index d38a713d0..b3fcd395f 100644 --- a/waku/v2/protocol/pubsub_topic.go +++ b/waku/v2/protocol/pubsub_topic.go @@ -7,96 +7,36 @@ import ( "strings" ) -// Waku2PubsubTopicPrefix is the expected prefix to be used for pubsub topics -const Waku2PubsubTopicPrefix = "/waku/2" - -// StaticShardingPubsubTopicPrefix is the expected prefix to be used for static sharding pubsub topics -const StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix + "/rs" - -// ErrInvalidStructure indicates that the pubsub topic is malformed -var ErrInvalidStructure = errors.New("invalid topic structure") - -// ErrInvalidTopicPrefix indicates that the pubsub topic is missing the prefix /waku/2 -var ErrInvalidTopicPrefix = errors.New("must start with " + Waku2PubsubTopicPrefix) -var ErrMissingTopicName = errors.New("missing topic-name") -var ErrInvalidShardedTopicPrefix = errors.New("must start with " + StaticShardingPubsubTopicPrefix) -var ErrMissingClusterIndex = errors.New("missing shard_cluster_index") -var ErrMissingShardNumber = errors.New("missing shard_number") - -// ErrInvalidNumberFormat indicates that a number exceeds the allowed range -var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed") - -// NamespacedPubsubTopicKind used to represent kind of NamespacedPubsubTopicKind -type NamespacedPubsubTopicKind int - -const ( - StaticSharding NamespacedPubsubTopicKind = iota - NamedSharding -) - -// NamespacedPubsubTopic is an interface for namespace based pubSub topic -type NamespacedPubsubTopic interface { +type WakuPubSubTopic interface { String() string - Kind() NamespacedPubsubTopicKind - Equal(NamespacedPubsubTopic) bool } -// NamedShardingPubsubTopic is object for a NamedSharding type pubSub topic -type NamedShardingPubsubTopic struct { - NamespacedPubsubTopic - kind NamespacedPubsubTopicKind - name string -} +const defaultPubsubTopic = "/waku/2/default-waku/proto" -// NewNamedShardingPubsubTopic creates a new NamedShardingPubSubTopic -func NewNamedShardingPubsubTopic(name string) NamespacedPubsubTopic { - return NamedShardingPubsubTopic{ - kind: NamedSharding, - name: name, - } -} +type DefaultPubsubTopic struct{} -// Kind returns the type of PubsubTopic whether it is StaticShared or NamedSharded -func (n NamedShardingPubsubTopic) Kind() NamespacedPubsubTopicKind { - return n.kind +func (DefaultPubsubTopic) String() string { + return defaultPubsubTopic } -// Name is the name of the NamedSharded pubsub topic. -func (n NamedShardingPubsubTopic) Name() string { - return n.name -} - -// Equal compares NamedShardingPubsubTopic -func (n NamedShardingPubsubTopic) Equal(t2 NamespacedPubsubTopic) bool { - return n.String() == t2.String() -} - -// String formats NamedShardingPubsubTopic to RFC 23 specific string format for pubsub topic. -func (n NamedShardingPubsubTopic) String() string { - return fmt.Sprintf("%s/%s", Waku2PubsubTopicPrefix, n.name) -} - -// Parse parses a topic string into a NamedShardingPubsubTopic -func (n *NamedShardingPubsubTopic) Parse(topic string) error { - if !strings.HasPrefix(topic, Waku2PubsubTopicPrefix) { - return ErrInvalidTopicPrefix - } +// StaticShardingPubsubTopicPrefix is the expected prefix to be used for static sharding pubsub topics +const StaticShardingPubsubTopicPrefix = "/waku/2/rs" - topicName := topic[8:] - if len(topicName) == 0 { - return ErrMissingTopicName - } +// waku pubsub topic errors +var ErrNotWakuPubsubTopic = errors.New("not a waku pubsub topic") - n.kind = NamedSharding - n.name = topicName +// shard pubsub topic errors +var ErrNotShardPubsubTopic = errors.New("not a shard pubsub topic") +var ErrInvalidStructure = errors.New("invalid topic structure") +var ErrInvalidShardedTopicPrefix = errors.New("must start with " + StaticShardingPubsubTopicPrefix) +var ErrMissingClusterIndex = errors.New("missing shard_cluster_index") +var ErrMissingShardNumber = errors.New("missing shard_number") - return nil -} +// ErrInvalidNumberFormat indicates that a number exceeds the allowed range +var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed") // StaticShardingPubsubTopic describes a pubSub topic as per StaticSharding type StaticShardingPubsubTopic struct { - NamespacedPubsubTopic - kind NamespacedPubsubTopicKind cluster uint16 shard uint16 } @@ -104,7 +44,6 @@ type StaticShardingPubsubTopic struct { // NewStaticShardingPubsubTopic creates a new pubSub topic func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) StaticShardingPubsubTopic { return StaticShardingPubsubTopic{ - kind: StaticSharding, cluster: cluster, shard: shard, } @@ -120,13 +59,8 @@ func (s StaticShardingPubsubTopic) Shard() uint16 { return s.shard } -// Kind returns the type of PubsubTopic whether it is StaticShared or NamedSharded -func (s StaticShardingPubsubTopic) Kind() NamespacedPubsubTopicKind { - return s.kind -} - // Equal compares StaticShardingPubsubTopic -func (s StaticShardingPubsubTopic) Equal(t2 NamespacedPubsubTopic) bool { +func (s StaticShardingPubsubTopic) Equal(t2 StaticShardingPubsubTopic) bool { return s.String() == t2.String() } @@ -168,31 +102,30 @@ func (s *StaticShardingPubsubTopic) Parse(topic string) error { s.shard = uint16(shardInt) s.cluster = uint16(clusterInt) - s.kind = StaticSharding return nil } -// ToShardedPubsubTopic takes a pubSub topic string and creates a NamespacedPubsubTopic object. -func ToShardedPubsubTopic(topic string) (NamespacedPubsubTopic, error) { +func ToShardPubsubTopic(topic WakuPubSubTopic) (StaticShardingPubsubTopic, error) { + result, ok := topic.(StaticShardingPubsubTopic) + if !ok { + return StaticShardingPubsubTopic{}, ErrNotShardPubsubTopic + } + return result, nil +} + +// ToWakuPubsubTopic takes a pubSub topic string and creates a WakuPubsubTopic object. +func ToWakuPubsubTopic(topic string) (WakuPubSubTopic, error) { + if topic == defaultPubsubTopic { + return DefaultPubsubTopic{}, nil + } if strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) { s := StaticShardingPubsubTopic{} err := s.Parse(topic) if err != nil { - return nil, err + return s, err } return s, nil } - - s := NamedShardingPubsubTopic{} - err := s.Parse(topic) - if err != nil { - return nil, err - } - return s, nil -} - -// DefaultPubsubTopic is the default pubSub topic used in waku -func DefaultPubsubTopic() NamespacedPubsubTopic { - return NewNamedShardingPubsubTopic("default-waku/proto") + return nil, ErrNotWakuPubsubTopic } diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 6f6ea12ef..d051e3087 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -25,7 +25,7 @@ import ( const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") // DefaultWakuTopic is the default pubsub topic used across all Waku protocols -var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String() +var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic{}.String() // WakuRelay is the implementation of the Waku Relay protocol type WakuRelay struct { diff --git a/waku/v2/protocol/shard.go b/waku/v2/protocol/shard.go index aeeb32cb2..a49c6e302 100644 --- a/waku/v2/protocol/shard.go +++ b/waku/v2/protocol/shard.go @@ -49,8 +49,8 @@ func NewRelayShards(cluster uint16, indices ...uint16) (RelayShards, error) { return RelayShards{Cluster: cluster, Indices: indices}, nil } -func (rs RelayShards) Topics() []NamespacedPubsubTopic { - var result []NamespacedPubsubTopic +func (rs RelayShards) Topics() []WakuPubSubTopic { + var result []WakuPubSubTopic for _, i := range rs.Indices { result = append(result, NewStaticShardingPubsubTopic(rs.Cluster, i)) } @@ -72,14 +72,12 @@ func (rs RelayShards) Contains(cluster uint16, index uint16) bool { return found } -func (rs RelayShards) ContainsNamespacedTopic(topic NamespacedPubsubTopic) bool { - if topic.Kind() != StaticSharding { +func (rs RelayShards) ContainsShardPubsubTopic(topic WakuPubSubTopic) bool { + if shardedTopic, err := ToShardPubsubTopic(topic); err != nil { return false + } else { + return rs.Contains(shardedTopic.Cluster(), shardedTopic.Shard()) } - - shardedTopic := topic.(StaticShardingPubsubTopic) - - return rs.Contains(shardedTopic.Cluster(), shardedTopic.Shard()) } func TopicsToRelayShards(topic ...string) ([]RelayShards, error) { @@ -123,11 +121,11 @@ func TopicsToRelayShards(topic ...string) ([]RelayShards, error) { } func (rs RelayShards) ContainsTopic(topic string) bool { - nsTopic, err := ToShardedPubsubTopic(topic) + wTopic, err := ToWakuPubsubTopic(topic) if err != nil { return false } - return rs.ContainsNamespacedTopic(nsTopic) + return rs.ContainsShardPubsubTopic(wTopic) } func (rs RelayShards) IndicesList() ([]byte, error) { diff --git a/waku/v2/protocol/store/waku_store_pagination_test.go b/waku/v2/protocol/store/waku_store_pagination_test.go index 673b75d3e..c76f3d0bc 100644 --- a/waku/v2/protocol/store/waku_store_pagination_test.go +++ b/waku/v2/protocol/store/waku_store_pagination_test.go @@ -13,6 +13,7 @@ import ( ) func TestIndexComputation(t *testing.T) { + testContentTopic := "/waku/2/default-content/proto" msg := &wpb.WakuMessage{ Payload: []byte{1, 2, 3}, Timestamp: utils.GetUnixEpoch(), @@ -27,14 +28,14 @@ func TestIndexComputation(t *testing.T) { msg1 := &wpb.WakuMessage{ Payload: []byte{1, 2, 3}, Timestamp: 123, - ContentTopic: protocol.DefaultContentTopic, + ContentTopic: testContentTopic, } idx1 := protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), "test").Index() msg2 := &wpb.WakuMessage{ Payload: []byte{1, 2, 3}, Timestamp: 123, - ContentTopic: protocol.DefaultContentTopic, + ContentTopic: testContentTopic, } idx2 := protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), "test").Index() diff --git a/waku/v2/protocol/topic_test.go b/waku/v2/protocol/topic_test.go index e96c646b0..e2b02db68 100644 --- a/waku/v2/protocol/topic_test.go +++ b/waku/v2/protocol/topic_test.go @@ -114,41 +114,53 @@ func TestShardChoiceSimulation(t *testing.T) { } } -func TestNsPubsubTopic(t *testing.T) { - ns1 := NewNamedShardingPubsubTopic("waku-dev") - require.Equal(t, "/waku/2/waku-dev", ns1.String()) +func TestShardPubsubTopic(t *testing.T) { + { // not waku topci + topic := "/waku/1/2/3" + _, err := ToWakuPubsubTopic(topic) + require.Error(t, ErrNotWakuPubsubTopic, err) + } - ns2 := NewStaticShardingPubsubTopic(0, 2) - require.Equal(t, "/waku/2/rs/0/2", ns2.String()) + { // check default pubsub topic + topic := defaultPubsubTopic + wakuTopic, err := ToWakuPubsubTopic(topic) + require.NoError(t, err) + require.Equal(t, defaultPubsubTopic, wakuTopic.String()) + } - require.True(t, ns1.Equal(ns1)) - require.False(t, ns1.Equal(ns2)) + { // check behavior of waku topic + topic := "/waku/2/rs/16/42" + wakuTopic, err := ToWakuPubsubTopic(topic) + require.NoError(t, err) + require.Equal(t, topic, wakuTopic.String()) + require.Equal(t, uint16(16), wakuTopic.(StaticShardingPubsubTopic).Cluster()) + require.Equal(t, uint16(42), wakuTopic.(StaticShardingPubsubTopic).Shard()) + } - topic := "/waku/2/waku-dev" - ns, err := ToShardedPubsubTopic(topic) - require.NoError(t, err) - require.Equal(t, NamedSharding, ns.Kind()) - require.Equal(t, "waku-dev", ns.(NamedShardingPubsubTopic).Name()) + { // check if shard pubtopic checks for prefix + topic := "/waku/1/rs/16/42" + err := (&StaticShardingPubsubTopic{}).Parse(topic) + require.Error(t, err) + require.ErrorIs(t, err, ErrInvalidShardedTopicPrefix) + } + + { // check if cluster/index is missing + topic := "/waku/2/rs//02" + _, err := ToWakuPubsubTopic(topic) + require.Error(t, err) + require.ErrorIs(t, err, ErrMissingClusterIndex) + + topic = "/waku/2/rs/1/" + _, err = ToWakuPubsubTopic(topic) + require.Error(t, err) + require.ErrorIs(t, err, ErrMissingShardNumber) + } + + { // check if the cluster/index are number + topic := "/waku/2/rs/xx/77" + _, err := ToWakuPubsubTopic(topic) + require.Error(t, err) + require.ErrorIs(t, err, ErrInvalidNumberFormat) + } - topic = "/waku/2/rs/16/42" - ns, err = ToShardedPubsubTopic(topic) - require.NoError(t, err) - require.Equal(t, StaticSharding, ns.Kind()) - require.Equal(t, uint16(16), ns.(StaticShardingPubsubTopic).Cluster()) - require.Equal(t, uint16(42), ns.(StaticShardingPubsubTopic).Shard()) - - topic = "/waku/1/rs/16/42" - _, err = ToShardedPubsubTopic(topic) - require.Error(t, err) - require.ErrorIs(t, err, ErrInvalidTopicPrefix) - - topic = "/waku/2/rs//02" - _, err = ToShardedPubsubTopic(topic) - require.Error(t, err) - require.ErrorIs(t, err, ErrMissingClusterIndex) - - topic = "/waku/2/rs/xx/77" - _, err = ToShardedPubsubTopic(topic) - require.Error(t, err) - require.ErrorIs(t, err, ErrInvalidNumberFormat) } diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index a5ea444ca..28efa8994 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -80,7 +80,7 @@ const registerMaxRetries = 7 // Discover is used to find a number of peers that use the default pubsub topic func (r *Rendezvous) Discover(ctx context.Context, rp *RendezvousPoint, numPeers int) { - r.DiscoverWithNamespace(ctx, protocol.DefaultPubsubTopic().String(), rp, numPeers) + r.DiscoverWithNamespace(ctx, protocol.DefaultPubsubTopic{}.String(), rp, numPeers) } // DiscoverShard is used to find a number of peers that support an specific cluster and shard index @@ -137,7 +137,7 @@ func (r *Rendezvous) callRegister(ctx context.Context, namespace string, rendezv // Register registers the node in the rendezvous points using the default pubsub topic as namespace func (r *Rendezvous) Register(ctx context.Context, rendezvousPoints []*RendezvousPoint) { - r.RegisterWithNamespace(ctx, protocol.DefaultPubsubTopic().String(), rendezvousPoints) + r.RegisterWithNamespace(ctx, protocol.DefaultPubsubTopic{}.String(), rendezvousPoints) } // RegisterShard registers the node in the rendezvous points using a shard as namespace From 38202e7a2e8db6982f53aaad3805f92d4498e634 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Mon, 30 Oct 2023 12:30:25 -0400 Subject: [PATCH 3/7] refactor: publish API for relay and lightpush (#845) --- Makefile | 5 ++- cmd/waku/server/rest/relay.go | 2 +- cmd/waku/server/rpc/filter_test.go | 4 +- cmd/waku/server/rpc/relay.go | 2 +- docs/api/lightpush.md | 17 +++----- docs/api/relay.md | 33 +++++----------- examples/basic2/main.go | 2 +- examples/chat2/chat.go | 10 ++--- examples/filter2/main.go | 4 +- examples/noise/main.go | 8 ++-- examples/rln/main.go | 3 +- library/lightpush.go | 2 +- library/relay.go | 2 +- tests/connection_test.go | 2 +- waku/v2/node/wakunode2_test.go | 4 +- .../filter/filter_proto_ident_test.go | 22 ++++++----- waku/v2/protocol/filter/filter_test.go | 8 ++-- .../legacy_filter/waku_filter_test.go | 12 +++--- waku/v2/protocol/lightpush/waku_lightpush.go | 15 +++---- .../lightpush/waku_lightpush_option.go | 21 +++++++--- .../protocol/lightpush/waku_lightpush_test.go | 4 +- .../protocol/noise/pairing_relay_messenger.go | 2 +- waku/v2/protocol/relay/options.go | 22 +++++++++++ waku/v2/protocol/relay/waku_relay.go | 39 +++++++++++-------- waku/v2/protocol/relay/waku_relay_test.go | 6 +-- 25 files changed, 137 insertions(+), 114 deletions(-) create mode 100644 waku/v2/protocol/relay/options.go diff --git a/Makefile b/Makefile index 211b4c726..e68348b17 100644 --- a/Makefile +++ b/Makefile @@ -124,10 +124,13 @@ build-example-filter2: build-example-c-bindings: cd examples/c-bindings && $(MAKE) +build-example-noise: + cd examples/noise && $(MAKE) + build-example-rln: cd examples/rln && $(MAKE) -build-example: build-example-basic2 build-example-chat-2 build-example-filter2 build-example-c-bindings build-example-rln +build-example: build-example-basic2 build-example-chat-2 build-example-filter2 build-example-c-bindings build-example-noise build-example-rln static-library: @echo "Building static library..." diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index b788ed99f..1261f6ed1 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -220,7 +220,7 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) { return } - _, err = r.node.Relay().PublishToTopic(req.Context(), message, strings.Replace(topic, "\n", "", -1)) + _, err = r.node.Relay().Publish(req.Context(), message, relay.WithPubSubTopic(strings.Replace(topic, "\n", "", -1))) if err != nil { r.log.Error("publishing message", zap.Error(err)) } diff --git a/cmd/waku/server/rpc/filter_test.go b/cmd/waku/server/rpc/filter_test.go index 8986de538..e6c7d698c 100644 --- a/cmd/waku/server/rpc/filter_test.go +++ b/cmd/waku/server/rpc/filter_test.go @@ -143,10 +143,10 @@ func TestFilterGetV1Messages(t *testing.T) { // Wait for the subscription to be started time.Sleep(1 * time.Second) - _, err = serviceA.node.Relay().PublishToTopic( + _, err = serviceA.node.Relay().Publish( context.Background(), &wpb.WakuMessage{ContentTopic: "ct"}, - testTopic, + relay.WithPubSubTopic(testTopic), ) require.NoError(t, err) require.True(t, reply) diff --git a/cmd/waku/server/rpc/relay.go b/cmd/waku/server/rpc/relay.go index 8c7d10a4f..5f4310468 100644 --- a/cmd/waku/server/rpc/relay.go +++ b/cmd/waku/server/rpc/relay.go @@ -115,7 +115,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, return err } - _, err = r.node.Relay().PublishToTopic(req.Context(), msg, topic) + _, err = r.node.Relay().Publish(req.Context(), msg, relay.WithPubSubTopic(topic)) if err != nil { r.log.Error("publishing message", zap.Error(err)) return err diff --git a/docs/api/lightpush.md b/docs/api/lightpush.md index 354546380..b22acabd3 100644 --- a/docs/api/lightpush.md +++ b/docs/api/lightpush.md @@ -65,22 +65,17 @@ if err != nil { ``` -To send a message, it needs to be wrapped into a [`WakuMessage`](https://rfc.vac.dev/spec/14/) protobuffer. The payload of the message is not limited to strings. Any kind of data that can be serialized +To send a message, it needs to be wrapped into a [`WakuMessage`](https://rfc.vac.dev/spec/14/) protobuffer. +The payload of the message is not limited to strings. Any kind of data that can be serialized into a `[]byte` can be sent as long as it does not exceed the maximum length a message can have (~1MB) -The following functions can be used to publish a message: -- `wakuNode.Lightpush().Publish(ctx, msg, opts...)` - to send a message to the default waku pubsub topic -- `wakuNode.Lightpush().PublishToTopic(ctx, msg, topic, opts...)` - to send a message to a custom pubsub topic +`wakuNode.Lightpush().Publish(ctx, msg, opts...)` is used to publish a message. This function will return a message id on success, or an error if the message could not be published. -Both of these functions will return a message id on success, or an error if the message could not be published. - -If no options are specified, go-waku will automatically choose the peer used to broadcast the message via Lightpush. This behaviour can be controlled via options: +If no options are specified, go-waku will automatically choose the peer used to broadcast the message via Lightpush and publish the message to a pubsub topic derived from the content topic of the message. This behaviour can be controlled via options: ### Options - +- `lightpush.WithPubSubTopic(topic)` - broadcast the message using a custom pubsub topic +- `lightpush.WithDefaultPubsubTopic()` - broadcast the message to the default pubsub topic - `lightpush.WithPeer(peerID)` - use an specific peer ID (which should be part of the node peerstore) to broadcast the message with - `lightpush.WithAutomaticPeerSelection(host)` - automatically select a peer that supports lightpush protocol from the peerstore to broadcast the message with - `lightpush.WithFastestPeerSelection(ctx)` - automatically select a peer based on its ping reply time - - - diff --git a/docs/api/relay.md b/docs/api/relay.md index 3525ab2c3..84f3530c2 100644 --- a/docs/api/relay.md +++ b/docs/api/relay.md @@ -41,34 +41,19 @@ One of these options must be specified when instantiating a node supporting the ## Receiving messages ```go ... -sub, err := wakuNode.Relay().Subscribe(context.Background()) +contentFilter := protocol.NewContentFilter(relay.DefaultWakuTopic) +sub, err := wakuNode.Relay().Subscribe(context.Background, contentFilter) ([]*Subscription, error) if err != nil { fmt.Println(err) return } -for value := range sub.C { +for value := range sub[0].C { fmt.Println("Received msg:", string(value.Message().Payload)) } ... ``` -To receive messages sent via the relay protocol, you need to subscribe to a pubsub topic. This can be done via any of these functions: -- `wakuNode.Relay().Subscribe(ctx)` - subscribes to the default waku pubsub topic `/waku/2/default-waku/proto` -- `wakuNode.Relay().SubscribeToTopic(ctx, topic)` - subscribes to a custom pubsub topic - -These functions return a `Subscription` struct containing a channel on which messages will be received. To stop receiving messages in this channel `sub.Unsubscribe()` can be executed which will close the channel (without unsubscribing from the pubsub topic) - -> Pubsub topics should follow the [recommended usage](https://rfc.vac.dev/spec/23/) structure. For this purpose, the `NewPubsubTopic` helper function was created: -```go -import "github.com/waku-org/go-waku/waku/v2/protocol" - -topic := protocol.NewPubsubTopic("the_topic_name", "the_encoding") -/* -fmt.Println(topic.String()) // => `/waku/2/the_topic_name/the_encoding` -*/ -``` - - +To receive messages sent via the relay protocol, you need to subscribe specifying a content filter with the function `Subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error)`. This functions return a list of `Subscription` struct containing a channel on which messages will be received. To stop receiving messages `WakuRelay`'s `Unsubscribe(ctx context.Context, contentFilter waku_proto.ContentFilter) error` can be executed which will close the channel (without unsubscribing from the pubsub topic) which will make sure the subscription is stopped, and if no other subscriptions exist for underlying pubsub topic, the pubsub is also unsubscribed. ## Sending messages @@ -95,11 +80,13 @@ if err != nil { To send a message, it needs to be wrapped into a [`WakuMessage`](https://rfc.vac.dev/spec/14/) protobuffer. The payload of the message is not limited to strings. Any kind of data that can be serialized into a `[]byte` can be sent as long as it does not exceed the maximum length a message can have (~1MB) -The following functions can be used to publish a message: -- `wakuNode.Relay().Publish(ctx, msg)` - to send a message to the default waku pubsub topic -- `wakuNode.Relay().PublishToTopic(ctx, msg, topic)` - to send a message to a custom pubsub topic +`wakuNode.Relay().Publish(ctx, msg, opts...)` is used to publish a message. This function will return a message id on success, or an error if the message could not be published. -Both of these functions will return a message id on success, or an error if the message could not be published. +If no options are specified, go-waku will automatically choose the peer used to broadcast the message via Relay and publish the message to a pubsub topic derived from the content topic of the message. This behaviour can be controlled via options: + +### Options +- `relay.WithPubSubTopic(topic)` - broadcast the message using a custom pubsub topic +- `relay.WithDefaultPubsubTopic()` - broadcast the message to the default pubsub topic > If `WithWakuRelayAndMinPeers` was used during the instantiation of the wakuNode, it should be possible to verify if there's enough peers for publishing to a topic with `wakuNode.Relay().EnoughPeersToPublish()` and `wakuNode.Relay().EnoughPeersToPublishToTopic(topic)` diff --git a/examples/basic2/main.go b/examples/basic2/main.go index 1cad0b8f2..ce93be2ce 100644 --- a/examples/basic2/main.go +++ b/examples/basic2/main.go @@ -103,7 +103,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, contentTopic string, ms Timestamp: timestamp, } - _, err = wakuNode.Relay().Publish(ctx, msg) + _, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()) if err != nil { log.Error("Error sending a message", zap.Error(err)) } diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index fcf7e2218..847a30329 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -319,18 +319,18 @@ func (c *Chat) publish(ctx context.Context, message string) error { } if c.options.LightPush.Enable { - var lightOpt lightpush.Option + lightOpt := []lightpush.Option{lightpush.WithDefaultPubsubTopic()} var peerID peer.ID peerID, err = options.LightPush.NodePeerID() if err != nil { - lightOpt = lightpush.WithAutomaticPeerSelection() + lightOpt = append(lightOpt, lightpush.WithAutomaticPeerSelection()) } else { - lightOpt = lightpush.WithPeer(peerID) + lightOpt = append(lightOpt, lightpush.WithPeer(peerID)) } - _, err = c.node.Lightpush().Publish(c.ctx, wakuMsg, lightOpt) + _, err = c.node.Lightpush().Publish(c.ctx, wakuMsg, lightOpt...) } else { - _, err = c.node.Relay().Publish(ctx, wakuMsg) + _, err = c.node.Relay().Publish(ctx, wakuMsg, relay.WithDefaultPubsubTopic()) } return err diff --git a/examples/filter2/main.go b/examples/filter2/main.go index a4f24c1a5..70dd1dc7d 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -19,6 +19,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -98,6 +99,7 @@ func main() { // Send FilterRequest from light node to full node cf := protocol.ContentFilter{ + PubsubTopic: relay.DefaultWakuTopic, ContentTopics: protocol.NewContentTopicSet(contentTopic), } @@ -157,7 +159,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { Timestamp: timestamp, } - _, err := wakuNode.Relay().Publish(ctx, msg) + _, err := wakuNode.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubSubTopic.String())) if err != nil { log.Error("Error sending a message: ", err) } diff --git a/examples/noise/main.go b/examples/noise/main.go index 80f27d4a8..f5d0998cd 100644 --- a/examples/noise/main.go +++ b/examples/noise/main.go @@ -18,7 +18,9 @@ import ( n "github.com/waku-org/go-noise" "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/noise" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -186,7 +188,7 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode, pairingObj *noise.P msg.Timestamp = wakuNode.Timesource().Now().UnixNano() - _, err = wakuNode.Relay().Publish(ctx, msg) + _, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()) if err != nil { log.Error("Error sending a message", zap.Error(err)) } @@ -194,13 +196,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode, pairingObj *noise.P } func readLoop(ctx context.Context, wakuNode *node.WakuNode, pairingObj *noise.Pairing) { - sub, err := wakuNode.Relay().Subscribe(ctx) + sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic)) if err != nil { log.Error("Could not subscribe", zap.Error(err)) return } - for value := range sub.Ch { + for value := range sub[0].Ch { if value.Message().ContentTopic != pairingObj.ContentTopic { continue } diff --git a/examples/rln/main.go b/examples/rln/main.go index 6a2aec2a7..ee8d70993 100644 --- a/examples/rln/main.go +++ b/examples/rln/main.go @@ -17,6 +17,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/payload" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -128,7 +129,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { log.Error("Error appending proof", zap.Error(err)) } - _, err = wakuNode.Relay().PublishToTopic(ctx, msg, pubsubTopic.String()) + _, err = wakuNode.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubsubTopic.String())) if err != nil { log.Error("Error sending a message", zap.Error(err)) } diff --git a/library/lightpush.go b/library/lightpush.go index 16389a1fb..62bc2bee8 100644 --- a/library/lightpush.go +++ b/library/lightpush.go @@ -41,7 +41,7 @@ func lightpushPublish(msg *pb.WakuMessage, pubsubTopic string, peerID string, ms lpOptions = append(lpOptions, lightpush.WithPubSubTopic(pubsubTopic)) } - hash, err := wakuState.node.Lightpush().PublishToTopic(ctx, msg, lpOptions...) + hash, err := wakuState.node.Lightpush().Publish(ctx, msg, lpOptions...) return hexutil.Encode(hash), err } diff --git a/library/relay.go b/library/relay.go index 63a5d2327..0cd2093a0 100644 --- a/library/relay.go +++ b/library/relay.go @@ -39,7 +39,7 @@ func relayPublish(msg *pb.WakuMessage, pubsubTopic string, ms int) (string, erro ctx = context.Background() } - hash, err := wakuState.node.Relay().PublishToTopic(ctx, msg, pubsubTopic) + hash, err := wakuState.node.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubsubTopic)) return hexutil.Encode(hash), err } diff --git a/tests/connection_test.go b/tests/connection_test.go index 1bc984500..910679da0 100644 --- a/tests/connection_test.go +++ b/tests/connection_test.go @@ -72,6 +72,6 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) erro Timestamp: timestamp, } - _, err = wakuNode.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic) + _, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()) return err } diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index 9d4d75e85..8becee599 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -208,7 +208,7 @@ func Test500(t *testing.T) { msg := createTestMsg(0) msg.Payload = int2Bytes(i) msg.Timestamp = int64(i) - if _, err := wakuNode2.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic); err != nil { + if _, err := wakuNode2.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()); err != nil { require.Fail(t, "Could not publish all messages") } time.Sleep(5 * time.Millisecond) @@ -292,7 +292,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) { time.Sleep(500 * time.Millisecond) - if _, err := wakuNode1.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic); err != nil { + if _, err := wakuNode1.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()); err != nil { require.Fail(t, "Could not publish all messages") } diff --git a/waku/v2/protocol/filter/filter_proto_ident_test.go b/waku/v2/protocol/filter/filter_proto_ident_test.go index d41c70f8e..03919d8c6 100644 --- a/waku/v2/protocol/filter/filter_proto_ident_test.go +++ b/waku/v2/protocol/filter/filter_proto_ident_test.go @@ -5,16 +5,18 @@ import ( "encoding/hex" "errors" "fmt" - "github.com/libp2p/go-msgio/pbio" - "github.com/waku-org/go-waku/waku/v2/peermanager" - "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" - "golang.org/x/exp/slices" "math" "net/http" "strings" "sync" "time" + "github.com/libp2p/go-msgio/pbio" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "golang.org/x/exp/slices" + libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/peer" @@ -30,7 +32,7 @@ func (s *FilterTestSuite) TestCreateSubscription() { // Initial subscribe s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) }, s.subDetails[0].C) @@ -42,7 +44,7 @@ func (s *FilterTestSuite) TestModifySubscription() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) }, s.subDetails[0].C) @@ -52,7 +54,7 @@ func (s *FilterTestSuite) TestModifySubscription() { s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID()) s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) }, s.subDetails[0].C) @@ -64,13 +66,13 @@ func (s *FilterTestSuite) TestMultipleMessages() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), s.testTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) }, s.subDetails[0].C) s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) }, s.subDetails[0].C) @@ -286,7 +288,7 @@ func (s *FilterTestSuite) TestIncorrectPushIdentifier() { time.Sleep(1 * time.Second) // Send message - _, err = s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic) + _, err = s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) // Message should never arrive -> exit after timeout diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index 0bdada3ec..b1cbfcb33 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -275,13 +275,13 @@ func (s *FilterTestSuite) publishMsg(topic, contentTopic string, optionalPayload payload = "123" } - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch(), payload), topic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch(), payload), relay.WithPubSubTopic(topic)) s.Require().NoError(err) } func (s *FilterTestSuite) publishMessages(msgs []WakuMsg) { for _, m := range msgs { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(m.contentTopic, utils.GetUnixEpoch(), m.payload), m.pubSubTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(m.contentTopic, utils.GetUnixEpoch(), m.payload), relay.WithPubSubTopic(m.pubSubTopic)) s.Require().NoError(err) } } @@ -495,7 +495,7 @@ func (s *FilterTestSuite) TestAutoShard() { s.log.Info("Testing Autoshard:CreateSubscription") s.subDetails = s.subscribe("", s.testContentTopic, s.fullNodeHost.ID()) s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) }, s.subDetails[0].C) @@ -528,7 +528,7 @@ func (s *FilterTestSuite) TestAutoShard() { s.subDetails = s.subscribe("", newContentTopic, s.fullNodeHost.ID()) s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) }, s.subDetails[0].C) diff --git a/waku/v2/protocol/legacy_filter/waku_filter_test.go b/waku/v2/protocol/legacy_filter/waku_filter_test.go index a4aa32bb4..4f35ace2e 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter_test.go +++ b/waku/v2/protocol/legacy_filter/waku_filter_test.go @@ -110,7 +110,7 @@ func TestWakuFilter(t *testing.T) { require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) }() - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) + _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) require.NoError(t, err) wg.Wait() @@ -127,7 +127,7 @@ func TestWakuFilter(t *testing.T) { } }() - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", utils.GetUnixEpoch()), testTopic) + _, err = node2.Publish(ctx, tests.CreateWakuMessage("TopicB", utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) require.NoError(t, err) wg.Wait() @@ -149,7 +149,7 @@ func TestWakuFilter(t *testing.T) { time.Sleep(1 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) + _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) require.NoError(t, err) wg.Wait() } @@ -207,7 +207,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { }() - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) + _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) require.NoError(t, err) wg.Wait() @@ -217,7 +217,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { time.Sleep(1 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) + _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) require.NoError(t, err) // TODO: find out how to eliminate this sleep @@ -226,7 +226,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { time.Sleep(3 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) + _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) require.NoError(t, err) time.Sleep(1 * time.Second) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index a54dee249..3439c17b4 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -118,7 +118,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream) // TODO: Assumes success, should probably be extended to check for network, peers, etc // It might make sense to use WithReadiness option here? - _, err = wakuLP.relay.PublishToTopic(ctx, message, pubSubTopic) + _, err = wakuLP.relay.Publish(ctx, message, relay.WithPubSubTopic(pubSubTopic)) if err != nil { logger.Error("publishing message", zap.Error(err)) wakuLP.metrics.RecordError(messagePushFailure) @@ -261,9 +261,10 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe return params, nil } -// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol -// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding. -func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { +// Publish is used to broadcast a WakuMessage to the pubSubTopic (which is derived from the +// contentTopic) via lightpush protocol. If auto-sharding is not to be used, then the +// `WithPubSubTopic` option should be provided to publish the message to an specific pubSubTopic +func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { if message == nil { return nil, errors.New("message can't be null") } @@ -289,9 +290,3 @@ func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.Wa return nil, errors.New(response.Info) } - -// Publish is used to broadcast a WakuMessage to the pubSubTopic (which is derived from the contentTopic) via lightpush protocol -// If auto-sharding is not to be used, then PublishToTopic API should be used -func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { - return wakuLP.PublishToTopic(ctx, message, opts...) -} diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index 55dba60af..d1627c168 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -5,6 +5,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" ) @@ -40,12 +41,6 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { } } -func WithPubSubTopic(pubsubTopic string) Option { - return func(params *lightPushParameters) { - params.pubsubTopic = pubsubTopic - } -} - // WithFastestPeerSelection is an option used to select a peer from the peer store // with the lowest ping. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer @@ -56,6 +51,20 @@ func WithFastestPeerSelection(fromThesePeers ...peer.ID) Option { } } +// WithPubSubTopic is used to specify the pubsub topic on which a WakuMessage will be broadcasted +func WithPubSubTopic(pubsubTopic string) Option { + return func(params *lightPushParameters) { + params.pubsubTopic = pubsubTopic + } +} + +// WithDefaultPubsubTopic is used to indicate that the message should be broadcasted in the default pubsub topic +func WithDefaultPubsubTopic() Option { + return func(params *lightPushParameters) { + params.pubsubTopic = relay.DefaultWakuTopic + } +} + // WithRequestID is an option to set a specific request ID to be used when // publishing a message func WithRequestID(requestID []byte) Option { diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index ab4455a8e..41beb261d 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -109,7 +109,7 @@ func TestWakuLightPush(t *testing.T) { lpOptions = append(lpOptions, WithPeer(host2.ID())) // Checking that msg hash is correct - hash, err := client.PublishToTopic(ctx, msg2, lpOptions...) + hash, err := client.Publish(ctx, msg2, lpOptions...) require.NoError(t, err) require.Equal(t, protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), string(testTopic)).Hash(), hash) wg.Wait() @@ -141,7 +141,7 @@ func TestWakuLightPushNoPeers(t *testing.T) { var lpOptions []Option lpOptions = append(lpOptions, WithPubSubTopic(testTopic)) - _, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), lpOptions...) + _, err = client.Publish(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), lpOptions...) require.Errorf(t, err, "no suitable remote peers") } diff --git a/waku/v2/protocol/noise/pairing_relay_messenger.go b/waku/v2/protocol/noise/pairing_relay_messenger.go index 489fc446f..05a9bdd53 100644 --- a/waku/v2/protocol/noise/pairing_relay_messenger.go +++ b/waku/v2/protocol/noise/pairing_relay_messenger.go @@ -129,7 +129,7 @@ func (r *NoiseWakuRelay) Publish(ctx context.Context, contentTopic string, paylo message.ContentTopic = contentTopic message.Timestamp = r.timesource.Now().UnixNano() - _, err = r.relay.PublishToTopic(ctx, message, r.pubsubTopic) + _, err = r.relay.Publish(ctx, message, relay.WithPubSubTopic(r.pubsubTopic)) return err } diff --git a/waku/v2/protocol/relay/options.go b/waku/v2/protocol/relay/options.go new file mode 100644 index 000000000..9d7979fe9 --- /dev/null +++ b/waku/v2/protocol/relay/options.go @@ -0,0 +1,22 @@ +package relay + +type publishParameters struct { + pubsubTopic string +} + +// PublishOption is the type of options accepted when publishing WakuMessages +type PublishOption func(*publishParameters) + +// WithPubSubTopic is used to specify the pubsub topic on which a WakuMessage will be broadcasted +func WithPubSubTopic(pubsubTopic string) PublishOption { + return func(params *publishParameters) { + params.pubsubTopic = pubsubTopic + } +} + +// WithPubSubTopic is used to indicate that the message should be broadcasted in the default pubsub topic +func WithDefaultPubsubTopic() PublishOption { + return func(params *publishParameters) { + params.pubsubTopic = DefaultWakuTopic + } +} diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index d051e3087..f55eba1e3 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -223,8 +223,10 @@ func (w *WakuRelay) subscribeToPubsubTopic(topic string) (subs *pubsub.Subscript return sub, nil } -// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic -func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, topic string) ([]byte, error) { +// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic. The pubsubTopic is derived from contentTopic +// specified in the message via autosharding. To publish to a specific pubsubTopic, the `WithPubSubTopic` option should +// be provided +func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts ...PublishOption) ([]byte, error) { // Publish a `WakuMessage` to a PubSub topic. if w.pubsub == nil { return nil, errors.New("PubSub hasn't been set") @@ -234,15 +236,28 @@ func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, return nil, errors.New("message can't be null") } - if err := message.Validate(); err != nil { + err := message.Validate() + if err != nil { return nil, err } - if !w.EnoughPeersToPublishToTopic(topic) { + params := new(publishParameters) + for _, opt := range opts { + opt(params) + } + + if params.pubsubTopic == "" { + params.pubsubTopic, err = waku_proto.GetPubSubTopicFromContentTopic(message.ContentTopic) + if err != nil { + return nil, err + } + } + + if !w.EnoughPeersToPublishToTopic(params.pubsubTopic) { return nil, errors.New("not enough peers to publish") } - pubSubTopic, err := w.upsertTopic(topic) + pubSubTopic, err := w.upsertTopic(params.pubsubTopic) if err != nil { return nil, err } @@ -257,23 +272,13 @@ func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, return nil, err } - hash := message.Hash(topic) + hash := message.Hash(params.pubsubTopic) - w.log.Debug("waku.relay published", zap.String("pubsubTopic", topic), logging.HexString("hash", hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload))) + w.log.Debug("waku.relay published", zap.String("pubsubTopic", params.pubsubTopic), logging.HexString("hash", hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload))) return hash, nil } -// Publish is used to broadcast a WakuMessage, the pubsubTopic is derived from contentTopic specified in the message via autosharding. -// To publish to a specific pubsubTopic, please use PublishToTopic -func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage) ([]byte, error) { - pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(message.ContentTopic) - if err != nil { - return nil, err - } - return w.PublishToTopic(ctx, message, pubSubTopic) -} - func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) { pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) if err != nil { diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index e49db217f..8130dc4f1 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -63,7 +63,7 @@ func TestWakuRelay(t *testing.T) { ContentTopic: "test", Timestamp: 0, } - _, err = relay.PublishToTopic(context.Background(), msg, testTopic) + _, err = relay.Publish(context.Background(), msg, WithPubSubTopic(testTopic)) require.NoError(t, err) time.Sleep(2 * time.Second) @@ -273,7 +273,7 @@ func TestWakuRelayAutoShard(t *testing.T) { Timestamp: 0, } - _, err = relay.PublishToTopic(context.Background(), msg1, subs[0].contentFilter.PubsubTopic) + _, err = relay.Publish(context.Background(), msg1, WithPubSubTopic(subs[0].contentFilter.PubsubTopic)) require.NoError(t, err) wg = waitForMsg(t, subs1[0].Ch, testcTopic1) @@ -300,7 +300,7 @@ func TestWakuRelayAutoShard(t *testing.T) { Timestamp: 1, } - _, err = relay.PublishToTopic(context.Background(), msg2, subs[0].contentFilter.PubsubTopic) + _, err = relay.Publish(context.Background(), msg2, WithPubSubTopic(subs[0].contentFilter.PubsubTopic)) require.NoError(t, err) wg2.Wait() From 4584bb432487e6448ec7c7f886733ce1eba60f92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Mon, 30 Oct 2023 12:55:36 -0400 Subject: [PATCH 4/7] refactor: validate protobuffer for store (#841) --- waku/v2/protocol/lightpush/pb/validation.go | 2 +- waku/v2/protocol/store/pb/validation.go | 68 ++++++++++++ waku/v2/protocol/store/pb/validation_test.go | 42 +++++++ waku/v2/protocol/store/waku_store_client.go | 103 ++++++++++-------- waku/v2/protocol/store/waku_store_common.go | 12 -- waku/v2/protocol/store/waku_store_protocol.go | 91 +++++++++++----- 6 files changed, 230 insertions(+), 88 deletions(-) create mode 100644 waku/v2/protocol/store/pb/validation.go create mode 100644 waku/v2/protocol/store/pb/validation_test.go diff --git a/waku/v2/protocol/lightpush/pb/validation.go b/waku/v2/protocol/lightpush/pb/validation.go index c2f0218b0..e45d5b313 100644 --- a/waku/v2/protocol/lightpush/pb/validation.go +++ b/waku/v2/protocol/lightpush/pb/validation.go @@ -7,7 +7,7 @@ var ( errMissingQuery = errors.New("missing Query field") errMissingMessage = errors.New("missing Message field") errMissingPubsubTopic = errors.New("missing PubsubTopic field") - errRequestIDMismatch = errors.New("RequestID in response does not match request") + errRequestIDMismatch = errors.New("requestID in response does not match request") errMissingResponse = errors.New("missing Response field") ) diff --git a/waku/v2/protocol/store/pb/validation.go b/waku/v2/protocol/store/pb/validation.go new file mode 100644 index 000000000..740b58086 --- /dev/null +++ b/waku/v2/protocol/store/pb/validation.go @@ -0,0 +1,68 @@ +package pb + +import ( + "errors" +) + +// MaxContentFilters is the maximum number of allowed content filters in a query +const MaxContentFilters = 10 + +var ( + errMissingRequestID = errors.New("missing RequestId field") + errMissingQuery = errors.New("missing Query field") + errRequestIDMismatch = errors.New("requestID in response does not match request") + errMaxContentFilters = errors.New("exceeds the maximum number of content filters allowed") + errEmptyContentTopics = errors.New("one or more content topics specified is empty") +) + +func (x *HistoryQuery) Validate() error { + if len(x.ContentFilters) > MaxContentFilters { + return errMaxContentFilters + } + + for _, m := range x.ContentFilters { + if m.ContentTopic == "" { + return errEmptyContentTopics + } + } + + return nil +} + +func (x *HistoryRPC) ValidateQuery() error { + if x.RequestId == "" { + return errMissingRequestID + } + + if x.Query == nil { + return errMissingQuery + } + + return x.Query.Validate() +} + +func (x *HistoryResponse) Validate() error { + for _, m := range x.Messages { + if err := m.Validate(); err != nil { + return err + } + } + + return nil +} + +func (x *HistoryRPC) ValidateResponse(requestID string) error { + if x.RequestId == "" { + return errMissingRequestID + } + + if x.RequestId != requestID { + return errRequestIDMismatch + } + + if x.Response != nil { + return x.Response.Validate() + } + + return nil +} diff --git a/waku/v2/protocol/store/pb/validation_test.go b/waku/v2/protocol/store/pb/validation_test.go new file mode 100644 index 000000000..09018455b --- /dev/null +++ b/waku/v2/protocol/store/pb/validation_test.go @@ -0,0 +1,42 @@ +package pb + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func cf(val string) *ContentFilter { + return &ContentFilter{ + ContentTopic: val, + } +} + +func TestValidateRequest(t *testing.T) { + request := HistoryRPC{} + require.ErrorIs(t, request.ValidateQuery(), errMissingRequestID) + request.RequestId = "test" + require.ErrorIs(t, request.ValidateQuery(), errMissingQuery) + request.Query = &HistoryQuery{ + ContentFilters: []*ContentFilter{ + cf("1"), cf("2"), cf("3"), cf("4"), cf("5"), + cf("6"), cf("7"), cf("8"), cf("9"), cf("10"), + cf("11"), + }, + } + require.ErrorIs(t, request.ValidateQuery(), errMaxContentFilters) + request.Query.ContentFilters = []*ContentFilter{cf("a"), cf("")} + require.ErrorIs(t, request.ValidateQuery(), errEmptyContentTopics) + request.Query.ContentFilters = []*ContentFilter{cf("a")} + require.NoError(t, request.ValidateQuery()) +} + +func TestValidateResponse(t *testing.T) { + response := HistoryRPC{} + require.ErrorIs(t, response.ValidateResponse("test"), errMissingRequestID) + response.RequestId = "test1" + require.ErrorIs(t, response.ValidateResponse("test"), errRequestIDMismatch) + response.RequestId = "test" + response.Response = &HistoryResponse{} + require.NoError(t, response.ValidateResponse("test")) +} diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 3d90d4e39..d1f25e6a6 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -170,7 +170,7 @@ func DefaultOptions() []HistoryRequestOption { } } -func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestID []byte) (*pb.HistoryResponse, error) { +func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.HistoryRPC, selectedPeer peer.ID) (*pb.HistoryResponse, error) { logger := store.log.With(logging.HostID("peer", selectedPeer)) logger.Info("querying message history") @@ -181,8 +181,6 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec return nil, err } - historyRequest := &pb.HistoryRPC{Query: q, RequestId: hex.EncodeToString(requestID)} - writer := pbio.NewDelimitedWriter(stream) reader := pbio.NewDelimitedReader(stream, math.MaxInt32) @@ -209,6 +207,8 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec stream.Close() + // nwaku does not return a response if there are no results due to the way their + // protobuffer library works. this condition once they have proper proto3 support if historyResponseRPC.Response == nil { // Empty response return &pb.HistoryResponse{ @@ -216,10 +216,14 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec }, nil } + if err := historyResponseRPC.ValidateResponse(historyRequest.RequestId); err != nil { + return nil, err + } + return historyResponseRPC.Response, nil } -func (store *WakuStore) localQuery(query *pb.HistoryQuery, requestID []byte) (*pb.HistoryResponse, error) { +func (store *WakuStore) localQuery(historyQuery *pb.HistoryRPC) (*pb.HistoryResponse, error) { logger := store.log logger.Info("querying local message history") @@ -228,8 +232,8 @@ func (store *WakuStore) localQuery(query *pb.HistoryQuery, requestID []byte) (*p } historyResponseRPC := &pb.HistoryRPC{ - RequestId: hex.EncodeToString(requestID), - Response: store.FindMessages(query), + RequestId: historyQuery.RequestId, + Response: store.FindMessages(historyQuery.Query), } if historyResponseRPC.Response == nil { @@ -243,21 +247,6 @@ func (store *WakuStore) localQuery(query *pb.HistoryQuery, requestID []byte) (*p } func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) { - q := &pb.HistoryQuery{ - PubsubTopic: query.Topic, - ContentFilters: []*pb.ContentFilter{}, - StartTime: query.StartTime, - EndTime: query.EndTime, - PagingInfo: &pb.PagingInfo{}, - } - - for _, cf := range query.ContentTopics { - q.ContentFilters = append(q.ContentFilters, &pb.ContentFilter{ContentTopic: cf}) - } - - if len(q.ContentFilters) > MaxContentFilters { - return nil, ErrMaxContentFilters - } params := new(HistoryRequestParameters) params.s = store @@ -283,38 +272,53 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR } } + historyRequest := &pb.HistoryRPC{ + RequestId: hex.EncodeToString(params.requestID), + Query: &pb.HistoryQuery{ + PubsubTopic: query.Topic, + ContentFilters: []*pb.ContentFilter{}, + StartTime: query.StartTime, + EndTime: query.EndTime, + PagingInfo: &pb.PagingInfo{}, + }, + } + + for _, cf := range query.ContentTopics { + historyRequest.Query.ContentFilters = append(historyRequest.Query.ContentFilters, &pb.ContentFilter{ContentTopic: cf}) + } + if !params.localQuery && params.selectedPeer == "" { store.metrics.RecordError(peerNotFoundFailure) return nil, ErrNoPeersAvailable } - if len(params.requestID) == 0 { - return nil, ErrInvalidID - } - if params.cursor != nil { - q.PagingInfo.Cursor = params.cursor + historyRequest.Query.PagingInfo.Cursor = params.cursor } if params.asc { - q.PagingInfo.Direction = pb.PagingInfo_FORWARD + historyRequest.Query.PagingInfo.Direction = pb.PagingInfo_FORWARD } else { - q.PagingInfo.Direction = pb.PagingInfo_BACKWARD + historyRequest.Query.PagingInfo.Direction = pb.PagingInfo_BACKWARD } pageSize := params.pageSize if pageSize == 0 || pageSize > uint64(MaxPageSize) { pageSize = MaxPageSize } - q.PagingInfo.PageSize = pageSize + historyRequest.Query.PagingInfo.PageSize = pageSize + + err := historyRequest.ValidateQuery() + if err != nil { + return nil, err + } var response *pb.HistoryResponse - var err error if params.localQuery { - response, err = store.localQuery(q, params.requestID) + response, err = store.localQuery(historyRequest) } else { - response, err = store.queryFrom(ctx, q, params.selectedPeer, params.requestID) + response, err = store.queryFrom(ctx, historyRequest, params.selectedPeer) } if err != nil { return nil, err @@ -327,7 +331,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR result := &Result{ store: store, Messages: response.Messages, - query: q, + query: historyRequest.Query, peerID: params.selectedPeer, } @@ -390,24 +394,27 @@ func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) { }, nil } - q := &pb.HistoryQuery{ - PubsubTopic: r.Query().PubsubTopic, - ContentFilters: r.Query().ContentFilters, - StartTime: r.Query().StartTime, - EndTime: r.Query().EndTime, - PagingInfo: &pb.PagingInfo{ - PageSize: r.Query().PagingInfo.PageSize, - Direction: r.Query().PagingInfo.Direction, - Cursor: &pb.Index{ - Digest: r.Cursor().Digest, - ReceiverTime: r.Cursor().ReceiverTime, - SenderTime: r.Cursor().SenderTime, - PubsubTopic: r.Cursor().PubsubTopic, + historyRequest := &pb.HistoryRPC{ + RequestId: hex.EncodeToString(protocol.GenerateRequestID()), + Query: &pb.HistoryQuery{ + PubsubTopic: r.Query().PubsubTopic, + ContentFilters: r.Query().ContentFilters, + StartTime: r.Query().StartTime, + EndTime: r.Query().EndTime, + PagingInfo: &pb.PagingInfo{ + PageSize: r.Query().PagingInfo.PageSize, + Direction: r.Query().PagingInfo.Direction, + Cursor: &pb.Index{ + Digest: r.Cursor().Digest, + ReceiverTime: r.Cursor().ReceiverTime, + SenderTime: r.Cursor().SenderTime, + PubsubTopic: r.Cursor().PubsubTopic, + }, }, }, } - response, err := store.queryFrom(ctx, q, r.PeerID(), protocol.GenerateRequestID()) + response, err := store.queryFrom(ctx, historyRequest, r.PeerID()) if err != nil { return nil, err } @@ -420,7 +427,7 @@ func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) { started: true, store: store, Messages: response.Messages, - query: q, + query: historyRequest.Query, peerID: r.PeerID(), } diff --git a/waku/v2/protocol/store/waku_store_common.go b/waku/v2/protocol/store/waku_store_common.go index a19b8a423..33d7a6a46 100644 --- a/waku/v2/protocol/store/waku_store_common.go +++ b/waku/v2/protocol/store/waku_store_common.go @@ -20,27 +20,15 @@ const StoreID_v20beta4 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta4") // MaxPageSize is the maximum number of waku messages to return per page const MaxPageSize = 20 -// MaxContentFilters is the maximum number of allowed content filters in a query -const MaxContentFilters = 10 - var ( - // ErrMaxContentFilters is returned when the number of content topics in the query - // exceeds the limit - ErrMaxContentFilters = errors.New("exceeds the maximum number of content filters allowed") // ErrNoPeersAvailable is returned when there are no store peers in the peer store // that could be used to retrieve message history ErrNoPeersAvailable = errors.New("no suitable remote peers") - // ErrInvalidID is returned when no RequestID is given - ErrInvalidID = errors.New("invalid request id") - // ErrFailedToResumeHistory is returned when the node attempted to retrieve historic // messages to fill its own message history but for some reason it failed ErrFailedToResumeHistory = errors.New("failed to resume the history") - - // ErrFailedQuery is emitted when the query fails to return results - ErrFailedQuery = errors.New("failed to resolve the query") ) type WakuSwap interface { diff --git a/waku/v2/protocol/store/waku_store_protocol.go b/waku/v2/protocol/store/waku_store_protocol.go index 6b5d0b8cc..52084632f 100644 --- a/waku/v2/protocol/store/waku_store_protocol.go +++ b/waku/v2/protocol/store/waku_store_protocol.go @@ -2,6 +2,7 @@ package store import ( "context" + "encoding/hex" "errors" "math" "sync" @@ -33,10 +34,6 @@ func findMessages(query *pb.HistoryQuery, msgProvider MessageProvider) ([]*wpb.W query.PagingInfo.PageSize = MaxPageSize } - if len(query.ContentFilters) > MaxContentFilters { - return nil, nil, ErrMaxContentFilters - } - cursor, queryResult, err := msgProvider.Query(query) if err != nil { return nil, nil, err @@ -181,6 +178,18 @@ func (store *WakuStore) onRequest(stream network.Stream) { return } + if err := historyRPCRequest.ValidateQuery(); err != nil { + logger.Error("invalid request received", zap.Error(err)) + store.metrics.RecordError(decodeRPCFailure) + if err := stream.Reset(); err != nil { + store.log.Error("resetting connection", zap.Error(err)) + } + + // TODO: If store protocol is updated to include error messages + // `err.Error()` can be returned as a response + return + } + logger = logger.With(zap.String("id", historyRPCRequest.RequestId)) if query := historyRPCRequest.Query; query != nil { logger = logger.With(logging.Filters(query.GetContentFilters())) @@ -238,42 +247,59 @@ func (store *WakuStore) Stop() { store.wg.Wait() } -func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, candidateList []peer.ID) ([]*wpb.WakuMessage, error) { - // loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully - // returns the number of retrieved messages, or error if all the requests fail +type queryLoopCandidateResponse struct { + peerID peer.ID + response *pb.HistoryResponse + err error +} + +func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, candidateList []peer.ID) ([]*queryLoopCandidateResponse, error) { + err := query.Validate() + if err != nil { + return nil, err + } queryWg := sync.WaitGroup{} queryWg.Add(len(candidateList)) - resultChan := make(chan *pb.HistoryResponse, len(candidateList)) + resultChan := make(chan *queryLoopCandidateResponse, len(candidateList)) + // loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully + // returns the number of retrieved messages, or error if all the requests fail for _, peer := range candidateList { func() { defer queryWg.Done() - result, err := store.queryFrom(ctx, query, peer, protocol.GenerateRequestID()) - if err == nil { - resultChan <- result - return + + historyRequest := &pb.HistoryRPC{ + RequestId: hex.EncodeToString(protocol.GenerateRequestID()), + Query: query, + } + + result := &queryLoopCandidateResponse{ + peerID: peer, + } + + response, err := store.queryFrom(ctx, historyRequest, peer) + if err != nil { + store.log.Error("resuming history", logging.HostID("peer", peer), zap.Error(err)) + result.err = err + } else { + result.response = response } - store.log.Error("resuming history", logging.HostID("peer", peer), zap.Error(err)) + + resultChan <- result }() } queryWg.Wait() close(resultChan) - var messages []*wpb.WakuMessage - hasResults := false + var queryLoopResults []*queryLoopCandidateResponse for result := range resultChan { - hasResults = true - messages = append(messages, result.Messages...) + queryLoopResults = append(queryLoopResults, result) } - if hasResults { - return messages, nil - } - - return nil, ErrFailedQuery + return queryLoopResults, nil } func (store *WakuStore) findLastSeen() (int64, error) { @@ -323,20 +349,31 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList return -1, ErrNoPeersAvailable } - messages, err := store.queryLoop(ctx, rpc, peerList) + queryLoopResults, err := store.queryLoop(ctx, rpc, peerList) if err != nil { store.log.Error("resuming history", zap.Error(err)) return -1, ErrFailedToResumeHistory } msgCount := 0 - for _, msg := range messages { - if err = store.storeMessage(protocol.NewEnvelope(msg, store.timesource.Now().UnixNano(), pubsubTopic)); err == nil { - msgCount++ + for _, r := range queryLoopResults { + if err == nil && r.response.GetError() != pb.HistoryResponse_NONE { + r.err = errors.New("invalid cursor") + } + + if r.err != nil { + store.log.Warn("could not resume message history", zap.Error(r.err), logging.HostID("peer", r.peerID)) + continue + } + + for _, msg := range r.response.Messages { + if err = store.storeMessage(protocol.NewEnvelope(msg, store.timesource.Now().UnixNano(), pubsubTopic)); err == nil { + msgCount++ + } } } - store.log.Info("retrieved messages since the last online time", zap.Int("messages", len(messages))) + store.log.Info("retrieved messages since the last online time", zap.Int("messages", msgCount)) return msgCount, nil } From cf82f66d12ed84550722f8da156a9fb39a85114b Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 30 Oct 2023 16:25:21 -0400 Subject: [PATCH 5/7] revert(#820): msg digest matches msg hash --- waku/v2/protocol/envelope.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/waku/v2/protocol/envelope.go b/waku/v2/protocol/envelope.go index f72b1997d..6b0a0f7b5 100644 --- a/waku/v2/protocol/envelope.go +++ b/waku/v2/protocol/envelope.go @@ -1,6 +1,7 @@ package protocol import ( + "github.com/waku-org/go-waku/waku/v2/hash" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" ) @@ -19,11 +20,12 @@ type Envelope struct { // as well as generating a hash based on the bytes that compose the message func NewEnvelope(msg *wpb.WakuMessage, receiverTime int64, pubSubTopic string) *Envelope { messageHash := msg.Hash(pubSubTopic) + digest := hash.SHA256([]byte(msg.ContentTopic), msg.Payload) return &Envelope{ msg: msg, hash: messageHash, index: &pb.Index{ - Digest: messageHash, + Digest: digest[:], ReceiverTime: receiverTime, SenderTime: msg.Timestamp, PubsubTopic: pubSubTopic, From 48acff4a5cff96c27a4d2e1c1b6a3768078e2c87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Mon, 30 Oct 2023 19:20:13 -0400 Subject: [PATCH 6/7] feat: add warning about bootnodes not supporting shards (#848) --- waku/v2/discv5/discover.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index e723d20e6..95bb8c781 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -379,9 +379,8 @@ func delayedHasNext(ctx context.Context, iterator enode.Iterator) bool { return true } -// Iterates over the nodes found via discv5 belonging to the node's current shard, and sends them to peerConnector -func (d *DiscoveryV5) peerLoop(ctx context.Context) error { - iterator, err := d.PeerIterator(FilterPredicate(func(n *enode.Node) bool { +func (d *DiscoveryV5) defaultPredicate() Predicate { + return FilterPredicate(func(n *enode.Node) bool { localRS, err := wenr.RelaySharding(d.localnode.Node().Record()) if err != nil { return false @@ -413,7 +412,12 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error { } return false - })) + }) +} + +// Iterates over the nodes found via discv5 belonging to the node's current shard, and sends them to peerConnector +func (d *DiscoveryV5) peerLoop(ctx context.Context) error { + iterator, err := d.PeerIterator(d.defaultPredicate()) if err != nil { d.metrics.RecordError(iteratorFailure) return fmt.Errorf("obtaining iterator: %w", err) @@ -441,6 +445,20 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error { } func (d *DiscoveryV5) runDiscoveryV5Loop(ctx context.Context) { + if len(d.config.Bootnodes) > 0 { + localRS, err := wenr.RelaySharding(d.localnode.Node().Record()) + if err == nil && localRS != nil { + iterator := d.defaultPredicate()(enode.IterNodes(d.config.Bootnodes)) + validBootCount := 0 + for iterator.Next() { + validBootCount++ + } + + if validBootCount == 0 { + d.log.Warn("no discv5 bootstrap nodes share this node configured shards") + } + } + } restartLoop: for { From 36beb9de757a29c20308b5e6e11ee571e93fe666 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Tue, 31 Oct 2023 06:50:13 -0400 Subject: [PATCH 7/7] refactor: fix nomenclature for shards (#849) --- waku/v2/discv5/discover.go | 6 +- waku/v2/node/localnode.go | 2 +- waku/v2/protocol/enr/shards.go | 14 +-- waku/v2/protocol/metadata/waku_metadata.go | 10 +- .../protocol/metadata/waku_metadata_test.go | 20 ++-- waku/v2/protocol/pubsub_topic.go | 28 +++--- waku/v2/protocol/shard.go | 97 ++++++++++--------- waku/v2/rendezvous/rendezvous.go | 4 +- 8 files changed, 94 insertions(+), 87 deletions(-) diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 95bb8c781..1f2e96779 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -400,13 +400,13 @@ func (d *DiscoveryV5) defaultPredicate() Predicate { return false } - if nodeRS.Cluster != localRS.Cluster { + if nodeRS.ClusterID != localRS.ClusterID { return false } // Contains any - for _, idx := range localRS.Indices { - if nodeRS.Contains(localRS.Cluster, idx) { + for _, idx := range localRS.ShardIDs { + if nodeRS.Contains(localRS.ClusterID, idx) { return true } } diff --git a/waku/v2/node/localnode.go b/waku/v2/node/localnode.go index fe64e5588..1d12d535b 100644 --- a/waku/v2/node/localnode.go +++ b/waku/v2/node/localnode.go @@ -319,7 +319,7 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error { if len(rs) == 1 { w.log.Info("updating advertised relay shards in ENR") - if len(rs[0].Indices) != len(topics) { + if len(rs[0].ShardIDs) != len(topics) { w.log.Warn("A mix of named and static shards found. ENR shard will contain only the following shards", zap.Any("shards", rs[0])) } diff --git a/waku/v2/protocol/enr/shards.go b/waku/v2/protocol/enr/shards.go index 0e57eda25..29adc2d57 100644 --- a/waku/v2/protocol/enr/shards.go +++ b/waku/v2/protocol/enr/shards.go @@ -13,9 +13,9 @@ func deleteShardingENREntries(localnode *enode.LocalNode) { localnode.Delete(enr.WithEntry(ShardingIndicesListEnrField, struct{}{})) } -func WithWakuRelayShardingIndicesList(rs protocol.RelayShards) ENROption { +func WithWakuRelayShardList(rs protocol.RelayShards) ENROption { return func(localnode *enode.LocalNode) error { - value, err := rs.IndicesList() + value, err := rs.ShardList() if err != nil { return err } @@ -35,11 +35,11 @@ func WithWakuRelayShardingBitVector(rs protocol.RelayShards) ENROption { func WithWakuRelaySharding(rs protocol.RelayShards) ENROption { return func(localnode *enode.LocalNode) error { - if len(rs.Indices) >= 64 { + if len(rs.ShardIDs) >= 64 { return WithWakuRelayShardingBitVector(rs)(localnode) } - return WithWakuRelayShardingIndicesList(rs)(localnode) + return WithWakuRelayShardList(rs)(localnode) } } @@ -60,7 +60,7 @@ func WithWakuRelayShardingTopics(topics ...string) ENROption { // ENR record accessors -func RelayShardingIndicesList(record *enr.Record) (*protocol.RelayShards, error) { +func RelayShardList(record *enr.Record) (*protocol.RelayShards, error) { var field []byte if err := record.Load(enr.WithEntry(ShardingIndicesListEnrField, &field)); err != nil { if enr.IsNotFound(err) { @@ -69,7 +69,7 @@ func RelayShardingIndicesList(record *enr.Record) (*protocol.RelayShards, error) return nil, err } - res, err := protocol.FromIndicesList(field) + res, err := protocol.FromShardList(field) if err != nil { return nil, err } @@ -95,7 +95,7 @@ func RelayShardingBitVector(record *enr.Record) (*protocol.RelayShards, error) { } func RelaySharding(record *enr.Record) (*protocol.RelayShards, error) { - res, err := RelayShardingIndicesList(record) + res, err := RelayShardList(record) if err != nil { return nil, err } diff --git a/waku/v2/protocol/metadata/waku_metadata.go b/waku/v2/protocol/metadata/waku_metadata.go index 3f0295ea4..aae9db62e 100644 --- a/waku/v2/protocol/metadata/waku_metadata.go +++ b/waku/v2/protocol/metadata/waku_metadata.go @@ -78,8 +78,8 @@ func (wakuM *WakuMetadata) getClusterAndShards() (*uint32, []uint32, error) { } var shards []uint32 - if shard != nil && shard.Cluster == uint16(wakuM.clusterID) { - for _, idx := range shard.Indices { + if shard != nil && shard.ClusterID == uint16(wakuM.clusterID) { + for _, idx := range shard.ShardIDs { shards = append(shards, uint32(idx)) } } @@ -139,9 +139,9 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protoc } result := &protocol.RelayShards{} - result.Cluster = uint16(*response.ClusterId) + result.ClusterID = uint16(*response.ClusterId) for _, i := range response.Shards { - result.Indices = append(result.Indices, uint16(i)) + result.ShardIDs = append(result.ShardIDs, uint16(i)) } return result, nil @@ -226,7 +226,7 @@ func (wakuM *WakuMetadata) Connected(n network.Network, cc network.Conn) { if err == nil { if shard == nil { err = errors.New("no shard reported") - } else if shard.Cluster != wakuM.clusterID { + } else if shard.ClusterID != wakuM.clusterID { err = errors.New("different clusterID reported") } } else { diff --git a/waku/v2/protocol/metadata/waku_metadata_test.go b/waku/v2/protocol/metadata/waku_metadata_test.go index 18eaf9901..58b462b43 100644 --- a/waku/v2/protocol/metadata/waku_metadata_test.go +++ b/waku/v2/protocol/metadata/waku_metadata_test.go @@ -27,14 +27,14 @@ func createWakuMetadata(t *testing.T, rs *protocol.RelayShards) *WakuMetadata { localNode, err := enr.NewLocalnode(key) require.NoError(t, err) - cluster := uint16(0) + clusterID := uint16(0) if rs != nil { err = enr.WithWakuRelaySharding(*rs)(localNode) require.NoError(t, err) - cluster = rs.Cluster + clusterID = rs.ClusterID } - m1 := NewWakuMetadata(cluster, localNode, utils.Logger()) + m1 := NewWakuMetadata(clusterID, localNode, utils.Logger()) m1.SetHost(host) err = m1.Start(context.TODO()) require.NoError(t, err) @@ -65,25 +65,25 @@ func TestWakuMetadataRequest(t *testing.T) { // Query a peer that is subscribed to a shard result, err := m16_1.Request(context.Background(), m16_2.h.ID()) require.NoError(t, err) - require.Equal(t, testShard16, result.Cluster) - require.Equal(t, rs16_2.Indices, result.Indices) + require.Equal(t, testShard16, result.ClusterID) + require.Equal(t, rs16_2.ShardIDs, result.ShardIDs) // Updating the peer shards - rs16_2.Indices = append(rs16_2.Indices, 3, 4) + rs16_2.ShardIDs = append(rs16_2.ShardIDs, 3, 4) err = enr.WithWakuRelaySharding(rs16_2)(m16_2.localnode) require.NoError(t, err) // Query same peer, after that peer subscribes to more shards result, err = m16_1.Request(context.Background(), m16_2.h.ID()) require.NoError(t, err) - require.Equal(t, testShard16, result.Cluster) - require.ElementsMatch(t, rs16_2.Indices, result.Indices) + require.Equal(t, testShard16, result.ClusterID) + require.ElementsMatch(t, rs16_2.ShardIDs, result.ShardIDs) // Query a peer not subscribed to a shard result, err = m16_1.Request(context.Background(), m_noRS.h.ID()) require.NoError(t, err) - require.Equal(t, uint16(0), result.Cluster) - require.Len(t, result.Indices, 0) + require.Equal(t, uint16(0), result.ClusterID) + require.Len(t, result.ShardIDs, 0) } func TestNoNetwork(t *testing.T) { diff --git a/waku/v2/protocol/pubsub_topic.go b/waku/v2/protocol/pubsub_topic.go index b3fcd395f..6b2fd94e7 100644 --- a/waku/v2/protocol/pubsub_topic.go +++ b/waku/v2/protocol/pubsub_topic.go @@ -37,26 +37,26 @@ var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed") // StaticShardingPubsubTopic describes a pubSub topic as per StaticSharding type StaticShardingPubsubTopic struct { - cluster uint16 - shard uint16 + clusterID uint16 + shardID uint16 } // NewStaticShardingPubsubTopic creates a new pubSub topic func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) StaticShardingPubsubTopic { return StaticShardingPubsubTopic{ - cluster: cluster, - shard: shard, + clusterID: cluster, + shardID: shard, } } // Cluster returns the sharded cluster index func (s StaticShardingPubsubTopic) Cluster() uint16 { - return s.cluster + return s.clusterID } // Shard returns the shard number func (s StaticShardingPubsubTopic) Shard() uint16 { - return s.shard + return s.shardID } // Equal compares StaticShardingPubsubTopic @@ -66,7 +66,7 @@ func (s StaticShardingPubsubTopic) Equal(t2 StaticShardingPubsubTopic) bool { // String formats StaticShardingPubsubTopic to RFC 23 specific string format for pubsub topic. func (s StaticShardingPubsubTopic) String() string { - return fmt.Sprintf("%s/%d/%d", StaticShardingPubsubTopicPrefix, s.cluster, s.shard) + return fmt.Sprintf("%s/%d/%d", StaticShardingPubsubTopicPrefix, s.clusterID, s.shardID) } // Parse parses a topic string into a StaticShardingPubsubTopic @@ -100,18 +100,18 @@ func (s *StaticShardingPubsubTopic) Parse(topic string) error { return ErrInvalidNumberFormat } - s.shard = uint16(shardInt) - s.cluster = uint16(clusterInt) + s.shardID = uint16(shardInt) + s.clusterID = uint16(clusterInt) return nil } func ToShardPubsubTopic(topic WakuPubSubTopic) (StaticShardingPubsubTopic, error) { - result, ok := topic.(StaticShardingPubsubTopic) - if !ok { - return StaticShardingPubsubTopic{}, ErrNotShardPubsubTopic - } - return result, nil + result, ok := topic.(StaticShardingPubsubTopic) + if !ok { + return StaticShardingPubsubTopic{}, ErrNotShardPubsubTopic + } + return result, nil } // ToWakuPubsubTopic takes a pubSub topic string and creates a WakuPubsubTopic object. diff --git a/waku/v2/protocol/shard.go b/waku/v2/protocol/shard.go index a49c6e302..5a426182e 100644 --- a/waku/v2/protocol/shard.go +++ b/waku/v2/protocol/shard.go @@ -13,57 +13,64 @@ import ( const MaxShardIndex = uint16(1023) // ClusterIndex is the clusterID used in sharding space. -// For indices allocation and other magic numbers refer to RFC 51 +// For shardIDs allocation and other magic numbers refer to RFC 51 const ClusterIndex = 1 // GenerationZeroShardsCount is number of shards supported in generation-0 const GenerationZeroShardsCount = 8 +var ( + ErrTooManyShards = errors.New("too many shards") + ErrInvalidShard = errors.New("invalid shard") + ErrInvalidShardCount = errors.New("invalid shard count") + ErrExpected130Bytes = errors.New("invalid data: expected 130 bytes") +) + type RelayShards struct { - Cluster uint16 `json:"cluster"` - Indices []uint16 `json:"indices"` + ClusterID uint16 `json:"clusterID"` + ShardIDs []uint16 `json:"shardIDs"` } -func NewRelayShards(cluster uint16, indices ...uint16) (RelayShards, error) { - if len(indices) > math.MaxUint8 { - return RelayShards{}, errors.New("too many indices") +func NewRelayShards(clusterID uint16, shardIDs ...uint16) (RelayShards, error) { + if len(shardIDs) > math.MaxUint8 { + return RelayShards{}, ErrTooManyShards } - indiceSet := make(map[uint16]struct{}) - for _, index := range indices { + shardIDSet := make(map[uint16]struct{}) + for _, index := range shardIDs { if index > MaxShardIndex { - return RelayShards{}, errors.New("invalid index") + return RelayShards{}, ErrInvalidShard } - indiceSet[index] = struct{}{} // dedup + shardIDSet[index] = struct{}{} // dedup } - if len(indiceSet) == 0 { - return RelayShards{}, errors.New("invalid index count") + if len(shardIDSet) == 0 { + return RelayShards{}, ErrInvalidShardCount } - indices = []uint16{} - for index := range indiceSet { - indices = append(indices, index) + shardIDs = []uint16{} + for index := range shardIDSet { + shardIDs = append(shardIDs, index) } - return RelayShards{Cluster: cluster, Indices: indices}, nil + return RelayShards{ClusterID: clusterID, ShardIDs: shardIDs}, nil } func (rs RelayShards) Topics() []WakuPubSubTopic { var result []WakuPubSubTopic - for _, i := range rs.Indices { - result = append(result, NewStaticShardingPubsubTopic(rs.Cluster, i)) + for _, i := range rs.ShardIDs { + result = append(result, NewStaticShardingPubsubTopic(rs.ClusterID, i)) } return result } func (rs RelayShards) Contains(cluster uint16, index uint16) bool { - if rs.Cluster != cluster { + if rs.ClusterID != cluster { return false } found := false - for _, idx := range rs.Indices { + for _, idx := range rs.ShardIDs { if idx == index { found = true } @@ -94,22 +101,22 @@ func TopicsToRelayShards(topic ...string) ([]RelayShards, error) { return nil, err } - indices, ok := dict[ps.cluster] + shardIDs, ok := dict[ps.clusterID] if !ok { - indices = make(map[uint16]struct{}) + shardIDs = make(map[uint16]struct{}) } - indices[ps.shard] = struct{}{} - dict[ps.cluster] = indices + shardIDs[ps.shardID] = struct{}{} + dict[ps.clusterID] = shardIDs } - for cluster, indices := range dict { - idx := make([]uint16, 0, len(indices)) - for index := range indices { - idx = append(idx, index) + for clusterID, shardIDs := range dict { + idx := make([]uint16, 0, len(shardIDs)) + for shardID := range shardIDs { + idx = append(idx, shardID) } - rs, err := NewRelayShards(cluster, idx...) + rs, err := NewRelayShards(clusterID, idx...) if err != nil { return nil, err } @@ -128,23 +135,23 @@ func (rs RelayShards) ContainsTopic(topic string) bool { return rs.ContainsShardPubsubTopic(wTopic) } -func (rs RelayShards) IndicesList() ([]byte, error) { - if len(rs.Indices) > math.MaxUint8 { - return nil, errors.New("indices list too long") +func (rs RelayShards) ShardList() ([]byte, error) { + if len(rs.ShardIDs) > math.MaxUint8 { + return nil, ErrTooManyShards } var result []byte - result = binary.BigEndian.AppendUint16(result, rs.Cluster) - result = append(result, uint8(len(rs.Indices))) - for _, index := range rs.Indices { + result = binary.BigEndian.AppendUint16(result, rs.ClusterID) + result = append(result, uint8(len(rs.ShardIDs))) + for _, index := range rs.ShardIDs { result = binary.BigEndian.AppendUint16(result, index) } return result, nil } -func FromIndicesList(buf []byte) (RelayShards, error) { +func FromShardList(buf []byte) (RelayShards, error) { if len(buf) < 3 { return RelayShards{}, fmt.Errorf("insufficient data: expected at least 3 bytes, got %d bytes", len(buf)) } @@ -156,12 +163,12 @@ func FromIndicesList(buf []byte) (RelayShards, error) { return RelayShards{}, fmt.Errorf("invalid data: `length` field is %d but %d bytes were provided", length, len(buf)) } - var indices []uint16 + shardIDs := make([]uint16, length) for i := 0; i < length; i++ { - indices = append(indices, binary.BigEndian.Uint16(buf[3+2*i:5+2*i])) + shardIDs[i] = binary.BigEndian.Uint16(buf[3+2*i : 5+2*i]) } - return NewRelayShards(cluster, indices...) + return NewRelayShards(cluster, shardIDs...) } func setBit(n byte, pos uint) byte { @@ -181,10 +188,10 @@ func (rs RelayShards) BitVector() []byte { // of. The right-most bit in the bit vector represents shard 0, the left-most // bit represents shard 1023. var result []byte - result = binary.BigEndian.AppendUint16(result, rs.Cluster) + result = binary.BigEndian.AppendUint16(result, rs.ClusterID) vec := make([]byte, 128) - for _, index := range rs.Indices { + for _, index := range rs.ShardIDs { n := vec[index/8] vec[index/8] = byte(setBit(n, uint(index%8))) } @@ -195,11 +202,11 @@ func (rs RelayShards) BitVector() []byte { // Generate a RelayShards from a byte slice func FromBitVector(buf []byte) (RelayShards, error) { if len(buf) != 130 { - return RelayShards{}, errors.New("invalid data: expected 130 bytes") + return RelayShards{}, ErrExpected130Bytes } cluster := binary.BigEndian.Uint16(buf[0:2]) - var indices []uint16 + var shardIDs []uint16 for i := uint16(0); i < 128; i++ { for j := uint(0); j < 8; j++ { @@ -207,11 +214,11 @@ func FromBitVector(buf []byte) (RelayShards, error) { continue } - indices = append(indices, uint16(j)+8*i) + shardIDs = append(shardIDs, uint16(j)+8*i) } } - return RelayShards{Cluster: cluster, Indices: indices}, nil + return RelayShards{ClusterID: cluster, ShardIDs: shardIDs}, nil } // GetShardFromContentTopic runs Autosharding logic and returns a pubSubTopic diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index 28efa8994..db48faec6 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -148,8 +148,8 @@ func (r *Rendezvous) RegisterShard(ctx context.Context, cluster uint16, shard ui // RegisterRelayShards registers the node in the rendezvous point by specifying a RelayShards struct (more than one shard index can be registered) func (r *Rendezvous) RegisterRelayShards(ctx context.Context, rs protocol.RelayShards, rendezvousPoints []*RendezvousPoint) { - for _, idx := range rs.Indices { - go r.RegisterShard(ctx, rs.Cluster, idx, rendezvousPoints) + for _, idx := range rs.ShardIDs { + go r.RegisterShard(ctx, rs.ClusterID, idx, rendezvousPoints) } }