diff --git a/config/config.go b/config/config.go index b1e7cad29..20a2a1548 100644 --- a/config/config.go +++ b/config/config.go @@ -107,8 +107,7 @@ func DefaultConfigTestnet() *Config { "/ip4/139.162.153.10/tcp/4002/p2p/12D3KooWNR79jqHVVNhNVrqnDbxbJJze4VjbEsBjZhz6mkvinHAN", "/ip4/188.121.102.178/tcp/4002/p2p/12D3KooWCRHn8vjrKNBEQcut8uVCYX5q77RKidPaE6iMK31qEVHb", } - conf.Network.MinConns = 16 - conf.Network.MaxConns = 32 + conf.Network.MaxConns = 64 conf.Network.EnableNATService = false conf.Network.EnableUPnP = false conf.Network.EnableRelay = true @@ -133,7 +132,6 @@ func DefaultConfigLocalnet() *Config { conf.Network.EnableNATService = false conf.Network.EnableUPnP = false conf.Network.BootstrapAddrStrings = []string{} - conf.Network.MinConns = 0 conf.Network.MaxConns = 0 conf.Network.NetworkName = "pactus-localnet" conf.Network.DefaultPort = 21666 diff --git a/config/example_config.toml b/config/example_config.toml index 5bf955b27..5d144df86 100644 --- a/config/example_config.toml +++ b/config/example_config.toml @@ -36,13 +36,9 @@ # These addresses are used by the Pactus node to discover and connect to other peers on the network. ## bootstrap_addrs = [] - # `min_connections` is the minimum number of connections that the Pactus node should maintain. - # Default is `16`. - ## min_connections = 16 - - # `max_connections` is the maximum number of connections that the Pactus node should maintain. - # Default is `32`. - ## max_connections = 32 + # `max_connections` is the maximum number of connections that the Pactus node maintains. + # Default is `64`. + ## max_connections = 64 # `enable_nat_service` provides a service to other peer for determining their reachability status. # Default is `false`. diff --git a/network/config.go b/network/config.go index 8ecea1c03..20b624906 100644 --- a/network/config.go +++ b/network/config.go @@ -13,7 +13,6 @@ type Config struct { ListenAddrStrings []string `toml:"listen_addrs"` RelayAddrStrings []string `toml:"relay_addrs"` BootstrapAddrStrings []string `toml:"bootstrap_addrs"` - MinConns int `toml:"min_connections"` MaxConns int `toml:"max_connections"` EnableNATService bool `toml:"enable_nat_service"` EnableUPnP bool `toml:"enable_upnp"` @@ -36,8 +35,7 @@ func DefaultConfig() *Config { ListenAddrStrings: []string{}, RelayAddrStrings: []string{}, BootstrapAddrStrings: []string{}, - MinConns: 16, - MaxConns: 32, + MaxConns: 64, EnableNATService: false, EnableUPnP: false, EnableRelay: false, @@ -115,3 +113,15 @@ func (conf *Config) IsBootstrapper(pid lp2pcore.PeerID) bool { return false } + +func (conf *Config) ScaledMaxConns() int { + return util.LogScale(conf.MaxConns) +} + +func (conf *Config) ScaledMinConns() int { + return conf.ScaledMaxConns() / 4 +} + +func (conf *Config) ConnsThreshold() int { + return conf.ScaledMaxConns() / 8 +} diff --git a/network/config_test.go b/network/config_test.go index f17be158b..b53ac50e7 100644 --- a/network/config_test.go +++ b/network/config_test.go @@ -125,3 +125,35 @@ func TestIsBootstrapper(t *testing.T) { assert.True(t, conf.IsBootstrapper(pid2)) assert.True(t, conf.IsBootstrapper(pid3)) } + +func TestScaledConns(t *testing.T) { + tests := []struct { + config Config + expectedMax int + expectedMin int + expectedThreshold int + }{ + {Config{MaxConns: 1}, 1, 0, 0}, + {Config{MaxConns: 8}, 8, 2, 1}, + {Config{MaxConns: 30}, 32, 8, 4}, + {Config{MaxConns: 1000}, 1024, 256, 128}, + } + + for _, test := range tests { + resultMax := test.config.ScaledMaxConns() + resultMin := test.config.ScaledMinConns() + resultThreshold := test.config.ConnsThreshold() + if resultMax != test.expectedMax || + resultMin != test.expectedMin || + resultThreshold != test.expectedThreshold { + t.Errorf("For MaxConns %d, "+ + "NormedMaxConns() returned %d (expected %d), "+ + "NormedMinConns() returned %d (expected %d), "+ + "ConnsThreshold() returned %d (expected %d)", + test.config.MaxConns, + resultMax, test.expectedMax, + resultMin, test.expectedMin, + resultThreshold, test.expectedThreshold) + } + } +} diff --git a/network/gater.go b/network/gater.go index e05648118..ae32b59dc 100644 --- a/network/gater.go +++ b/network/gater.go @@ -16,10 +16,10 @@ var _ lp2pconnmgr.ConnectionGater = &ConnectionGater{} type ConnectionGater struct { lk sync.RWMutex - filters *multiaddr.Filters - peerMgr *peerMgr - maxConn int - logger *logger.SubLogger + filters *multiaddr.Filters + peerMgr *peerMgr + connsLimit int + logger *logger.SubLogger } func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, error) { @@ -29,10 +29,12 @@ func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, filters = SubnetsToFilters(privateSubnets, multiaddr.ActionDeny) } + connsLimit := conf.ScaledMaxConns() + conf.ConnsThreshold() + log.Info("connection gater created", "connsLimit", connsLimit) return &ConnectionGater{ - filters: filters, - maxConn: conf.MaxConns, - logger: log, + filters: filters, + connsLimit: connsLimit, + logger: log, }, nil } @@ -43,19 +45,19 @@ func (g *ConnectionGater) SetPeerManager(peerMgr *peerMgr) { g.peerMgr = peerMgr } -func (g *ConnectionGater) hasMaxConnections() bool { +func (g *ConnectionGater) onConnectionLimit() bool { if g.peerMgr == nil { return false } - return g.peerMgr.NumOfConnected() > g.maxConn + return g.peerMgr.NumOfConnected() > g.connsLimit } func (g *ConnectionGater) InterceptPeerDial(pid lp2ppeer.ID) bool { g.lk.RLock() defer g.lk.RUnlock() - if g.hasMaxConnections() { + if g.onConnectionLimit() { g.logger.Debug("InterceptPeerDial rejected: many connections", "pid", pid) return false } @@ -67,7 +69,7 @@ func (g *ConnectionGater) InterceptAddrDial(pid lp2ppeer.ID, ma multiaddr.Multia g.lk.RLock() defer g.lk.RUnlock() - if g.hasMaxConnections() { + if g.onConnectionLimit() { g.logger.Debug("InterceptAddrDial rejected: many connections", "pid", pid, "ma", ma.String()) return false } @@ -85,7 +87,7 @@ func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool { g.lk.RLock() defer g.lk.RUnlock() - if g.hasMaxConnections() { + if g.onConnectionLimit() { g.logger.Debug("InterceptAccept rejected: many connections") return false } diff --git a/network/gater_test.go b/network/gater_test.go index 645af4b0f..4fb25523f 100644 --- a/network/gater_test.go +++ b/network/gater_test.go @@ -61,7 +61,10 @@ func TestDenyPrivate(t *testing.T) { func TestMaxConnection(t *testing.T) { ts := testsuite.NewTestSuite(t) conf := testConfig() - conf.MaxConns = 1 + conf.MaxConns = 8 + assert.Equal(t, conf.ScaledMinConns(), 2) + assert.Equal(t, conf.ScaledMaxConns(), 8) + assert.Equal(t, conf.ConnsThreshold(), 1) net := makeTestNetwork(t, conf, nil) maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234") @@ -70,10 +73,19 @@ func TestMaxConnection(t *testing.T) { cmaPublic := &mockConnMultiaddrs{remote: maPublic} pid := ts.RandPeerID() + for i := 0; i < 9; i++ { + net.peerMgr.AddPeer(ts.RandPeerID(), + multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234"), lp2pnetwork.DirInbound) + } + + assert.True(t, net.connGater.InterceptPeerDial(pid)) + assert.True(t, net.connGater.InterceptAddrDial(pid, maPrivate)) + assert.True(t, net.connGater.InterceptAddrDial(pid, maPublic)) + assert.True(t, net.connGater.InterceptAccept(cmaPrivate)) + assert.True(t, net.connGater.InterceptAccept(cmaPublic)) + net.peerMgr.AddPeer(ts.RandPeerID(), - multiaddr.StringCast("/ip4/2.2.2.2/tcp/1234"), lp2pnetwork.DirInbound) - net.peerMgr.AddPeer(ts.RandPeerID(), - multiaddr.StringCast("/ip4/3.3.3.3/tcp/1234"), lp2pnetwork.DirInbound) + multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234"), lp2pnetwork.DirInbound) assert.False(t, net.connGater.InterceptPeerDial(pid)) assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate)) diff --git a/network/network.go b/network/network.go index d15d1b910..834aff6d3 100644 --- a/network/network.go +++ b/network/network.go @@ -112,23 +112,29 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo opts = append(opts, lp2p.DisableMetrics()) } - limit := MakeScalingLimitConfig(conf.MinConns, conf.MaxConns) + limit := BuildConcreteLimitConfig(conf.MaxConns) resMgr, err := lp2prcmgr.NewResourceManager( - lp2prcmgr.NewFixedLimiter(limit.AutoScale()), + lp2prcmgr.NewFixedLimiter(limit), rcMgrOpt..., ) if err != nil { return nil, LibP2PError{Err: err} } + // https://github.com/libp2p/go-libp2p/issues/2616 + // The connection manager doesn't reject any connections. + // It just triggers a pruning run once the high watermark is reached (or surpassed). + + lowWM := conf.ScaledMinConns() // Low Watermark + highWM := conf.ScaledMaxConns() - conf.ConnsThreshold() // High Watermark connMgr, err := lp2pconnmgr.NewConnManager( - conf.MinConns, // Low Watermark - conf.MaxConns, // High Watermark + lowWM, highWM, lp2pconnmgr.WithGracePeriod(time.Minute), ) if err != nil { return nil, LibP2PError{Err: err} } + log.Info("connection manager created", "lowWM", lowWM, "highWM", highWM) opts = append(opts, lp2p.Identity(networkKey), @@ -154,7 +160,7 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo } if conf.EnableRelay { - log.Info("relay enabled", "relay addrs", conf.RelayAddrStrings) + log.Info("relay enabled", "addrInfos", conf.RelayAddrInfos()) opts = append(opts, lp2p.EnableRelay(), lp2p.EnableAutoRelayWithStaticRelays(conf.RelayAddrInfos()), diff --git a/network/network_test.go b/network/network_test.go index fd6e8a285..61b0c550a 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -68,7 +68,6 @@ func testConfig() *Config { ListenAddrStrings: []string{}, NetworkKey: util.TempFilePath(), BootstrapAddrStrings: []string{}, - MinConns: 4, MaxConns: 8, EnableNATService: false, EnableUPnP: false, diff --git a/network/peermgr.go b/network/peermgr.go index 7b290d0fd..68834678e 100644 --- a/network/peermgr.go +++ b/network/peermgr.go @@ -39,8 +39,8 @@ func newPeerMgr(ctx context.Context, h lp2phost.Host, b := &peerMgr{ ctx: ctx, bootstrapAddrs: conf.BootstrapAddrInfos(), - minConns: conf.MinConns, - maxConns: conf.MaxConns, + minConns: conf.ScaledMinConns(), + maxConns: conf.ScaledMaxConns(), peers: make(map[lp2ppeer.ID]*peerInfo), host: h, logger: log, diff --git a/network/utils.go b/network/utils.go index 184c200f0..d9fff03ec 100644 --- a/network/utils.go +++ b/network/utils.go @@ -13,7 +13,6 @@ import ( lp2prcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/multiformats/go-multiaddr" "github.com/pactus-project/pactus/crypto/hash" - "github.com/pactus-project/pactus/util" "github.com/pactus-project/pactus/util/logger" ) @@ -131,36 +130,30 @@ func SubnetsToFilters(subnets []*net.IPNet, action multiaddr.Action) *multiaddr. return filters } -func MakeScalingLimitConfig(minConns, maxConns int) lp2prcmgr.ScalingLimitConfig { - limit := lp2prcmgr.DefaultLimits - - limit.SystemBaseLimit.ConnsOutbound = util.LogScale(maxConns / 2) - limit.SystemBaseLimit.ConnsInbound = util.LogScale(maxConns / 2) - limit.SystemBaseLimit.Conns = util.LogScale(maxConns) - limit.SystemBaseLimit.StreamsOutbound = util.LogScale(maxConns / 2) - limit.SystemBaseLimit.StreamsInbound = util.LogScale(maxConns / 2) - limit.SystemBaseLimit.Streams = util.LogScale(maxConns) - - limit.ServiceLimitIncrease.ConnsOutbound = util.LogScale(minConns / 2) - limit.ServiceLimitIncrease.ConnsInbound = util.LogScale(minConns / 2) - limit.ServiceLimitIncrease.Conns = util.LogScale(minConns) - limit.ServiceLimitIncrease.StreamsOutbound = util.LogScale(minConns / 2) - limit.ServiceLimitIncrease.StreamsInbound = util.LogScale(minConns / 2) - limit.ServiceLimitIncrease.Streams = util.LogScale(minConns) - - limit.TransientBaseLimit.ConnsOutbound = util.LogScale(maxConns / 2) - limit.TransientBaseLimit.ConnsInbound = util.LogScale(maxConns / 2) - limit.TransientBaseLimit.Conns = util.LogScale(maxConns) - limit.TransientBaseLimit.StreamsOutbound = util.LogScale(maxConns / 2) - limit.TransientBaseLimit.StreamsInbound = util.LogScale(maxConns / 2) - limit.TransientBaseLimit.Streams = util.LogScale(maxConns) - - limit.TransientLimitIncrease.ConnsInbound = util.LogScale(minConns / 2) - limit.TransientLimitIncrease.Conns = util.LogScale(minConns) - limit.TransientLimitIncrease.StreamsInbound = util.LogScale(minConns / 2) - limit.TransientLimitIncrease.Streams = util.LogScale(minConns) - - return limit +func BuildConcreteLimitConfig(maxConns int) lp2prcmgr.ConcreteLimitConfig { + changes := lp2prcmgr.PartialLimitConfig{} + + updateResourceLimits := func(limit *lp2prcmgr.ResourceLimits, maxConns, coefficient int) { + maxConnVal := lp2prcmgr.LimitVal(maxConns * coefficient) + + limit.ConnsOutbound = maxConnVal / 2 + limit.ConnsInbound = maxConnVal / 2 + limit.Conns = maxConnVal + limit.StreamsOutbound = maxConnVal * 4 + limit.StreamsInbound = maxConnVal * 4 + limit.Streams = maxConnVal * 8 + } + + updateResourceLimits(&changes.System, maxConns, 1) + updateResourceLimits(&changes.ServiceDefault, maxConns, 1) + updateResourceLimits(&changes.ProtocolDefault, maxConns, 1) + updateResourceLimits(&changes.ProtocolPeerDefault, maxConns, 1) + updateResourceLimits(&changes.Transient, maxConns, 1) + + defaultLimitConfig := lp2prcmgr.DefaultLimits.AutoScale() + changedLimitConfig := changes.Build(defaultLimitConfig) + + return changedLimitConfig } func MessageIDFunc(m *lp2pspb.Message) string { diff --git a/tests/main_test.go b/tests/main_test.go index 6f48ba35d..f33ed244f 100644 --- a/tests/main_test.go +++ b/tests/main_test.go @@ -86,7 +86,6 @@ func TestMain(m *testing.M) { tConfigs[i].Network.NetworkName = "test" tConfigs[i].Network.ListenAddrStrings = []string{"/ip4/127.0.0.1/tcp/0", "/ip4/127.0.0.1/udp/0/quic-v1"} tConfigs[i].Network.BootstrapAddrStrings = []string{} - tConfigs[i].Network.MinConns = 4 tConfigs[i].Network.MaxConns = 8 tConfigs[i].HTTP.Enable = false tConfigs[i].GRPC.Enable = false