Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

htlcswitch: sync local payment hand-off to link #4183

Merged
merged 10 commits into from
May 20, 2020
4 changes: 4 additions & 0 deletions htlcswitch/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type ChannelLink interface {
// possible).
HandleSwitchPacket(*htlcPacket) error

// HandleLocalAddPacket handles a locally-initiated UpdateAddHTLC
// packet. It will be processed synchronously.
HandleLocalAddPacket(*htlcPacket) error

// HandleChannelUpdate handles the htlc requests as settle/add/fail
// which sent to us from remote peer we have a channel with.
//
Expand Down
251 changes: 161 additions & 90 deletions htlcswitch/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,14 @@ type ChannelLinkConfig struct {
HtlcNotifier htlcNotifier
}

// localUpdateAddMsg contains a locally initiated htlc and a channel that will
// receive the outcome of the link processing. This channel must be buffered to
// prevent the link from blocking.
type localUpdateAddMsg struct {
pkt *htlcPacket
err chan error
}

// channelLink is the service which drives a channel's commitment update
// state-machine. In the event that an HTLC needs to be propagated to another
// link, the forward handler from config is used which sends HTLC to the
Expand Down Expand Up @@ -346,6 +354,10 @@ type channelLink struct {
// by the HTLC switch.
downstream chan *htlcPacket

// localUpdateAdd is a channel to which locally initiated HTLCs are
// sent across.
localUpdateAdd chan *localUpdateAddMsg

// htlcUpdates is a channel that we'll use to update outside
// sub-systems with the latest set of active HTLC's on our channel.
htlcUpdates chan *contractcourt.ContractUpdate
Expand Down Expand Up @@ -395,11 +407,12 @@ func NewChannelLink(cfg ChannelLinkConfig,
channel: channel,
shortChanID: channel.ShortChanID(),
// TODO(roasbeef): just do reserve here?
htlcUpdates: make(chan *contractcourt.ContractUpdate),
hodlMap: make(map[channeldb.CircuitKey]hodlHtlc),
hodlQueue: queue.NewConcurrentQueue(10),
log: build.NewPrefixLog(logPrefix, log),
quit: make(chan struct{}),
htlcUpdates: make(chan *contractcourt.ContractUpdate),
hodlMap: make(map[channeldb.CircuitKey]hodlHtlc),
hodlQueue: queue.NewConcurrentQueue(10),
log: build.NewPrefixLog(logPrefix, log),
quit: make(chan struct{}),
localUpdateAdd: make(chan *localUpdateAddMsg),
}
}

Expand Down Expand Up @@ -1097,9 +1110,7 @@ func (l *channelLink) htlcManager() {
// including all the currently pending entries. If the
// send was unsuccessful, then abandon the update,
// waiting for the revocation window to open up.
if err := l.updateCommitTx(); err != nil {
l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err)
if !l.updateCommitTxOrFail() {
return
}

Expand All @@ -1114,6 +1125,10 @@ func (l *channelLink) htlcManager() {
case pkt := <-l.downstream:
l.handleDownstreamPkt(pkt)

// A message containing a locally initiated add was received.
case msg := <-l.localUpdateAdd:
msg.err <- l.handleDownstreamUpdateAdd(msg.pkt)

// A message from the connected peer was just received. This
// indicates that we have a new incoming HTLC, either directly
// for us, or part of a multi-hop HTLC circuit.
Expand Down Expand Up @@ -1256,79 +1271,98 @@ func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
return time.Duration(prand.Int63n(upper-lower) + lower)
}

// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
// Switch. Possible messages sent by the switch include requests to forward new
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
// cleared HTLCs with the upstream peer.
//
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
var isSettle bool
switch htlc := pkt.htlc.(type) {
case *lnwire.UpdateAddHTLC:
// If hodl.AddOutgoing mode is active, we exit early to simulate
// arbitrary delays between the switch adding an ADD to the
// mailbox, and the HTLC being added to the commitment state.
if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
l.log.Warnf(hodl.AddOutgoing.Warning())
l.mailBox.AckPacket(pkt.inKey())
return
}
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
// downstream HTLC Switch.
func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error {
htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
if !ok {
return errors.New("not an UpdateAddHTLC packet")
}

// A new payment has been initiated via the downstream channel,
// so we add the new HTLC to our local log, then update the
// commitment chains.
htlc.ChanID = l.ChanID()
openCircuitRef := pkt.inKey()
index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
if err != nil {
// The HTLC was unable to be added to the state machine,
// as a result, we'll signal the switch to cancel the
// pending payment.
l.log.Warnf("Unable to handle downstream add HTLC: %v",
err)
// If hodl.AddOutgoing mode is active, we exit early to simulate
// arbitrary delays between the switch adding an ADD to the
// mailbox, and the HTLC being added to the commitment state.
if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
l.log.Warnf(hodl.AddOutgoing.Warning())
l.mailBox.AckPacket(pkt.inKey())
return nil
}

// Remove this packet from the link's mailbox, this
// prevents it from being reprocessed if the link
// restarts and resets it mailbox. If this response
// doesn't make it back to the originating link, it will
// be rejected upon attempting to reforward the Add to
// the switch, since the circuit was never fully opened,
// and the forwarding package shows it as
// unacknowledged.
l.mailBox.FailAdd(pkt)
// A new payment has been initiated via the downstream channel,
// so we add the new HTLC to our local log, then update the
// commitment chains.
htlc.ChanID = l.ChanID()
openCircuitRef := pkt.inKey()
index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
if err != nil {
// The HTLC was unable to be added to the state machine,
// as a result, we'll signal the switch to cancel the
// pending payment.
l.log.Warnf("Unable to handle downstream add HTLC: %v",
err)

// Remove this packet from the link's mailbox, this
// prevents it from being reprocessed if the link
// restarts and resets it mailbox. If this response
// doesn't make it back to the originating link, it will
// be rejected upon attempting to reforward the Add to
// the switch, since the circuit was never fully opened,
// and the forwarding package shows it as
// unacknowledged.
l.mailBox.FailAdd(pkt)

return
}
return NewDetailedLinkError(
lnwire.NewTemporaryChannelFailure(nil),
OutgoingFailureDownstreamHtlcAdd,
)
}

l.log.Tracef("received downstream htlc: payment_hash=%x, "+
"local_log_index=%v, pend_updates=%v",
htlc.PaymentHash[:], index,
l.channel.PendingLocalUpdateCount())
l.log.Tracef("received downstream htlc: payment_hash=%x, "+
"local_log_index=%v, pend_updates=%v",
htlc.PaymentHash[:], index,
l.channel.PendingLocalUpdateCount())

pkt.outgoingChanID = l.ShortChanID()
pkt.outgoingHTLCID = index
htlc.ID = index
pkt.outgoingChanID = l.ShortChanID()
pkt.outgoingHTLCID = index
htlc.ID = index

l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
pkt.inKey(), pkt.outKey())
l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
pkt.inKey(), pkt.outKey())

