Skip to content

Commit

Permalink
lnd: add retransmission subsystem
Browse files Browse the repository at this point in the history
Issue: #137

In this commit the retransmission subsystem was included in lnd,
now upon peer reconnection we fetch all messages from message storage
that were not acked and send them again to remote side.
  • Loading branch information
andrewshvv committed Mar 16, 2017
1 parent 9243a99 commit f8b2624
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 30 deletions.
101 changes: 79 additions & 22 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
// buffered channel acts as a semaphore to be used for synchronization purposes.
type outgoinMsg struct {
msg lnwire.Message
persist bool
sentChan chan struct{} // MUST be buffered.
}

Expand Down Expand Up @@ -161,6 +162,14 @@ type peer struct {
// on both sides.
globalSharedFeatures *lnwire.SharedFeatures

// retransmitter is an retransmission subsystem aka message store, which
// stores outgoing messages that were not acked. Messages queue'd
// on-disk and in the situation when the server is unable to send the
// message to the peer due to it being offline this service will take of
// retransmitting the messages that were not acked to the remote upon
// reconnection.
retransmitter *retransmitter

queueQuit chan struct{}
quit chan struct{}
wg sync.WaitGroup
Expand All @@ -169,7 +178,7 @@ type peer struct {
// newPeer creates a new peer from an establish connection object, and a
// pointer to the main server.
func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
addr *lnwire.NetAddress, inbound bool) (*peer, error) {
addr *lnwire.NetAddress, inbound bool, db *channeldb.DB) (*peer, error) {

nodePub := addr.IdentityKey

Expand Down Expand Up @@ -205,6 +214,15 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
quit: make(chan struct{}),
}

storeID := p.addr.IdentityKey.SerializeCompressed()
rt, err := newRetransmitter(channeldb.NewMessageStore(storeID, db))
if err != nil {
peerLog.Errorf("unable to initialise retransmitter "+
"for peerID(%v): %v", p.id, err)
return nil, err
}
p.retransmitter = rt

// Initiate the pending channel identifier properly depending on if this
// node is inbound or outbound. This value will be used in an increasing
// manner to track pending channels.
Expand Down Expand Up @@ -285,7 +303,7 @@ func (p *peer) Start() error {
return nil
}

peerLog.Tracef("peer %v starting", p)
peerLog.Tracef("peer(%v) starting", p)

p.wg.Add(2)
go p.queueHandler()
Expand Down Expand Up @@ -313,6 +331,21 @@ func (p *peer) Start() error {
"must be init message")
}

// If we had the interaction with this peer before than we should
// retrieve the messages that were not acked in previous session and
// sent them again in order to be sure that remote peer is handled them.
messages := p.retransmitter.MessagesToRetransmit()
if len(messages) != 0 {
peerLog.Infof("retransmission subsystem resends %v messages "+
"to the peer(%v)", len(messages), p)

for _, message := range messages {
// Sending over sendToPeer will cause block because of
// the usage of peer mutex.
p.queueMsg(message, false, nil)
}
}

p.wg.Add(3)
go p.readHandler()
go p.channelManager()
Expand Down Expand Up @@ -350,7 +383,7 @@ func (p *peer) Disconnect() {
return
}

peerLog.Tracef("Disconnecting %s", p)
peerLog.Tracef("Disconnecting peer(%v)", p)

