Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BOLT#2: Add message retransmission sub-system #156

Closed
4 changes: 4 additions & 0 deletions channeldb/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,8 @@ var (

// ErrNodeAliasNotFound is returned when alias for node can't be found.
ErrNodeAliasNotFound = fmt.Errorf("alias for node not found")

// ErrPeerMessagesNotFound is returned when no message have been
// found in the peer bucket or if bucket haven't been created yet.
ErrPeerMessagesNotFound = fmt.Errorf("peer messages not found")
)
174 changes: 174 additions & 0 deletions channeldb/messagestore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package channeldb

import (
"bytes"
"encoding/binary"

"github.com/boltdb/bolt"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/wire"
)

var (
// basePeerBucketKey is the base key for generating peer bucket keys.
// Peer bucket stores top-level bucket that maps: index -> code || msg
// concatenating the message code to the stored data allows the db logic
// to properly parse the wire message without trial and error or needing
// an additional index.
basePeerBucketKey = []byte("peermessagesstore")
)

// MessageStore represents the storage for lnwire messages, it might be boltd storage,
// local storage, test storage or hybrid one.
//
// NOTE: The original purpose of creating this interface was in separation
// between the retransmission logic and specifics of storage itself. In case of
// such interface we may create the test storage and register the add,
// remove, get actions without the need of population of lnwire messages with
// data.
type MessageStore interface {
// Get returns the sorted set of messages in the order they have been
// added originally and also the array of associated index to this
// messages within the message store.
Get() ([]uint64, []lnwire.Message, error)

// Add adds new message in the storage with preserving the order and
// returns the index of message within the message store.
Add(msg lnwire.Message) (uint64, error)

// Remove deletes message with this indexes.
Remove(indexes []uint64) error
}

// messagesStore represents the boltdb storage for messages inside
// retransmission sub-system.
type messagesStore struct {
// id is a unique slice of bytes identifying a peer. This value is
// typically a peer's identity public key serialized in compressed
// format.
id []byte
db *DB
}

// NewMessageStore creates new instance of message storage.
func NewMessageStore(id []byte, db *DB) MessageStore {
return &messagesStore{
id: id,
db: db,
}
}

// Add adds message to the storage and returns the index which
// corresponds the the message by which it might be removed later.
func (s *messagesStore) Add(msg lnwire.Message) (uint64, error) {
var index uint64

err := s.db.Batch(func(tx *bolt.Tx) error {
var err error

// Get or create the top peer bucket.
peerBucketKey := s.getPeerBucketKey()
peerBucket, err := tx.CreateBucketIfNotExists(peerBucketKey)
if err != nil {
return err
}

// Generate next index number to add it to the message code
// bucket.
index, err = peerBucket.NextSequence()
if err != nil {
return err
}
indexBytes := make([]byte, 8)
binary.BigEndian.PutUint64(indexBytes, index)

// Encode the message and place it in the top bucket.
var b bytes.Buffer
_, err = lnwire.WriteMessage(&b, msg, 0, wire.MainNet)
if err != nil {
return err
}

return peerBucket.Put(indexBytes, b.Bytes())
})

return index, err
}

// Remove removes the message from storage by index that were assigned to
// message during its addition to the storage.
func (s *messagesStore) Remove(indexes []uint64) error {
return s.db.Batch(func(tx *bolt.Tx) error {
// Get or create the top peer bucket.
peerBucketKey := s.getPeerBucketKey()
peerBucket := tx.Bucket(peerBucketKey)
if peerBucket == nil {
return ErrPeerMessagesNotFound
}

// Retrieve the messages indexes with this type/code and
// remove them from top peer bucket.
for _, index := range indexes {
var key [8]byte
binary.BigEndian.PutUint64(key[:], index)

if err := peerBucket.Delete(key[:]); err != nil {
return err
}
}

return nil
})
}