l.openedCircuits = append(l.openedCircuits, pkt.inKey())
l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
l.openedCircuits = append(l.openedCircuits, pkt.inKey())
l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())

l.cfg.Peer.SendMessage(false, htlc)
_ = l.cfg.Peer.SendMessage(false, htlc)

// Send a forward event notification to htlcNotifier.
l.cfg.HtlcNotifier.NotifyForwardingEvent(
newHtlcKey(pkt),
HtlcInfo{
IncomingTimeLock: pkt.incomingTimeout,
IncomingAmt: pkt.incomingAmount,
OutgoingTimeLock: htlc.Expiry,
OutgoingAmt: htlc.Amount,
},
getEventType(pkt),
)
// Send a forward event notification to htlcNotifier.
l.cfg.HtlcNotifier.NotifyForwardingEvent(
newHtlcKey(pkt),
HtlcInfo{
IncomingTimeLock: pkt.incomingTimeout,
IncomingAmt: pkt.incomingAmount,
OutgoingTimeLock: htlc.Expiry,
OutgoingAmt: htlc.Amount,
},
getEventType(pkt),
)

l.tryBatchUpdateCommitTx()

return nil
}

// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
// Switch. Possible messages sent by the switch include requests to forward new
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
// cleared HTLCs with the upstream peer.
//
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
switch htlc := pkt.htlc.(type) {
case *lnwire.UpdateAddHTLC:
// Handle add message. The returned error can be ignored,
// because it is also sent through the mailbox.
_ = l.handleDownstreamUpdateAdd(pkt)

case *lnwire.UpdateFulfillHTLC:
// If hodl.SettleOutgoing mode is active, we exit early to
Expand Down Expand Up @@ -1386,14 +1420,16 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
// Then we send the HTLC settle message to the connected peer
// so we can continue the propagation of the settle message.
l.cfg.Peer.SendMessage(false, htlc)
isSettle = true

// Send a settle event notification to htlcNotifier.
l.cfg.HtlcNotifier.NotifySettleEvent(
newHtlcKey(pkt),
getEventType(pkt),
)

// Immediately update the commitment tx to minimize latency.
l.updateCommitTxOrFail()

case *lnwire.UpdateFailHTLC:
// If hodl.FailOutgoing mode is active, we exit early to
// simulate arbitrary delays between the switch adding a FAIL to
Expand Down Expand Up @@ -1450,7 +1486,6 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
// We send the HTLC message to the peer which initially created
// the HTLC.
l.cfg.Peer.SendMessage(false, htlc)
isSettle = true

// If the packet does not have a link failure set, it failed
// further down the route so we notify a forwarding failure.
Expand All @@ -1469,19 +1504,20 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
newHtlcKey(pkt), getEventType(pkt),
)
}
}

