Skip to content

Commit

Permalink
refactor tx sending
Browse files Browse the repository at this point in the history
Refactor sending evm txs so that we store the txs before sending
and are accepting of send errors. This avoids a scenario where
we send the tx, but still receive an error, perhaps because of
a bad connection or other problem with the rpc provider.

Implement a send queue so that the caller doesn't have to wait
for the send. This should speed up ticks in core if the rpc
provider is functional but slow to respond.
  • Loading branch information
buck54321 committed Sep 18, 2024
1 parent ae2fbd0 commit 1791b16
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 143 deletions.
6 changes: 5 additions & 1 deletion client/asset/eth/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,11 @@ func (contractDeployer) nodeAndRate(
return nil, nil, nil, fmt.Errorf("error creating wallet: %w", err)
}

cl, err := newMultiRPCClient(walletDir, providers, log, chainCfg, 3, net)
creds, err := walletCredentials(chainCfg.ChainID, walletDir, net)
if err != nil {
return nil, nil, nil, fmt.Errorf("error generating wallet credentials: %w", err)
}
cl, err := newMultiRPCClient(creds, providers, log, chainCfg, 3, net)
if err != nil {
return nil, nil, nil, fmt.Errorf("error creating rpc client: %w", err)
}
Expand Down
177 changes: 132 additions & 45 deletions client/asset/eth/eth.go

Large diffs are not rendered by default.

134 changes: 91 additions & 43 deletions client/asset/eth/eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
"math/big"
"math/rand"
"os"
"sort"
"strings"
"sync"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -74,13 +76,72 @@ var (
SwapConf: 1,
}

tCreds *accountCredentials
tPriv *ecdsa.PrivateKey
signer = types.LatestSigner(params.AllEthashProtocolChanges)

// simBackend = backends.NewSimulatedBackend(core.GenesisAlloc{
// testAddressA: core.GenesisAccount{Balance: dexeth.GweiToWei(5e10)},
// }, 1e9)
)

func generateTestCredentials() (string, error) {
privB, _, err := privKeyFromSeed(encode.RandomBytes(32))
if err != nil {
return "", fmt.Errorf("error generating private key bytes: %w", err)
}

tPriv, err = crypto.ToECDSA(privB)
if err != nil {
return "", fmt.Errorf("error making private key: %w", err)
}

credsDir, err := os.MkdirTemp("", "")
if err != nil {
return "", fmt.Errorf("error making temp dir: %w", err)
}

pw := []byte("abc")

ks := keystore.NewKeyStore(credsDir, keystore.LightScryptN, keystore.LightScryptP)
if err := importKeyToKeyStore(ks, tPriv, pw); err != nil {
os.RemoveAll(credsDir)
return "", fmt.Errorf("error making temp dir: %w", err)
}

tCreds, err = credentialsFromKeyStore(ks, params.AllEthashProtocolChanges.ChainID)
if err != nil {
os.RemoveAll(credsDir)
return "", fmt.Errorf("error generating credentials from keystore: %w", err)
}

if err := importKeyToKeyStore(ks, tPriv, []byte("abc")); err != nil {
os.RemoveAll(credsDir)
return "", fmt.Errorf("error making temp dir: %w", err)
}

if err := ks.Unlock(*tCreds.acct, string(pw)); err != nil {
os.RemoveAll(credsDir)
return "", fmt.Errorf("error unlocking keystore: %w", err)
}

return credsDir, nil
}

func TestMain(m *testing.M) {
doIt := func() int {
credsDir, err := generateTestCredentials()
if err != nil {
tLogger.Critical("Error generating test credentials: %v", err)
return 1
}
defer os.RemoveAll(credsDir)
return m.Run()
}

os.Exit(doIt())
}

