diff --git a/peer.go b/peer.go index f8cde6402d..88c5e2bb86 100644 --- a/peer.go +++ b/peer.go @@ -46,6 +46,7 @@ const ( // buffered channel acts as a semaphore to be used for synchronization purposes. type outgoinMsg struct { msg lnwire.Message + persist bool sentChan chan struct{} // MUST be buffered. } @@ -161,6 +162,14 @@ type peer struct { // on both sides. globalSharedFeatures *lnwire.SharedFeatures + // retransmitter is an retransmission subsystem aka message store, which + // stores outgoing messages that were not acked. Messages queue'd + // on-disk and in the situation when the server is unable to send the + // message to the peer due to it being offline this service will take of + // retransmitting the messages that were not acked to the remote upon + // reconnection. + retransmitter *retransmitter + queueQuit chan struct{} quit chan struct{} wg sync.WaitGroup @@ -169,7 +178,7 @@ type peer struct { // newPeer creates a new peer from an establish connection object, and a // pointer to the main server. func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, - addr *lnwire.NetAddress, inbound bool) (*peer, error) { + addr *lnwire.NetAddress, inbound bool, db *channeldb.DB) (*peer, error) { nodePub := addr.IdentityKey @@ -205,6 +214,15 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, quit: make(chan struct{}), } + storeID := p.addr.IdentityKey.SerializeCompressed() + rt, err := newRetransmitter(channeldb.NewMessageStore(storeID, db)) + if err != nil { + peerLog.Errorf("unable to initialise retransmitter "+ + "for peerID(%v): %v", p.id, err) + return nil, err + } + p.retransmitter = rt + // Initiate the pending channel identifier properly depending on if this // node is inbound or outbound. This value will be used in an increasing // manner to track pending channels. @@ -285,7 +303,7 @@ func (p *peer) Start() error { return nil } - peerLog.Tracef("peer %v starting", p) + peerLog.Tracef("peer(%v) starting", p) p.wg.Add(2) go p.queueHandler() @@ -313,6 +331,21 @@ func (p *peer) Start() error { "must be init message") } + // If we had the interaction with this peer before than we should + // retrieve the messages that were not acked in previous session and + // sent them again in order to be sure that remote peer is handled them. + messages := p.retransmitter.MessagesToRetransmit() + if len(messages) != 0 { + peerLog.Infof("retransmission subsystem resends %v messages "+ + "to the peer(%v)", len(messages), p) + + for _, message := range messages { + // Sending over sendToPeer will cause block because of + // the usage of peer mutex. + p.queueMsg(message, false, nil) + } + } + p.wg.Add(3) go p.readHandler() go p.channelManager() @@ -350,7 +383,7 @@ func (p *peer) Disconnect() { return } - peerLog.Tracef("Disconnecting %s", p) + peerLog.Tracef("Disconnecting peer(%v)", p) // Ensure that the TCP connection is properly closed before continuing. p.conn.Close() @@ -376,7 +409,10 @@ func (p *peer) Disconnect() { // String returns the string representation of this peer. func (p *peer) String() string { - return p.conn.RemoteAddr().String() + return fmt.Sprintf("%x@%v", + p.addr.IdentityKey.SerializeCompressed(), + p.addr.Address) + } // readNextMessage reads, and returns the next message on the wire along with @@ -405,8 +441,8 @@ out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, _, err := p.readNextMessage() if err != nil { - peerLog.Infof("unable to read message from %v: %v", - p, err) + peerLog.Errorf("unable to read message from "+ + "peer(%v): %v", p, err) switch err.(type) { // If this is just a message we don't yet recognize, @@ -424,6 +460,12 @@ out: } } + if err := p.retransmitter.Ack(nextMsg); err != nil { + peerLog.Errorf("unable to ack messages for peer(%v):"+ + " %v", p, err) + break out + } + var ( isChanUpdate bool targetChan wire.OutPoint @@ -440,7 +482,7 @@ out: atomic.StoreInt64(&p.pingTime, delay) case *lnwire.Ping: - p.queueMsg(lnwire.NewPong(msg.Nonce), nil) + p.queueMsg(lnwire.NewPong(msg.Nonce), true, nil) case *lnwire.SingleFundingRequest: p.server.fundingMgr.processFundingRequest(msg, p.addr) @@ -492,7 +534,7 @@ out: p.htlcManMtx.Unlock() if !ok { peerLog.Errorf("recv'd update for unknown "+ - "channel %v from %v", targetChan, p) + "channel %v from peer(%v)", targetChan, p) continue } channel <- nextMsg @@ -502,7 +544,7 @@ out: p.Disconnect() p.wg.Done() - peerLog.Tracef("readHandler for peer %v done", p) + peerLog.Tracef("readHandler for peer(%v) done", p) } // logWireMessage logs the receipt or sending of particular wire message. This @@ -586,6 +628,16 @@ func (p *peer) writeHandler() { atomic.StoreInt64(&p.pingLastSend, now) } + if outMsg.persist { + err := p.retransmitter.Register(outMsg.msg) + if err != nil { + peerLog.Errorf("unable to register "+ + "message in retransmitter for "+ + "peer(%v): %v", p, err) + p.Disconnect() + return + } + } // Write out the message to the socket, closing the // 'sentChan' if it's non-nil, The 'sentChan' allows // callers to optionally synchronize sends with the @@ -679,7 +731,7 @@ out: // Convert the bytes read into a uint64, and queue the // message for sending. nonce := binary.BigEndian.Uint64(pingBuf[:]) - p.queueMsg(lnwire.NewPing(nonce), nil) + p.queueMsg(lnwire.NewPing(nonce), true, nil) case <-p.quit: break out } @@ -695,9 +747,14 @@ func (p *peer) PingTime() int64 { // queueMsg queues a new lnwire.Message to be eventually sent out on the // wire. -func (p *peer) queueMsg(msg lnwire.Message, doneChan chan struct{}) { +func (p *peer) queueMsg(msg lnwire.Message, persist bool, + doneChan chan struct{}) { select { - case p.outgoingQueue <- outgoinMsg{msg, doneChan}: + case p.outgoingQueue <- outgoinMsg{ + msg: msg, + sentChan: doneChan, + persist: persist, + }: case <-p.quit: return } @@ -802,7 +859,7 @@ func (p *peer) executeCooperativeClose(channel *lnwallet.LightningChannel) (*cha return nil, err } closeReq := lnwire.NewCloseRequest(*chanPoint, closeSig) - p.queueMsg(closeReq, nil) + p.queueMsg(closeReq, true, nil) return txid, nil } @@ -1108,7 +1165,7 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel, peerLog.Errorf("unable to expand revocation window: %v", err) continue } - p.queueMsg(rev, nil) + p.queueMsg(rev, true, nil) } state := &commitmentState{ @@ -1244,7 +1301,7 @@ func (p *peer) sendInitMsg() error { p.server.localFeatures, ) - p.queueMsg(msg, nil) + p.queueMsg(msg, true, nil) return nil } @@ -1278,7 +1335,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { return } - p.queueMsg(htlc, nil) + p.queueMsg(htlc, true, nil) state.pendingBatch = append(state.pendingBatch, &pendingPayment{ htlc: htlc, @@ -1308,7 +1365,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { // Then we send the HTLC settle message to the connected peer // so we can continue the propagation of the settle message. - p.queueMsg(htlc, nil) + p.queueMsg(htlc, true, nil) isSettle = true case *lnwire.UpdateFailHTLC: @@ -1329,7 +1386,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { // Finally, we send the HTLC message to the peer which // initially created the HTLC. - p.queueMsg(htlc, nil) + p.queueMsg(htlc, true, nil) isSettle = true } @@ -1478,7 +1535,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { peerLog.Errorf("unable to revoke commitment: %v", err) return } - p.queueMsg(nextRevocation, nil) + p.queueMsg(nextRevocation, true, nil) // If we just initiated a state transition, and we were waiting // for a reply from the remote peer, then we don't need to @@ -1574,7 +1631,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { ID: logIndex, PaymentPreimage: preimage, } - p.queueMsg(settleMsg, nil) + p.queueMsg(settleMsg, true, nil) delete(state.htlcsToSettle, htlc.Index) settledPayments[htlc.RHash] = struct{}{} @@ -1604,7 +1661,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { ID: logIndex, Reason: []byte{byte(reason)}, } - p.queueMsg(cancelMsg, nil) + p.queueMsg(cancelMsg, true, nil) delete(state.htlcsToCancel, htlc.Index) cancelledHtlcs[htlc.Index] = struct{}{} @@ -1698,7 +1755,7 @@ func (p *peer) updateCommitTx(state *commitmentState, reply bool) error { ChannelPoint: *state.chanPoint, CommitSig: parsedSig, } - p.queueMsg(commitSig, nil) + p.queueMsg(commitSig, true, nil) // As we've just cleared out a batch, move all pending updates to the // map of cleared HTLCs, clearing out the set of pending updates. diff --git a/server.go b/server.go index 8990d04637..7e459006ff 100644 --- a/server.go +++ b/server.go @@ -264,6 +264,9 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, go s.connMgr.Connect(connReq) } + srvrLog.Infof("Identity key: %x", + s.identityPriv.PubKey().SerializeCompressed()) + return s, nil } @@ -445,7 +448,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound // Now that we've established a connection, create a peer, and // it to the set of currently active peers. - p, err := newPeer(conn, connReq, s, peerAddr, inbound) + p, err := newPeer(conn, connReq, s, peerAddr, inbound, s.chanDB) if err != nil { srvrLog.Errorf("unable to create peer %v", err) if p.connReq != nil { @@ -510,17 +513,17 @@ func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) s.peersMtx.Lock() defer s.peersMtx.Unlock() - srvrLog.Tracef("Established connection to: %v", conn.RemoteAddr()) - nodePub := conn.(*brontide.Conn).RemotePub() + srvrLog.Tracef("Established connection to: %x@%v", + nodePub.SerializeCompressed(), conn.RemoteAddr()) // If we already have an inbound connection from this peer, simply drop // the connection. pubStr := string(nodePub.SerializeCompressed()) if _, ok := s.peersByPub[pubStr]; ok { - srvrLog.Errorf("Established outbound connection to peer %x, but "+ - "already connected, dropping conn", - nodePub.SerializeCompressed()) + srvrLog.Errorf("Established outbound connection to peer"+ + "(%x@%v), but already connected, dropping conn", + nodePub.SerializeCompressed(), conn.RemoteAddr()) s.connMgr.Remove(connReq.ID()) conn.Close() return @@ -664,7 +667,7 @@ out: go func(p *peer) { for _, msg := range bMsg.msgs { - p.queueMsg(msg, nil) + p.queueMsg(msg, true, nil) } }(sPeer) } @@ -699,7 +702,7 @@ out: sMsg.errChan <- nil for _, msg := range sMsg.msgs { - targetPeer.queueMsg(msg, nil) + targetPeer.queueMsg(msg, true, nil) } }() case query := <-s.queries: