Skip to content

Commit

Permalink
Merge pull request #110 from irisnet/vincent-hot-fix
Browse files Browse the repository at this point in the history
R4R: Reserve IDs in InitPeer instead of AddPeer
  • Loading branch information
zhangyelong authored Apr 9, 2020
2 parents 0cb87cf + 946b2fd commit c7a7340
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 9 deletions.
2 changes: 1 addition & 1 deletion consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool, options
metrics: NopMetrics(),
}
conR.updateFastSyncingMetric()
conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR)
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)

for _, option := range options {
option(conR)
Expand Down
2 changes: 1 addition & 1 deletion evidence/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor {
evR := &EvidenceReactor{
evpool: evpool,
}
evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR)
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR)
return evR
}

Expand Down
11 changes: 8 additions & 3 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type mempoolIDs struct {
activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter
}

// Reserve searches for the next unused ID and assignes it to the
// Reserve searches for the next unused ID and assigns it to the
// peer.
func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) {
ids.mtx.Lock()
Expand Down Expand Up @@ -111,10 +111,16 @@ func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReac
Mempool: mempool,
ids: newMempoolIDs(),
}
memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR)
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
return memR
}

// InitPeer implements Reactor by creating a state for the peer.
func (memR *MempoolReactor) InitPeer(peer p2p.Peer) p2p.Peer {
memR.ids.ReserveForPeer(peer)
return peer
}

// SetLogger sets the Logger on the reactor and the underlying Mempool.
func (memR *MempoolReactor) SetLogger(l log.Logger) {
memR.Logger = l
Expand Down Expand Up @@ -143,7 +149,6 @@ func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor.
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (memR *MempoolReactor) AddPeer(peer p2p.Peer) {
memR.ids.ReserveForPeer(peer)
go memR.broadcastTxRoutine(peer)
}

Expand Down
18 changes: 18 additions & 0 deletions mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,21 @@ func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) {
ids.ReserveForPeer(peer)
})
}

func TestDontExhaustMaxActiveIDs(t *testing.T) {
config := cfg.TestConfig()
const N = 1
reactors := makeAndConnectMempoolReactors(config, N)
defer func() {
for _, r := range reactors {
r.Stop()
}
}()
reactor := reactors[0]

for i := 0; i < maxActiveIDs+1; i++ {
peer := mock.NewPeer(nil)
reactor.Receive(MempoolChannel, peer, []byte{0x1, 0x2, 0x3})
reactor.AddPeer(peer)
}
}
4 changes: 4 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ func NewNode(config *cfg.Config,

p2p.MultiplexTransportConnFilters(connFilters...)(transport)

// Limit the number of incoming connections.
max := config.P2P.MaxNumInboundPeers
p2p.MultiplexTransportMaxIncomingConnections(max)(transport)

// Setup Switch.
sw := p2p.NewSwitch(
config.P2P,
Expand Down
2 changes: 1 addition & 1 deletion p2p/pex/pex_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor {
lastReceivedRequests: cmn.NewCMap(),
crawlPeerInfos: make(map[p2p.ID]crawlPeerInfo),
}
r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
r.BaseReactor = *p2p.NewBaseReactor("PEX", r)
return r
}

Expand Down
16 changes: 14 additions & 2 deletions p2p/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/pkg/errors"
"golang.org/x/net/netutil"
"net"
"time"

Expand Down Expand Up @@ -121,11 +122,18 @@ func MultiplexTransportResolver(resolver IPResolver) MultiplexTransportOption {
return func(mt *MultiplexTransport) { mt.resolver = resolver }
}

// MultiplexTransportMaxIncomingConnections sets the maximum number of
// simultaneous connections (incoming). Default: 0 (unlimited)
func MultiplexTransportMaxIncomingConnections(n int) MultiplexTransportOption {
return func(mt *MultiplexTransport) { mt.maxIncomingConnections = n }
}

// MultiplexTransport accepts and dials tcp connections and upgrades them to
// multiplexed peers.
type MultiplexTransport struct {
netAddr NetAddress
listener net.Listener
netAddr NetAddress
listener net.Listener
maxIncomingConnections int // see MaxIncomingConnections

acceptc chan accept
closec chan struct{}
Expand Down Expand Up @@ -239,6 +247,10 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error {
return err
}

if mt.maxIncomingConnections > 0 {
ln = netutil.LimitListener(ln, mt.maxIncomingConnections)
}

mt.netAddr = addr
mt.listener = ln

Expand Down
45 changes: 45 additions & 0 deletions p2p/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"net"
"reflect"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -134,6 +135,50 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
}
}

func TestTransportMultiplexMaxIncomingConnections(t *testing.T) {
mt := newMultiplexTransport(
emptyNodeInfo(),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
)
id := mt.nodeKey.ID()

MultiplexTransportMaxIncomingConnections(0)(mt)

addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}

if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}

errc := make(chan error)

go func() {
addr := NewNetAddress(id, mt.listener.Addr())

_, err := addr.Dial()
if err != nil {
errc <- err
return
}

close(errc)
}()

if err := <-errc; err != nil {
t.Errorf("connection failed: %v", err)
}

_, err = mt.Accept(peerConfig{})
if err == nil || !strings.Contains(err.Error(), "connection reset by peer") {
t.Errorf("expected connection reset by peer error, got %v", err)
}
}

func TestTransportMultiplexAcceptMultiple(t *testing.T) {
mt := testSetupMultiplexTransport(t)
laddr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
Expand Down
2 changes: 1 addition & 1 deletion version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
// TMCoreSemVer is the current version of Tendermint Core.
// It's the Semantic Version of the software.
// Must be a string because scripts like dist.sh read this file.
TMCoreSemVer = "0.32.1"
TMCoreSemVer = "0.32.2"

// ABCISemVer is the semantic version of the ABCI library
ABCISemVer = "0.15.0"
Expand Down

0 comments on commit c7a7340

Please sign in to comment.