diff --git a/channeldb/error.go b/channeldb/error.go index 5c237dbda4..4683774468 100644 --- a/channeldb/error.go +++ b/channeldb/error.go @@ -73,4 +73,8 @@ var ( // ErrNodeAliasNotFound is returned when alias for node can't be found. ErrNodeAliasNotFound = fmt.Errorf("alias for node not found") + + // ErrPeerMessagesNotFound is returned when no message have been + // found in the peer bucket or if bucket haven't been created yet. + ErrPeerMessagesNotFound = fmt.Errorf("peer messages not found") ) diff --git a/channeldb/messagestore.go b/channeldb/messagestore.go new file mode 100644 index 0000000000..6d4f1ee5ab --- /dev/null +++ b/channeldb/messagestore.go @@ -0,0 +1,174 @@ +package channeldb + +import ( + "bytes" + "encoding/binary" + + "github.com/boltdb/bolt" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/wire" +) + +var ( + // basePeerBucketKey is the base key for generating peer bucket keys. + // Peer bucket stores top-level bucket that maps: index -> code || msg + // concatenating the message code to the stored data allows the db logic + // to properly parse the wire message without trial and error or needing + // an additional index. + basePeerBucketKey = []byte("peermessagesstore") +) + +// MessageStore represents the storage for lnwire messages, it might be boltd storage, +// local storage, test storage or hybrid one. +// +// NOTE: The original purpose of creating this interface was in separation +// between the retransmission logic and specifics of storage itself. In case of +// such interface we may create the test storage and register the add, +// remove, get actions without the need of population of lnwire messages with +// data. +type MessageStore interface { + // Get returns the sorted set of messages in the order they have been + // added originally and also the array of associated index to this + // messages within the message store. + Get() ([]uint64, []lnwire.Message, error) + + // Add adds new message in the storage with preserving the order and + // returns the index of message within the message store. + Add(msg lnwire.Message) (uint64, error) + + // Remove deletes message with this indexes. + Remove(indexes []uint64) error +} + +// messagesStore represents the boltdb storage for messages inside +// retransmission sub-system. +type messagesStore struct { + // id is a unique slice of bytes identifying a peer. This value is + // typically a peer's identity public key serialized in compressed + // format. + id []byte + db *DB +} + +// NewMessageStore creates new instance of message storage. +func NewMessageStore(id []byte, db *DB) MessageStore { + return &messagesStore{ + id: id, + db: db, + } +} + +// Add adds message to the storage and returns the index which +// corresponds the the message by which it might be removed later. +func (s *messagesStore) Add(msg lnwire.Message) (uint64, error) { + var index uint64 + + err := s.db.Batch(func(tx *bolt.Tx) error { + var err error + + // Get or create the top peer bucket. + peerBucketKey := s.getPeerBucketKey() + peerBucket, err := tx.CreateBucketIfNotExists(peerBucketKey) + if err != nil { + return err + } + + // Generate next index number to add it to the message code + // bucket. + index, err = peerBucket.NextSequence() + if err != nil { + return err + } + indexBytes := make([]byte, 8) + binary.BigEndian.PutUint64(indexBytes, index) + + // Encode the message and place it in the top bucket. + var b bytes.Buffer + _, err = lnwire.WriteMessage(&b, msg, 0, wire.MainNet) + if err != nil { + return err + } + + return peerBucket.Put(indexBytes, b.Bytes()) + }) + + return index, err +} + +// Remove removes the message from storage by index that were assigned to +// message during its addition to the storage. +func (s *messagesStore) Remove(indexes []uint64) error { + return s.db.Batch(func(tx *bolt.Tx) error { + // Get or create the top peer bucket. + peerBucketKey := s.getPeerBucketKey() + peerBucket := tx.Bucket(peerBucketKey) + if peerBucket == nil { + return ErrPeerMessagesNotFound + } + + // Retrieve the messages indexes with this type/code and + // remove them from top peer bucket. + for _, index := range indexes { + var key [8]byte + binary.BigEndian.PutUint64(key[:], index) + + if err := peerBucket.Delete(key[:]); err != nil { + return err + } + } + + return nil + }) +} + +// Get retrieves messages from storage in the order in which they were +// originally added, proper order is needed for retransmission subsystem, as +// far this messages should be resent to remote peer in the same order as they +// were sent originally. +func (s *messagesStore) Get() ([]uint64, []lnwire.Message, error) { + var messages []lnwire.Message + var indexes []uint64 + + if err := s.db.View(func(tx *bolt.Tx) error { + peerBucketKey := s.getPeerBucketKey() + peerBucket := tx.Bucket(peerBucketKey) + if peerBucket == nil { + return nil + } + + // Iterate over messages buckets. + return peerBucket.ForEach(func(k, v []byte) error { + // Skip buckets fields. + if v == nil { + return nil + } + + // Decode the message from and add it to the array. + r := bytes.NewReader(v) + _, msg, _, err := lnwire.ReadMessage(r, 0, wire.MainNet) + if err != nil { + return err + } + + messages = append(messages, msg) + indexes = append(indexes, binary.BigEndian.Uint64(k)) + return nil + }) + }); err != nil { + return nil, nil, err + } + + // If bucket was haven't been created yet or just not contains any + // messages. + if len(messages) == 0 { + return nil, nil, ErrPeerMessagesNotFound + } + + return indexes, messages, nil +} + +// getPeerBucketKey generates the peer bucket boltd key by peer id and base +// peer bucket key. +func (s *messagesStore) getPeerBucketKey() []byte { + return append(basePeerBucketKey[:], s.id[:]...) +} diff --git a/channeldb/messagestore_test.go b/channeldb/messagestore_test.go new file mode 100644 index 0000000000..3dcd8da92d --- /dev/null +++ b/channeldb/messagestore_test.go @@ -0,0 +1,108 @@ +package channeldb + +import ( + "bytes" + "crypto/sha256" + "testing" + + "reflect" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/wire" +) + +// TestRetransmitterMessage tests the ability of message storage to +// add/remove/get messages and also checks that the order in which the +// messages had been added corresponds to the order of messages from Get +// function. +func TestRetransmitterMessageOrder(t *testing.T) { + db, clean, err := makeTestDB() + if err != nil { + t.Fatal(err) + } + defer clean() + s := NewMessageStore([]byte("id"), db) + + var ( + hash1, _ = chainhash.NewHash(bytes.Repeat([]byte("a"), 32)) + hash2, _ = chainhash.NewHash(bytes.Repeat([]byte("b"), 32)) + + chanPoint1 = wire.NewOutPoint(hash1, 0) + chanPoint2 = wire.NewOutPoint(hash2, 1) + + preimage1 = [sha256.Size]byte{0} + preimage2 = [sha256.Size]byte{1} + ) + + // Check that we are receiving the error without messages inside + // the storage. + _, messages, err := s.Get() + if err != nil && err != ErrPeerMessagesNotFound { + t.Fatalf("can't get the message: %v", err) + } else if len(messages) != 0 { + t.Fatal("wrong length of messages") + } + + msg1 := &lnwire.UpdateFufillHTLC{ + ChannelPoint: *chanPoint1, + ID: 0, + PaymentPreimage: preimage1, + } + + index1, err := s.Add(msg1) + if err != nil { + t.Fatalf("can't add the message to the message store: %v", err) + } + + msg2 := &lnwire.UpdateFufillHTLC{ + ChannelPoint: *chanPoint2, + ID: 1, + PaymentPreimage: preimage2, + } + + index2, err := s.Add(msg2) + if err != nil { + t.Fatalf("can't add the message to the message store: %v", err) + } + + _, messages, err = s.Get() + if err != nil { + t.Fatalf("can't get the message: %v", err) + } else if len(messages) != 2 { + t.Fatal("wrong length of messages") + } + + m, ok := messages[0].(*lnwire.UpdateFufillHTLC) + if !ok { + t.Fatal("wrong type of message") + } + + // Check the order + if !reflect.DeepEqual(m, msg1) { + t.Fatal("wrong order of message") + } + + m, ok = messages[1].(*lnwire.UpdateFufillHTLC) + if !ok { + t.Fatal("wrong type of message") + } + + // Check the order + if !reflect.DeepEqual(m, msg2) { + t.Fatal("wrong order of message") + } + + // Remove the messages by index and check that get function return + // non of the messages. + if err := s.Remove([]uint64{index1, index2}); err != nil { + t.Fatalf("can't remove the message: %v", err) + } + + _, messages, err = s.Get() + if err != nil && err != ErrPeerMessagesNotFound { + t.Fatalf("can't get the message: %v", err) + } else if len(messages) != 0 { + t.Fatal("wrong length of messages") + } +} diff --git a/fundingmanager.go b/fundingmanager.go index 8dc30e378c..1752252aeb 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -492,7 +492,8 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) { delay := msg.CsvDelay // TODO(roasbeef): error if funding flow already ongoing - fndgLog.Infof("Recv'd fundingRequest(amt=%v, delay=%v, pendingId=%v) "+ + fndgLog.Infof("Recv'd fundingRequest(amt=%v, "+ + "pushSatoshis=%v, delay=%v, pendingId=%v) "+ "from peer(%x)", amt, msg.PushSatoshis, delay, msg.ChannelID, fmsg.peerAddress.IdentityKey.SerializeCompressed()) diff --git a/gotest.sh b/gotest.sh index 38fd4772a4..c408fbbd4f 100755 --- a/gotest.sh +++ b/gotest.sh @@ -20,10 +20,10 @@ print () { # check_test_ports checks that test lnd ports are not used. check_test_ports() { # Make sure that test lnd ports are not used. - o=$(lsof -i :19555,19556 | sed '1d' | awk '{ printf "%s\n", $2 }') + o=$(lsof -i :19555,19556,19557 | sed '1d' | awk '{ printf "%s\n", $2 }') if [ "$o" != "" ]; then printf "Can't run the lnd tests:\n" - printf "some program is using the test lnd ports (19555 | 19556)\n" + printf "some program is using the test lnd ports (19555 | 19556 | 19557)\n" exit 1 fi } @@ -85,15 +85,18 @@ lint_check() { print "* Run static checks" # Make sure gometalinter is installed and $GOPATH/bin is in your path. - if [ ! -x "$(type -p gometalinter)" ]; then + if [ ! -x "$(type -p gometalinter.v1)" ]; then print "** Install gometalinter" - go get -v github.com/alecthomas/gometalinter - gometalinter --install + go get -u gopkg.in/alecthomas/gometalinter.v1 + gometalinter.v1 --install fi + # Update metalinter if needed. + gometalinter.v1 --install 1>/dev/null + # Automatic checks linter_targets=$(glide novendor | grep -v lnrpc) - test -z "$(gometalinter --disable-all \ + test -z "$(gometalinter.v1 --disable-all \ --enable=gofmt \ --enable=vet \ --enable=golint \ diff --git a/lnd_test.go b/lnd_test.go index 5dc017980f..86936318a8 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -1467,6 +1467,14 @@ func testRevokedCloseRetribution(net *networkHarness, t *harnessTest) { return bobChannelInfo.Channels[0], nil } + // Wait for Alice to receive the channel edge from the funding manager. + ctxt, _ = context.WithTimeout(ctxb, timeout) + err := net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint) + if err != nil { + t.Fatalf("alice didn't see the alice->bob channel before "+ + "timeout: %v", err) + } + // Open up a payment stream to Alice that we'll use to send payment to // Bob. We also create a small helper function to send payments to Bob, // consuming the payment hashes we generated above. diff --git a/lnwire/channel_announcement.go b/lnwire/channel_announcement.go index b7ab5e48cb..d744f0c246 100644 --- a/lnwire/channel_announcement.go +++ b/lnwire/channel_announcement.go @@ -91,8 +91,8 @@ func (a *ChannelAnnouncement) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (a *ChannelAnnouncement) Command() uint32 { - return CmdChannelAnnoucmentMessage +func (a *ChannelAnnouncement) Command() MessageCode { + return CmdChannelAnnouncement } // MaxPayloadLength returns the maximum allowed payload size for this message diff --git a/lnwire/channel_announcement_test.go b/lnwire/channel_announcement_test.go index d52ec51f6a..36825b0b6e 100644 --- a/lnwire/channel_announcement_test.go +++ b/lnwire/channel_announcement_test.go @@ -58,9 +58,7 @@ func TestChannelAnnoucementValidation(t *testing.T) { firstBitcoinPrivKey, firstBitcoinPubKey := getKeys("bitcoin-key-1") secondBitcoinPrivKey, secondBitcoinPubKey := getKeys("bitcoin-key-2") - var hash []byte - - hash = chainhash.DoubleHashB(firstNodePubKey.SerializeCompressed()) + hash := chainhash.DoubleHashB(firstNodePubKey.SerializeCompressed()) firstBitcoinSig, _ := firstBitcoinPrivKey.Sign(hash) hash = chainhash.DoubleHashB(secondNodePubKey.SerializeCompressed()) diff --git a/lnwire/channel_update_announcement.go b/lnwire/channel_update_announcement.go index fca97b36c5..6e3d7f828e 100644 --- a/lnwire/channel_update_announcement.go +++ b/lnwire/channel_update_announcement.go @@ -105,8 +105,8 @@ func (a *ChannelUpdateAnnouncement) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (a *ChannelUpdateAnnouncement) Command() uint32 { - return CmdChannelUpdateAnnoucmentMessage +func (a *ChannelUpdateAnnouncement) Command() MessageCode { + return CmdChannelUpdateAnnouncement } // MaxPayloadLength returns the maximum allowed payload size for this message diff --git a/lnwire/close_complete.go b/lnwire/close_complete.go index 94622be7ed..0e03026c58 100644 --- a/lnwire/close_complete.go +++ b/lnwire/close_complete.go @@ -64,7 +64,7 @@ func (c *CloseComplete) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (c *CloseComplete) Command() uint32 { +func (c *CloseComplete) Command() MessageCode { return CmdCloseComplete } diff --git a/lnwire/close_request.go b/lnwire/close_request.go index b5ce49dea6..7cf81676e9 100644 --- a/lnwire/close_request.go +++ b/lnwire/close_request.go @@ -81,7 +81,7 @@ func (c *CloseRequest) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (c *CloseRequest) Command() uint32 { +func (c *CloseRequest) Command() MessageCode { return CmdCloseRequest } diff --git a/lnwire/commit_sig.go b/lnwire/commit_sig.go index f51d35e3a8..7e00ab79c9 100644 --- a/lnwire/commit_sig.go +++ b/lnwire/commit_sig.go @@ -67,7 +67,7 @@ func (c *CommitSig) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (c *CommitSig) Command() uint32 { +func (c *CommitSig) Command() MessageCode { return CmdCommitSig } diff --git a/lnwire/error_generic.go b/lnwire/error_generic.go index 4476f9f458..fd4786d0d0 100644 --- a/lnwire/error_generic.go +++ b/lnwire/error_generic.go @@ -98,7 +98,7 @@ func (c *ErrorGeneric) Encode(w io.Writer, pver uint32) error { // the wire. // // This is part of the lnwire.Message interface. -func (c *ErrorGeneric) Command() uint32 { +func (c *ErrorGeneric) Command() MessageCode { return CmdErrorGeneric } diff --git a/lnwire/funding_locked.go b/lnwire/funding_locked.go index da2c5fd3e0..4c5badc07e 100644 --- a/lnwire/funding_locked.go +++ b/lnwire/funding_locked.go @@ -71,7 +71,7 @@ func (c *FundingLocked) Encode(w io.Writer, pver uint32) error { // FundingLocked message on the wire. // // This is part of the lnwire.Message interface. -func (c *FundingLocked) Command() uint32 { +func (c *FundingLocked) Command() MessageCode { return CmdFundingLocked } diff --git a/lnwire/init_message.go b/lnwire/init_message.go index b9deddd9ae..eb7f6746b2 100644 --- a/lnwire/init_message.go +++ b/lnwire/init_message.go @@ -59,7 +59,7 @@ func (msg *Init) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (msg *Init) Command() uint32 { +func (msg *Init) Command() MessageCode { return CmdInit } diff --git a/lnwire/lnwire_test.go b/lnwire/lnwire_test.go index c729cf4674..4a76a6b12f 100644 --- a/lnwire/lnwire_test.go +++ b/lnwire/lnwire_test.go @@ -4,10 +4,13 @@ import ( "encoding/hex" "net" + "bytes" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" + "reflect" + "testing" ) // Common variables and functions for the message tests @@ -74,3 +77,31 @@ var ( blue: 255, } ) + +// TestMessageHeaderEncodeDecode test that header is encoded and decoded +// properly. +func TestMessageHeaderEncodeDecode(t *testing.T) { + hdr := &messageHeader{ + magic: wire.BitcoinNet(0), + command: uint32(CmdInit), + length: 100, + } + + // Next encode the HDR message into an empty bytes buffer. + var b bytes.Buffer + if _, err := writeMessageHeader(&b, hdr); err != nil { + t.Fatalf("unable to encode ErrorGeneric: %v", err) + } + + // Deserialize the encoded HDR message into a new empty struct. + _, hdr2, err := readMessageHeader(&b) + if err != nil { + t.Fatalf("unable to decode ErrorGeneric: %v", err) + } + + // Assert equality of the two instances. + if !reflect.DeepEqual(hdr, hdr2) { + t.Fatalf("encode/decode error messages don't match %#v vs %#v", + hdr, hdr2) + } +} diff --git a/lnwire/message.go b/lnwire/message.go index 446dd72280..b3bf73e3b1 100644 --- a/lnwire/message.go +++ b/lnwire/message.go @@ -21,50 +21,99 @@ const MessageHeaderSize = 12 // individual limits imposed by messages themselves. const MaxMessagePayload = 1024 * 1024 * 32 // 32MB +// MessageCode represent the unique identifier of the lnwire command. +type MessageCode uint32 + +// String converts message code to the string representation. +func (c MessageCode) String() string { + switch c { + case CmdInit: + return "Init" + case CmdSingleFundingRequest: + return "SingleFundingRequest" + case CmdSingleFundingResponse: + return "SingleFundingResponse" + case CmdSingleFundingComplete: + return "SingleFundingComplete" + case CmdSingleFundingSignComplete: + return "SingleFundingSignComplete" + case CmdFundingLocked: + return "FundingLocked" + case CmdCloseRequest: + return "CloseRequest" + case CmdCloseComplete: + return "CloseComplete" + case CmdUpdateAddHTLC: + return "UpdateAddHTLC" + case CmdUpdateFailHTLC: + return "UpdateFailHTLC" + case CmdUpdateFufillHTLC: + return "UpdateFufillHTLC" + case CmdCommitSig: + return "CommitSig" + case CmdRevokeAndAck: + return "RevokeAndAck" + case CmdErrorGeneric: + return "ErrorGeneric" + case CmdChannelAnnouncement: + return "ChannelAnnouncement" + case CmdChannelUpdateAnnouncement: + return "ChannelUpdateAnnouncement" + case CmdNodeAnnouncement: + return "NodeAnnouncement" + case CmdPing: + return "Ping" + case CmdPong: + return "Pong" + default: + return "" + } +} + // Commands used in lightning message headers which detail the type of message. // TODO(roasbeef): update with latest type numbering from spec const ( - CmdInit = uint32(1) + CmdInit MessageCode = 1 // Commands for opening a channel funded by one party (single funder). - CmdSingleFundingRequest = uint32(100) - CmdSingleFundingResponse = uint32(110) - CmdSingleFundingComplete = uint32(120) - CmdSingleFundingSignComplete = uint32(130) + CmdSingleFundingRequest = 100 + CmdSingleFundingResponse = 110 + CmdSingleFundingComplete = 120 + CmdSingleFundingSignComplete = 130 // Command for locking a funded channel - CmdFundingLocked = uint32(200) + CmdFundingLocked = 200 // Commands for the workflow of cooperatively closing an active channel. - CmdCloseRequest = uint32(300) - CmdCloseComplete = uint32(310) + CmdCloseRequest = 300 + CmdCloseComplete = 310 // Commands for negotiating HTLCs. - CmdUpdateAddHTLC = uint32(1000) - CmdUpdateFufillHTLC = uint32(1010) - CmdUpdateFailHTLC = uint32(1020) + CmdUpdateAddHTLC = 1000 + CmdUpdateFufillHTLC = 1010 + CmdUpdateFailHTLC = 1020 // Commands for modifying commitment transactions. - CmdCommitSig = uint32(2000) - CmdRevokeAndAck = uint32(2010) + CmdCommitSig = 2000 + CmdRevokeAndAck = 2010 // Commands for reporting protocol errors. - CmdErrorGeneric = uint32(4000) + CmdErrorGeneric = 4000 // Commands for discovery service. - CmdChannelAnnoucmentMessage = uint32(5000) - CmdChannelUpdateAnnoucmentMessage = uint32(5010) - CmdNodeAnnoucmentMessage = uint32(5020) + CmdChannelAnnouncement = 5000 + CmdChannelUpdateAnnouncement = 5010 + CmdNodeAnnouncement = 5020 // Commands for connection keep-alive. - CmdPing = uint32(6000) - CmdPong = uint32(6010) + CmdPing = 6000 + CmdPong = 6010 ) // UnknownMessage is an implementation of the error interface that allows the // creation of an error in response to an unknown message. type UnknownMessage struct { - messageType uint32 + messageType MessageCode } // Error returns a human readable string describing the error. @@ -81,14 +130,14 @@ func (u *UnknownMessage) Error() string { type Message interface { Decode(io.Reader, uint32) error Encode(io.Writer, uint32) error - Command() uint32 + Command() MessageCode MaxPayloadLength(uint32) uint32 Validate() error } // makeEmptyMessage creates a new empty message of the proper concrete type // based on the command ID. -func makeEmptyMessage(command uint32) (Message, error) { +func makeEmptyMessage(command MessageCode) (Message, error) { var msg Message switch command { @@ -120,11 +169,11 @@ func makeEmptyMessage(command uint32) (Message, error) { msg = &RevokeAndAck{} case CmdErrorGeneric: msg = &ErrorGeneric{} - case CmdChannelAnnoucmentMessage: + case CmdChannelAnnouncement: msg = &ChannelAnnouncement{} - case CmdChannelUpdateAnnoucmentMessage: + case CmdChannelUpdateAnnouncement: msg = &ChannelUpdateAnnouncement{} - case CmdNodeAnnoucmentMessage: + case CmdNodeAnnouncement: msg = &NodeAnnouncement{} case CmdPing: msg = &Ping{} @@ -173,6 +222,20 @@ func readMessageHeader(r io.Reader) (int, *messageHeader, error) { return n, &hdr, nil } +// writeMessageHeader writes a lightning protocol message header to w. +func writeMessageHeader(w io.Writer, hdr *messageHeader) (int, error) { + // Encode the header for the message. This is done to a buffer + // rather than directly to the writer since writeElements doesn't + // return the number of bytes written. + hw := bytes.NewBuffer(make([]byte, 0, MessageHeaderSize)) + if err := writeElements(hw, hdr.magic, hdr.command, hdr.length); err != nil { + return 0, nil + } + + // Write the header first. + return w.Write(hw.Bytes()) +} + // discardInput reads n bytes from reader r in chunks and discards the read // bytes. This is used to skip payloads when various errors occur and helps // prevent rogue nodes from causing massive memory allocation through forging @@ -225,18 +288,13 @@ func WriteMessage(w io.Writer, msg Message, pver uint32, btcnet wire.BitcoinNet) } // Create header for the message. - hdr := messageHeader{magic: btcnet, command: cmd, length: uint32(lenp)} - - // Encode the header for the message. This is done to a buffer - // rather than directly to the writer since writeElements doesn't - // return the number of bytes written. - hw := bytes.NewBuffer(make([]byte, 0, MessageHeaderSize)) - if err := writeElements(hw, hdr.magic, hdr.command, hdr.length); err != nil { - return 0, nil + hdr := &messageHeader{ + magic: btcnet, + command: uint32(cmd), + length: uint32(lenp), } - // Write the header first. - n, err := w.Write(hw.Bytes()) + n, err := writeMessageHeader(w, hdr) totalBytes += n if err != nil { return totalBytes, err @@ -277,7 +335,7 @@ func ReadMessage(r io.Reader, pver uint32, btcnet wire.BitcoinNet) (int, Message } // Create struct of appropriate message type based on the command. - command := hdr.command + command := MessageCode(hdr.command) msg, err := makeEmptyMessage(command) if err != nil { discardInput(r, hdr.length) diff --git a/lnwire/node_announcement.go b/lnwire/node_announcement.go index 5f7e9f98da..54d83cd565 100644 --- a/lnwire/node_announcement.go +++ b/lnwire/node_announcement.go @@ -159,8 +159,8 @@ func (a *NodeAnnouncement) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (a *NodeAnnouncement) Command() uint32 { - return CmdNodeAnnoucmentMessage +func (a *NodeAnnouncement) Command() MessageCode { + return CmdNodeAnnouncement } // MaxPayloadLength returns the maximum allowed payload size for this message diff --git a/lnwire/ping.go b/lnwire/ping.go index fd75d32e15..01fa05bb02 100644 --- a/lnwire/ping.go +++ b/lnwire/ping.go @@ -46,7 +46,7 @@ func (p *Pong) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (p *Pong) Command() uint32 { +func (p *Pong) Command() MessageCode { return CmdPong } diff --git a/lnwire/pong.go b/lnwire/pong.go index 2cf2be9b5d..467af1e0b3 100644 --- a/lnwire/pong.go +++ b/lnwire/pong.go @@ -45,7 +45,7 @@ func (p *Ping) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (p *Ping) Command() uint32 { +func (p *Ping) Command() MessageCode { return CmdPing } diff --git a/lnwire/revoke_and_ack.go b/lnwire/revoke_and_ack.go index 205d4c4623..31e69a0748 100644 --- a/lnwire/revoke_and_ack.go +++ b/lnwire/revoke_and_ack.go @@ -87,7 +87,7 @@ func (c *RevokeAndAck) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (c *RevokeAndAck) Command() uint32 { +func (c *RevokeAndAck) Command() MessageCode { return CmdRevokeAndAck } diff --git a/lnwire/signature.go b/lnwire/signature.go index 936027a043..45f99631b3 100644 --- a/lnwire/signature.go +++ b/lnwire/signature.go @@ -65,7 +65,7 @@ func deserializeSigFromWire(e **btcec.Signature, b [64]byte) error { // Create a canonical serialized signature. DER format is: // 0x30 0x02 r 0x02 s - sigBytes := make([]byte, 6+rLen+sLen, 6+rLen+sLen) + sigBytes := make([]byte, 6+rLen+sLen) sigBytes[0] = 0x30 // DER signature magic value sigBytes[1] = 4 + rLen + sLen // Length of rest of signature sigBytes[2] = 0x02 // Big integer magic value diff --git a/lnwire/single_funding_complete.go b/lnwire/single_funding_complete.go index a86e8f26c9..de54cd318f 100644 --- a/lnwire/single_funding_complete.go +++ b/lnwire/single_funding_complete.go @@ -99,7 +99,7 @@ func (s *SingleFundingComplete) Encode(w io.Writer, pver uint32) error { // SingleFundingComplete on the wire. // // This is part of the lnwire.Message interface. -func (s *SingleFundingComplete) Command() uint32 { +func (s *SingleFundingComplete) Command() MessageCode { return CmdSingleFundingComplete } diff --git a/lnwire/single_funding_request.go b/lnwire/single_funding_request.go index fa0f8ec03d..840f90d347 100644 --- a/lnwire/single_funding_request.go +++ b/lnwire/single_funding_request.go @@ -152,7 +152,7 @@ func (c *SingleFundingRequest) Encode(w io.Writer, pver uint32) error { // SingleFundingRequest on the wire. // // This is part of the lnwire.Message interface. -func (c *SingleFundingRequest) Command() uint32 { +func (c *SingleFundingRequest) Command() MessageCode { return CmdSingleFundingRequest } diff --git a/lnwire/single_funding_response.go b/lnwire/single_funding_response.go index 6f56dc62be..8eb7bcf5c1 100644 --- a/lnwire/single_funding_response.go +++ b/lnwire/single_funding_response.go @@ -10,7 +10,7 @@ import ( // SingleFundingResponse is the message Bob sends to Alice after she initiates // the single funder channel workflow via a SingleFundingRequest message. Once -// Alice receives Bob's reponse, then she has all the items neccessary to +// Alice receives Bob's response, then she has all the items necessary to // construct the funding transaction, and both commitment transactions. type SingleFundingResponse struct { // ChannelID serves to uniquely identify the future channel created by @@ -26,7 +26,7 @@ type SingleFundingResponse struct { ChannelDerivationPoint *btcec.PublicKey // CommitmentKey is key the responder to the funding workflow wishes to - // use within their versino of the commitment transaction for any + // use within their version of the commitment transaction for any // delayed (CSV) or immediate outputs to them. CommitmentKey *btcec.PublicKey @@ -126,7 +126,7 @@ func (c *SingleFundingResponse) Encode(w io.Writer, pver uint32) error { // SingleFundingResponse on the wire. // // This is part of the lnwire.Message interface. -func (c *SingleFundingResponse) Command() uint32 { +func (c *SingleFundingResponse) Command() MessageCode { return CmdSingleFundingResponse } diff --git a/lnwire/single_funding_signcomplete.go b/lnwire/single_funding_signcomplete.go index 5ec96a0b4e..4dfdcd5ffb 100644 --- a/lnwire/single_funding_signcomplete.go +++ b/lnwire/single_funding_signcomplete.go @@ -60,7 +60,7 @@ func (c *SingleFundingSignComplete) Encode(w io.Writer, pver uint32) error { // SingleFundingSignComplete on the wire. // // This is part of the lnwire.Message interface. -func (c *SingleFundingSignComplete) Command() uint32 { +func (c *SingleFundingSignComplete) Command() MessageCode { return CmdSingleFundingSignComplete } diff --git a/lnwire/update_add_htlc.go b/lnwire/update_add_htlc.go index 8079a1dbb6..31ff15a9b4 100644 --- a/lnwire/update_add_htlc.go +++ b/lnwire/update_add_htlc.go @@ -105,7 +105,7 @@ func (c *UpdateAddHTLC) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (c *UpdateAddHTLC) Command() uint32 { +func (c *UpdateAddHTLC) Command() MessageCode { return CmdUpdateAddHTLC } diff --git a/lnwire/update_fail_htlc.go b/lnwire/update_fail_htlc.go index 82c76f1c93..08d13078b7 100644 --- a/lnwire/update_fail_htlc.go +++ b/lnwire/update_fail_htlc.go @@ -131,7 +131,7 @@ func (c *UpdateFailHTLC) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (c *UpdateFailHTLC) Command() uint32 { +func (c *UpdateFailHTLC) Command() MessageCode { return CmdUpdateFailHTLC } diff --git a/lnwire/update_fulfill_htlc.go b/lnwire/update_fulfill_htlc.go index 07699590c8..12df7b0aea 100644 --- a/lnwire/update_fulfill_htlc.go +++ b/lnwire/update_fulfill_htlc.go @@ -71,7 +71,7 @@ func (c *UpdateFufillHTLC) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (c *UpdateFufillHTLC) Command() uint32 { +func (c *UpdateFufillHTLC) Command() MessageCode { return CmdUpdateFufillHTLC } diff --git a/networktest.go b/networktest.go index 04e21b1bf1..d707c6f201 100644 --- a/networktest.go +++ b/networktest.go @@ -441,7 +441,7 @@ func (l *lightningNode) lightningNetworkWatcher() { // If this is a open request, then it can be // dispatched if the number of edges seen for // the channel is at least two. - if numEdges, _ := openChans[targetChan]; numEdges >= 2 { + if numEdges := openChans[targetChan]; numEdges >= 2 { close(watchRequest.eventChan) continue } diff --git a/peer.go b/peer.go index f8cde6402d..88c5e2bb86 100644 --- a/peer.go +++ b/peer.go @@ -46,6 +46,7 @@ const ( // buffered channel acts as a semaphore to be used for synchronization purposes. type outgoinMsg struct { msg lnwire.Message + persist bool sentChan chan struct{} // MUST be buffered. } @@ -161,6 +162,14 @@ type peer struct { // on both sides. globalSharedFeatures *lnwire.SharedFeatures + // retransmitter is an retransmission subsystem aka message store, which + // stores outgoing messages that were not acked. Messages queue'd + // on-disk and in the situation when the server is unable to send the + // message to the peer due to it being offline this service will take of + // retransmitting the messages that were not acked to the remote upon + // reconnection. + retransmitter *retransmitter + queueQuit chan struct{} quit chan struct{} wg sync.WaitGroup @@ -169,7 +178,7 @@ type peer struct { // newPeer creates a new peer from an establish connection object, and a // pointer to the main server. func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, - addr *lnwire.NetAddress, inbound bool) (*peer, error) { + addr *lnwire.NetAddress, inbound bool, db *channeldb.DB) (*peer, error) { nodePub := addr.IdentityKey @@ -205,6 +214,15 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, quit: make(chan struct{}), } + storeID := p.addr.IdentityKey.SerializeCompressed() + rt, err := newRetransmitter(channeldb.NewMessageStore(storeID, db)) + if err != nil { + peerLog.Errorf("unable to initialise retransmitter "+ + "for peerID(%v): %v", p.id, err) + return nil, err + } + p.retransmitter = rt + // Initiate the pending channel identifier properly depending on if this // node is inbound or outbound. This value will be used in an increasing // manner to track pending channels. @@ -285,7 +303,7 @@ func (p *peer) Start() error { return nil } - peerLog.Tracef("peer %v starting", p) + peerLog.Tracef("peer(%v) starting", p) p.wg.Add(2) go p.queueHandler() @@ -313,6 +331,21 @@ func (p *peer) Start() error { "must be init message") } + // If we had the interaction with this peer before than we should + // retrieve the messages that were not acked in previous session and + // sent them again in order to be sure that remote peer is handled them. + messages := p.retransmitter.MessagesToRetransmit() + if len(messages) != 0 { + peerLog.Infof("retransmission subsystem resends %v messages "+ + "to the peer(%v)", len(messages), p) + + for _, message := range messages { + // Sending over sendToPeer will cause block because of + // the usage of peer mutex. + p.queueMsg(message, false, nil) + } + } + p.wg.Add(3) go p.readHandler() go p.channelManager() @@ -350,7 +383,7 @@ func (p *peer) Disconnect() { return } - peerLog.Tracef("Disconnecting %s", p) + peerLog.Tracef("Disconnecting peer(%v)", p) // Ensure that the TCP connection is properly closed before continuing. p.conn.Close() @@ -376,7 +409,10 @@ func (p *peer) Disconnect() { // String returns the string representation of this peer. func (p *peer) String() string { - return p.conn.RemoteAddr().String() + return fmt.Sprintf("%x@%v", + p.addr.IdentityKey.SerializeCompressed(), + p.addr.Address) + } // readNextMessage reads, and returns the next message on the wire along with @@ -405,8 +441,8 @@ out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, _, err := p.readNextMessage() if err != nil { - peerLog.Infof("unable to read message from %v: %v", - p, err) + peerLog.Errorf("unable to read message from "+ + "peer(%v): %v", p, err) switch err.(type) { // If this is just a message we don't yet recognize, @@ -424,6 +460,12 @@ out: } } + if err := p.retransmitter.Ack(nextMsg); err != nil { + peerLog.Errorf("unable to ack messages for peer(%v):"+ + " %v", p, err) + break out + } + var ( isChanUpdate bool targetChan wire.OutPoint @@ -440,7 +482,7 @@ out: atomic.StoreInt64(&p.pingTime, delay) case *lnwire.Ping: - p.queueMsg(lnwire.NewPong(msg.Nonce), nil) + p.queueMsg(lnwire.NewPong(msg.Nonce), true, nil) case *lnwire.SingleFundingRequest: p.server.fundingMgr.processFundingRequest(msg, p.addr) @@ -492,7 +534,7 @@ out: p.htlcManMtx.Unlock() if !ok { peerLog.Errorf("recv'd update for unknown "+ - "channel %v from %v", targetChan, p) + "channel %v from peer(%v)", targetChan, p) continue } channel <- nextMsg @@ -502,7 +544,7 @@ out: p.Disconnect() p.wg.Done() - peerLog.Tracef("readHandler for peer %v done", p) + peerLog.Tracef("readHandler for peer(%v) done", p) } // logWireMessage logs the receipt or sending of particular wire message. This @@ -586,6 +628,16 @@ func (p *peer) writeHandler() { atomic.StoreInt64(&p.pingLastSend, now) } + if outMsg.persist { + err := p.retransmitter.Register(outMsg.msg) + if err != nil { + peerLog.Errorf("unable to register "+ + "message in retransmitter for "+ + "peer(%v): %v", p, err) + p.Disconnect() + return + } + } // Write out the message to the socket, closing the // 'sentChan' if it's non-nil, The 'sentChan' allows // callers to optionally synchronize sends with the @@ -679,7 +731,7 @@ out: // Convert the bytes read into a uint64, and queue the // message for sending. nonce := binary.BigEndian.Uint64(pingBuf[:]) - p.queueMsg(lnwire.NewPing(nonce), nil) + p.queueMsg(lnwire.NewPing(nonce), true, nil) case <-p.quit: break out } @@ -695,9 +747,14 @@ func (p *peer) PingTime() int64 { // queueMsg queues a new lnwire.Message to be eventually sent out on the // wire. -func (p *peer) queueMsg(msg lnwire.Message, doneChan chan struct{}) { +func (p *peer) queueMsg(msg lnwire.Message, persist bool, + doneChan chan struct{}) { select { - case p.outgoingQueue <- outgoinMsg{msg, doneChan}: + case p.outgoingQueue <- outgoinMsg{ + msg: msg, + sentChan: doneChan, + persist: persist, + }: case <-p.quit: return } @@ -802,7 +859,7 @@ func (p *peer) executeCooperativeClose(channel *lnwallet.LightningChannel) (*cha return nil, err } closeReq := lnwire.NewCloseRequest(*chanPoint, closeSig) - p.queueMsg(closeReq, nil) + p.queueMsg(closeReq, true, nil) return txid, nil } @@ -1108,7 +1165,7 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel, peerLog.Errorf("unable to expand revocation window: %v", err) continue } - p.queueMsg(rev, nil) + p.queueMsg(rev, true, nil) } state := &commitmentState{ @@ -1244,7 +1301,7 @@ func (p *peer) sendInitMsg() error { p.server.localFeatures, ) - p.queueMsg(msg, nil) + p.queueMsg(msg, true, nil) return nil } @@ -1278,7 +1335,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { return } - p.queueMsg(htlc, nil) + p.queueMsg(htlc, true, nil) state.pendingBatch = append(state.pendingBatch, &pendingPayment{ htlc: htlc, @@ -1308,7 +1365,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { // Then we send the HTLC settle message to the connected peer // so we can continue the propagation of the settle message. - p.queueMsg(htlc, nil) + p.queueMsg(htlc, true, nil) isSettle = true case *lnwire.UpdateFailHTLC: @@ -1329,7 +1386,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { // Finally, we send the HTLC message to the peer which // initially created the HTLC. - p.queueMsg(htlc, nil) + p.queueMsg(htlc, true, nil) isSettle = true } @@ -1478,7 +1535,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { peerLog.Errorf("unable to revoke commitment: %v", err) return } - p.queueMsg(nextRevocation, nil) + p.queueMsg(nextRevocation, true, nil) // If we just initiated a state transition, and we were waiting // for a reply from the remote peer, then we don't need to @@ -1574,7 +1631,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { ID: logIndex, PaymentPreimage: preimage, } - p.queueMsg(settleMsg, nil) + p.queueMsg(settleMsg, true, nil) delete(state.htlcsToSettle, htlc.Index) settledPayments[htlc.RHash] = struct{}{} @@ -1604,7 +1661,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { ID: logIndex, Reason: []byte{byte(reason)}, } - p.queueMsg(cancelMsg, nil) + p.queueMsg(cancelMsg, true, nil) delete(state.htlcsToCancel, htlc.Index) cancelledHtlcs[htlc.Index] = struct{}{} @@ -1698,7 +1755,7 @@ func (p *peer) updateCommitTx(state *commitmentState, reply bool) error { ChannelPoint: *state.chanPoint, CommitSig: parsedSig, } - p.queueMsg(commitSig, nil) + p.queueMsg(commitSig, true, nil) // As we've just cleared out a batch, move all pending updates to the // map of cleared HTLCs, clearing out the set of pending updates. diff --git a/retranmission.go b/retranmission.go new file mode 100644 index 0000000000..cdd1767b83 --- /dev/null +++ b/retranmission.go @@ -0,0 +1,203 @@ +package main + +import ( + "sync" + + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" +) + +// retransmitter represents the retransmission subsystem which is described +// in details in BOLT #2 (Message Retransmission). This subsystem keeps +// records of all messages that were sent to other peer and waits the ACK +// message to be received from other side. The ACK message denotes that the +// previous messages was read. Because communication transports are unreliable +// and may need to be re-established from time to time and reconnection +// introduces doubt as to what has been received such logic is needed to be sure +// that peers are in consistent state in terms of message communication. +type retransmitter struct { + // storage is a message storage which is needed to store the messages + // which weren't acked and restore them after node restarts, in order to + // send them to other side again. + storage channeldb.MessageStore + + // codeToIndex map is used to locate which messages can be deleted from + // the message storage in response to a retrieved ACK message. The + // mapping for items is code -> {index #1 , ... , index #n}. So when + // receiving a new message, we check for the existence of the message + // code in this index bucket, then delete all the messages from the + // top-level bucket that are returned. + codeToIndex map[lnwire.MessageCode][]uint64 + + // messagesToRetransmit list of messages that should be retransmitted + // to other side. + messagesToRetransmit []lnwire.Message + + mutex sync.RWMutex +} + +// newRetransmitter creates new instance of retransmitter. +func newRetransmitter(storage channeldb.MessageStore) (*retransmitter, error) { + + // Retrieve the messages from the message storage with their + // associated indexes. + indexes, messages, err := storage.Get() + if err != channeldb.ErrPeerMessagesNotFound && err != nil { + return nil, err + } + + // Initialize map of code to index map, so than later we can retrieve + // indexes of messages that should be removed. + codeToIndex := make(map[lnwire.MessageCode][]uint64) + for i, message := range messages { + codeToIndex[message.Command()] = append( + codeToIndex[message.Command()], + indexes[i], + ) + } + + return &retransmitter{ + storage: storage, + codeToIndex: codeToIndex, + messagesToRetransmit: messages, + }, nil +} + +// Register adds message that should be acknowledged in the message storage. +func (rt *retransmitter) Register(msg lnwire.Message) error { + switch msg.Command() { + // messages without acknowledgment + case lnwire.CmdCloseComplete, + lnwire.CmdChannelUpdateAnnouncement, + lnwire.CmdChannelAnnouncement, + lnwire.CmdNodeAnnouncement, + lnwire.CmdPing, + lnwire.CmdPong, + lnwire.CmdErrorGeneric, + lnwire.CmdInit: + return nil + default: + // Adds message to storage and returns the message index which + // have been associated with this message within the storage. + index, err := rt.storage.Add(msg) + if err != nil { + return err + } + + // Associate the message index within the message storage + // with message code in order to remove messages by index later. + rt.mutex.Lock() + rt.codeToIndex[msg.Command()] = append( + rt.codeToIndex[msg.Command()], index, + ) + rt.mutex.Unlock() + + return nil + } +} + +// Ack encapsulates the specification logic about which messages should be +// acknowledged by receiving this one. +func (rt *retransmitter) Ack(msg lnwire.Message) error { + switch msg.Command() { + + case lnwire.CmdSingleFundingResponse: + return rt.remove( + lnwire.CmdSingleFundingRequest, + ) + case lnwire.CmdSingleFundingComplete: + return rt.remove( + lnwire.CmdSingleFundingResponse, + ) + case lnwire.CmdSingleFundingSignComplete: + return rt.remove( + lnwire.CmdSingleFundingComplete, + ) + case lnwire.CmdFundingLocked: + return rt.remove( + lnwire.CmdSingleFundingSignComplete, + ) + case lnwire.CmdUpdateAddHTLC, + lnwire.CmdUpdateFailHTLC, + lnwire.CmdUpdateFufillHTLC: + return rt.remove( + lnwire.CmdFundingLocked, + ) + case lnwire.CmdRevokeAndAck: + return rt.remove( + lnwire.CmdUpdateAddHTLC, + lnwire.CmdUpdateFailHTLC, + lnwire.CmdUpdateFufillHTLC, + lnwire.CmdCommitSig, + lnwire.CmdCloseRequest, + lnwire.CmdFundingLocked, + ) + + case lnwire.CmdCommitSig, + lnwire.CmdCloseRequest: + return rt.remove( + lnwire.CmdFundingLocked, + lnwire.CmdRevokeAndAck, + ) + case lnwire.CmdCloseComplete: + return rt.remove( + lnwire.CmdCloseRequest, + ) + case lnwire.CmdPing, + lnwire.CmdPong, + lnwire.CmdChannelUpdateAnnouncement, + lnwire.CmdChannelAnnouncement, + lnwire.CmdNodeAnnouncement, + lnwire.CmdErrorGeneric, + lnwire.CmdInit, + lnwire.CmdSingleFundingRequest: + return nil + + default: + return errors.Errorf("wrong message type: %v", msg.Command()) + } +} + +// remove retrieves the messages storage indexes of the messages that +// corresponds to given types/codes and remove them from the message storage +// thereby acknowledge them. +func (rt *retransmitter) remove(codes ...lnwire.MessageCode) error { + rt.mutex.RLock() + var messagesToRemove []uint64 + for _, code := range codes { + indexes, ok := rt.codeToIndex[code] + if !ok { + continue + } + messagesToRemove = append(messagesToRemove, indexes...) + } + rt.mutex.RUnlock() + + if err := rt.storage.Remove(messagesToRemove); err != nil { + return err + } + + // After successful deletion the messages by index, clean up the code + // to index map. + rt.mutex.Lock() + for _, code := range codes { + delete(rt.codeToIndex, code) + } + rt.mutex.Unlock() + + return nil +} + +// MessagesToRetransmit returns the array of messages, that were not +// acknowledged in previous session with this peer, in the order they have been +// originally added in storage. +func (rt *retransmitter) MessagesToRetransmit() []lnwire.Message { + return rt.messagesToRetransmit +} + +// Flush removes the initialized messages after the have been successfully +// retransmitted. +func (rt *retransmitter) Flush() { + rt.messagesToRetransmit = nil +} diff --git a/retransmission_test.go b/retransmission_test.go new file mode 100644 index 0000000000..6fec2b2a90 --- /dev/null +++ b/retransmission_test.go @@ -0,0 +1,197 @@ +package main + +import ( + "testing" + + "sort" + + "github.com/lightningnetwork/lnd/lnwire" +) + +var ackTestVector = []struct { + name string + send lnwire.Message + recv lnwire.Message +}{ + { + name: "open_channel -> accept_channel", + send: &lnwire.SingleFundingRequest{}, + recv: &lnwire.SingleFundingResponse{}, + }, + { + name: "accept_channel -> funding_created", + send: &lnwire.SingleFundingResponse{}, + recv: &lnwire.SingleFundingComplete{}, + }, + { + name: "funding_created -> funding_signed", + send: &lnwire.SingleFundingComplete{}, + recv: &lnwire.SingleFundingSignComplete{}, + }, + { + name: "funding_signed -> funding_locked", + send: &lnwire.SingleFundingSignComplete{}, + recv: &lnwire.FundingLocked{}, + }, + { + name: "funding_locked -> update_add_htlc", + send: &lnwire.FundingLocked{}, + recv: &lnwire.UpdateAddHTLC{}, + }, + { + name: "funding_locked -> update_fulfill_htlc", + send: &lnwire.FundingLocked{}, + recv: &lnwire.UpdateFufillHTLC{}, + }, + { + name: "funding_locked -> update_fail_htlc", + send: &lnwire.FundingLocked{}, + recv: &lnwire.UpdateFailHTLC{}, + }, + // TODO(andrew.shvv) uncomment after update_fail_malformed_htlc will + // be included + //{ + // name: "funding_locked -> update_fail_malformed_htlc", + // send: &lnwire.SingleFundingOpenProof{}, + // recv: , + //}, + { + name: "funding_locked -> commitment_signed", + send: &lnwire.FundingLocked{}, + recv: &lnwire.CommitSig{}, + }, + { + name: "funding_locked -> revoke_and_ack", + send: &lnwire.FundingLocked{}, + recv: &lnwire.RevokeAndAck{}, + }, + { + name: "funding_locked -> shutdown", + send: &lnwire.FundingLocked{}, + recv: &lnwire.CloseRequest{}, + }, + { + name: "update_add_htlc -> revoke_and_ack", + send: &lnwire.UpdateAddHTLC{}, + recv: &lnwire.RevokeAndAck{}, + }, + { + name: "update_fulfill_htlc -> revoke_and_ack", + send: &lnwire.UpdateFufillHTLC{}, + recv: &lnwire.RevokeAndAck{}, + }, + { + name: "update_fail_htlc -> revoke_and_ack", + send: &lnwire.UpdateFailHTLC{}, + recv: &lnwire.RevokeAndAck{}, + }, + // TODO(andrew.shvv) uncomment after update_fail_malformed_htlc will + // be included + //{ + // name: "update_fail_malformed_htlc -> revoke_and_ack", + // send: , + // recv: &lnwire.RevokeAndAck{}, + //}, + { + name: "commitment_signed -> revoke_and_ack", + send: &lnwire.CommitSig{}, + recv: &lnwire.RevokeAndAck{}, + }, + { + name: "revoke_and_ack -> commitment_signed", + send: &lnwire.RevokeAndAck{}, + recv: &lnwire.CommitSig{}, + }, + { + name: "revoke_and_ack -> shutdown", + send: &lnwire.RevokeAndAck{}, + recv: &lnwire.CloseRequest{}, + }, + + { + name: "shutdown -> revoke_and_ack", + send: &lnwire.CloseRequest{}, + recv: &lnwire.RevokeAndAck{}, + }, + { + name: "shutdown -> closing_signed", + send: &lnwire.CloseRequest{}, + recv: &lnwire.CloseComplete{}, + }, +} + +// MockStore map implementation of message storage which not requires the +// messages to be decoded/encoded which means that we shouldn't populate the +// lnwire message with data. +type MockStore struct { + sequence uint64 + messages map[uint64]lnwire.Message +} + +func (s *MockStore) Get() ([]uint64, []lnwire.Message, error) { + indexes := make([]int, len(s.messages)) + messages := make([]lnwire.Message, len(s.messages)) + + i := 0 + for index := range s.messages { + indexes[i] = int(index) + i++ + } + sort.Ints(indexes) + + uindexes := make([]uint64, len(s.messages)) + for i, index := range indexes { + messages[i] = s.messages[uint64(index)] + uindexes[i] = uint64(index) + } + + return uindexes, messages, nil +} +func (s *MockStore) Add(msg lnwire.Message) (uint64, error) { + index := s.sequence + s.messages[index] = msg + s.sequence++ + + return index, nil +} + +func (s *MockStore) Remove(indexes []uint64) error { + for _, index := range indexes { + delete(s.messages, index) + } + return nil +} + +// TestRetransmitterSpecVector tests the behaviour of retransmission +// subsystem which is described in specification. +func TestRetransmitterSpecVector(t *testing.T) { + + s := &MockStore{messages: make(map[uint64]lnwire.Message)} + + rt, err := newRetransmitter(s) + if err != nil { + t.Fatalf("can't init retransmitter: %v", err) + } + + for _, test := range ackTestVector { + if err := rt.Register(test.send); err != nil { + t.Fatalf("can't register message: %v", err) + } + + _, messages, _ := s.Get() + if len(messages) != 1 { + t.Fatalf("test(%v): message(%v) wasn't registered", + test.name, test.send.Command()) + } + + if err := rt.Ack(test.recv); err != nil { + t.Fatalf("can't ack message: %v", err) + } + + _, messages, _ = s.Get() + if len(messages) != 0 { + t.Fatalf("test(%v): message(%v) wasn't acked", + test.name, test.send.Command()) + } + } +} diff --git a/server.go b/server.go index 8990d04637..7e459006ff 100644 --- a/server.go +++ b/server.go @@ -264,6 +264,9 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, go s.connMgr.Connect(connReq) } + srvrLog.Infof("Identity key: %x", + s.identityPriv.PubKey().SerializeCompressed()) + return s, nil } @@ -445,7 +448,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound // Now that we've established a connection, create a peer, and // it to the set of currently active peers. - p, err := newPeer(conn, connReq, s, peerAddr, inbound) + p, err := newPeer(conn, connReq, s, peerAddr, inbound, s.chanDB) if err != nil { srvrLog.Errorf("unable to create peer %v", err) if p.connReq != nil { @@ -510,17 +513,17 @@ func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) s.peersMtx.Lock() defer s.peersMtx.Unlock() - srvrLog.Tracef("Established connection to: %v", conn.RemoteAddr()) - nodePub := conn.(*brontide.Conn).RemotePub() + srvrLog.Tracef("Established connection to: %x@%v", + nodePub.SerializeCompressed(), conn.RemoteAddr()) // If we already have an inbound connection from this peer, simply drop // the connection. pubStr := string(nodePub.SerializeCompressed()) if _, ok := s.peersByPub[pubStr]; ok { - srvrLog.Errorf("Established outbound connection to peer %x, but "+ - "already connected, dropping conn", - nodePub.SerializeCompressed()) + srvrLog.Errorf("Established outbound connection to peer"+ + "(%x@%v), but already connected, dropping conn", + nodePub.SerializeCompressed(), conn.RemoteAddr()) s.connMgr.Remove(connReq.ID()) conn.Close() return @@ -664,7 +667,7 @@ out: go func(p *peer) { for _, msg := range bMsg.msgs { - p.queueMsg(msg, nil) + p.queueMsg(msg, true, nil) } }(sPeer) } @@ -699,7 +702,7 @@ out: sMsg.errChan <- nil for _, msg := range sMsg.msgs { - targetPeer.queueMsg(msg, nil) + targetPeer.queueMsg(msg, true, nil) } }() case query := <-s.queries: diff --git a/utxonursery_test.go b/utxonursery_test.go index f21fcc7b03..df3c617208 100644 --- a/utxonursery_test.go +++ b/utxonursery_test.go @@ -2,7 +2,6 @@ package main import ( "bytes" - "fmt" "reflect" "testing" @@ -236,7 +235,7 @@ func TestSerializeKidOutput(t *testing.T) { deserializedKid, err := deserializeKidOutput(&b) if err != nil { - fmt.Printf(err.Error()) + t.Fatalf("can't deserialize kid output: %v", err) } if !reflect.DeepEqual(kid, deserializedKid) {