// Get retrieves messages from storage in the order in which they were
// originally added, proper order is needed for retransmission subsystem, as
// far this messages should be resent to remote peer in the same order as they
// were sent originally.
func (s *messagesStore) Get() ([]uint64, []lnwire.Message, error) {
var messages []lnwire.Message
var indexes []uint64

if err := s.db.View(func(tx *bolt.Tx) error {
peerBucketKey := s.getPeerBucketKey()
peerBucket := tx.Bucket(peerBucketKey)
if peerBucket == nil {
return nil
}

// Iterate over messages buckets.
return peerBucket.ForEach(func(k, v []byte) error {
// Skip buckets fields.
if v == nil {
return nil
}

// Decode the message from and add it to the array.
r := bytes.NewReader(v)
_, msg, _, err := lnwire.ReadMessage(r, 0, wire.MainNet)
if err != nil {
return err
}

messages = append(messages, msg)
indexes = append(indexes, binary.BigEndian.Uint64(k))
return nil
})
}); err != nil {
return nil, nil, err
}

// If bucket was haven't been created yet or just not contains any
// messages.
if len(messages) == 0 {
return nil, nil, ErrPeerMessagesNotFound
}

return indexes, messages, nil
}

// getPeerBucketKey generates the peer bucket boltd key by peer id and base
// peer bucket key.
func (s *messagesStore) getPeerBucketKey() []byte {
return append(basePeerBucketKey[:], s.id[:]...)
}
108 changes: 108 additions & 0 deletions channeldb/messagestore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package channeldb

import (
"bytes"
"crypto/sha256"
"testing"

"reflect"

"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/wire"
)

// TestRetransmitterMessage tests the ability of message storage to
// add/remove/get messages and also checks that the order in which the
// messages had been added corresponds to the order of messages from Get
// function.
func TestRetransmitterMessageOrder(t *testing.T) {
db, clean, err := makeTestDB()
if err != nil {
t.Fatal(err)
}
defer clean()
s := NewMessageStore([]byte("id"), db)

var (
hash1, _ = chainhash.NewHash(bytes.Repeat([]byte("a"), 32))
hash2, _ = chainhash.NewHash(bytes.Repeat([]byte("b"), 32))

chanPoint1 = wire.NewOutPoint(hash1, 0)
chanPoint2 = wire.NewOutPoint(hash2, 1)

preimage1 = [sha256.Size]byte{0}
preimage2 = [sha256.Size]byte{1}
)

// Check that we are receiving the error without messages inside
// the storage.
_, messages, err := s.Get()
if err != nil && err != ErrPeerMessagesNotFound {
t.Fatalf("can't get the message: %v", err)
} else if len(messages) != 0 {
t.Fatal("wrong length of messages")
}

msg1 := &lnwire.UpdateFufillHTLC{
ChannelPoint: *chanPoint1,
ID: 0,
PaymentPreimage: preimage1,
}

index1, err := s.Add(msg1)
if err != nil {
t.Fatalf("can't add the message to the message store: %v", err)
}

msg2 := &lnwire.UpdateFufillHTLC{
ChannelPoint: *chanPoint2,
ID: 1,
PaymentPreimage: preimage2,
}

index2, err := s.Add(msg2)
if err != nil {
t.Fatalf("can't add the message to the message store: %v", err)
}

_, messages, err = s.Get()
if err != nil {
t.Fatalf("can't get the message: %v", err)
} else if len(messages) != 2 {
t.Fatal("wrong length of messages")
}

m, ok := messages[0].(*lnwire.UpdateFufillHTLC)
if !ok {
t.Fatal("wrong type of message")
}

// Check the order
if !reflect.DeepEqual(m, msg1) {
t.Fatal("wrong order of message")
}

m, ok = messages[1].(*lnwire.UpdateFufillHTLC)
if !ok {
t.Fatal("wrong type of message")
}

// Check the order
if !reflect.DeepEqual(m, msg2) {
t.Fatal("wrong order of message")
}

// Remove the messages by index and check that get function return
// non of the messages.
if err := s.Remove([]uint64{index1, index2}); err != nil {
t.Fatalf("can't remove the message: %v", err)
}

_, messages, err = s.Get()
if err != nil && err != ErrPeerMessagesNotFound {
t.Fatalf("can't get the message: %v", err)
} else if len(messages) != 0 {
t.Fatal("wrong length of messages")
}
}
3 changes: 2 additions & 1 deletion fundingmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,8 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
delay := msg.CsvDelay

