Skip to content

Commit

Permalink
Merge pull request #634 from GeorgeTsagk/sweep-batcher
Browse files Browse the repository at this point in the history
Loop Out Sweep Batcher
  • Loading branch information
GeorgeTsagk authored Jan 23, 2024
2 parents e9d374a + 6f75a11 commit df2db80
Show file tree
Hide file tree
Showing 54 changed files with 6,013 additions and 1,812 deletions.
48 changes: 30 additions & 18 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/swap"
"github.com/lightninglabs/loop/sweep"
"github.com/lightninglabs/loop/sweepbatcher"
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/routing/route"
"google.golang.org/grpc"
Expand Down Expand Up @@ -60,7 +62,7 @@ var (
// probeTimeout is the maximum time until a probe is allowed to take.
probeTimeout = 3 * time.Minute

republishDelay = 10 * time.Second
repushDelay = 1 * time.Second

// MinerFeeEstimationFailed is a magic number that is returned in a
// quote call as the miner fee if the fee estimation in lnd's wallet
Expand Down Expand Up @@ -133,7 +135,8 @@ type ClientConfig struct {

// NewClient returns a new instance to initiate swaps with.
func NewClient(dbDir string, loopDB loopdb.SwapStore,
cfg *ClientConfig) (*Client, func(), error) {
sweeperDb sweepbatcher.BatcherStore, cfg *ClientConfig) (
*Client, func(), error) {

lsatStore, err := lsat.NewFileStore(dbDir)
if err != nil {
Expand Down Expand Up @@ -161,27 +164,36 @@ func NewClient(dbDir string, loopDB loopdb.SwapStore,
Lnd: cfg.Lnd,
}

verifySchnorrSig := func(pubKey *btcec.PublicKey, hash, sig []byte) error {
schnorrSig, err := schnorr.ParseSignature(sig)
if err != nil {
return err
}

if !schnorrSig.Verify(hash, pubKey) {
return fmt.Errorf("invalid signature")
}

return nil
}

batcher := sweepbatcher.NewBatcher(
cfg.Lnd.WalletKit, cfg.Lnd.ChainNotifier, cfg.Lnd.Signer,
swapServerClient.MultiMuSig2SignSweep, verifySchnorrSig,
cfg.Lnd.ChainParams, sweeperDb, loopDB,
)

executor := newExecutor(&executorConfig{
lnd: cfg.Lnd,
store: loopDB,
sweeper: sweeper,
batcher: batcher,
createExpiryTimer: config.CreateExpiryTimer,
loopOutMaxParts: cfg.LoopOutMaxParts,
totalPaymentTimeout: cfg.TotalPaymentTimeout,
maxPaymentRetries: cfg.MaxPaymentRetries,
cancelSwap: swapServerClient.CancelLoopOutSwap,
verifySchnorrSig: func(pubKey *btcec.PublicKey, hash, sig []byte) error {
schnorrSig, err := schnorr.ParseSignature(sig)
if err != nil {
return err
}

if !schnorrSig.Verify(hash, pubKey) {
return fmt.Errorf("invalid signature")
}

return nil
},
verifySchnorrSig: verifySchnorrSig,
})

client := &Client{
Expand Down Expand Up @@ -232,7 +244,7 @@ func (s *Client) FetchSwaps(ctx context.Context) ([]*SwapInfo, error) {
LastUpdate: swp.LastUpdateTime(),
}

htlc, err := GetHtlc(
htlc, err := utils.GetHtlc(
swp.Hash, &swp.Contract.SwapContract,
s.lndServices.ChainParams,
)
Expand Down Expand Up @@ -265,7 +277,7 @@ func (s *Client) FetchSwaps(ctx context.Context) ([]*SwapInfo, error) {
LastUpdate: swp.LastUpdateTime(),
}

htlc, err := GetHtlc(
htlc, err := utils.GetHtlc(
swp.Hash, &swp.Contract.SwapContract,
s.lndServices.ChainParams,
)
Expand Down Expand Up @@ -540,7 +552,7 @@ func (s *Client) getLoopOutSweepFee(ctx context.Context, confTarget int32) (
return 0, err
}

scriptVersion := GetHtlcScriptVersion(
scriptVersion := utils.GetHtlcScriptVersion(
loopdb.CurrentProtocolVersion(),
)

Expand Down Expand Up @@ -731,7 +743,7 @@ func (s *Client) estimateFee(ctx context.Context, amt btcutil.Amount,
// Generate a dummy address for fee estimation.
witnessProg := [32]byte{}

scriptVersion := GetHtlcScriptVersion(
scriptVersion := utils.GetHtlcScriptVersion(
loopdb.CurrentProtocolVersion(),
)

Expand Down
35 changes: 21 additions & 14 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/swap"
"github.com/lightninglabs/loop/test"
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -146,8 +147,6 @@ func TestLoopOutFailWrongAmount(t *testing.T) {
// TestLoopOutResume tests that swaps in various states are properly resumed
// after a restart.
func TestLoopOutResume(t *testing.T) {
defer test.Guard(t)()

defaultConfs := loopdb.DefaultLoopOutHtlcConfirmations

storedVersion := []loopdb.ProtocolVersion{
Expand Down Expand Up @@ -279,7 +278,7 @@ func testLoopOutResume(t *testing.T, confs uint32, expired, preimageRevealed,
preimageRevealed, int32(confs),
)

htlc, err := GetHtlc(
htlc, err := utils.GetHtlc(
hash, &pendingSwap.Contract.SwapContract,
&chaincfg.TestNet3Params,
)
Expand All @@ -304,7 +303,7 @@ func testLoopOutResume(t *testing.T, confs uint32, expired, preimageRevealed,
func(r error) {},
func(r error) {},
preimageRevealed,
confIntent, GetHtlcScriptVersion(protocolVersion),
confIntent, utils.GetHtlcScriptVersion(protocolVersion),
)
}

Expand All @@ -317,15 +316,28 @@ func testLoopOutSuccess(ctx *testContext, amt btcutil.Amount, hash lntypes.Hash,

signalPrepaymentResult(nil)

ctx.AssertRegisterSpendNtfn(confIntent.PkScript)

// Assert that a call to track payment was sent, and respond with status
// in flight so that our swap will push its preimage to the server.
ctx.trackPayment(lnrpc.Payment_IN_FLIGHT)

// We need to notify the height, as the loopout is going to attempt a
// sweep when a new block is received.
err := ctx.Lnd.NotifyHeight(ctx.Lnd.Height + 1)
require.NoError(ctx.Context.T, err)

// Publish tick.
ctx.expiryChan <- testTime

// One spend notifier is registered by batch to watch primary sweep.
ctx.AssertRegisterSpendNtfn(confIntent.PkScript)

ctx.AssertEpochListeners(2)

// Mock the blockheight again as that's when the batch will broadcast
// the tx.
err = ctx.Lnd.NotifyHeight(ctx.Lnd.Height + 1)
require.NoError(ctx.Context.T, err)

// Expect a signing request in the non taproot case.
if scriptVersion != swap.HtlcV3 {
<-ctx.Context.Lnd.SignOutputRawChannel
Expand All @@ -340,14 +352,7 @@ func testLoopOutSuccess(ctx *testContext, amt btcutil.Amount, hash lntypes.Hash,
// preimage before sweeping in order for the server to trust us with
// our MuSig2 signing attempts.
if scriptVersion == swap.HtlcV3 {
ctx.assertPreimagePush(ctx.store.loopOutSwaps[hash].Preimage)

// Try MuSig2 signing first and fail it so that we go for a
// normal sweep.
for i := 0; i < maxMusigSweepRetries; i++ {
ctx.expiryChan <- testTime
ctx.assertPreimagePush(ctx.store.loopOutSwaps[hash].Preimage)
}
ctx.assertPreimagePush(ctx.store.LoopOutSwaps[hash].Preimage)
<-ctx.Context.Lnd.SignOutputRawChannel
}

Expand Down Expand Up @@ -388,6 +393,8 @@ func testLoopOutSuccess(ctx *testContext, amt btcutil.Amount, hash lntypes.Hash,

ctx.NotifySpend(sweepTx, 0)

ctx.AssertRegisterConf(true, 3)

ctx.assertStatus(loopdb.StateSuccess)

ctx.assertStoreFinished(loopdb.StateSuccess)
Expand Down
1 change: 1 addition & 0 deletions cmd/loop/loopout.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func loopOut(ctx *cli.Context) error {
resp, err := client.LoopOut(context.Background(), &looprpc.LoopOutRequest{
Amt: int64(amt),
Dest: destAddr,
IsExternalAddr: destAddr != "",
Account: account,
AccountAddrType: accountAddrType,
MaxMinerFee: int64(limits.maxMinerFee),
Expand Down
23 changes: 23 additions & 0 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/sweep"
"github.com/lightninglabs/loop/sweepbatcher"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/queue"
)
Expand All @@ -23,6 +24,8 @@ type executorConfig struct {

sweeper *sweep.Sweeper

batcher *sweepbatcher.Batcher

store loopdb.SwapStore

createExpiryTimer func(expiry time.Duration) <-chan time.Time
Expand Down Expand Up @@ -71,6 +74,7 @@ func (s *executor) run(mainCtx context.Context,
err error
blockEpochChan <-chan int32
blockErrorChan <-chan error
batcherErrChan chan error
)

for {
Expand Down Expand Up @@ -121,6 +125,21 @@ func (s *executor) run(mainCtx context.Context,
return mainCtx.Err()
}

batcherErrChan = make(chan error, 1)

s.wg.Add(1)
go func() {
defer s.wg.Done()

err := s.batcher.Run(mainCtx)
if err != nil {
select {
case batcherErrChan <- err:
case <-mainCtx.Done():
}
}
}()

// Start main event loop.
log.Infof("Starting event loop at height %v", height)

Expand Down Expand Up @@ -156,6 +175,7 @@ func (s *executor) run(mainCtx context.Context,
err := newSwap.execute(mainCtx, &executeConfig{
statusChan: statusChan,
sweeper: s.sweeper,
batcher: s.batcher,
blockEpochChan: queue.ChanOut(),
timerFactory: s.executorConfig.createExpiryTimer,
loopOutMaxParts: s.executorConfig.loopOutMaxParts,
Expand Down Expand Up @@ -211,6 +231,9 @@ func (s *executor) run(mainCtx context.Context,
case err := <-blockErrorChan:
return fmt.Errorf("block error: %v", err)

case err := <-batcherErrChan:
return fmt.Errorf("batcher error: %v", err)

case <-mainCtx.Done():
return mainCtx.Err()
}
Expand Down
5 changes: 5 additions & 0 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ type OutRequest struct {
// Destination address for the swap.
DestAddr btcutil.Address

// IsExternalAddr indicates whether the provided destination address
// does not belong to the underlying wallet. This helps indicate
// whether the sweep of this swap can be batched or not.
IsExternalAddr bool

// MaxSwapRoutingFee is the maximum off-chain fee in msat that may be
// paid for payment to the server. This limit is applied during path
// finding. Typically this value is taken from the response of the
Expand Down
7 changes: 7 additions & 0 deletions labels/lnd_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const (
// loopInTimeout is the label used for loop in swaps to sweep an HTLC
// that has timed out.
loopInSweepTimeout = "InSweepTimeout"

loopOutBatchSweepSuccess = "BatchOutSweepSuccess -- %d"
)

// LoopOutSweepSuccess returns the label used for loop out swaps to sweep the
Expand All @@ -25,6 +27,11 @@ func LoopOutSweepSuccess(swapHash string) string {
return fmt.Sprintf(loopdLabelPattern, loopOutSweepSuccess, swapHash)
}

// LoopOutBatchSweepSuccess returns the label used for loop out sweep batcher.
func LoopOutBatchSweepSuccess(batchID int32) string {
return fmt.Sprintf(loopOutBatchSweepSuccess, batchID)
}

// LoopInHtlcLabel returns the label used for loop in swaps to publish an HTLC.
func LoopInHtlcLabel(swapHash string) string {
return fmt.Sprintf(loopdLabelPattern, loopInHtlc, swapHash)
Expand Down
2 changes: 2 additions & 0 deletions liquidity/autoloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func TestAutoloopAddress(t *testing.T) {
Amount: amt,
// Define the expected destination address.
DestAddr: addr,
IsExternalAddr: true,
MaxSwapRoutingFee: maxRouteFee,
MaxPrepayRoutingFee: ppmToSat(
quote1.PrepayAmount, prepayFeePPM,
Expand All @@ -439,6 +440,7 @@ func TestAutoloopAddress(t *testing.T) {
Amount: amt,
// Define the expected destination address.
DestAddr: addr,
IsExternalAddr: true,
MaxSwapRoutingFee: maxRouteFee,
MaxPrepayRoutingFee: ppmToSat(
quote2.PrepayAmount, routeFeePPM,
Expand Down
7 changes: 7 additions & 0 deletions liquidity/liquidity.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,13 @@ func (m *Manager) autoloop(ctx context.Context) error {
// Create a copy of our range var so that we can reference it.
swap := swap

// Check if the parameter for custom address is defined for loop
// outs.
if m.params.DestAddr != nil {
swap.DestAddr = m.params.DestAddr
swap.IsExternalAddr = true
}

go m.dispatchStickyLoopOut(
ctx, swap, defaultAmountBackoffRetry,
defaultAmountBackoff,
Expand Down
3 changes: 3 additions & 0 deletions liquidity/loopout_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (b *loopOutBuilder) buildSwap(ctx context.Context, pubkey route.Vertex,
// already validated them.
request := loop.OutRequest{
Amount: amount,
IsExternalAddr: false,
OutgoingChanSet: chanSet,
MaxPrepayRoutingFee: prepayMaxFee,
MaxSwapRoutingFee: routeMaxFee,
Expand All @@ -160,9 +161,11 @@ func (b *loopOutBuilder) buildSwap(ctx context.Context, pubkey route.Vertex,
if len(params.Account) > 0 {
account = params.Account
addrType = params.AccountAddrType
request.IsExternalAddr = true
}
if params.DestAddr != nil {
request.DestAddr = params.DestAddr
request.IsExternalAddr = true
} else {
addr, err := b.cfg.Lnd.WalletKit.NextAddr(
ctx, account, addrType, false,
Expand Down
5 changes: 4 additions & 1 deletion loopd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/loopd/perms"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/sweepbatcher"

"github.com/lightninglabs/loop/instantout/reservation"
loop_looprpc "github.com/lightninglabs/loop/looprpc"
Expand Down Expand Up @@ -412,9 +413,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
return err
}

sweeperDb := sweepbatcher.NewSQLStore(baseDb, chainParams)

// Create an instance of the loop client library.
swapClient, clientCleanup, err := getClient(
d.cfg, swapDb, &d.lnd.LndServices,
d.cfg, swapDb, sweeperDb, &d.lnd.LndServices,
)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions loopd/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/lightninglabs/loop/instantout/reservation"
"github.com/lightninglabs/loop/liquidity"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/sweepbatcher"
"github.com/lightningnetwork/lnd"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/signal"
Expand All @@ -32,6 +33,7 @@ func SetupLoggers(root *build.RotatingLogWriter, intercept signal.Interceptor) {

lnd.SetSubLogger(root, Subsystem, log)
lnd.AddSubLogger(root, "LOOP", intercept, loop.UseLogger)
lnd.AddSubLogger(root, "SWEEP", intercept, sweepbatcher.UseLogger)
lnd.AddSubLogger(root, "LNDC", intercept, lndclient.UseLogger)
lnd.AddSubLogger(root, "STORE", intercept, loopdb.UseLogger)
lnd.AddSubLogger(root, lsat.Subsystem, intercept, lsat.UseLogger)
Expand Down
Loading

0 comments on commit df2db80

Please sign in to comment.