Skip to content

Commit

Permalink
Merge pull request #1875 from joostjager/resolverstate
Browse files Browse the repository at this point in the history
rpc+contractcourt: merge the contractResolver state into the pendingchannels RPC response
  • Loading branch information
Roasbeef authored Feb 2, 2019
2 parents bceb048 + ca619bf commit 6032e47
Show file tree
Hide file tree
Showing 10 changed files with 346 additions and 136 deletions.
15 changes: 15 additions & 0 deletions contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,21 @@ func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint,
return nil
}

// GetChannelArbitrator safely returns the channel arbitrator for a given
// channel outpoint.
func (c *ChainArbitrator) GetChannelArbitrator(chanPoint wire.OutPoint) (
*ChannelArbitrator, error) {

c.Lock()
arbitrator, ok := c.activeChannels[chanPoint]
c.Unlock()
if !ok {
return nil, fmt.Errorf("unable to find arbitrator")
}

return arbitrator, nil
}

// forceCloseReq is a request sent from an outside sub-system to the arbitrator
// that watches a particular channel to broadcast the commitment transaction,
// and enter the resolution phase of the channel.
Expand Down
142 changes: 123 additions & 19 deletions contractcourt/channel_arbitrator.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package contractcourt

import (
"bytes"
"errors"
"sync"
"sync/atomic"

"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
Expand Down Expand Up @@ -132,6 +134,36 @@ type ChannelArbitratorConfig struct {
ChainArbitratorConfig
}

// ContractReport provides a summary of a commitment tx output.
type ContractReport struct {
// Outpoint is the final output that will be swept back to the wallet.
Outpoint wire.OutPoint

// Incoming indicates whether the htlc was incoming to this channel.
Incoming bool

// Amount is the final value that will be swept in back to the wallet.
Amount btcutil.Amount

// MaturityHeight is the absolute block height that this output will
// mature at.
MaturityHeight uint32

// Stage indicates whether the htlc is in the CLTV-timeout stage (1) or
// the CSV-delay stage (2). A stage 1 htlc's maturity height will be set
// to its expiry height, while a stage 2 htlc's maturity height will be
// set to its confirmation height plus the maturity requirement.
Stage uint32

// LimboBalance is the total number of frozen coins within this
// contract.
LimboBalance btcutil.Amount

// RecoveredBalance is the total value that has been successfully swept
// back to the user's wallet.
RecoveredBalance btcutil.Amount
}

// htlcSet represents the set of active HTLCs on a given commitment
// transaction.
type htlcSet struct {
Expand Down Expand Up @@ -202,6 +234,10 @@ type ChannelArbitrator struct {
// be able to signal them for shutdown in the case that we shutdown.
activeResolvers []ContractResolver

// activeResolversLock prevents simultaneous read and write to the
// resolvers slice.
activeResolversLock sync.RWMutex

// resolutionSignal is a channel that will be sent upon by contract
// resolvers once their contract has been fully resolved. With each
// send, we'll check to see if the contract is fully resolved.
Expand Down Expand Up @@ -461,6 +497,33 @@ func supplementTimeoutResolver(r *htlcTimeoutResolver,
return nil
}

// Report returns htlc reports for the active resolvers.
func (c *ChannelArbitrator) Report() []*ContractReport {
c.activeResolversLock.RLock()
defer c.activeResolversLock.RUnlock()

var reports []*ContractReport
for _, resolver := range c.activeResolvers {
r, ok := resolver.(reportingContractResolver)
if !ok {
continue
}

if r.IsResolved() {
continue
}

report := r.report()
if report == nil {
continue
}

reports = append(reports, report)
}

return reports
}

// Stop signals the ChannelArbitrator for a graceful shutdown.
func (c *ChannelArbitrator) Stop() error {
if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
Expand All @@ -473,9 +536,11 @@ func (c *ChannelArbitrator) Stop() error {
go c.cfg.ChainEvents.Cancel()
}

c.activeResolversLock.RLock()
for _, activeResolver := range c.activeResolvers {
activeResolver.Stop()
}
c.activeResolversLock.RUnlock()

close(c.quit)
c.wg.Wait()
Expand Down Expand Up @@ -816,7 +881,19 @@ func (c *ChannelArbitrator) stateStep(triggerHeight uint32,
log.Infof("ChannelArbitrator(%v): still awaiting contract "+
"resolution", c.cfg.ChanPoint)

nextState = StateWaitingFullResolution
numUnresolved, err := c.log.FetchUnresolvedContracts()
if err != nil {
return StateError, closeTx, err
}

// If we still have unresolved contracts, then we'll stay alive
// to oversee their resolution.
if len(numUnresolved) != 0 {
nextState = StateWaitingFullResolution
break
}

nextState = StateFullyResolved

// If we start as fully resolved, then we'll end as fully resolved.
case StateFullyResolved:
Expand All @@ -842,6 +919,9 @@ func (c *ChannelArbitrator) stateStep(triggerHeight uint32,

// launchResolvers updates the activeResolvers list and starts the resolvers.
func (c *ChannelArbitrator) launchResolvers(resolvers []ContractResolver) {
c.activeResolversLock.Lock()
defer c.activeResolversLock.Unlock()

c.activeResolvers = resolvers
for _, contract := range resolvers {
c.wg.Add(1)
Expand Down Expand Up @@ -1361,6 +1441,25 @@ func (c *ChannelArbitrator) prepContractResolutions(htlcActions ChainActionMap,
return htlcResolvers, msgsToSend, nil
}

// replaceResolver replaces a in the list of active resolvers. If the resolver
// to be replaced is not found, it returns an error.
func (c *ChannelArbitrator) replaceResolver(oldResolver,
newResolver ContractResolver) error {

c.activeResolversLock.Lock()
defer c.activeResolversLock.Unlock()

oldKey := oldResolver.ResolverKey()
for i, r := range c.activeResolvers {
if bytes.Equal(r.ResolverKey(), oldKey) {
c.activeResolvers[i] = newResolver
return nil
}
}

return errors.New("resolver to be replaced not found")
}

// resolveContract is a goroutine tasked with fully resolving an unresolved
// contract. Either the initial contract will be resolved after a single step,
// or the contract will itself create another contract to be resolved. In
Expand Down Expand Up @@ -1410,6 +1509,7 @@ func (c *ChannelArbitrator) resolveContract(currentContract ContractResolver) {
c.cfg.ChanPoint, currentContract,
nextContract)

// Swap contract in log.
err := c.log.SwapContract(
currentContract, nextContract,
)
Expand All @@ -1418,6 +1518,17 @@ func (c *ChannelArbitrator) resolveContract(currentContract ContractResolver) {
"contract: %v", err)
}

// Swap contract in resolvers list. This is to
// make sure that reports are queried from the
// new resolver.
err = c.replaceResolver(
currentContract, nextContract,
)
if err != nil {
log.Errorf("unable to replace "+
"contract: %v", err)
}

// As this contract produced another, we'll
// re-assign, so we can continue our resolution
// loop.
Expand Down Expand Up @@ -1722,29 +1833,22 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
log.Infof("ChannelArbitrator(%v): a contract has been "+
"fully resolved!", c.cfg.ChanPoint)

numUnresolved, err := c.log.FetchUnresolvedContracts()
nextState, _, err := c.advanceState(
uint32(bestHeight), chainTrigger,
)
if err != nil {
log.Errorf("unable to query resolved "+
"contracts: %v", err)
}

// If we still have unresolved contracts, then we'll
// stay alive to oversee their resolution.
if len(numUnresolved) != 0 {
continue
log.Errorf("unable to advance state: %v", err)
}

log.Infof("ChannelArbitrator(%v): all contracts fully "+
"resolved, exiting", c.cfg.ChanPoint)
// If we don't have anything further to do after
// advancing our state, then we'll exit.
if nextState == StateFullyResolved {
log.Infof("ChannelArbitrator(%v): all "+
"contracts fully resolved, exiting",
c.cfg.ChanPoint)

// Otherwise, our job is finished here, the contract is
// now fully resolved! We'll mark it as such, then exit
// ourselves.
if err := c.cfg.MarkChannelResolved(); err != nil {
log.Errorf("unable to mark contract "+
"resolved: %v", err)
return
}
return

// We've just received a request to forcibly close out the
// channel. We'll
Expand Down
5 changes: 1 addition & 4 deletions contractcourt/channel_arbitrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,7 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) {
notifier.spendChan <- &chainntnfs.SpendDetail{}

// At this point channel should be marked as resolved.

// It should transition StateWaitingFullResolution ->
// StateFullyResolved, but this isn't happening.
// assertStateTransitions(t, arbLog.newStates, StateFullyResolved)
assertStateTransitions(t, arbLog.newStates, StateFullyResolved)

select {
case <-resolved:
Expand Down
8 changes: 8 additions & 0 deletions contractcourt/contract_resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ type ContractResolver interface {
Stop()
}

// reportingContractResolver is a ContractResolver that also exposes a report on
// the resolution state of the contract.
type reportingContractResolver interface {
ContractResolver

report() *ContractReport
}

// ResolverKit is meant to be used as a mix-in struct to be embedded within a
// given ContractResolver implementation. It contains all the items that a
// resolver requires to carry out its duties.
Expand Down
23 changes: 23 additions & 0 deletions contractcourt/htlc_incoming_contest_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"encoding/binary"
"fmt"
"io"

"github.com/btcsuite/btcutil"
)

// htlcIncomingContestResolver is a ContractResolver that's able to resolve an
Expand Down Expand Up @@ -166,6 +168,27 @@ func (h *htlcIncomingContestResolver) Resolve() (ContractResolver, error) {
}
}

// report returns a report on the resolution state of the contract.
func (h *htlcIncomingContestResolver) report() *ContractReport {
// No locking needed as these values are read-only.

finalAmt := h.htlcAmt.ToSatoshis()
if h.htlcResolution.SignedSuccessTx != nil {
finalAmt = btcutil.Amount(
h.htlcResolution.SignedSuccessTx.TxOut[0].Value,
)
}

return &ContractReport{
Outpoint: h.htlcResolution.ClaimOutpoint,
Incoming: true,
Amount: finalAmt,
MaturityHeight: h.htlcExpiry,
LimboBalance: finalAmt,
Stage: 1,
}
}

// Stop signals the resolver to cancel any current resolution processes, and
// suspend.
//
Expand Down
25 changes: 25 additions & 0 deletions contractcourt/htlc_outgoing_contest_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"

"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
)
Expand Down Expand Up @@ -108,6 +109,9 @@ func (h *htlcOutgoingContestResolver) Resolve() (ContractResolver, error) {
scriptToWatch []byte
err error
)

// TODO(joostjager): output already set properly in
// lnwallet.newOutgoingHtlcResolution? And script too?
if h.htlcResolution.SignedTimeoutTx == nil {
outPointToWatch = h.htlcResolution.ClaimOutpoint
scriptToWatch = h.htlcResolution.SweepSignDesc.Output.PkScript
Expand Down Expand Up @@ -235,6 +239,27 @@ func (h *htlcOutgoingContestResolver) Resolve() (ContractResolver, error) {
}
}

// report returns a report on the resolution state of the contract.
func (h *htlcOutgoingContestResolver) report() *ContractReport {
// No locking needed as these values are read-only.

finalAmt := h.htlcAmt.ToSatoshis()
if h.htlcResolution.SignedTimeoutTx != nil {
finalAmt = btcutil.Amount(
h.htlcResolution.SignedTimeoutTx.TxOut[0].Value,
)
}

return &ContractReport{
Outpoint: h.htlcResolution.ClaimOutpoint,
Incoming: false,
Amount: finalAmt,
MaturityHeight: h.htlcResolution.Expiry,
LimboBalance: finalAmt,
Stage: 1,
}
}

// Stop signals the resolver to cancel any current resolution processes, and
// suspend.
//
Expand Down
Loading

0 comments on commit 6032e47

Please sign in to comment.