From cb6f21b598e90c8b1ea55e569b838797e9b37efe Mon Sep 17 00:00:00 2001 From: eugene Date: Thu, 3 Feb 2022 14:04:43 -0500 Subject: [PATCH] peer+wire: add addrv2 message, protocol negotiation --- peer/peer.go | 231 ++++++++++++++++++++++++++------ peer/peer_test.go | 290 ++++++++++++++++++++++++++++++++++++----- server.go | 68 ++++++++++ wire/message.go | 18 ++- wire/message_test.go | 4 +- wire/msgaddrv2.go | 102 +++++++++++++++ wire/msgaddrv2_test.go | 73 +++++++++++ wire/protocol.go | 9 +- 8 files changed, 714 insertions(+), 81 deletions(-) create mode 100644 wire/msgaddrv2.go create mode 100644 wire/msgaddrv2_test.go diff --git a/peer/peer.go b/peer/peer.go index a7b925802f..6d34c5f822 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -29,7 +29,7 @@ import ( const ( // MaxProtocolVersion is the max protocol version the peer supports. - MaxProtocolVersion = wire.FeeFilterVersion + MaxProtocolVersion = wire.AddrV2Version // DefaultTrickleInterval is the min time between attempts to send an // inv message to a peer. @@ -102,6 +102,9 @@ type MessageListeners struct { // OnAddr is invoked when a peer receives an addr bitcoin message. OnAddr func(p *Peer, msg *wire.MsgAddr) + // OnAddrV2 is invoked when a peer receives an addrv2 bitcoin message. + OnAddrV2 func(p *Peer, msg *wire.MsgAddrV2) + // OnPing is invoked when a peer receives a ping bitcoin message. OnPing func(p *Peer, msg *wire.MsgPing) @@ -197,6 +200,9 @@ type MessageListeners struct { // message. OnSendHeaders func(p *Peer, msg *wire.MsgSendHeaders) + // OnSendAddrV2 is invoked when a peer receives a sendaddrv2 message. + OnSendAddrV2 func(p *Peer, msg *wire.MsgSendAddrV2) + // OnRead is invoked when a peer receives a bitcoin message. It // consists of the number of bytes read, the message, and whether or not // an error in the read occurred. Typically, callers will opt to use @@ -399,7 +405,7 @@ type AddrFunc func(remoteAddr *wire.NetAddress) *wire.NetAddress // HostToNetAddrFunc is a func which takes a host, port, services and returns // the netaddress. type HostToNetAddrFunc func(host string, port uint16, - services wire.ServiceFlag) (*wire.NetAddress, error) + services wire.ServiceFlag) (*wire.NetAddressV2, error) // NOTE: The overall data flow of a peer is split into 3 goroutines. Inbound // messages are read via the inHandler goroutine and generally dispatched to @@ -445,7 +451,7 @@ type Peer struct { inbound bool flagsMtx sync.Mutex // protects the peer flags below - na *wire.NetAddress + na *wire.NetAddressV2 id int32 userAgent string services wire.ServiceFlag @@ -455,6 +461,7 @@ type Peer struct { sendHeadersPreferred bool // peer sent a sendheaders message verAckReceived bool witnessEnabled bool + sendAddrV2 bool wireEncoding wire.MessageEncoding @@ -585,7 +592,7 @@ func (p *Peer) ID() int32 { // NA returns the peer network address. // // This function is safe for concurrent access. -func (p *Peer) NA() *wire.NetAddress { +func (p *Peer) NA() *wire.NetAddressV2 { p.flagsMtx.Lock() na := p.na p.flagsMtx.Unlock() @@ -820,6 +827,16 @@ func (p *Peer) IsWitnessEnabled() bool { return witnessEnabled } +// WantsAddrV2 returns if the peer supports addrv2 messages instead of the +// legacy addr messages. +func (p *Peer) WantsAddrV2() bool { + p.flagsMtx.Lock() + wantsAddrV2 := p.sendAddrV2 + p.flagsMtx.Unlock() + + return wantsAddrV2 +} + // PushAddrMsg sends an addr message to the connected peer using the provided // addresses. This function is useful over manually sending the message via // QueueMessage since it automatically limits the addresses to the maximum @@ -856,6 +873,38 @@ func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress) ([]*wire.NetAddress, er return msg.AddrList, nil } +// PushAddrV2Msg is used to push an addrv2 message to the remote peer. +// +// This function is safe for concurrent access. +func (p *Peer) PushAddrV2Msg(addrs []*wire.NetAddressV2) ( + []*wire.NetAddressV2, error) { + + count := len(addrs) + + // Nothing to send. + if count == 0 { + return nil, nil + } + + m := wire.NewMsgAddrV2() + m.AddrList = make([]*wire.NetAddressV2, count) + copy(m.AddrList, addrs) + + // Randomize the addresses sent if there are more than the maximum. + if count > wire.MaxV2AddrPerMsg { + rand.Shuffle(count, func(i, j int) { + m.AddrList[i] = m.AddrList[j] + m.AddrList[j] = m.AddrList[i] + }) + + // Truncate it to the maximum size. + m.AddrList = m.AddrList[:wire.MaxV2AddrPerMsg] + } + + p.QueueMessage(m, nil) + return m.AddrList, nil +} + // PushGetBlocksMsg sends a getblocks message for the provided block locator // and stop hash. It will ignore back-to-back duplicate requests. // @@ -1363,6 +1412,19 @@ out: continue } + // Since the protocol version is 70016 but we don't + // implement compact blocks, we have to ignore unknown + // messages after the version-verack handshake. This + // matches bitcoind's behavior and is necessary since + // compact blocks negotiation occurs after the + // handshake. + if err == wire.ErrUnknownMessage { + log.Debugf("Received unknown message from %s:"+ + " %v", p, err) + idleTimer.Reset(idleTimeout) + continue + } + // Only log the error and send reject message if the // local peer is not forcibly disconnecting and the // remote peer has not disconnected. @@ -1404,6 +1466,11 @@ out: ) break out + case *wire.MsgSendAddrV2: + // Disconnect if peer sends this after the handshake is + // completed. + break out + case *wire.MsgGetAddr: if p.cfg.Listeners.OnGetAddr != nil { p.cfg.Listeners.OnGetAddr(p, msg) @@ -1414,6 +1481,11 @@ out: p.cfg.Listeners.OnAddr(p, msg) } + case *wire.MsgAddrV2: + if p.cfg.Listeners.OnAddrV2 != nil { + p.cfg.Listeners.OnAddrV2(p, msg) + } + case *wire.MsgPing: p.handlePingMsg(msg) if p.cfg.Listeners.OnPing != nil { @@ -1986,29 +2058,8 @@ func (p *Peer) readRemoteVersionMsg() error { return nil } -// readRemoteVerAckMsg waits for the next message to arrive from the remote -// peer. If this message is not a verack message, then an error is returned. -// This method is to be used as part of the version negotiation upon a new -// connection. -func (p *Peer) readRemoteVerAckMsg() error { - // Read the next message from the wire. - remoteMsg, _, err := p.readMessage(wire.LatestEncoding) - if err != nil { - return err - } - - // It should be a verack message, otherwise send a reject message to the - // peer explaining why. - msg, ok := remoteMsg.(*wire.MsgVerAck) - if !ok { - reason := "a verack message must follow version" - rejectMsg := wire.NewMsgReject( - msg.Command(), wire.RejectMalformed, reason, - ) - _ = p.writeMessage(rejectMsg, wire.LatestEncoding) - return errors.New(reason) - } - +// processRemoteVerAckMsg takes the verack from the remote peer and handles it. +func (p *Peer) processRemoteVerAckMsg(msg *wire.MsgVerAck) { p.flagsMtx.Lock() p.verAckReceived = true p.flagsMtx.Unlock() @@ -2016,8 +2067,6 @@ func (p *Peer) readRemoteVerAckMsg() error { if p.cfg.Listeners.OnVerAck != nil { p.cfg.Listeners.OnVerAck(p, msg) } - - return nil } // localVersionMsg creates a version message that can be used to send to the @@ -2032,7 +2081,15 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) { } } - theirNA := p.na + theirNA := p.na.ToLegacy() + + // If p.na is a torv3 hidden service address, we'll need to send over + // an empty NetAddress for their address. + if p.na.IsTorV3() { + theirNA = wire.NewNetAddressIPPort( + net.IP([]byte{0, 0, 0, 0}), p.na.Port, p.na.Services, + ) + } // If we are behind a proxy and the connection comes from the proxy then // we return an unroutable address as their address. This is to prevent @@ -2040,7 +2097,7 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) { if p.cfg.Proxy != "" { proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy) // invalid proxy means poorly configured, be on the safe side. - if err != nil || p.na.IP.String() == proxyaddress { + if err != nil || p.na.Addr.String() == proxyaddress { theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, theirNA.Services) } @@ -2092,14 +2149,71 @@ func (p *Peer) writeLocalVersionMsg() error { return p.writeMessage(localVerMsg, wire.LatestEncoding) } +// writeSendAddrV2Msg writes our sendaddrv2 message to the remote peer if the +// peer supports protocol version 70016 and above. +func (p *Peer) writeSendAddrV2Msg(pver uint32) error { + if pver < wire.AddrV2Version { + return nil + } + + sendAddrMsg := wire.NewMsgSendAddrV2() + return p.writeMessage(sendAddrMsg, wire.LatestEncoding) +} + +// waitToFinishNegotiation waits until desired negotiation messages are +// received, recording the remote peer's preference for sendaddrv2 as an +// example. The list of negotiated features can be expanded in the future. If a +// verack is received, negotiation stops and the connection is live. +func (p *Peer) waitToFinishNegotiation(pver uint32) error { + // There are several possible messages that can be received here. We + // could immediately receive verack and be done with the handshake. We + // could receive sendaddrv2 and still have to wait for verack. Or we + // can receive unknown messages before and after sendaddrv2 and still + // have to wait for verack. + for { + remoteMsg, _, err := p.readMessage(wire.LatestEncoding) + if err == wire.ErrUnknownMessage { + continue + } else if err != nil { + return err + } + + switch m := remoteMsg.(type) { + case *wire.MsgSendAddrV2: + if pver >= wire.AddrV2Version { + p.flagsMtx.Lock() + p.sendAddrV2 = true + p.flagsMtx.Unlock() + + if p.cfg.Listeners.OnSendAddrV2 != nil { + p.cfg.Listeners.OnSendAddrV2(p, m) + } + } + case *wire.MsgVerAck: + // Receiving a verack means we are done with the + // handshake. + p.processRemoteVerAckMsg(m) + return nil + default: + // This is triggered if the peer sends, for example, a + // GETDATA message during this negotiation. + return wire.ErrInvalidHandshake + } + } +} + // negotiateInboundProtocol performs the negotiation protocol for an inbound // peer. The events should occur in the following order, otherwise an error is // returned: // // 1. Remote peer sends their version. // 2. We send our version. -// 3. We send our verack. -// 4. Remote peer sends their verack. +// 3. We send sendaddrv2 if their version is >= 70016. +// 4. We send our verack. +// 5. Wait until sendaddrv2 or verack is received. Unknown messages are +// skipped as it could be wtxidrelay or a different message in the future +// that btcd does not implement but bitcoind does. +// 6. If remote peer sent sendaddrv2 above, wait until receipt of verack. func (p *Peer) negotiateInboundProtocol() error { if err := p.readRemoteVersionMsg(); err != nil { return err @@ -2109,12 +2223,22 @@ func (p *Peer) negotiateInboundProtocol() error { return err } + var protoVersion uint32 + p.flagsMtx.Lock() + protoVersion = p.protocolVersion + p.flagsMtx.Unlock() + + if err := p.writeSendAddrV2Msg(protoVersion); err != nil { + return err + } + err := p.writeMessage(wire.NewMsgVerAck(), wire.LatestEncoding) if err != nil { return err } - return p.readRemoteVerAckMsg() + // Finish the negotiation by waiting for negotiable messages or verack. + return p.waitToFinishNegotiation(protoVersion) } // negotiateOutboundProtocol performs the negotiation protocol for an outbound @@ -2123,8 +2247,11 @@ func (p *Peer) negotiateInboundProtocol() error { // // 1. We send our version. // 2. Remote peer sends their version. -// 3. Remote peer sends their verack. +// 3. We send sendaddrv2 if their version is >= 70016. // 4. We send our verack. +// 5. We wait to receive sendaddrv2 or verack, skipping unknown messages as +// in the inbound case. +// 6. If sendaddrv2 was received, wait for receipt of verack. func (p *Peer) negotiateOutboundProtocol() error { if err := p.writeLocalVersionMsg(); err != nil { return err @@ -2134,11 +2261,22 @@ func (p *Peer) negotiateOutboundProtocol() error { return err } - if err := p.readRemoteVerAckMsg(); err != nil { + var protoVersion uint32 + p.flagsMtx.Lock() + protoVersion = p.protocolVersion + p.flagsMtx.Unlock() + + if err := p.writeSendAddrV2Msg(protoVersion); err != nil { + return err + } + + err := p.writeMessage(wire.NewMsgVerAck(), wire.LatestEncoding) + if err != nil { return err } - return p.writeMessage(wire.NewMsgVerAck(), wire.LatestEncoding) + // Finish the negotiation by waiting for negotiable messages or verack. + return p.waitToFinishNegotiation(protoVersion) } // start begins processing input and output messages. @@ -2201,7 +2339,12 @@ func (p *Peer) AssociateConnection(conn net.Conn) { p.Disconnect() return } - p.na = na + + // Convert the NetAddress created above into NetAddressV2. + currentNa := wire.NetAddressV2FromBytes( + na.Timestamp, na.Services, na.IP, na.Port, + ) + p.na = currentNa } go func() { @@ -2267,7 +2410,10 @@ func NewInboundPeer(cfg *Config) *Peer { return newPeerBase(cfg, true) } -// NewOutboundPeer returns a new outbound bitcoin peer. +// NewOutboundPeer returns a new outbound bitcoin peer. If the Config argument +// does not set HostToNetAddress, connecting to anything other than an ipv4 or +// ipv6 address will fail and may cause a nil-pointer-dereference. This +// includes hostnames and onion services. func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) { p := newPeerBase(cfg, false) p.addr = addr @@ -2289,7 +2435,12 @@ func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) { } p.na = na } else { - p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0) + // If host is an onion hidden service or a hostname, it is + // likely that a nil-pointer-dereference will occur. The caller + // should set HostToNetAddress if connecting to these. + p.na = wire.NetAddressV2FromBytes( + time.Now(), 0, net.ParseIP(host), uint16(port), + ) } return p, nil diff --git a/peer/peer_test.go b/peer/peer_test.go index dd7f36aa3a..9df90c233d 100644 --- a/peer/peer_test.go +++ b/peer/peer_test.go @@ -289,18 +289,16 @@ func TestPeerConnection(t *testing.T) { { "basic handshake", func() (*peer.Peer, *peer.Peer, error) { - inConn, outConn := pipe( - &conn{raddr: "10.0.0.1:8333"}, - &conn{raddr: "10.0.0.2:8333"}, - ) inPeer := peer.NewInboundPeer(peer1Cfg) - inPeer.AssociateConnection(inConn) - outPeer, err := peer.NewOutboundPeer(peer2Cfg, "10.0.0.2:8333") if err != nil { return nil, nil, err } - outPeer.AssociateConnection(outConn) + + err = setupPeerConnection(inPeer, outPeer) + if err != nil { + return nil, nil, err + } for i := 0; i < 4; i++ { select { @@ -315,18 +313,16 @@ func TestPeerConnection(t *testing.T) { { "socks proxy", func() (*peer.Peer, *peer.Peer, error) { - inConn, outConn := pipe( - &conn{raddr: "10.0.0.1:8333", proxy: true}, - &conn{raddr: "10.0.0.2:8333"}, - ) inPeer := peer.NewInboundPeer(peer1Cfg) - inPeer.AssociateConnection(inConn) - outPeer, err := peer.NewOutboundPeer(peer2Cfg, "10.0.0.2:8333") if err != nil { return nil, nil, err } - outPeer.AssociateConnection(outConn) + + err = setupPeerConnection(inPeer, outPeer) + if err != nil { + return nil, nil, err + } for i := 0; i < 4; i++ { select { @@ -359,7 +355,7 @@ func TestPeerConnection(t *testing.T) { // TestPeerListeners tests that the peer listeners are called as expected. func TestPeerListeners(t *testing.T) { verack := make(chan struct{}, 1) - ok := make(chan wire.Message, 20) + ok := make(chan wire.Message, 22) peerCfg := &peer.Config{ Listeners: peer.MessageListeners{ OnGetAddr: func(p *peer.Peer, msg *wire.MsgGetAddr) { @@ -447,6 +443,12 @@ func TestPeerListeners(t *testing.T) { OnSendHeaders: func(p *peer.Peer, msg *wire.MsgSendHeaders) { ok <- msg }, + OnSendAddrV2: func(p *peer.Peer, msg *wire.MsgSendAddrV2) { + ok <- msg + }, + OnAddrV2: func(p *peer.Peer, msg *wire.MsgAddrV2) { + ok <- msg + }, }, UserAgentName: "peer", UserAgentVersion: "1.0", @@ -456,12 +458,7 @@ func TestPeerListeners(t *testing.T) { TrickleInterval: time.Second * 10, AllowSelfConns: true, } - inConn, outConn := pipe( - &conn{raddr: "10.0.0.1:8333"}, - &conn{raddr: "10.0.0.2:8333"}, - ) inPeer := peer.NewInboundPeer(peerCfg) - inPeer.AssociateConnection(inConn) peerCfg.Listeners = peer.MessageListeners{ OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) { @@ -473,7 +470,12 @@ func TestPeerListeners(t *testing.T) { t.Errorf("NewOutboundPeer: unexpected err %v\n", err) return } - outPeer.AssociateConnection(outConn) + + err = setupPeerConnection(inPeer, outPeer) + if err != nil { + t.Errorf("setupPeerConnection: failed: %v\n", err) + return + } for i := 0; i < 2; i++ { select { @@ -597,6 +599,14 @@ func TestPeerListeners(t *testing.T) { "OnSendHeaders", wire.NewMsgSendHeaders(), }, + { + "OnSendAddrV2", + wire.NewMsgSendAddrV2(), + }, + { + "OnAddrV2", + wire.NewMsgAddrV2(), + }, } t.Logf("Running %d tests", len(tests)) for _, test := range tests { @@ -881,17 +891,17 @@ func TestDuplicateVersionMsg(t *testing.T) { Services: 0, AllowSelfConns: true, } - inConn, outConn := pipe( - &conn{laddr: "10.0.0.1:9108", raddr: "10.0.0.2:9108"}, - &conn{laddr: "10.0.0.2:9108", raddr: "10.0.0.1:9108"}, - ) - outPeer, err := peer.NewOutboundPeer(peerCfg, inConn.laddr) + outPeer, err := peer.NewOutboundPeer(peerCfg, "10.0.0.2:8333") if err != nil { t.Fatalf("NewOutboundPeer: unexpected err: %v\n", err) } - outPeer.AssociateConnection(outConn) inPeer := peer.NewInboundPeer(peerCfg) - inPeer.AssociateConnection(inConn) + + err = setupPeerConnection(inPeer, outPeer) + if err != nil { + t.Fatalf("setupPeerConnection failed to connect: %v\n", err) + } + // Wait for the veracks from the initial protocol version negotiation. for i := 0; i < 2; i++ { select { @@ -947,17 +957,16 @@ func TestUpdateLastBlockHeight(t *testing.T) { remotePeerCfg.NewestBlock = func() (*chainhash.Hash, int32, error) { return &chainhash.Hash{}, remotePeerHeight, nil } - inConn, outConn := pipe( - &conn{laddr: "10.0.0.1:9108", raddr: "10.0.0.2:9108"}, - &conn{laddr: "10.0.0.2:9108", raddr: "10.0.0.1:9108"}, - ) - localPeer, err := peer.NewOutboundPeer(&peerCfg, inConn.laddr) + localPeer, err := peer.NewOutboundPeer(&peerCfg, "10.0.0.2:8333") if err != nil { t.Fatalf("NewOutboundPeer: unexpected err: %v\n", err) } - localPeer.AssociateConnection(outConn) inPeer := peer.NewInboundPeer(&remotePeerCfg) - inPeer.AssociateConnection(inConn) + + err = setupPeerConnection(inPeer, localPeer) + if err != nil { + t.Fatalf("setupPeerConnection failed to connect: %v\n", err) + } // Wait for the veracks from the initial protocol version negotiation. for i := 0; i < 2; i++ { @@ -989,3 +998,214 @@ func TestUpdateLastBlockHeight(t *testing.T) { remotePeerHeight+1) } } + +// setupPeerConnection initiates a tcp connection between two peers. +func setupPeerConnection(in, out *peer.Peer) error { + // listenFunc is a function closure that listens for a tcp connection. + // The tcp connection will be the one the inbound peer uses. This will + // be run as a goroutine. + listenFunc := func(l *net.TCPListener, errChan chan error, + listenChan chan struct{}) { + + listenChan <- struct{}{} + + conn, err := l.Accept() + if err != nil { + errChan <- err + return + } + + in.AssociateConnection(conn) + errChan <- nil + } + + // dialFunc is a function closure that initiates the tcp connection. + // The tcp connection will be the one the outbound peer uses. + dialFunc := func(addr *net.TCPAddr) error { + conn, err := net.Dial("tcp", addr.String()) + if err != nil { + return err + } + + out.AssociateConnection(conn) + return nil + } + + listenAddr := "localhost:0" + + addr, err := net.ResolveTCPAddr("tcp", listenAddr) + if err != nil { + return err + } + + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return err + } + + errChan := make(chan error, 1) + listenChan := make(chan struct{}, 1) + + go listenFunc(l, errChan, listenChan) + <-listenChan + + if err := dialFunc(l.Addr().(*net.TCPAddr)); err != nil { + return err + } + + select { + case err = <-errChan: + return err + case <-time.After(time.Second * 2): + return errors.New("failed to create connection") + } +} + +// TestSendAddrV2Handshake tests that the version-verack handshake with the +// addition of the sendaddrv2 message works as expected. +func TestSendAddrV2Handshake(t *testing.T) { + verack := make(chan struct{}, 2) + sendaddr := make(chan struct{}, 2) + peer1Cfg := &peer.Config{ + Listeners: peer.MessageListeners{ + OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) { + verack <- struct{}{} + }, + OnSendAddrV2: func(p *peer.Peer, + msg *wire.MsgSendAddrV2) { + + sendaddr <- struct{}{} + }, + }, + AllowSelfConns: true, + ChainParams: &chaincfg.MainNetParams, + } + + peer2Cfg := &peer.Config{ + Listeners: peer1Cfg.Listeners, + AllowSelfConns: true, + ChainParams: &chaincfg.MainNetParams, + } + + verackErr := errors.New("verack timeout") + + tests := []struct { + name string + expectsV2 bool + setup func() (*peer.Peer, *peer.Peer, error) + }{ + { + "successful sendaddrv2 handshake", + true, + func() (*peer.Peer, *peer.Peer, error) { + inPeer := peer.NewInboundPeer(peer1Cfg) + outPeer, err := peer.NewOutboundPeer( + peer2Cfg, "10.0.0.2:8333", + ) + if err != nil { + return nil, nil, err + } + + err = setupPeerConnection(inPeer, outPeer) + if err != nil { + return nil, nil, err + } + + for i := 0; i < 4; i++ { + select { + case <-sendaddr: + case <-verack: + case <-time.After(time.Second * 2): + return nil, nil, verackErr + } + } + + return inPeer, outPeer, nil + }, + }, + { + "handshake with legacy inbound peer", + false, + func() (*peer.Peer, *peer.Peer, error) { + legacyVersion := wire.AddrV2Version - 1 + peer1Cfg.ProtocolVersion = legacyVersion + inPeer := peer.NewInboundPeer(peer1Cfg) + outPeer, err := peer.NewOutboundPeer( + peer2Cfg, "10.0.0.2:8333", + ) + if err != nil { + return nil, nil, err + } + + err = setupPeerConnection(inPeer, outPeer) + if err != nil { + return nil, nil, err + } + + for i := 0; i < 2; i++ { + select { + case <-verack: + case <-time.After(time.Second * 2): + return nil, nil, verackErr + } + } + + return inPeer, outPeer, nil + }, + }, + { + "handshake with legacy outbound peer", + false, + func() (*peer.Peer, *peer.Peer, error) { + inPeer := peer.NewInboundPeer(peer1Cfg) + legacyVersion := wire.AddrV2Version - 1 + peer2Cfg.ProtocolVersion = legacyVersion + outPeer, err := peer.NewOutboundPeer( + peer2Cfg, "10.0.0.2:8333", + ) + if err != nil { + return nil, nil, err + } + + err = setupPeerConnection(inPeer, outPeer) + if err != nil { + return nil, nil, err + } + + for i := 0; i < 2; i++ { + select { + case <-verack: + case <-time.After(time.Second * 2): + return nil, nil, verackErr + } + } + + return inPeer, outPeer, nil + }, + }, + } + + t.Logf("Running %d tests", len(tests)) + for i, test := range tests { + inPeer, outPeer, err := test.setup() + if err != nil { + t.Fatalf("TestSendAddrV2Handshake setup #%d: "+ + "unexpected err: %v", i, err) + } + + if inPeer.WantsAddrV2() != test.expectsV2 { + t.Fatalf("TestSendAddrV2Handshake #%d expected "+ + "wantsAddrV2 to be %v instead was %v", i, + test.expectsV2, inPeer.WantsAddrV2()) + } else if outPeer.WantsAddrV2() != test.expectsV2 { + t.Fatalf("TestSendAddrV2Handshake #%d expected "+ + "wantsAddrV2 to be %v instead was %v", i, + test.expectsV2, outPeer.WantsAddrV2()) + } + + inPeer.Disconnect() + outPeer.Disconnect() + inPeer.WaitForDisconnect() + outPeer.WaitForDisconnect() + } +} diff --git a/server.go b/server.go index 2b5cc0ff42..5cef434292 100644 --- a/server.go +++ b/server.go @@ -344,6 +344,34 @@ func (sp *serverPeer) relayTxDisabled() bool { // pushAddrMsg sends a legacy addr message to the connected peer using the // provided addresses. func (sp *serverPeer) pushAddrMsg(addresses []*wire.NetAddressV2) { + if sp.WantsAddrV2() { + // If the peer supports addrv2, we'll be pushing an addrv2 + // message instead. The logic is otherwise identical to the + // addr case below. + addrs := make([]*wire.NetAddressV2, 0, len(addresses)) + for _, addr := range addresses { + // Filter addresses already known to the peer. + if sp.addressKnown(addr) { + continue + } + + addrs = append(addrs, addr) + } + + known, err := sp.PushAddrV2Msg(addrs) + if err != nil { + peerLog.Errorf("Can't push addrv2 message to %s: %v", + sp.Peer, err) + sp.Disconnect() + return + } + + // Add the final set of addresses sent to the set the peer + // knows of. + sp.addKnownAddresses(known) + return + } + addrs := make([]*wire.NetAddress, 0, len(addresses)) for _, addr := range addresses { // Filter addresses already known to the peer. @@ -1328,6 +1356,45 @@ func (sp *serverPeer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) { sp.server.addrManager.AddAddresses(addrs, sp.NA()) } +// OnAddrV2 is invoked when a peer receives an addrv2 bitcoin message and is +// used to notify the server about advertised addresses. +func (sp *serverPeer) OnAddrV2(_ *peer.Peer, msg *wire.MsgAddrV2) { + // Ignore if simnet for the same reasons as the regular addr message. + if cfg.SimNet { + return + } + + // An empty AddrV2 message is invalid. + if len(msg.AddrList) == 0 { + peerLog.Errorf("Command [%s] from %s does not contain any "+ + "addresses", msg.Command(), sp.Peer) + sp.Disconnect() + return + } + + for _, na := range msg.AddrList { + // Don't add more to the set of known addresses if we're + // disconnecting. + if !sp.Connected() { + return + } + + // Set the timestamp to 5 days ago if the timestamp received is + // more than 10 minutes in the future so this address is one of + // the first to be removed. + now := time.Now() + if na.Timestamp.After(now.Add(time.Minute * 10)) { + na.Timestamp = now.Add(-1 * time.Hour * 24 * 5) + } + + // Add to the set of known addresses. + sp.addKnownAddresses([]*wire.NetAddressV2{na}) + } + + // Add the addresses to the addrmanager. + sp.server.addrManager.AddAddresses(msg.AddrList, sp.NA()) +} + // OnRead is invoked when a peer receives a message and it is used to update // the bytes received by the server. func (sp *serverPeer) OnRead(_ *peer.Peer, bytesRead int, msg wire.Message, err error) { @@ -2074,6 +2141,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config { OnFilterLoad: sp.OnFilterLoad, OnGetAddr: sp.OnGetAddr, OnAddr: sp.OnAddr, + OnAddrV2: sp.OnAddrV2, OnRead: sp.OnRead, OnWrite: sp.OnWrite, OnNotFound: sp.OnNotFound, diff --git a/wire/message.go b/wire/message.go index 6d3147a81d..1f412fa6fa 100644 --- a/wire/message.go +++ b/wire/message.go @@ -32,6 +32,7 @@ const ( CmdVerAck = "verack" CmdGetAddr = "getaddr" CmdAddr = "addr" + CmdAddrV2 = "addrv2" CmdGetBlocks = "getblocks" CmdInv = "inv" CmdGetData = "getdata" @@ -78,6 +79,13 @@ const ( // protocol. var LatestEncoding = WitnessEncoding +// ErrUnknownMessage is the error returned when decoding an unknown message. +var ErrUnknownMessage = fmt.Errorf("received unknown message") + +// ErrInvalidHandshake is the error returned when a peer sends us a known +// message that does not belong in the version-verack handshake. +var ErrInvalidHandshake = fmt.Errorf("invalid message during handshake") + // Message is an interface that describes a bitcoin message. A type that // implements Message has complete control over the representation of its data // and may therefore contain additional or fewer fields than those which @@ -109,6 +117,9 @@ func makeEmptyMessage(command string) (Message, error) { case CmdAddr: msg = &MsgAddr{} + case CmdAddrV2: + msg = &MsgAddrV2{} + case CmdGetBlocks: msg = &MsgGetBlocks{} @@ -185,7 +196,7 @@ func makeEmptyMessage(command string) (Message, error) { msg = &MsgCFCheckpt{} default: - return nil, fmt.Errorf("unhandled command [%s]", command) + return nil, ErrUnknownMessage } return msg, nil } @@ -378,9 +389,10 @@ func ReadMessageWithEncodingN(r io.Reader, pver uint32, btcnet BitcoinNet, // Create struct of appropriate message type based on the command. msg, err := makeEmptyMessage(command) if err != nil { + // makeEmptyMessage can only return ErrUnknownMessage and it is + // important that we bubble it up to the caller. discardInput(r, hdr.length) - return totalBytes, nil, nil, messageError("ReadMessage", - err.Error()) + return totalBytes, nil, nil, err } // Check for maximum length based on the message type as a malicious client diff --git a/wire/message_test.go b/wire/message_test.go index 3a422e66ba..7ba2e0639f 100644 --- a/wire/message_test.go +++ b/wire/message_test.go @@ -295,7 +295,7 @@ func TestReadMessageWireErrors(t *testing.T) { pver, btcnet, len(unsupportedCommandBytes), - &MessageError{}, + ErrUnknownMessage, 24, }, @@ -345,7 +345,7 @@ func TestReadMessageWireErrors(t *testing.T) { pver, btcnet, len(discardBytes), - &MessageError{}, + ErrUnknownMessage, 24, }, } diff --git a/wire/msgaddrv2.go b/wire/msgaddrv2.go new file mode 100644 index 0000000000..4db4a1334a --- /dev/null +++ b/wire/msgaddrv2.go @@ -0,0 +1,102 @@ +package wire + +import ( + "fmt" + "io" +) + +// MaxV2AddrPerMsg is the maximum number of version 2 addresses that will exist +// in a single addrv2 message (MsgAddrV2). +const MaxV2AddrPerMsg = 1000 + +// MsgAddrV2 implements the Message interface and represents a bitcoin addrv2 +// message that can support longer-length addresses like torv3, cjdns, and i2p. +// It is used to gossip addresses on the network. Each message is limited to +// MaxV2AddrPerMsg addresses. This is the same limit as MsgAddr. +type MsgAddrV2 struct { + AddrList []*NetAddressV2 +} + +// BtcDecode decodes r using the bitcoin protocol into a MsgAddrV2. +func (m *MsgAddrV2) BtcDecode(r io.Reader, pver uint32, + enc MessageEncoding) error { + + count, err := ReadVarInt(r, pver) + if err != nil { + return err + } + + // Limit to max addresses per message. + if count > MaxV2AddrPerMsg { + str := fmt.Sprintf("too many addresses for message [count %v,"+ + " max %v]", count, MaxV2AddrPerMsg) + return messageError("MsgAddrV2.BtcDecode", str) + } + + addrList := make([]NetAddressV2, count) + m.AddrList = make([]*NetAddressV2, 0, count) + for i := uint64(0); i < count; i++ { + na := &addrList[i] + err := readNetAddressV2(r, pver, na) + switch err { + case ErrSkippedNetworkID: + // This may be a network ID we don't know of, but is + // still valid. We can safely skip those. + continue + case ErrInvalidAddressSize: + // The encoding used by the peer does not follow + // BIP-155 and we should stop processing this message. + return err + } + + m.AddrList = append(m.AddrList, na) + } + + return nil +} + +// BtcEncode encodes the MsgAddrV2 into a writer w. +func (m *MsgAddrV2) BtcEncode(w io.Writer, pver uint32, + enc MessageEncoding) error { + + count := len(m.AddrList) + if count > MaxV2AddrPerMsg { + str := fmt.Sprintf("too many addresses for message [count %v,"+ + " max %v]", count, MaxV2AddrPerMsg) + return messageError("MsgAddrV2.BtcEncode", str) + } + + err := WriteVarInt(w, pver, uint64(count)) + if err != nil { + return err + } + + for _, na := range m.AddrList { + err = writeNetAddressV2(w, pver, na) + if err != nil { + return err + } + } + + return nil +} + +// Command returns the protocol command string for MsgAddrV2. +func (m *MsgAddrV2) Command() string { + return CmdAddrV2 +} + +// MaxPayloadLength returns the maximum length payload possible for MsgAddrV2. +func (m *MsgAddrV2) MaxPayloadLength(pver uint32) uint32 { + // The varint that can store the maximum number of addresses is 3 bytes + // long. The maximum payload is then 3 + 1000 * maxNetAddressV2Payload. + return 3 + (MaxV2AddrPerMsg * maxNetAddressV2Payload()) +} + +// NewMsgAddrV2 returns a new bitcoin addrv2 message that conforms to the +// Message interface. +func NewMsgAddrV2() *MsgAddrV2 { + return &MsgAddrV2{ + AddrList: make([]*NetAddressV2, 0, MaxV2AddrPerMsg), + } +} diff --git a/wire/msgaddrv2_test.go b/wire/msgaddrv2_test.go new file mode 100644 index 0000000000..213d699c96 --- /dev/null +++ b/wire/msgaddrv2_test.go @@ -0,0 +1,73 @@ +package wire + +import ( + "bytes" + "io" + "testing" +) + +// TestAddrV2Decode checks that decoding an addrv2 message off the wire behaves +// as expected. This means ignoring certain addresses, and failing in certain +// failure scenarios. +func TestAddrV2Decode(t *testing.T) { + tests := []struct { + buf []byte + expectedError bool + expectedAddrs int + }{ + // Exceeding max addresses. + { + []byte{0xfd, 0xff, 0xff}, + true, + 0, + }, + + // Invalid address size. + { + []byte{0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x05}, + true, + 0, + }, + + // One valid address and one skipped address + { + []byte{ + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x04, + 0x7f, 0x00, 0x00, 0x01, 0x22, 0x22, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x02, 0x10, 0xfd, 0x87, 0xd8, + 0x7e, 0xeb, 0x43, 0xff, 0xfe, 0xcc, 0x39, 0xa8, + 0x73, 0x69, 0x15, 0xff, 0xff, 0x22, 0x22, + }, + false, + 1, + }, + } + + t.Logf("Running %d tests", len(tests)) + for i, test := range tests { + r := bytes.NewReader(test.buf) + m := &MsgAddrV2{} + + err := m.BtcDecode(r, 0, LatestEncoding) + if test.expectedError { + if err == nil { + t.Errorf("Test #%d expected error", i) + } + + continue + } else if err != nil { + t.Errorf("Test #%d unexpected error %v", i, err) + } + + // Trying to read more should give EOF. + var b [1]byte + if _, err := r.Read(b[:]); err != io.EOF { + t.Errorf("Test #%d did not cleanly finish reading", i) + } + + if len(m.AddrList) != test.expectedAddrs { + t.Errorf("Test #%d expected %d addrs, instead of %d", + i, test.expectedAddrs, len(m.AddrList)) + } + } +} diff --git a/wire/protocol.go b/wire/protocol.go index 8cc9838a55..3b414ec3f1 100644 --- a/wire/protocol.go +++ b/wire/protocol.go @@ -13,7 +13,7 @@ import ( // XXX pedro: we will probably need to bump this. const ( // ProtocolVersion is the latest protocol version this package supports. - ProtocolVersion uint32 = 70013 + ProtocolVersion uint32 = 70016 // MultipleAddressVersion is the protocol version which added multiple // addresses per message (pver >= MultipleAddressVersion). @@ -51,6 +51,13 @@ const ( // FeeFilterVersion is the protocol version which added a new // feefilter message. FeeFilterVersion uint32 = 70013 + + // AddrV2Version is the protocol version which added two new messages. + // sendaddrv2 is sent during the version-verack handshake and signals + // support for sending and receiving the addrv2 message. In the future, + // new messages that occur during the version-verack handshake will not + // come with a protocol version bump. + AddrV2Version uint32 = 70016 ) // ServiceFlag identifies services supported by a bitcoin peer.