// Ensure that the TCP connection is properly closed before continuing.
p.conn.Close()
Expand All @@ -376,7 +409,10 @@ func (p *peer) Disconnect() {

// String returns the string representation of this peer.
func (p *peer) String() string {
return p.conn.RemoteAddr().String()
return fmt.Sprintf("%x@%v",
p.addr.IdentityKey.SerializeCompressed(),
p.addr.Address)

}

// readNextMessage reads, and returns the next message on the wire along with
Expand Down Expand Up @@ -405,8 +441,8 @@ out:
for atomic.LoadInt32(&p.disconnect) == 0 {
nextMsg, _, err := p.readNextMessage()
if err != nil {
peerLog.Infof("unable to read message from %v: %v",
p, err)
peerLog.Errorf("unable to read message from "+
"peer(%v): %v", p, err)

switch err.(type) {
// If this is just a message we don't yet recognize,
Expand All @@ -424,6 +460,12 @@ out:
}
}

if err := p.retransmitter.Ack(nextMsg); err != nil {
peerLog.Errorf("unable to ack messages for peer(%v):"+
" %v", p, err)
break out
}

var (
isChanUpdate bool
targetChan wire.OutPoint
Expand All @@ -440,7 +482,7 @@ out:
atomic.StoreInt64(&p.pingTime, delay)

case *lnwire.Ping:
p.queueMsg(lnwire.NewPong(msg.Nonce), nil)
p.queueMsg(lnwire.NewPong(msg.Nonce), true, nil)

case *lnwire.SingleFundingRequest:
p.server.fundingMgr.processFundingRequest(msg, p.addr)
Expand Down Expand Up @@ -492,7 +534,7 @@ out:
p.htlcManMtx.Unlock()
if !ok {
peerLog.Errorf("recv'd update for unknown "+
"channel %v from %v", targetChan, p)
"channel %v from peer(%v)", targetChan, p)
continue
}
channel <- nextMsg
Expand All @@ -502,7 +544,7 @@ out:
p.Disconnect()

p.wg.Done()
peerLog.Tracef("readHandler for peer %v done", p)
peerLog.Tracef("readHandler for peer(%v) done", p)
}

// logWireMessage logs the receipt or sending of particular wire message. This
Expand Down Expand Up @@ -586,6 +628,16 @@ func (p *peer) writeHandler() {
atomic.StoreInt64(&p.pingLastSend, now)
}

if outMsg.persist {
err := p.retransmitter.Register(outMsg.msg)
if err != nil {
peerLog.Errorf("unable to register "+
"message in retransmitter for "+
"peer(%v): %v", p, err)
p.Disconnect()
return
}
}
// Write out the message to the socket, closing the
// 'sentChan' if it's non-nil, The 'sentChan' allows
// callers to optionally synchronize sends with the
Expand Down Expand Up @@ -679,7 +731,7 @@ out:
// Convert the bytes read into a uint64, and queue the
// message for sending.
nonce := binary.BigEndian.Uint64(pingBuf[:])
p.queueMsg(lnwire.NewPing(nonce), nil)
p.queueMsg(lnwire.NewPing(nonce), true, nil)
case <-p.quit:
break out
}
Expand All @@ -695,9 +747,14 @@ func (p *peer) PingTime() int64 {

// queueMsg queues a new lnwire.Message to be eventually sent out on the
// wire.
func (p *peer) queueMsg(msg lnwire.Message, doneChan chan struct{}) {
func (p *peer) queueMsg(msg lnwire.Message, persist bool,
doneChan chan struct{}) {
select {
case p.outgoingQueue <- outgoinMsg{msg, doneChan}:
case p.outgoingQueue <- outgoinMsg{
msg: msg,
sentChan: doneChan,
persist: persist,
}:
case <-p.quit:
return
}
Expand Down Expand Up @@ -802,7 +859,7 @@ func (p *peer) executeCooperativeClose(channel *lnwallet.LightningChannel) (*cha
return nil, err
}
closeReq := lnwire.NewCloseRequest(*chanPoint, closeSig)
p.queueMsg(closeReq, nil)
p.queueMsg(closeReq, true, nil)

return txid, nil
}
Expand Down Expand Up @@ -1108,7 +1165,7 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
peerLog.Errorf("unable to expand revocation window: %v", err)
continue
}
p.queueMsg(rev, nil)
p.queueMsg(rev, true, nil)
}

state := &commitmentState{
Expand Down Expand Up @@ -1244,7 +1301,7 @@ func (p *peer) sendInitMsg() error {
p.server.localFeatures,
)

p.queueMsg(msg, nil)
p.queueMsg(msg, true, nil)
return nil
}

Expand Down Expand Up @@ -1278,7 +1335,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
return
}

p.queueMsg(htlc, nil)
p.queueMsg(htlc, true, nil)