// TODO(roasbeef): error if funding flow already ongoing
fndgLog.Infof("Recv'd fundingRequest(amt=%v, delay=%v, pendingId=%v) "+
fndgLog.Infof("Recv'd fundingRequest(amt=%v, "+
"pushSatoshis=%v, delay=%v, pendingId=%v) "+
"from peer(%x)", amt, msg.PushSatoshis, delay, msg.ChannelID,
fmsg.peerAddress.IdentityKey.SerializeCompressed())

Expand Down
15 changes: 9 additions & 6 deletions gotest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ print () {
# check_test_ports checks that test lnd ports are not used.
check_test_ports() {
# Make sure that test lnd ports are not used.
o=$(lsof -i :19555,19556 | sed '1d' | awk '{ printf "%s\n", $2 }')
o=$(lsof -i :19555,19556,19557 | sed '1d' | awk '{ printf "%s\n", $2 }')
if [ "$o" != "" ]; then
printf "Can't run the lnd tests:\n"
printf "some program is using the test lnd ports (19555 | 19556)\n"
printf "some program is using the test lnd ports (19555 | 19556 | 19557)\n"
exit 1
fi
}
Expand Down Expand Up @@ -85,15 +85,18 @@ lint_check() {
print "* Run static checks"

# Make sure gometalinter is installed and $GOPATH/bin is in your path.
if [ ! -x "$(type -p gometalinter)" ]; then
if [ ! -x "$(type -p gometalinter.v1)" ]; then
print "** Install gometalinter"
go get -v github.com/alecthomas/gometalinter
gometalinter --install
go get -u gopkg.in/alecthomas/gometalinter.v1
gometalinter.v1 --install
fi

# Update metalinter if needed.
gometalinter.v1 --install 1>/dev/null

# Automatic checks
linter_targets=$(glide novendor | grep -v lnrpc)
test -z "$(gometalinter --disable-all \
test -z "$(gometalinter.v1 --disable-all \
--enable=gofmt \
--enable=vet \
--enable=golint \
Expand Down
8 changes: 8 additions & 0 deletions lnd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1467,6 +1467,14 @@ func testRevokedCloseRetribution(net *networkHarness, t *harnessTest) {
return bobChannelInfo.Channels[0], nil
}

// Wait for Alice to receive the channel edge from the funding manager.
ctxt, _ = context.WithTimeout(ctxb, timeout)
err := net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint)
if err != nil {
t.Fatalf("alice didn't see the alice->bob channel before "+
"timeout: %v", err)
}

// Open up a payment stream to Alice that we'll use to send payment to
// Bob. We also create a small helper function to send payments to Bob,
// consuming the payment hashes we generated above.
Expand Down
4 changes: 2 additions & 2 deletions lnwire/channel_announcement.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func (a *ChannelAnnouncement) Encode(w io.Writer, pver uint32) error {
// wire.
//
// This is part of the lnwire.Message interface.
func (a *ChannelAnnouncement) Command() uint32 {
return CmdChannelAnnoucmentMessage
func (a *ChannelAnnouncement) Command() MessageCode {
return CmdChannelAnnouncement
}

// MaxPayloadLength returns the maximum allowed payload size for this message
Expand Down
4 changes: 1 addition & 3 deletions lnwire/channel_announcement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ func TestChannelAnnoucementValidation(t *testing.T) {
firstBitcoinPrivKey, firstBitcoinPubKey := getKeys("bitcoin-key-1")
secondBitcoinPrivKey, secondBitcoinPubKey := getKeys("bitcoin-key-2")

var hash []byte

hash = chainhash.DoubleHashB(firstNodePubKey.SerializeCompressed())
hash := chainhash.DoubleHashB(firstNodePubKey.SerializeCompressed())
firstBitcoinSig, _ := firstBitcoinPrivKey.Sign(hash)

hash = chainhash.DoubleHashB(secondNodePubKey.SerializeCompressed())
Expand Down
4 changes: 2 additions & 2 deletions lnwire/channel_update_announcement.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ func (a *ChannelUpdateAnnouncement) Encode(w io.Writer, pver uint32) error {
// wire.
//
// This is part of the lnwire.Message interface.
func (a *ChannelUpdateAnnouncement) Command() uint32 {
return CmdChannelUpdateAnnoucmentMessage
func (a *ChannelUpdateAnnouncement) Command() MessageCode {
return CmdChannelUpdateAnnouncement
}

// MaxPayloadLength returns the maximum allowed payload size for this message
Expand Down
2 changes: 1 addition & 1 deletion lnwire/close_complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *CloseComplete) Encode(w io.Writer, pver uint32) error {
// wire.
//
// This is part of the lnwire.Message interface.
func (c *CloseComplete) Command() uint32 {
func (c *CloseComplete) Command() MessageCode {
return CmdCloseComplete
}

Expand Down
Loading