type tGetTxRes struct {
tx *types.Transaction
height int64
Expand Down Expand Up @@ -120,7 +181,6 @@ type testNode struct {
hdrByHash *types.Header
lastSignedTx *types.Transaction
sentTxs int
sendTxTx *types.Transaction
sendTxErr error
simBackend bind.ContractBackend
maxFeeRate *big.Int
Expand Down Expand Up @@ -244,13 +304,11 @@ func (n *testNode) signData(data []byte) (sig, pubKey []byte, err error) {

return sig, crypto.FromECDSAPub(&n.privKey.PublicKey), nil
}
func (n *testNode) sendTransaction(ctx context.Context, txOpts *bind.TransactOpts, to common.Address, data []byte, filts ...acceptabilityFilter) (*types.Transaction, error) {
n.sentTxs++
return n.sendTxTx, n.sendTxErr
}

func (n *testNode) sendSignedTransaction(ctx context.Context, tx *types.Transaction, filts ...acceptabilityFilter) error {
n.lastSignedTx = tx
return nil
n.sentTxs++
return n.sendTxErr
}

func tTx(gasFeeCap, gasTipCap, value uint64, to *common.Address, data []byte, gasLimit uint64) *types.Transaction {
Expand Down Expand Up @@ -694,6 +752,7 @@ func TestCheckPendingTxs(t *testing.T) {
pendingTx.SubmissionTime = submissionStamp
pendingTx.lastBroadcast = time.Unix(int64(submissionStamp), 0)
pendingTx.lastFeeCheck = time.Unix(int64(submissionStamp), 0)
pendingTx.initialSendComplete.Store(true)
return pendingTx
}

Expand Down Expand Up @@ -845,17 +904,16 @@ func TestTakeAction(t *testing.T) {

pendingTx := eth.extendedTx(node.newTransaction(0, aGwei), asset.Send, 1, nil)
eth.pendingTxs = []*extendedWalletTx{pendingTx}

feeCap := new(big.Int).Mul(aGwei, big.NewInt(5))
tipCap := new(big.Int).Mul(aGwei, big.NewInt(2))
replacementTx, _ := types.SignTx(types.NewTx(&types.DynamicFeeTx{
Nonce: 1,
GasTipCap: tipCap,
GasFeeCap: feeCap,
Gas: 50_000,
ChainID: node.chainConfig().ChainID,
}), signer, node.privKey)
node.sendTxTx = replacementTx
const tipHeight = 100
eth.currentTip = &types.Header{Number: big.NewInt(tipHeight)}
tipRate := new(big.Int).Mul(aGwei, big.NewInt(2))
c := &eth.currentFees
c.Lock()
c.baseRate = new(big.Int).Mul(aGwei, big.NewInt(5))
c.tipRate = tipRate
c.blockNum = tipHeight
maxFeeRate := new(big.Int).Add(c.tipRate, new(big.Int).Mul(c.baseRate, big.NewInt(2)))
c.Unlock()

tooCheapAction := []byte(fmt.Sprintf(`{"txID":"%s","bump":true}`, pendingTx.ID))
if err := eth.TakeAction(actionTypeTooCheap, tooCheapAction); err != nil {
Expand All @@ -867,11 +925,11 @@ func TestTakeAction(t *testing.T) {
t.Fatal("tx wasn't replaced")
}
tx, _ := newPendingTx.tx()
if tx.GasFeeCap().Cmp(feeCap) != 0 {
t.Fatalf("wrong fee cap. wanted %s, got %s", feeCap, tx.GasFeeCap())
if tx.GasFeeCap().Cmp(maxFeeRate) != 0 {
t.Fatalf("wrong fee cap. wanted %s, got %s", maxFeeRate, tx.GasFeeCap())
}
if tx.GasTipCap().Cmp(tipCap) != 0 {
t.Fatalf("wrong tip cap. wanted %s, got %s", tipCap, tx.GasTipCap())
if tx.GasTipCap().Cmp(tipRate) != 0 {
t.Fatalf("wrong tip cap. wanted %s, got %s", tipRate, tx.GasTipCap())
}
if !newPendingTx.savedToDB {
t.Fatal("didn't save to DB")
Expand Down Expand Up @@ -906,6 +964,9 @@ func TestTakeAction(t *testing.T) {
t.Fatalf("Tx wasn't abandoned")
}
eth.pendingTxs = []*extendedWalletTx{pendingTx}
txOpts := newTxOpts(eth.ctx, eth.addr, 1, 1, big.NewInt(1), big.NewInt(1))
txOpts.Nonce = pendingTx.Nonce
replacementTx, _ := eth.creds.signedTx(txOpts, eth.addr, nil)
node.getTxRes = replacementTx
lostNonceAction = []byte(fmt.Sprintf(`{"txID":"%s","abandon":false,"replacementID":"%s"}`, pendingTx.ID, replacementTx.Hash()))
if err := eth.TakeAction(actionTypeLostNonce, lostNonceAction); err != nil {
Expand Down Expand Up @@ -1083,12 +1144,6 @@ func TestSyncStatus(t *testing.T) {
}

func newTestNode(assetID uint32) *tMempoolNode {
privKey, _ := crypto.HexToECDSA("9447129055a25c8496fca9e5ee1b9463e47e6043ff0c288d07169e8284860e34")
addr := common.HexToAddress("2b84C791b79Ee37De042AD2ffF1A253c3ce9bc27")
acct := &accounts.Account{
Address: addr,
}

tc := &tContractor{
gasEstimates: ethGases,
swapMap: make(map[[32]byte]*dexeth.SwapState),
Expand All @@ -1109,12 +1164,12 @@ func newTestNode(assetID uint32) *tMempoolNode {

return &tMempoolNode{
testNode: &testNode{
acct: acct,
addr: acct.Address,
acct: tCreds.acct,
addr: tCreds.acct.Address,
maxFeeRate: dexeth.GweiToWei(100),
baseFee: dexeth.GweiToWei(100),
tip: dexeth.GweiToWei(2),
privKey: privKey,
privKey: tPriv,
contractor: c,
tContractor: tc,
tokenContractor: ttc,
Expand Down Expand Up @@ -1163,7 +1218,8 @@ func tassetWallet(assetID uint32) (asset.Wallet, *assetWallet, *tMempoolNode, co
baseChainID: BipID,
chainID: dexeth.ChainIDs[dex.Simnet],
tokens: dexeth.Tokens,
addr: node.addr,
creds: tCreds,
addr: tCreds.addr,
net: dex.Simnet,
node: node,
ctx: ctx,
Expand All @@ -1175,6 +1231,7 @@ func tassetWallet(assetID uint32) (asset.Wallet, *assetWallet, *tMempoolNode, co
txDB: &tTxDB{},
currentTip: &types.Header{Number: new(big.Int)},
finalizeConfs: txConfsNeededToConfirm,
dontQSends: true,
},
versionedGases: versionedGases,
maxSwapGas: versionedGases[0].Swap,
Expand Down Expand Up @@ -4492,11 +4549,7 @@ func testSend(t *testing.T, assetID uint32) {
w, eth, node, shutdown := tassetWallet(assetID)
defer shutdown()

tx := tTx(0, 0, 0, &testAddressA, nil, 21000)
txHash := tx.Hash()

node.sendTxTx = tx
node.tokenContractor.transferTx = tx
node.tokenContractor.transferTx = tTx(0, 0, 0, &testAddressA, nil, 21000)

maxFeeRate, _, _ := eth.recommendedMaxFeeRate(eth.ctx)
ethFees := dexeth.WeiToGwei(maxFeeRate) * defaultSendGasLimit
Expand Down Expand Up @@ -4550,7 +4603,7 @@ func testSend(t *testing.T, assetID uint32) {
node.tokenContractor.bal = dexeth.GweiToWei(val - test.sendAdj)
node.bal = dexeth.GweiToWei(tokenFees - test.feeAdj)
}
coin, err := w.Send(test.addr, val, 0)
_, err := w.Send(test.addr, val, 0)
if test.wantErr {
if err == nil {
t.Fatalf("expected error for test %v", test.name)
Expand All @@ -4560,9 +4613,6 @@ func testSend(t *testing.T, assetID uint32) {
if err != nil {
t.Fatalf("unexpected error for test %v: %v", test.name, err)
}
if !bytes.Equal(txHash[:], coin.ID()) {
t.Fatal("coin is not the tx hash")
}
}
}

Expand Down Expand Up @@ -4749,9 +4799,7 @@ func testEstimateVsActualSendFees(t *testing.T, assetID uint32) {
w, _, node, shutdown := tassetWallet(assetID)
defer shutdown()

tx := tTx(0, 0, 0, &testAddressA, nil, 21000)
node.sendTxTx = tx
node.tokenContractor.transferTx = tx
node.tokenContractor.transferTx = tTx(0, 0, 0, &testAddressA, nil, 21000)

const testAddr = "dd93b447f7eBCA361805eBe056259853F3912E04"

Expand Down
22 changes: 3 additions & 19 deletions client/asset/eth/multirpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,19 +408,13 @@ type multiRPCClient struct {
var _ ethFetcher = (*multiRPCClient)(nil)

func newMultiRPCClient(
dir string,
creds *accountCredentials,
endpoints []string,
log dex.Logger,
cfg *params.ChainConfig,
finalizeConfs uint64,
net dex.Network,
) (*multiRPCClient, error) {
walletDir := getWalletDir(dir, net)
creds, err := pathCredentials(filepath.Join(walletDir, "keystore"))
if err != nil {
return nil, fmt.Errorf("error parsing credentials from %q: %w", dir, err)
}

m := &multiRPCClient{
net: net,
cfg: cfg,
Expand Down Expand Up @@ -1317,18 +1311,8 @@ func (m *multiRPCClient) sendSignedTransaction(ctx context.Context, tx *types.Tr
return nil
}

func (m *multiRPCClient) sendTransaction(ctx context.Context, txOpts *bind.TransactOpts, to common.Address, data []byte, filts ...acceptabilityFilter) (*types.Transaction, error) {
tx, err := m.creds.ks.SignTx(*m.creds.acct, types.NewTx(&types.DynamicFeeTx{
To: &to,
ChainID: m.chainID,
Nonce: txOpts.Nonce.Uint64(),
Gas: txOpts.GasLimit,
GasFeeCap: txOpts.GasFeeCap,
GasTipCap: txOpts.GasTipCap,
Value: txOpts.Value,
Data: data,
}), m.chainID)

func (m *multiRPCClient) genSignAndSendTransaction(ctx context.Context, txOpts *bind.TransactOpts, to common.Address, data []byte, filts ...acceptabilityFilter) (*types.Transaction, error) {
tx, err := m.creds.signedTx(txOpts, to, data)
if err != nil {
return nil, fmt.Errorf("signing error: %v", err)
}
Expand Down
8 changes: 6 additions & 2 deletions client/asset/eth/multirpc_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ func (m *MRPCTest) rpcClient(dir string, seed []byte, endpoints []string, net de
return nil, fmt.Errorf("error creating wallet: %v", err)
}

return newMultiRPCClient(dir, endpoints, log, cfg, 3, net)
creds, err := walletCredentials(cfg.ChainID, dir, net)
if err != nil {
return nil, fmt.Errorf("error generating wallet credentials: %w", err)
}
return newMultiRPCClient(creds, endpoints, log, cfg, 3, net)
}

func (m *MRPCTest) TestHTTP(t *testing.T, port string) {
Expand Down Expand Up @@ -157,7 +161,7 @@ func (m *MRPCTest) TestSimnetMultiRPCClient(t *testing.T, wsPort, httpPort strin
if err != nil {
t.Fatal(err)
}
if _, err := cl.sendTransaction(ctx, txOpts, alphaAddr, nil); err != nil {
if _, err := cl.genSignAndSendTransaction(ctx, txOpts, alphaAddr, nil); err != nil {
t.Fatalf("error sending tx %d-%d: %v", i, j, err)
}
}
Expand Down
Loading

0 comments on commit 1791b16

Please sign in to comment.