From e1e783d4e88995c7caacfd3b2c47ea48c196c960 Mon Sep 17 00:00:00 2001 From: amirvalhalla Date: Wed, 10 Jan 2024 21:32:22 +0330 Subject: [PATCH 1/5] feat: implement auto relay and relay service --- config/config_test.go | 2 - config/example_config.toml | 9 ++-- network/config.go | 15 +++---- network/config_test.go | 22 ++++----- network/network.go | 65 ++++++++++++++++++++++----- network/network_test.go | 2 - network/notifee.go | 12 +++-- network/relay_service.go | 92 -------------------------------------- 8 files changed, 81 insertions(+), 138 deletions(-) delete mode 100644 network/relay_service.go diff --git a/config/config_test.go b/config/config_test.go index d91b73deb..8cbd38e7e 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -48,7 +48,6 @@ func TestLocalnetConfig(t *testing.T) { assert.NoError(t, conf.BasicCheck()) assert.Empty(t, conf.Network.ListenAddrStrings) - assert.Empty(t, conf.Network.RelayAddrStrings) assert.Equal(t, conf.Network.NetworkName, "pactus-localnet") assert.Equal(t, conf.Network.DefaultPort, 21666) assert.Equal(t, conf.Store.TxCacheSize, param.DefaultParams().TransactionToLiveInterval) @@ -63,7 +62,6 @@ func TestTestnetConfig(t *testing.T) { assert.NoError(t, conf.BasicCheck()) assert.NotEmpty(t, conf.Network.DefaultRelayAddrStrings) assert.Empty(t, conf.Network.ListenAddrStrings) - assert.Empty(t, conf.Network.RelayAddrStrings) assert.Equal(t, conf.Network.NetworkName, "pactus-testnet-v2") assert.Equal(t, conf.Network.DefaultPort, 21777) assert.Equal(t, conf.Store.TxCacheSize, param.DefaultParams().TransactionToLiveInterval) diff --git a/config/example_config.toml b/config/example_config.toml index abcb9f82d..53b87a287 100644 --- a/config/example_config.toml +++ b/config/example_config.toml @@ -28,10 +28,6 @@ # `listen_addrs` specifies the addresses and ports where the node will listen for incoming connections from other nodes. ## listen_addrs = [] - # `relay_addrs` provides the necessary relay addresses. These should be specified if 'enable_relay' is 'true'. - # Note: this parameter will be ignored if 'enable_relay' is 'false'. - ## relay_addrs = [] - # `bootstrap_addrs` is a list of peer addresses needed for peer discovery. # These addresses are used by the Pactus node to discover and connect to other peers on the network. ## bootstrap_addrs = [] @@ -58,6 +54,11 @@ # Default is `false`. ## enable_relay = false + # `enable_relay_service` indicates whether relay service should be enabled or not. + # Relay service is a transport protocol that enables peers to discover each other on the peer-to-peer network. + # Default is `false`. + ## enable_relay_service = false + # `enable_mdns` indicates whether MDNS should be enabled or not. # MDNS is a protocol to discover local peers quickly and efficiently. # Default is `false`. diff --git a/network/config.go b/network/config.go index 4414c9a9f..f53a46b07 100644 --- a/network/config.go +++ b/network/config.go @@ -1,6 +1,7 @@ package network import ( + "errors" "fmt" lp2pcore "github.com/libp2p/go-libp2p/core" @@ -13,13 +14,13 @@ type Config struct { NetworkKey string `toml:"network_key"` PublicAddrString string `toml:"public_addr"` ListenAddrStrings []string `toml:"listen_addrs"` - RelayAddrStrings []string `toml:"relay_addrs"` BootstrapAddrStrings []string `toml:"bootstrap_addrs"` MaxConns int `toml:"max_connections"` EnableUDP bool `toml:"enable_udp"` EnableNATService bool `toml:"enable_nat_service"` EnableUPnP bool `toml:"enable_upnp"` EnableRelay bool `toml:"enable_relay"` + EnableRelayService bool `toml:"enable_relay_service"` EnableMdns bool `toml:"enable_mdns"` EnableMetrics bool `toml:"enable_metrics"` ForcePrivateNetwork bool `toml:"force_private_network"` @@ -37,13 +38,13 @@ func DefaultConfig() *Config { NetworkKey: "network_key", PublicAddrString: "", ListenAddrStrings: []string{}, - RelayAddrStrings: []string{}, BootstrapAddrStrings: []string{}, MaxConns: 64, EnableUDP: false, EnableNATService: false, EnableUPnP: false, EnableRelay: false, + EnableRelayService: false, EnableMdns: false, EnableMetrics: false, ForcePrivateNetwork: false, @@ -78,8 +79,8 @@ func (conf *Config) BasicCheck() error { if err := validateAddrInfo(conf.DefaultRelayAddrStrings...); err != nil { return err } - if err := validateAddrInfo(conf.RelayAddrStrings...); err != nil { - return err + if conf.EnableRelay && conf.EnableRelayService { + return errors.New("not possible for both the relay and relay service be active") } return validateAddrInfo(conf.BootstrapAddrStrings...) } @@ -106,12 +107,6 @@ func (conf *Config) ListenAddrs() []multiaddr.Multiaddr { return addrs } -func (conf *Config) RelayAddrInfos() []lp2ppeer.AddrInfo { - addrs := util.Merge(conf.DefaultRelayAddrStrings, conf.RelayAddrStrings) - addrInfos, _ := MakeAddrInfos(addrs) - return addrInfos -} - func (conf *Config) BootstrapAddrInfos() []lp2ppeer.AddrInfo { addrs := util.Merge(conf.DefaultBootstrapAddrStrings, conf.BootstrapAddrStrings) addrInfos, _ := MakeAddrInfos(addrs) diff --git a/network/config_test.go b/network/config_test.go index 9777a2339..77f00162c 100644 --- a/network/config_test.go +++ b/network/config_test.go @@ -20,6 +20,14 @@ func TestConfigBasicCheck(t *testing.T) { c.ListenAddrStrings = []string{""} }, }, + { + name: "Both Relay and Relay Service be true - Expect Error", + expectError: true, + updateFn: func(c *Config) { + c.EnableRelay = true + c.EnableRelayService = true + }, + }, { name: "Invalid ListenAddrStrings - Expect Error", expectError: true, @@ -48,13 +56,6 @@ func TestConfigBasicCheck(t *testing.T) { c.DefaultRelayAddrStrings = []string{"/ip4/127.0.0.1/"} }, }, - { - name: "Invalid RelayAddrStrings - Expect Error", - expectError: true, - updateFn: func(c *Config) { - c.RelayAddrStrings = []string{"/ip4/127.0.0.1/"} - }, - }, { name: "Invalid DefaultBootstrapAddrStrings - Expect Error", expectError: true, @@ -76,13 +77,6 @@ func TestConfigBasicCheck(t *testing.T) { c.PublicAddrString = "/ip4/127.0.0.1/" }, }, - { - name: "Valid RelayAddrStrings - No Error", - expectError: false, - updateFn: func(c *Config) { - c.RelayAddrStrings = []string{"/ip4/127.0.0.1/p2p/12D3KooWQBpPV6NtZy1dvN2oF7dJdLoooRZfEmwtHiDUf42ArDjT"} - }, - }, { name: "Valid ListenAddrStrings - No Error", expectError: false, diff --git a/network/network.go b/network/network.go index b2bb80f9f..d51b00b07 100644 --- a/network/network.go +++ b/network/network.go @@ -16,6 +16,7 @@ import ( lp2pautorelay "github.com/libp2p/go-libp2p/p2p/host/autorelay" lp2prcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" lp2pconnmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr" + lp2pproto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" lp2pnoise "github.com/libp2p/go-libp2p/p2p/security/noise" lp2quic "github.com/libp2p/go-libp2p/p2p/transport/quic" lp2ptcp "github.com/libp2p/go-libp2p/p2p/transport/tcp" @@ -24,6 +25,7 @@ import ( "github.com/pactus-project/pactus/util/logger" "github.com/pactus-project/pactus/version" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/exp/slices" ) var _ Network = &network{} @@ -44,7 +46,6 @@ type network struct { stream *streamService gossip *gossipService notifee *NotifeeService - relay *relayService generalTopic *lp2pps.Topic consensusTopic *lp2pps.Topic eventChannel chan Event @@ -169,16 +170,20 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo ) } + var host lp2phost.Host if conf.EnableRelay { - log.Info("relay enabled", "addrInfos", conf.RelayAddrInfos()) + log.Info("relay enabled") + autoRelayOpt := []lp2pautorelay.Option{ - lp2pautorelay.WithMinCandidates(2), + lp2pautorelay.WithMaxCandidates(4), lp2pautorelay.WithMinInterval(1 * time.Minute), } opts = append(opts, lp2p.EnableRelay(), - lp2p.EnableAutoRelayWithStaticRelays(conf.RelayAddrInfos(), autoRelayOpt...), + lp2p.EnableAutoRelayWithPeerSource(findRelayPeers(func() lp2phost.Host { + return host + }, log), autoRelayOpt...), lp2p.EnableHolePunching(), ) } else { @@ -188,6 +193,11 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo ) } + if conf.EnableRelayService { + log.Info("relay service enabled") + opts = append(opts, lp2p.EnableRelayService()) + } + privateSubnets := PrivateSubnets() privateFilters := SubnetsToFilters(privateSubnets, multiaddr.ActionDeny) publicAddrs := conf.PublicAddr() @@ -213,7 +223,7 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo } opts = append(opts, lp2p.ConnectionGater(connGater)) - host, err := lp2p.New(opts...) + host, err = lp2p.New(opts...) if err != nil { return nil, LibP2PError{Err: err} } @@ -245,8 +255,7 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo n.peerMgr = newPeerMgr(ctx, host, conf, n.logger) n.stream = newStreamService(ctx, n.host, streamProtocolID, n.eventChannel, n.logger) n.gossip = newGossipService(ctx, n.host, n.eventChannel, conf, n.logger) - n.relay = newRelayService(ctx, n.host, conf, log) - n.notifee = newNotifeeService(ctx, n.host, n.eventChannel, n.peerMgr, n.relay, streamProtocolID, n.logger) + n.notifee = newNotifeeService(ctx, n.host, n.eventChannel, n.peerMgr, streamProtocolID, n.logger) n.host.Network().Notify(n.notifee) n.connGater.SetPeerManager(n.peerMgr) @@ -259,6 +268,44 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo return n, nil } +func findRelayPeers(h func() lp2phost.Host, log *logger.SubLogger) func(ctx context.Context, + num int) <-chan lp2ppeer.AddrInfo { + return func(ctx context.Context, num int) <-chan lp2ppeer.AddrInfo { + log.Debug("try to find relay peers", "num", num) + + // make a channel to return, and put items from numPeers on + // that channel up to numPeers. Then close it. + r := make(chan lp2ppeer.AddrInfo, num) + defer close(r) + + for _, id := range h().Peerstore().PeersWithAddrs() { + protos, err := h().Peerstore().GetProtocols(id) + if err != nil { + continue + } + + if !slices.Contains(protos, lp2pproto.ProtoIDv2Hop) { + continue + } + + addr := h().Peerstore().Addrs(id) + log.Debug("found relay peer", "addr", addr) + dhtPeer := lp2ppeer.AddrInfo{ID: id, Addrs: addr} + // Attempt to put peers on r if we have space, + // otherwise return (we reached numPeers) + select { + case r <- dhtPeer: + case <-ctx.Done(): + return r + default: + return r + } + } + + return r + } +} + func (n *network) EventChannel() <-chan Event { return n.eventChannel } @@ -276,7 +323,6 @@ func (n *network) Start() error { n.stream.Start() n.peerMgr.Start() n.notifee.Start() - n.relay.Start() n.logger.Info("network started", "addr", n.host.Addrs(), "id", n.host.ID()) return nil @@ -293,7 +339,6 @@ func (n *network) Stop() { n.stream.Stop() n.peerMgr.Stop() n.notifee.Stop() - n.relay.Stop() n.dht.Stop() if err := n.host.Close(); err != nil { @@ -391,7 +436,7 @@ func (n *network) NumConnectedPeers() int { } func (n *network) ReachabilityStatus() string { - return n.relay.Reachability().String() + return n.notifee.Reachability().String() } func (n *network) HostAddrs() []string { diff --git a/network/network_test.go b/network/network_test.go index ff5792217..814b8ee22 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -195,7 +195,6 @@ func TestNetwork(t *testing.T) { // Private node M confM := testConfig() confM.EnableRelay = true - confM.RelayAddrStrings = relayAddrs confM.BootstrapAddrStrings = bootstrapAddresses confM.ListenAddrStrings = []string{ "/ip4/127.0.0.1/tcp/9987", @@ -208,7 +207,6 @@ func TestNetwork(t *testing.T) { // Private node N confN := testConfig() confN.EnableRelay = true - confN.RelayAddrStrings = relayAddrs confN.BootstrapAddrStrings = bootstrapAddresses confN.ListenAddrStrings = []string{ "/ip4/127.0.0.1/tcp/5678", diff --git a/network/notifee.go b/network/notifee.go index 4e85d35e4..6f95af1ca 100644 --- a/network/notifee.go +++ b/network/notifee.go @@ -20,11 +20,11 @@ type NotifeeService struct { logger *logger.SubLogger streamProtocolID lp2pcore.ProtocolID peerMgr *peerMgr - relay *relayService + reachability lp2pnetwork.Reachability } func newNotifeeService(ctx context.Context, host lp2phost.Host, eventChannel chan<- Event, - peerMgr *peerMgr, relay *relayService, + peerMgr *peerMgr, protocolID lp2pcore.ProtocolID, log *logger.SubLogger, ) *NotifeeService { events := []interface{}{ @@ -43,8 +43,8 @@ func newNotifeeService(ctx context.Context, host lp2phost.Host, eventChannel cha eventChannel: eventChannel, streamProtocolID: protocolID, peerMgr: peerMgr, - relay: relay, logger: log, + reachability: lp2pnetwork.ReachabilityUnknown, } host.Network().Notify(notifee) @@ -59,7 +59,7 @@ func (s *NotifeeService) Start() { switch e := evt.(type) { case lp2pevent.EvtLocalReachabilityChanged: s.logger.Info("reachability changed", "reachability", e.Reachability) - s.relay.SetReachability(e.Reachability) + s.reachability = e.Reachability case lp2pevent.EvtPeerIdentificationCompleted: s.logger.Debug("identification completed", "pid", e.Peer) @@ -84,6 +84,10 @@ func (s *NotifeeService) Stop() { s.lp2pEventSub.Close() } +func (s *NotifeeService) Reachability() lp2pnetwork.Reachability { + return s.reachability +} + func (s *NotifeeService) Connected(_ lp2pnetwork.Network, conn lp2pnetwork.Conn) { pid := conn.RemotePeer() s.logger.Info("connected to peer", "pid", pid, "direction", conn.Stat().Direction) diff --git a/network/relay_service.go b/network/relay_service.go deleted file mode 100644 index 85c89a8da..000000000 --- a/network/relay_service.go +++ /dev/null @@ -1,92 +0,0 @@ -package network - -import ( - "context" - "sync" - "time" - - lp2phost "github.com/libp2p/go-libp2p/core/host" - lp2pnetwork "github.com/libp2p/go-libp2p/core/network" - "github.com/pactus-project/pactus/util/logger" -) - -type relayService struct { - lk sync.RWMutex - - ctx context.Context - host lp2phost.Host - reachability lp2pnetwork.Reachability - conf *Config - logger *logger.SubLogger -} - -func newRelayService(ctx context.Context, host lp2phost.Host, conf *Config, log *logger.SubLogger) *relayService { - return &relayService{ - ctx: ctx, - host: host, - conf: conf, - logger: log, - reachability: lp2pnetwork.ReachabilityUnknown, - } -} - -func (rs *relayService) Start() { - go func() { - ticker := time.NewTicker(60 * time.Second) - defer ticker.Stop() - - for { - select { - case <-rs.ctx.Done(): - return - case <-ticker.C: - rs.checkConnectivity() - } - } - }() -} - -func (rs *relayService) Stop() { -} - -func (rs *relayService) SetReachability(reachability lp2pnetwork.Reachability) { - rs.lk.Lock() - rs.reachability = reachability - rs.lk.Unlock() - - rs.checkConnectivity() -} - -func (rs *relayService) Reachability() lp2pnetwork.Reachability { - rs.lk.RLock() - defer rs.lk.RUnlock() - - return rs.reachability -} - -func (rs *relayService) checkConnectivity() { - rs.lk.Lock() - defer rs.lk.Unlock() - - net := rs.host.Network() - if rs.conf.EnableRelay && - rs.reachability == lp2pnetwork.ReachabilityPrivate { - for _, ai := range rs.conf.RelayAddrInfos() { - if net.Connectedness(ai.ID) != lp2pnetwork.Connected { - rs.logger.Info("try connecting relay node", "addr", ai.Addrs) - err := ConnectSync(rs.ctx, rs.host, ai) - if err != nil { - // TODO: Make me Warn? - rs.logger.Debug("unable to connect to relay node", "error", err, "addr", ai.Addrs) - } else { - rs.logger.Info("connect to relay node", "addr", ai.Addrs) - } - } - } - } else { - // It is public node or relay is disabled. - for _, ai := range rs.conf.RelayAddrInfos() { - _ = net.ClosePeer(ai.ID) - } - } -} From 67224d0bde640c38f28404f862cad244f01921d5 Mon Sep 17 00:00:00 2001 From: b00f Date: Thu, 11 Jan 2024 16:08:46 +0800 Subject: [PATCH 2/5] test: fixing dead-lock issue on TestNetwork (#7) --- network/config.go | 17 ++++-- network/config_test.go | 71 +++++++++++++++++-------- network/errors.go | 15 ++++-- network/network.go | 79 ++++++++++++++++------------ network/network_test.go | 113 +++++++++++++++++----------------------- 5 files changed, 170 insertions(+), 125 deletions(-) diff --git a/network/config.go b/network/config.go index f53a46b07..2d43ce121 100644 --- a/network/config.go +++ b/network/config.go @@ -1,7 +1,6 @@ package network import ( - "errors" "fmt" lp2pcore "github.com/libp2p/go-libp2p/core" @@ -55,12 +54,22 @@ func DefaultConfig() *Config { func validateMultiAddr(addrs ...string) error { _, err := MakeMultiAddrs(addrs) + if err != nil { + return ConfigError{ + Reason: fmt.Sprintf("address is not valid: %s", err.Error()), + } + } return err } func validateAddrInfo(addrs ...string) error { _, err := MakeAddrInfos(addrs) - return err + if err != nil { + return ConfigError{ + Reason: fmt.Sprintf("address is not valid: %s", err.Error()), + } + } + return nil } // BasicCheck performs basic checks on the configuration. @@ -80,7 +89,9 @@ func (conf *Config) BasicCheck() error { return err } if conf.EnableRelay && conf.EnableRelayService { - return errors.New("not possible for both the relay and relay service be active") + return ConfigError{ + Reason: "both the relay and relay service cannot be active at the same time", + } } return validateAddrInfo(conf.BootstrapAddrStrings...) } diff --git a/network/config_test.go b/network/config_test.go index 77f00162c..c0961a843 100644 --- a/network/config_test.go +++ b/network/config_test.go @@ -10,83 +10,99 @@ import ( func TestConfigBasicCheck(t *testing.T) { testCases := []struct { name string - expectError bool + expectError error updateFn func(c *Config) }{ { - name: "Empty ListenAddrStrings - Expect Error", - expectError: true, + name: "Empty ListenAddrStrings - Expect Error", + expectError: ConfigError{ + Reason: "address is not valid: failed to parse multiaddr \"\": empty multiaddr", + }, updateFn: func(c *Config) { c.ListenAddrStrings = []string{""} }, }, { - name: "Both Relay and Relay Service be true - Expect Error", - expectError: true, + name: "Both Relay and Relay Service be true - Expect Error", + expectError: ConfigError{ + Reason: "both the relay and relay service cannot be active at the same time", + }, updateFn: func(c *Config) { c.EnableRelay = true c.EnableRelayService = true }, }, { - name: "Invalid ListenAddrStrings - Expect Error", - expectError: true, + name: "Invalid ListenAddrStrings - Expect Error", + expectError: ConfigError{ + Reason: "address is not valid: failed to parse multiaddr \"127.0.0.1\": must begin with /", + }, updateFn: func(c *Config) { c.ListenAddrStrings = []string{"127.0.0.1"} }, }, { - name: "Invalid ListenAddrStrings (No port) - Expect Error", - expectError: true, + name: "Invalid ListenAddrStrings (No port) - Expect Error", + expectError: ConfigError{ + Reason: "address is not valid: failed to parse multiaddr \"/ip4\": unexpected end of multiaddr", + }, updateFn: func(c *Config) { c.ListenAddrStrings = []string{"/ip4"} }, }, { - name: "Invalid Public Address - Expect Error", - expectError: true, + name: "Invalid Public Address - Expect Error", + expectError: ConfigError{ + Reason: "address is not valid: failed to parse multiaddr \"/ip4\": unexpected end of multiaddr", + }, updateFn: func(c *Config) { c.PublicAddrString = "/ip4" }, }, { - name: "Invalid DefaultRelayAddrStrings - Expect Error", - expectError: true, + name: "Invalid DefaultRelayAddrStrings - Expect Error", + expectError: ConfigError{ + Reason: "address is not valid: invalid p2p multiaddr", + }, updateFn: func(c *Config) { c.DefaultRelayAddrStrings = []string{"/ip4/127.0.0.1/"} }, }, { - name: "Invalid DefaultBootstrapAddrStrings - Expect Error", - expectError: true, + name: "Invalid DefaultBootstrapAddrStrings - Expect Error", + expectError: ConfigError{ + Reason: "address is not valid: invalid p2p multiaddr", + }, updateFn: func(c *Config) { c.DefaultBootstrapAddrStrings = []string{"/ip4/127.0.0.1/"} }, }, { - name: "Invalid BootstrapAddrStrings - Expect Error", - expectError: true, + name: "Invalid BootstrapAddrStrings - Expect Error", + expectError: ConfigError{ + Reason: "address is not valid: invalid p2p multiaddr", + }, updateFn: func(c *Config) { c.BootstrapAddrStrings = []string{"/ip4/127.0.0.1/"} }, }, { name: "Valid Public Address - No Error", - expectError: false, + expectError: nil, updateFn: func(c *Config) { c.PublicAddrString = "/ip4/127.0.0.1/" }, }, { name: "Valid ListenAddrStrings - No Error", - expectError: false, + expectError: nil, updateFn: func(c *Config) { c.ListenAddrStrings = []string{"/ip4/127.0.0.1"} }, }, { name: "Valid BootstrapAddrStrings - No Error", - expectError: false, + expectError: nil, updateFn: func(c *Config) { c.BootstrapAddrStrings = []string{"/ip4/127.0.0.1/p2p/12D3KooWQBpPV6NtZy1dvN2oF7dJdLoooRZfEmwtHiDUf42ArDjT"} }, @@ -97,8 +113,10 @@ func TestConfigBasicCheck(t *testing.T) { t.Run(tc.name, func(t *testing.T) { conf := DefaultConfig() tc.updateFn(conf) - if tc.expectError { - assert.Error(t, conf.BasicCheck(), "Expected error for Test %d: %s", i, tc.name) + if tc.expectError != nil { + err := conf.BasicCheck() + assert.ErrorIs(t, tc.expectError, err, + "Expected error not matched for test %d-%s: expected %s, got: %s", i, tc.name, tc.expectError, err) } else { assert.NoError(t, conf.BasicCheck(), "Expected no error for Test %d: %s", i, tc.name) } @@ -156,3 +174,12 @@ func TestScaledConns(t *testing.T) { } } } + +func TestPublicAddr(t *testing.T) { + conf1 := DefaultConfig() + assert.Nil(t, conf1.PublicAddr()) + + conf2 := DefaultConfig() + conf2.PublicAddrString = "/ip4/127.0.0.1/p2p/12D3KooWQBpPV6NtZy1dvN2oF7dJdLoooRZfEmwtHiDUf42ArDjT" + assert.NotNil(t, conf2.PublicAddr()) +} diff --git a/network/errors.go b/network/errors.go index e88f62971..39ca74d9e 100644 --- a/network/errors.go +++ b/network/errors.go @@ -2,6 +2,15 @@ package network import "fmt" +// ConfigError is returned when the config is not valid with a descriptive Reason message. +type ConfigError struct { + Reason string +} + +func (e ConfigError) Error() string { + return e.Reason +} + // NotSubscribedError is returned when the peer is not subscribed to a // specific topic. type NotSubscribedError struct { @@ -18,8 +27,7 @@ type InvalidTopicError struct { } func (e InvalidTopicError) Error() string { - return fmt.Sprintf("invalid topic: '%s'", - e.TopicID.String()) + return fmt.Sprintf("invalid topic: '%s'", e.TopicID.String()) } // LibP2PError is returned when an underlying libp2p operation encounters an error. @@ -28,6 +36,5 @@ type LibP2PError struct { } func (e LibP2PError) Error() string { - return fmt.Sprintf("libp2p error: %s", - e.Err.Error()) + return fmt.Sprintf("libp2p error: %s", e.Err.Error()) } diff --git a/network/network.go b/network/network.go index d51b00b07..a0376c6d7 100644 --- a/network/network.go +++ b/network/network.go @@ -90,6 +90,8 @@ func NewNetwork(conf *Config) (Network, error) { } func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*network, error) { + self := new(network) + networkKey, err := loadOrCreateKey(conf.NetworkKey) if err != nil { return nil, LibP2PError{Err: err} @@ -169,8 +171,17 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo lp2p.NATPortMap(), ) } + // networkReady is a channel used to wait until the network is ready. + // This is primarily to avoid reading while writing. + networkReady := make(chan struct{}) + defer close(networkReady) + + networkGetter := func() *network { + <-networkReady // Closed when newNetwork is finished + time.Sleep(1 * time.Second) // Adding a safety delay + return self + } - var host lp2phost.Host if conf.EnableRelay { log.Info("relay enabled") @@ -181,9 +192,7 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo opts = append(opts, lp2p.EnableRelay(), - lp2p.EnableAutoRelayWithPeerSource(findRelayPeers(func() lp2phost.Host { - return host - }, log), autoRelayOpt...), + lp2p.EnableAutoRelayWithPeerSource(findRelayPeers(networkGetter), autoRelayOpt...), lp2p.EnableHolePunching(), ) } else { @@ -223,24 +232,22 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo } opts = append(opts, lp2p.ConnectionGater(connGater)) - host, err = lp2p.New(opts...) + host, err := lp2p.New(opts...) if err != nil { return nil, LibP2PError{Err: err} } ctx, cancel := context.WithCancel(context.Background()) - n := &network{ - ctx: ctx, - cancel: cancel, - config: conf, - logger: log, - host: host, - connGater: connGater, - eventChannel: make(chan Event, 100), - } + self.ctx = ctx + self.cancel = cancel + self.config = conf + self.logger = log + self.host = host + self.connGater = connGater + self.eventChannel = make(chan Event, 100) - log.SetObj(n) + log.SetObj(self) conf.CheckIsBootstrapper(host.ID()) @@ -248,38 +255,46 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo streamProtocolID := lp2pcore.ProtocolID(fmt.Sprintf("/%s/stream/v1", conf.NetworkName)) if conf.EnableMdns { - n.mdns = newMdnsService(ctx, n.host, n.logger) + self.mdns = newMdnsService(ctx, self.host, self.logger) } - n.dht = newDHTService(n.ctx, n.host, kadProtocolID, conf, n.logger) - n.peerMgr = newPeerMgr(ctx, host, conf, n.logger) - n.stream = newStreamService(ctx, n.host, streamProtocolID, n.eventChannel, n.logger) - n.gossip = newGossipService(ctx, n.host, n.eventChannel, conf, n.logger) - n.notifee = newNotifeeService(ctx, n.host, n.eventChannel, n.peerMgr, streamProtocolID, n.logger) + self.dht = newDHTService(self.ctx, self.host, kadProtocolID, conf, self.logger) + self.peerMgr = newPeerMgr(ctx, host, conf, self.logger) + self.stream = newStreamService(ctx, self.host, streamProtocolID, self.eventChannel, self.logger) + self.gossip = newGossipService(ctx, self.host, self.eventChannel, conf, self.logger) + self.notifee = newNotifeeService(ctx, self.host, self.eventChannel, self.peerMgr, streamProtocolID, self.logger) - n.host.Network().Notify(n.notifee) - n.connGater.SetPeerManager(n.peerMgr) + self.host.Network().Notify(self.notifee) + self.connGater.SetPeerManager(self.peerMgr) - n.logger.Info("network setup", "id", n.host.ID(), + self.logger.Info("network setup", "id", self.host.ID(), "name", conf.NetworkName, "address", conf.ListenAddrs(), "bootstrapper", conf.IsBootstrapper) - return n, nil + return self, nil } -func findRelayPeers(h func() lp2phost.Host, log *logger.SubLogger) func(ctx context.Context, +func findRelayPeers(networkGetter func() *network) func(ctx context.Context, num int) <-chan lp2ppeer.AddrInfo { return func(ctx context.Context, num int) <-chan lp2ppeer.AddrInfo { - log.Debug("try to find relay peers", "num", num) - // make a channel to return, and put items from numPeers on // that channel up to numPeers. Then close it. r := make(chan lp2ppeer.AddrInfo, num) defer close(r) - for _, id := range h().Peerstore().PeersWithAddrs() { - protos, err := h().Peerstore().GetProtocols(id) + // Because the network is initialized after relay, we need to + // obtain them indirectly this way. + n := networkGetter() + if n == nil { // context canceled etc. + return r + } + + n.logger.Debug("try to find relay peers", "num", num) + + peerStore := n.host.Peerstore() + for _, id := range peerStore.Peers() { + protos, err := peerStore.GetProtocols(id) if err != nil { continue } @@ -288,8 +303,8 @@ func findRelayPeers(h func() lp2phost.Host, log *logger.SubLogger) func(ctx cont continue } - addr := h().Peerstore().Addrs(id) - log.Debug("found relay peer", "addr", addr) + addr := peerStore.Addrs(id) + n.logger.Debug("found relay peer", "addr", addr) dhtPeer := lp2ppeer.AddrInfo{ID: id, Addrs: addr} // Attempt to put peers on r if we have space, // otherwise return (we reached numPeers) diff --git a/network/network_test.go b/network/network_test.go index 814b8ee22..699b3ddbc 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -8,9 +8,8 @@ import ( "time" lp2p "github.com/libp2p/go-libp2p" - lp2phost "github.com/libp2p/go-libp2p/core/host" lp2ppeer "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" + lp2pproto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" "github.com/pactus-project/pactus/util" "github.com/pactus-project/pactus/util/logger" "github.com/pactus-project/pactus/util/testsuite" @@ -22,32 +21,6 @@ func alwaysPropagate(_ *GossipMessage) bool { return true } -// Original code from: -// https://github.com/libp2p/go-libp2p/blob/master/p2p/host/autorelay/autorelay_test.go -func makeTestRelay(t *testing.T) lp2phost.Host { - t.Helper() - - h, err := lp2p.New( - lp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), - lp2p.DisableRelay(), - lp2p.EnableRelayService(), - lp2p.ForceReachabilityPublic(), - lp2p.AddrsFactory(func(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr { - return addrs - }), - ) - require.NoError(t, err) - require.Eventually(t, func() bool { - for _, p := range h.Mux().Protocols() { - if p == "/libp2p/circuit/relay/0.2.0/hop" { - return true - } - } - return false - }, time.Second, 10*time.Millisecond) - return h -} - func makeTestNetwork(t *testing.T, conf *Config, opts []lp2p.Option) *network { t.Helper() @@ -153,16 +126,6 @@ func TestStoppingNetwork(t *testing.T) { func TestNetwork(t *testing.T) { ts := testsuite.NewTestSuite(t) - // Relay - nodeR := makeTestRelay(t) - - relayAddrs := []string{} - for _, addr := range nodeR.Addrs() { - addr := fmt.Sprintf("%s/p2p/%s", addr, nodeR.ID().String()) - fmt.Printf("relay address: %s\n", addr) - relayAddrs = append(relayAddrs, addr) - } - bootstrapPort := ts.RandInt32(9999) + 10000 publicPort := ts.RandInt32(9999) + 10000 @@ -179,9 +142,11 @@ func TestNetwork(t *testing.T) { fmt.Sprintf("/ip4/127.0.0.1/tcp/%v/p2p/%v", bootstrapPort, networkB.SelfID().String()), } - // Public node + // Public and relay node confP := testConfig() confP.BootstrapAddrStrings = bootstrapAddresses + confP.EnableRelay = false + confP.EnableRelayService = true confP.ListenAddrStrings = []string{ fmt.Sprintf("/ip4/127.0.0.1/tcp/%v", publicPort), } @@ -242,27 +207,46 @@ func TestNetwork(t *testing.T) { time.Sleep(2 * time.Second) - t.Run("Reachability Status", func(t *testing.T) { - assert.Equal(t, networkP.ReachabilityStatus(), "Public") - assert.Equal(t, networkB.ReachabilityStatus(), "Public") - assert.Equal(t, networkM.ReachabilityStatus(), "Private") - assert.Equal(t, networkN.ReachabilityStatus(), "Private") - assert.Equal(t, networkX.ReachabilityStatus(), "Private") - }) - t.Run("Supported Protocols", func(t *testing.T) { - protosNetB := networkB.Protocols() - for i, p := range networkB.host.Mux().Protocols() { - assert.Equal(t, protosNetB[i], string(p)) - } + require.EventuallyWithT(t, func(c *assert.CollectT) { + protos := networkM.Protocols() + assert.Contains(t, protos, lp2pproto.ProtoIDv2Stop) + assert.NotContains(t, protos, lp2pproto.ProtoIDv2Hop) + }, time.Second, 100*time.Millisecond) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + protos := networkN.Protocols() + assert.Contains(t, protos, lp2pproto.ProtoIDv2Stop) + assert.NotContains(t, protos, lp2pproto.ProtoIDv2Hop) + }, time.Second, 100*time.Millisecond) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + protos := networkP.Protocols() + assert.NotContains(t, protos, lp2pproto.ProtoIDv2Stop) + assert.Contains(t, protos, lp2pproto.ProtoIDv2Hop) + }, time.Second, 100*time.Millisecond) }) t.Run("all nodes have at least one connection to the bootstrap node B", func(t *testing.T) { - assert.GreaterOrEqual(t, networkP.NumConnectedPeers(), 1) - assert.GreaterOrEqual(t, networkB.NumConnectedPeers(), 1) - assert.GreaterOrEqual(t, networkM.NumConnectedPeers(), 1) - assert.GreaterOrEqual(t, networkN.NumConnectedPeers(), 1) - assert.GreaterOrEqual(t, networkX.NumConnectedPeers(), 1) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.GreaterOrEqual(c, networkP.NumConnectedPeers(), 4) // Connected to B, M, N, X + }, 2*time.Second, 100*time.Millisecond) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.GreaterOrEqual(c, networkB.NumConnectedPeers(), 4) // Connected to P, M, N, X + }, 2*time.Second, 100*time.Millisecond) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.GreaterOrEqual(c, networkM.NumConnectedPeers(), 2) // Connected to B, P, N? + }, 2*time.Second, 100*time.Millisecond) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.GreaterOrEqual(c, networkN.NumConnectedPeers(), 2) // Connected to B, P, M? + }, 2*time.Second, 100*time.Millisecond) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.GreaterOrEqual(c, networkX.NumConnectedPeers(), 2) // Connected to B, P + }, 2*time.Second, 100*time.Millisecond) }) t.Run("Gossip: all nodes receive general gossip messages", func(t *testing.T) { @@ -340,21 +324,14 @@ func TestNetwork(t *testing.T) { shouldNotReceiveEvent(t, networkX) }) - circuitAddrInfoN, _ := lp2ppeer.AddrInfoFromString( - fmt.Sprintf("%s/p2p-circuit/p2p/%s", relayAddrs[0], networkN.SelfID())) - t.Run("node X (private, not connected via relay) is not accessible by node M", func(t *testing.T) { - require.Error(t, networkX.host.Connect(networkX.ctx, *circuitAddrInfoN)) - msgM := ts.RandBytes(64) require.Error(t, networkM.SendTo(msgM, networkX.SelfID())) }) // TODO: How to test this? // t.Run("nodes M and N (private, connected via relay) can communicate using the relay node R", func(t *testing.T) { - // require.NoError(t, networkM.host.Connect(networkM.ctx, *circuitAddrInfoN)) - - // msgM := ts.RandBytes(64) + // msgM := ts.RandBytes(64) // require.NoError(t, networkM.SendTo(msgM, networkN.SelfID())) // eM := shouldReceiveEvent(t, networkN, EventTypeStream).(*StreamMessage) // assert.Equal(t, readData(t, eM.Reader, len(msgM)), msgM) @@ -369,6 +346,14 @@ func TestNetwork(t *testing.T) { assert.Equal(t, e.PeerID, networkP.SelfID()) require.Error(t, networkB.SendTo(msgB, networkP.SelfID())) }) + + t.Run("Reachability Status", func(t *testing.T) { + assert.Equal(t, networkP.ReachabilityStatus(), "Public") + assert.Equal(t, networkB.ReachabilityStatus(), "Public") + assert.Equal(t, networkM.ReachabilityStatus(), "Private") + assert.Equal(t, networkN.ReachabilityStatus(), "Private") + assert.Equal(t, networkX.ReachabilityStatus(), "Private") + }) } func TestConnections(t *testing.T) { From 29e7622275738cf1d6c4ddfd3f283ad8fc00fffe Mon Sep 17 00:00:00 2001 From: amirvalhalla Date: Thu, 11 Jan 2024 12:07:49 +0330 Subject: [PATCH 3/5] fix: relay candidates --- network/network.go | 1 + network/network_test.go | 8 +++----- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/network/network.go b/network/network.go index a0376c6d7..564faff11 100644 --- a/network/network.go +++ b/network/network.go @@ -186,6 +186,7 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo log.Info("relay enabled") autoRelayOpt := []lp2pautorelay.Option{ + lp2pautorelay.WithMinCandidates(1), lp2pautorelay.WithMaxCandidates(4), lp2pautorelay.WithMinInterval(1 * time.Minute), } diff --git a/network/network_test.go b/network/network_test.go index 699b3ddbc..f0bd8650e 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -111,13 +111,11 @@ func TestStoppingNetwork(t *testing.T) { net.Stop() } -// In this test, we are setting up a simulated network environment that consists of six nodes: -// - R is a Relay node +// In this test, we are setting up a simulated network environment that consists of these nodes: // - B is a Bootstrap node -// - P is a Public node +// - P is a Public and relay node // - M, N, and X are Private Nodes behind a Network Address Translation (NAT) -// - Both M and N are connected to the relay node R -// - X is not connected to the relay node and does not join the consensus topic +// - M and N have relay enabled, while X does not. // // The test will evaluate the following scenarios: // - Connection establishment to the bootstrap node From 1fe421a2fc3d40ac8d3e4ea6d4c0a5f3e2faf8c4 Mon Sep 17 00:00:00 2001 From: amirvalhalla Date: Thu, 11 Jan 2024 12:22:12 +0330 Subject: [PATCH 4/5] fix: deadlock issue --- network/network_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/network/network_test.go b/network/network_test.go index f0bd8650e..97d476e85 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -228,23 +228,23 @@ func TestNetwork(t *testing.T) { t.Run("all nodes have at least one connection to the bootstrap node B", func(t *testing.T) { assert.EventuallyWithT(t, func(c *assert.CollectT) { assert.GreaterOrEqual(c, networkP.NumConnectedPeers(), 4) // Connected to B, M, N, X - }, 2*time.Second, 100*time.Millisecond) + }, 5*time.Second, 100*time.Millisecond) assert.EventuallyWithT(t, func(c *assert.CollectT) { assert.GreaterOrEqual(c, networkB.NumConnectedPeers(), 4) // Connected to P, M, N, X - }, 2*time.Second, 100*time.Millisecond) + }, 5*time.Second, 100*time.Millisecond) assert.EventuallyWithT(t, func(c *assert.CollectT) { assert.GreaterOrEqual(c, networkM.NumConnectedPeers(), 2) // Connected to B, P, N? - }, 2*time.Second, 100*time.Millisecond) + }, 5*time.Second, 100*time.Millisecond) assert.EventuallyWithT(t, func(c *assert.CollectT) { assert.GreaterOrEqual(c, networkN.NumConnectedPeers(), 2) // Connected to B, P, M? - }, 2*time.Second, 100*time.Millisecond) + }, 5*time.Second, 100*time.Millisecond) assert.EventuallyWithT(t, func(c *assert.CollectT) { assert.GreaterOrEqual(c, networkX.NumConnectedPeers(), 2) // Connected to B, P - }, 2*time.Second, 100*time.Millisecond) + }, 5*time.Second, 100*time.Millisecond) }) t.Run("Gossip: all nodes receive general gossip messages", func(t *testing.T) { From c8d74d05b4864e893a3cd3af29be0f8964c40dcf Mon Sep 17 00:00:00 2001 From: amirvalhalla Date: Thu, 11 Jan 2024 12:48:29 +0330 Subject: [PATCH 5/5] feat: connection count --- network/network_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/network/network_test.go b/network/network_test.go index 97d476e85..03a97698e 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -227,23 +227,23 @@ func TestNetwork(t *testing.T) { t.Run("all nodes have at least one connection to the bootstrap node B", func(t *testing.T) { assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.GreaterOrEqual(c, networkP.NumConnectedPeers(), 4) // Connected to B, M, N, X + assert.GreaterOrEqual(c, networkP.NumConnectedPeers(), 1) // Connected to B, M, N, X }, 5*time.Second, 100*time.Millisecond) assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.GreaterOrEqual(c, networkB.NumConnectedPeers(), 4) // Connected to P, M, N, X + assert.GreaterOrEqual(c, networkB.NumConnectedPeers(), 1) // Connected to P, M, N, X }, 5*time.Second, 100*time.Millisecond) assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.GreaterOrEqual(c, networkM.NumConnectedPeers(), 2) // Connected to B, P, N? + assert.GreaterOrEqual(c, networkM.NumConnectedPeers(), 1) // Connected to B, P, N? }, 5*time.Second, 100*time.Millisecond) assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.GreaterOrEqual(c, networkN.NumConnectedPeers(), 2) // Connected to B, P, M? + assert.GreaterOrEqual(c, networkN.NumConnectedPeers(), 1) // Connected to B, P, M? }, 5*time.Second, 100*time.Millisecond) assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.GreaterOrEqual(c, networkX.NumConnectedPeers(), 2) // Connected to B, P + assert.GreaterOrEqual(c, networkX.NumConnectedPeers(), 1) // Connected to B, P }, 5*time.Second, 100*time.Millisecond) })