Skip to content

Commit

Permalink
channeldb+retranmission: add retranmission subsystem
Browse files Browse the repository at this point in the history
Issue: #137

In this commit retranmission subsystem and boltdb mesage storage were
added. Retransmission subsystem described in details in BOLT #2
(Message Retransmission) section. This subsystem keeps records
of all messages that were sent to other peer and waits the ACK
message to be received from other side and after that removes all
acked messaged from the storage.
  • Loading branch information
andrewshvv committed Mar 16, 2017
1 parent 69be2de commit 9243a99
Show file tree
Hide file tree
Showing 5 changed files with 686 additions and 0 deletions.
4 changes: 4 additions & 0 deletions channeldb/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,8 @@ var (

// ErrNodeAliasNotFound is returned when alias for node can't be found.
ErrNodeAliasNotFound = fmt.Errorf("alias for node not found")

// ErrPeerMessagesNotFound is returned when no message have been
// found in the peer bucket or if bucket haven't been created yet.
ErrPeerMessagesNotFound = fmt.Errorf("peer messages not found")
)
174 changes: 174 additions & 0 deletions channeldb/messagestore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package channeldb

import (
"bytes"
"encoding/binary"

"github.com/boltdb/bolt"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/wire"
)

var (
// basePeerBucketKey is the base key for generating peer bucket keys.
// Peer bucket stores top-level bucket that maps: index -> code || msg
// concatenating the message code to the stored data allows the db logic
// to properly parse the wire message without trial and error or needing
// an additional index.
basePeerBucketKey = []byte("peermessagesstore")
)

// MessageStore represents the storage for lnwire messages, it might be boltd storage,
// local storage, test storage or hybrid one.
//
// NOTE: The original purpose of creating this interface was in separation
// between the retransmission logic and specifics of storage itself. In case of
// such interface we may create the test storage and register the add,
// remove, get actions without the need of population of lnwire messages with
// data.
type MessageStore interface {
// Get returns the sorted set of messages in the order they have been
// added originally and also the array of associated index to this
// messages within the message store.
Get() ([]uint64, []lnwire.Message, error)

// Add adds new message in the storage with preserving the order and
// returns the index of message within the message store.
Add(msg lnwire.Message) (uint64, error)

// Remove deletes message with this indexes.
Remove(indexes []uint64) error
}

// messagesStore represents the boltdb storage for messages inside
// retransmission sub-system.
type messagesStore struct {
// id is a unique slice of bytes identifying a peer. This value is
// typically a peer's identity public key serialized in compressed
// format.
id []byte
db *DB
}

// NewMessageStore creates new instance of message storage.
func NewMessageStore(id []byte, db *DB) MessageStore {
return &messagesStore{
id: id,
db: db,
}
}

// Add adds message to the storage and returns the index which
// corresponds the the message by which it might be removed later.
func (s *messagesStore) Add(msg lnwire.Message) (uint64, error) {
var index uint64

err := s.db.Batch(func(tx *bolt.Tx) error {
var err error

// Get or create the top peer bucket.
peerBucketKey := s.getPeerBucketKey()
peerBucket, err := tx.CreateBucketIfNotExists(peerBucketKey)
if err != nil {
return err
}

// Generate next index number to add it to the message code
// bucket.
index, err = peerBucket.NextSequence()
if err != nil {
return err
}
indexBytes := make([]byte, 8)
binary.BigEndian.PutUint64(indexBytes, index)

// Encode the message and place it in the top bucket.
var b bytes.Buffer
_, err = lnwire.WriteMessage(&b, msg, 0, wire.MainNet)
if err != nil {
return err
}

return peerBucket.Put(indexBytes, b.Bytes())
})

return index, err
}

// Remove removes the message from storage by index that were assigned to
// message during its addition to the storage.
func (s *messagesStore) Remove(indexes []uint64) error {
return s.db.Batch(func(tx *bolt.Tx) error {
// Get or create the top peer bucket.
peerBucketKey := s.getPeerBucketKey()
peerBucket := tx.Bucket(peerBucketKey)
if peerBucket == nil {
return ErrPeerMessagesNotFound
}

// Retrieve the messages indexes with this type/code and
// remove them from top peer bucket.
for _, index := range indexes {
var key [8]byte
binary.BigEndian.PutUint64(key[:], index)

if err := peerBucket.Delete(key[:]); err != nil {
return err
}
}

return nil
})
}