// If this newly added update exceeds the min batch size for adds, or
// this is a settle request, then initiate an update.
if l.channel.PendingLocalUpdateCount() >= uint64(l.cfg.BatchSize) ||
isSettle {
// Immediately update the commitment tx to minimize latency.
l.updateCommitTxOrFail()
}
}

if err := l.updateCommitTx(); err != nil {
l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err)
return
}
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
// full.
func (l *channelLink) tryBatchUpdateCommitTx() {
if l.channel.PendingLocalUpdateCount() < uint64(l.cfg.BatchSize) {
return
}

l.updateCommitTxOrFail()
}

// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
Expand Down Expand Up @@ -1753,9 +1789,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// Otherwise, the remote party initiated the state transition,
// so we'll reply with a signature to provide them with their
// version of the latest commitment.
if err := l.updateCommitTx(); err != nil {
l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err)
if !l.updateCommitTxOrFail() {
return
}

Expand Down Expand Up @@ -1832,9 +1866,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// but there are still remote updates that are not in the remote
// commit tx yet, send out an update.
if l.channel.OweCommitment(true) {
if err := l.updateCommitTx(); err != nil {
l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err)
if !l.updateCommitTxOrFail() {
return
}
}
Expand Down Expand Up @@ -1918,6 +1950,18 @@ func (l *channelLink) ackDownStreamPackets() error {
return nil
}

// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
// the link.
func (l *channelLink) updateCommitTxOrFail() bool {
if err := l.updateCommitTx(); err != nil {
l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err)
return false
}

return true
}

// updateCommitTx signs, then sends an update to the remote peer adding a new
// commitment to their commitment chain which includes all the latest updates
// we've received+processed up to this point.
Expand Down Expand Up @@ -2312,6 +2356,33 @@ func (l *channelLink) HandleSwitchPacket(pkt *htlcPacket) error {
return l.mailBox.AddPacket(pkt)
}

// HandleLocalAddPacket handles a locally-initiated UpdateAddHTLC packet. It
// will be processed synchronously.
//
// NOTE: Part of the ChannelLink interface.
func (l *channelLink) HandleLocalAddPacket(pkt *htlcPacket) error {
l.log.Tracef("received switch packet outkey=%v", pkt.outKey())

// Create a buffered result channel to prevent the link from blocking.
errChan := make(chan error, 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we buffering this to account for the case where the link gets a shutdown instruction and stops listening, or to account for the time between this function sending and starting to listen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the former. If this function HandleLocalAddPacket quits, the link event loop can't deliver the message anymore.


select {
case l.localUpdateAdd <- &localUpdateAddMsg{
pkt: pkt,
err: errChan,
}:
case <-l.quit:
return ErrLinkShuttingDown
}

select {
case err := <-errChan:
return err
case <-l.quit:
return ErrLinkShuttingDown
}
}

// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
// to us from remote peer we have a channel with.
//
Expand Down
5 changes: 5 additions & 0 deletions htlcswitch/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,11 @@ func (f *mockChannelLink) HandleSwitchPacket(pkt *htlcPacket) error {
return nil
}

func (f *mockChannelLink) HandleLocalAddPacket(pkt *htlcPacket) error {
_ = f.mailBox.AddPacket(pkt)
return nil
}

func (f *mockChannelLink) HandleChannelUpdate(lnwire.Message) {
}

Expand Down
Loading