state.pendingBatch = append(state.pendingBatch, &pendingPayment{
htlc: htlc,
Expand Down Expand Up @@ -1308,7 +1365,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {

// Then we send the HTLC settle message to the connected peer
// so we can continue the propagation of the settle message.
p.queueMsg(htlc, nil)
p.queueMsg(htlc, true, nil)
isSettle = true

case *lnwire.UpdateFailHTLC:
Expand All @@ -1329,7 +1386,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {

// Finally, we send the HTLC message to the peer which
// initially created the HTLC.
p.queueMsg(htlc, nil)
p.queueMsg(htlc, true, nil)
isSettle = true
}

Expand Down Expand Up @@ -1478,7 +1535,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
peerLog.Errorf("unable to revoke commitment: %v", err)
return
}
p.queueMsg(nextRevocation, nil)
p.queueMsg(nextRevocation, true, nil)

// If we just initiated a state transition, and we were waiting
// for a reply from the remote peer, then we don't need to
Expand Down Expand Up @@ -1574,7 +1631,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
ID: logIndex,
PaymentPreimage: preimage,
}
p.queueMsg(settleMsg, nil)
p.queueMsg(settleMsg, true, nil)

delete(state.htlcsToSettle, htlc.Index)
settledPayments[htlc.RHash] = struct{}{}
Expand Down Expand Up @@ -1604,7 +1661,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
ID: logIndex,
Reason: []byte{byte(reason)},
}
p.queueMsg(cancelMsg, nil)
p.queueMsg(cancelMsg, true, nil)
delete(state.htlcsToCancel, htlc.Index)

cancelledHtlcs[htlc.Index] = struct{}{}
Expand Down Expand Up @@ -1698,7 +1755,7 @@ func (p *peer) updateCommitTx(state *commitmentState, reply bool) error {
ChannelPoint: *state.chanPoint,
CommitSig: parsedSig,
}
p.queueMsg(commitSig, nil)
p.queueMsg(commitSig, true, nil)

// As we've just cleared out a batch, move all pending updates to the
// map of cleared HTLCs, clearing out the set of pending updates.
Expand Down
19 changes: 11 additions & 8 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
go s.connMgr.Connect(connReq)
}

srvrLog.Infof("Identity key: %x",
s.identityPriv.PubKey().SerializeCompressed())

return s, nil
}

Expand Down Expand Up @@ -445,7 +448,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound

// Now that we've established a connection, create a peer, and
// it to the set of currently active peers.
p, err := newPeer(conn, connReq, s, peerAddr, inbound)
p, err := newPeer(conn, connReq, s, peerAddr, inbound, s.chanDB)
if err != nil {
srvrLog.Errorf("unable to create peer %v", err)
if p.connReq != nil {
Expand Down Expand Up @@ -510,17 +513,17 @@ func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
s.peersMtx.Lock()
defer s.peersMtx.Unlock()

srvrLog.Tracef("Established connection to: %v", conn.RemoteAddr())

nodePub := conn.(*brontide.Conn).RemotePub()
srvrLog.Tracef("Established connection to: %x@%v",
nodePub.SerializeCompressed(), conn.RemoteAddr())

// If we already have an inbound connection from this peer, simply drop
// the connection.
pubStr := string(nodePub.SerializeCompressed())
if _, ok := s.peersByPub[pubStr]; ok {
srvrLog.Errorf("Established outbound connection to peer %x, but "+
"already connected, dropping conn",
nodePub.SerializeCompressed())
srvrLog.Errorf("Established outbound connection to peer"+
"(%x@%v), but already connected, dropping conn",
nodePub.SerializeCompressed(), conn.RemoteAddr())
s.connMgr.Remove(connReq.ID())
conn.Close()
return
Expand Down Expand Up @@ -664,7 +667,7 @@ out:

go func(p *peer) {
for _, msg := range bMsg.msgs {
p.queueMsg(msg, nil)
p.queueMsg(msg, true, nil)
}
}(sPeer)
}
Expand Down Expand Up @@ -699,7 +702,7 @@ out:
sMsg.errChan <- nil

for _, msg := range sMsg.msgs {
targetPeer.queueMsg(msg, nil)
targetPeer.queueMsg(msg, true, nil)
}
}()
case query := <-s.queries:
Expand Down

0 comments on commit f8b2624

Please sign in to comment.