// Get retrieves messages from storage in the order in which they were
// originally added, proper order is needed for retransmission subsystem, as
// far this messages should be resent to remote peer in the same order as they
// were sent originally.
func (s *messagesStore) Get() ([]uint64, []lnwire.Message, error) {
var messages []lnwire.Message
var indexes []uint64

if err := s.db.View(func(tx *bolt.Tx) error {
peerBucketKey := s.getPeerBucketKey()
peerBucket := tx.Bucket(peerBucketKey)
if peerBucket == nil {
return nil
}

// Iterate over messages buckets.
return peerBucket.ForEach(func(k, v []byte) error {
// Skip buckets fields.
if v == nil {
return nil
}

// Decode the message from and add it to the array.
r := bytes.NewReader(v)
_, msg, _, err := lnwire.ReadMessage(r, 0, wire.MainNet)
if err != nil {
return err
}

messages = append(messages, msg)
indexes = append(indexes, binary.BigEndian.Uint64(k))
return nil
})
}); err != nil {
return nil, nil, err
}

// If bucket was haven't been created yet or just not contains any
// messages.
if len(messages) == 0 {
return nil, nil, ErrPeerMessagesNotFound
}

return indexes, messages, nil
}

// getPeerBucketKey generates the peer bucket boltd key by peer id and base
// peer bucket key.
func (s *messagesStore) getPeerBucketKey() []byte {
return append(basePeerBucketKey[:], s.id[:]...)
}
108 changes: 108 additions & 0 deletions channeldb/messagestore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package channeldb

import (
"bytes"
"crypto/sha256"
"testing"

"reflect"

"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/wire"
)

// TestRetransmitterMessage tests the ability of message storage to
// add/remove/get messages and also checks that the order in which the
// messages had been added corresponds to the order of messages from Get
// function.
func TestRetransmitterMessageOrder(t *testing.T) {
db, clean, err := makeTestDB()
if err != nil {
t.Fatal(err)
}
defer clean()
s := NewMessageStore([]byte("id"), db)

var (
hash1, _ = chainhash.NewHash(bytes.Repeat([]byte("a"), 32))
hash2, _ = chainhash.NewHash(bytes.Repeat([]byte("b"), 32))

chanPoint1 = wire.NewOutPoint(hash1, 0)
chanPoint2 = wire.NewOutPoint(hash2, 1)

preimage1 = [sha256.Size]byte{0}
preimage2 = [sha256.Size]byte{1}
)

// Check that we are receiving the error without messages inside
// the storage.
_, messages, err := s.Get()
if err != nil && err != ErrPeerMessagesNotFound {
t.Fatalf("can't get the message: %v", err)
} else if len(messages) != 0 {
t.Fatal("wrong length of messages")
}

msg1 := &lnwire.UpdateFufillHTLC{
ChannelPoint: *chanPoint1,
ID: 0,
PaymentPreimage: preimage1,
}

index1, err := s.Add(msg1)
if err != nil {
t.Fatalf("can't add the message to the message store: %v", err)
}

msg2 := &lnwire.UpdateFufillHTLC{
ChannelPoint: *chanPoint2,
ID: 1,
PaymentPreimage: preimage2,
}

index2, err := s.Add(msg2)
if err != nil {
t.Fatalf("can't add the message to the message store: %v", err)
}

_, messages, err = s.Get()
if err != nil {
t.Fatalf("can't get the message: %v", err)
} else if len(messages) != 2 {
t.Fatal("wrong length of messages")
}

m, ok := messages[0].(*lnwire.UpdateFufillHTLC)
if !ok {
t.Fatal("wrong type of message")
}

// Check the order
if !reflect.DeepEqual(m, msg1) {
t.Fatal("wrong order of message")
}

m, ok = messages[1].(*lnwire.UpdateFufillHTLC)
if !ok {
t.Fatal("wrong type of message")
}

// Check the order
if !reflect.DeepEqual(m, msg2) {
t.Fatal("wrong order of message")
}

// Remove the messages by index and check that get function return
// non of the messages.
if err := s.Remove([]uint64{index1, index2}); err != nil {
t.Fatalf("can't remove the message: %v", err)
}

_, messages, err = s.Get()
if err != nil && err != ErrPeerMessagesNotFound {
t.Fatalf("can't get the message: %v", err)
} else if len(messages) != 0 {
t.Fatal("wrong length of messages")
}
}
Loading

0 comments on commit 9243a99

Please sign in to comment.