Skip to content

Commit

Permalink
peer+wire: add addrv2 message, protocol negotiation
Browse files Browse the repository at this point in the history
  • Loading branch information
Crypt-iQ committed Feb 24, 2022
1 parent 201c083 commit cb6f21b
Show file tree
Hide file tree
Showing 8 changed files with 714 additions and 81 deletions.
231 changes: 191 additions & 40 deletions peer/peer.go

Large diffs are not rendered by default.

290 changes: 255 additions & 35 deletions peer/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,18 +289,16 @@ func TestPeerConnection(t *testing.T) {
{
"basic handshake",
func() (*peer.Peer, *peer.Peer, error) {
inConn, outConn := pipe(
&conn{raddr: "10.0.0.1:8333"},
&conn{raddr: "10.0.0.2:8333"},
)
inPeer := peer.NewInboundPeer(peer1Cfg)
inPeer.AssociateConnection(inConn)

outPeer, err := peer.NewOutboundPeer(peer2Cfg, "10.0.0.2:8333")
if err != nil {
return nil, nil, err
}
outPeer.AssociateConnection(outConn)

err = setupPeerConnection(inPeer, outPeer)
if err != nil {
return nil, nil, err
}

for i := 0; i < 4; i++ {
select {
Expand All @@ -315,18 +313,16 @@ func TestPeerConnection(t *testing.T) {
{
"socks proxy",
func() (*peer.Peer, *peer.Peer, error) {
inConn, outConn := pipe(
&conn{raddr: "10.0.0.1:8333", proxy: true},
&conn{raddr: "10.0.0.2:8333"},
)
inPeer := peer.NewInboundPeer(peer1Cfg)
inPeer.AssociateConnection(inConn)

outPeer, err := peer.NewOutboundPeer(peer2Cfg, "10.0.0.2:8333")
if err != nil {
return nil, nil, err
}
outPeer.AssociateConnection(outConn)

err = setupPeerConnection(inPeer, outPeer)
if err != nil {
return nil, nil, err
}

for i := 0; i < 4; i++ {
select {
Expand Down Expand Up @@ -359,7 +355,7 @@ func TestPeerConnection(t *testing.T) {
// TestPeerListeners tests that the peer listeners are called as expected.
func TestPeerListeners(t *testing.T) {
verack := make(chan struct{}, 1)
ok := make(chan wire.Message, 20)
ok := make(chan wire.Message, 22)
peerCfg := &peer.Config{
Listeners: peer.MessageListeners{
OnGetAddr: func(p *peer.Peer, msg *wire.MsgGetAddr) {
Expand Down Expand Up @@ -447,6 +443,12 @@ func TestPeerListeners(t *testing.T) {
OnSendHeaders: func(p *peer.Peer, msg *wire.MsgSendHeaders) {
ok <- msg
},
OnSendAddrV2: func(p *peer.Peer, msg *wire.MsgSendAddrV2) {
ok <- msg
},
OnAddrV2: func(p *peer.Peer, msg *wire.MsgAddrV2) {
ok <- msg
},
},
UserAgentName: "peer",
UserAgentVersion: "1.0",
Expand All @@ -456,12 +458,7 @@ func TestPeerListeners(t *testing.T) {
TrickleInterval: time.Second * 10,
AllowSelfConns: true,
}
inConn, outConn := pipe(
&conn{raddr: "10.0.0.1:8333"},
&conn{raddr: "10.0.0.2:8333"},
)
inPeer := peer.NewInboundPeer(peerCfg)
inPeer.AssociateConnection(inConn)

peerCfg.Listeners = peer.MessageListeners{
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
Expand All @@ -473,7 +470,12 @@ func TestPeerListeners(t *testing.T) {
t.Errorf("NewOutboundPeer: unexpected err %v\n", err)
return
}
outPeer.AssociateConnection(outConn)

err = setupPeerConnection(inPeer, outPeer)
if err != nil {
t.Errorf("setupPeerConnection: failed: %v\n", err)
return
}

for i := 0; i < 2; i++ {
select {
Expand Down Expand Up @@ -597,6 +599,14 @@ func TestPeerListeners(t *testing.T) {
"OnSendHeaders",
wire.NewMsgSendHeaders(),
},
{
"OnSendAddrV2",
wire.NewMsgSendAddrV2(),
},
{
"OnAddrV2",
wire.NewMsgAddrV2(),
},
}
t.Logf("Running %d tests", len(tests))
for _, test := range tests {
Expand Down Expand Up @@ -881,17 +891,17 @@ func TestDuplicateVersionMsg(t *testing.T) {
Services: 0,
AllowSelfConns: true,
}
inConn, outConn := pipe(
&conn{laddr: "10.0.0.1:9108", raddr: "10.0.0.2:9108"},
&conn{laddr: "10.0.0.2:9108", raddr: "10.0.0.1:9108"},
)
outPeer, err := peer.NewOutboundPeer(peerCfg, inConn.laddr)
outPeer, err := peer.NewOutboundPeer(peerCfg, "10.0.0.2:8333")
if err != nil {
t.Fatalf("NewOutboundPeer: unexpected err: %v\n", err)
}
outPeer.AssociateConnection(outConn)
inPeer := peer.NewInboundPeer(peerCfg)
inPeer.AssociateConnection(inConn)

err = setupPeerConnection(inPeer, outPeer)
if err != nil {
t.Fatalf("setupPeerConnection failed to connect: %v\n", err)
}

// Wait for the veracks from the initial protocol version negotiation.
for i := 0; i < 2; i++ {
select {
Expand Down Expand Up @@ -947,17 +957,16 @@ func TestUpdateLastBlockHeight(t *testing.T) {
remotePeerCfg.NewestBlock = func() (*chainhash.Hash, int32, error) {
return &chainhash.Hash{}, remotePeerHeight, nil
}
inConn, outConn := pipe(
&conn{laddr: "10.0.0.1:9108", raddr: "10.0.0.2:9108"},
&conn{laddr: "10.0.0.2:9108", raddr: "10.0.0.1:9108"},
)
localPeer, err := peer.NewOutboundPeer(&peerCfg, inConn.laddr)
localPeer, err := peer.NewOutboundPeer(&peerCfg, "10.0.0.2:8333")
if err != nil {
t.Fatalf("NewOutboundPeer: unexpected err: %v\n", err)
}
localPeer.AssociateConnection(outConn)
inPeer := peer.NewInboundPeer(&remotePeerCfg)
inPeer.AssociateConnection(inConn)

err = setupPeerConnection(inPeer, localPeer)
if err != nil {
t.Fatalf("setupPeerConnection failed to connect: %v\n", err)
}

// Wait for the veracks from the initial protocol version negotiation.
for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -989,3 +998,214 @@ func TestUpdateLastBlockHeight(t *testing.T) {
remotePeerHeight+1)
}
}

// setupPeerConnection initiates a tcp connection between two peers.
func setupPeerConnection(in, out *peer.Peer) error {
// listenFunc is a function closure that listens for a tcp connection.
// The tcp connection will be the one the inbound peer uses. This will
// be run as a goroutine.
listenFunc := func(l *net.TCPListener, errChan chan error,
listenChan chan struct{}) {

listenChan <- struct{}{}

conn, err := l.Accept()
if err != nil {
errChan <- err
return
}

in.AssociateConnection(conn)
errChan <- nil
}

// dialFunc is a function closure that initiates the tcp connection.
// The tcp connection will be the one the outbound peer uses.
dialFunc := func(addr *net.TCPAddr) error {
conn, err := net.Dial("tcp", addr.String())
if err != nil {
return err
}

out.AssociateConnection(conn)
return nil
}

listenAddr := "localhost:0"

addr, err := net.ResolveTCPAddr("tcp", listenAddr)
if err != nil {
return err
}

l, err := net.ListenTCP("tcp", addr)
if err != nil {
return err
}

errChan := make(chan error, 1)
listenChan := make(chan struct{}, 1)

go listenFunc(l, errChan, listenChan)
<-listenChan

if err := dialFunc(l.Addr().(*net.TCPAddr)); err != nil {
return err
}

select {
case err = <-errChan:
return err
case <-time.After(time.Second * 2):
return errors.New("failed to create connection")
}
}

// TestSendAddrV2Handshake tests that the version-verack handshake with the
// addition of the sendaddrv2 message works as expected.
func TestSendAddrV2Handshake(t *testing.T) {
verack := make(chan struct{}, 2)
sendaddr := make(chan struct{}, 2)
peer1Cfg := &peer.Config{
Listeners: peer.MessageListeners{
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
verack <- struct{}{}
},
OnSendAddrV2: func(p *peer.Peer,
msg *wire.MsgSendAddrV2) {

sendaddr <- struct{}{}
},
},
AllowSelfConns: true,
ChainParams: &chaincfg.MainNetParams,
}

peer2Cfg := &peer.Config{
Listeners: peer1Cfg.Listeners,
AllowSelfConns: true,
ChainParams: &chaincfg.MainNetParams,
}

verackErr := errors.New("verack timeout")

tests := []struct {
name string
expectsV2 bool
setup func() (*peer.Peer, *peer.Peer, error)
}{
{
"successful sendaddrv2 handshake",
true,
func() (*peer.Peer, *peer.Peer, error) {
inPeer := peer.NewInboundPeer(peer1Cfg)
outPeer, err := peer.NewOutboundPeer(
peer2Cfg, "10.0.0.2:8333",
)
if err != nil {
return nil, nil, err
}

err = setupPeerConnection(inPeer, outPeer)
if err != nil {
return nil, nil, err
}

for i := 0; i < 4; i++ {
select {
case <-sendaddr:
case <-verack:
case <-time.After(time.Second * 2):
return nil, nil, verackErr
}
}

return inPeer, outPeer, nil
},
},
{
"handshake with legacy inbound peer",
false,
func() (*peer.Peer, *peer.Peer, error) {
legacyVersion := wire.AddrV2Version - 1
peer1Cfg.ProtocolVersion = legacyVersion
inPeer := peer.NewInboundPeer(peer1Cfg)
outPeer, err := peer.NewOutboundPeer(
peer2Cfg, "10.0.0.2:8333",
)
if err != nil {
return nil, nil, err
}

err = setupPeerConnection(inPeer, outPeer)
if err != nil {
return nil, nil, err
}

for i := 0; i < 2; i++ {
select {
case <-verack:
case <-time.After(time.Second * 2):
return nil, nil, verackErr
}
}

return inPeer, outPeer, nil
},
},
{
"handshake with legacy outbound peer",
false,
func() (*peer.Peer, *peer.Peer, error) {
inPeer := peer.NewInboundPeer(peer1Cfg)
legacyVersion := wire.AddrV2Version - 1
peer2Cfg.ProtocolVersion = legacyVersion
outPeer, err := peer.NewOutboundPeer(
peer2Cfg, "10.0.0.2:8333",
)
if err != nil {
return nil, nil, err
}

err = setupPeerConnection(inPeer, outPeer)
if err != nil {
return nil, nil, err
}

for i := 0; i < 2; i++ {
select {
case <-verack:
case <-time.After(time.Second * 2):
return nil, nil, verackErr
}
}

return inPeer, outPeer, nil
},
},
}

t.Logf("Running %d tests", len(tests))
for i, test := range tests {
inPeer, outPeer, err := test.setup()
if err != nil {
t.Fatalf("TestSendAddrV2Handshake setup #%d: "+
"unexpected err: %v", i, err)
}

if inPeer.WantsAddrV2() != test.expectsV2 {
t.Fatalf("TestSendAddrV2Handshake #%d expected "+
"wantsAddrV2 to be %v instead was %v", i,
test.expectsV2, inPeer.WantsAddrV2())
} else if outPeer.WantsAddrV2() != test.expectsV2 {
t.Fatalf("TestSendAddrV2Handshake #%d expected "+
"wantsAddrV2 to be %v instead was %v", i,
test.expectsV2, outPeer.WantsAddrV2())
}

inPeer.Disconnect()
outPeer.Disconnect()
inPeer.WaitForDisconnect()
outPeer.WaitForDisconnect()
}
}
Loading

0 comments on commit cb6f21b

Please sign in to comment.