Skip to content

Commit

Permalink
fix(relayer): ensure submission order (#2198)
Browse files Browse the repository at this point in the history
Fixes race condition in relayer that resulted in unordered submissions
sometimes.

This refactors the `Sender` to reserve nonces synchronously, but do the
actual sending asynchronously. Active buffer refactored to reserves
nonces synchronously ensuring strictly ordered submissions.

issue: omni-network/ops#533

---------

Co-authored-by: Khalil Claybon <[email protected]>
  • Loading branch information
corverroos and kc1116 authored Oct 18, 2024
1 parent c49155b commit 27b79e9
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 88 deletions.
6 changes: 3 additions & 3 deletions relayer/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func Run(ctx context.Context, cfg Config) error {
pnl := newPnlLogger(network.ID, pricer)

for _, destChain := range network.EVMChains() {
// Setup sender provider
sendProvider := func() (SendFunc, error) {
// Setup send provider
sendProvider := func() (SendAsync, error) {
sender, err := NewSender(
network.ID,
destChain,
Expand All @@ -75,7 +75,7 @@ func Run(ctx context.Context, cfg Config) error {
return nil, err
}

return sender.SendTransaction, nil
return sender.SendAsync, nil
}

// Setup validator set awaiter
Expand Down
32 changes: 20 additions & 12 deletions relayer/app/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,34 @@ import (
"golang.org/x/sync/semaphore"
)

// activeBuffer links the output of cprovider/creator to the opsender.
// It has an large activeBuffer allowing many submissions to be queued up.
// It however limits the number of concurrent transactions it forwards to opsender
// to limiting our mempool size.
// activeBuffer links the output of each worker's cprovider/creators (one per chain version)
// to the destination chain async sender. Fan-in buffer.
//
// It limits the number of concurrent transactions it forwards to the async sender
// to limit the mempool size.
//
// While the mempool limit is reached, calls to AddInput block.
//
// If stops processing on any error.
type activeBuffer struct {
chainName string
buffer chan xchain.Submission
mempoolLimit int64
errChan chan error
sender SendFunc
sendAsync SendAsync
}

func newActiveBuffer(chainName string, mempoolLimit int64, sender SendFunc) *activeBuffer {
func newActiveBuffer(chainName string, mempoolLimit int64, sendAsync SendAsync) *activeBuffer {
return &activeBuffer{
chainName: chainName,
buffer: make(chan xchain.Submission),
mempoolLimit: mempoolLimit,
errChan: make(chan error, 1),
sender: sender,
sendAsync: sendAsync,
}
}

// AddInput adds a new submission to the buffer.
// AddInput adds a new submission to the buffer. It blocks while mempoolLimit is reached.
func (b *activeBuffer) AddInput(ctx context.Context, submission xchain.Submission) error {
select {
case <-ctx.Done():
Expand All @@ -45,7 +49,7 @@ func (b *activeBuffer) AddInput(ctx context.Context, submission xchain.Submissio
return nil
}

// Run processes the buffer, sending submissions to the opsender.
// Run processes the buffer, sending submissions to the async sender.
func (b *activeBuffer) Run(ctx context.Context) error {
sema := semaphore.NewWeighted(b.mempoolLimit)
for {
Expand All @@ -60,12 +64,16 @@ func (b *activeBuffer) Run(ctx context.Context) error {
}
mempoolLen.WithLabelValues(b.chainName).Inc()

// Trigger async send synchronously (for ordered nonces), but wait for response async.
response := b.sendAsync(ctx, submission)
go func() {
if err := b.sender(ctx, submission); err != nil {
b.submitErr(err)
err := <-response
if err != nil {
b.submitErr(errors.Wrap(err, "send submission"))
}
sema.Release(1)

mempoolLen.WithLabelValues(b.chainName).Dec()
sema.Release(1)
}()
}
}
Expand Down
32 changes: 22 additions & 10 deletions relayer/app/buffer_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,16 @@ func newMockSender() *mockBufSender {
}
}

func (m *mockBufSender) Send(_ context.Context, sub xchain.Submission) error {
m.sendChan <- sub
return nil
func (m *mockBufSender) Send(_ context.Context, sub xchain.Submission) <-chan error {
// Simulate async send that returns success when MineNext is called below.
resp := make(chan error, 1)

go func() {
m.sendChan <- sub
resp <- nil
}()

return resp
}

func (m *mockBufSender) Next() xchain.Submission {
Expand Down Expand Up @@ -88,10 +95,12 @@ func Test_activeBuffer_Run(t *testing.T) {
}
}()

require.Eventuallyf(t, func() bool {
return counter.Load() == memLimit+1
},
time.Second, time.Millisecond, "expected %d", memLimit+1)
require.Eventuallyf(t,
func() bool {
return counter.Load() == memLimit+1
},
time.Second, time.Millisecond, "expected %d", memLimit+1,
)

// assert again that buf is blocking
require.Equal(t, memLimit+1, counter.Load())
Expand All @@ -102,9 +111,12 @@ func Test_activeBuffer_Run(t *testing.T) {
output = append(output, sender.Next())
}

require.Eventuallyf(t, func() bool {
return counter.Load() == int64(size)
}, time.Second, time.Millisecond, "expected %d", size)
require.Eventuallyf(t,
func() bool {
return counter.Load() == int64(size)
},
time.Second, time.Millisecond, "expected %d", size,
)

// Assert equality of input and output submissions
require.Len(t, input, len(output))
Expand Down
4 changes: 2 additions & 2 deletions relayer/app/cursors_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ func (m *mockXChainClient) GetEmittedCursor(ctx context.Context, ref xchain.Emit
}

type mockSender struct {
SendTransactionFn func(ctx context.Context, submission xchain.Submission) error
SendTransactionFn func(ctx context.Context, submission xchain.Submission) <-chan error
}

func (m *mockSender) SendTransaction(ctx context.Context, submission xchain.Submission) error {
func (m *mockSender) SendTransaction(ctx context.Context, submission xchain.Submission) <-chan error {
return m.SendTransactionFn(ctx, submission)
}

Expand Down
125 changes: 72 additions & 53 deletions relayer/app/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,23 @@ import (
"github.com/ethereum/go-ethereum/params"
)

// Sender uses txmgr to send transactions a specific destination chain.
type onSubmitFunc func(context.Context, *ethtypes.Transaction, *ethtypes.Receipt, xchain.Submission)

// Sender uses txmgr to send transactions to the destination chain.
// Sender uses txmgr to send transactions to a specific destination chain.
type Sender struct {
network netconf.ID
txMgr txmgr.TxManager
gasEstimator gasEstimator
portal common.Address
abi *abi.ABI
chain netconf.Chain
gasToken tokens.Token
chainNames map[xchain.ChainVersion]string
rpcClient ethclient.Client
ethCl ethclient.Client
onSubmit onSubmitFunc
}

// NewSender creates a new sender that uses txmgr to send transactions to the destination chain.
// NewSender returns a new sender.
func NewSender(
network netconf.ID,
chain netconf.Chain,
Expand All @@ -51,12 +51,13 @@ func NewSender(
chainNames map[xchain.ChainVersion]string,
onSubmit onSubmitFunc,
) (Sender, error) {
// we want to query receipts every 1/3 of the block time
cfg, err := txmgr.NewConfig(txmgr.NewCLIConfig(
chain.ID,
chain.BlockPeriod/3,
txmgr.DefaultSenderFlagValues,
),
const receiptPollFreq = 3 // Query receipts every 1/3 of the block time
cfg, err := txmgr.NewConfig(
txmgr.NewCLIConfig(
chain.ID,
chain.BlockPeriod/receiptPollFreq,
txmgr.DefaultSenderFlagValues,
),
&privateKey,
rpcClient,
)
Expand Down Expand Up @@ -84,23 +85,33 @@ func NewSender(
network: network,
txMgr: txMgr,
gasEstimator: newGasEstimator(network),
portal: chain.PortalAddress,
abi: &parsedAbi,
chain: chain,
gasToken: meta.NativeToken,
chainNames: chainNames,
rpcClient: rpcClient,
ethCl: rpcClient,
onSubmit: onSubmit,
}, nil
}

// SendTransaction sends the submission to the destination chain.
func (s Sender) SendTransaction(ctx context.Context, sub xchain.Submission) error {
// SendAsync sends the submission to the destination chain asynchronously.
// It returns a channel that will receive an error if the submission fails or nil when it succeeds.
// Nonces are however reserved synchronously, so ordering of submissions
// is preserved.
func (s Sender) SendAsync(ctx context.Context, sub xchain.Submission) <-chan error {
// Helper function to return error "synchronously".
returnErr := func(err error) chan error {
resp := make(chan error, 1)
resp <- err

return resp
}

if s.txMgr == nil {
return errors.New("tx mgr not found", "dest_chain_id", sub.DestChainID)
return returnErr(errors.New("tx mgr not found [BUG]", "dest_chain_id", sub.DestChainID))
} else if sub.DestChainID != s.chain.ID {
return errors.New("unexpected destination chain [BUG]",
"got", sub.DestChainID, "expect", s.chain.ID)
return returnErr(errors.New("unexpected destination chain [BUG]",
"got", sub.DestChainID, "expect", s.chain.ID))
}

// Get some info for logging
Expand All @@ -127,67 +138,75 @@ func (s Sender) SendTransaction(ctx context.Context, sub xchain.Submission) erro

txData, err := xchain.EncodeXSubmit(xchain.SubmissionToBinding(sub))
if err != nil {
return err
return returnErr(err)
}

// Reserve a nonce here to ensure correctly ordered submissions.
nonce, err := s.txMgr.ReserveNextNonce(ctx)
if err != nil {
return err
return returnErr(err)
}

estimatedGas := s.gasEstimator(s.chain.ID, sub.Msgs)

candidate := txmgr.TxCandidate{
TxData: txData,
To: &s.portal,
To: &s.chain.PortalAddress,
GasLimit: estimatedGas,
Value: big.NewInt(0),
Nonce: &nonce,
}

tx, rec, err := s.txMgr.Send(ctx, candidate)
if err != nil {
return errors.Wrap(err, "failed to send tx", reqAttrs...)
}
asyncResp := make(chan error, 1) // Actual async response populated by goroutine below.
go func() {
tx, rec, err := s.txMgr.Send(ctx, candidate)
if err != nil {
asyncResp <- errors.Wrap(err, "failed to send tx", reqAttrs...)
return
}

submissionTotal.WithLabelValues(srcChain, dstChain).Inc()
msgTotal.WithLabelValues(srcChain, dstChain).Add(float64(len(sub.Msgs)))
gasEstimated.WithLabelValues(dstChain).Observe(float64(estimatedGas))

receiptAttrs := []any{
"valset_id", sub.ValidatorSetID,
"status", rec.Status,
"nonce", tx.Nonce(),
"height", rec.BlockNumber.Uint64(),
"gas_used", rec.GasUsed,
"tx_hash", rec.TxHash,
}
submissionTotal.WithLabelValues(srcChain, dstChain).Inc()
msgTotal.WithLabelValues(srcChain, dstChain).Add(float64(len(sub.Msgs)))
gasEstimated.WithLabelValues(dstChain).Observe(float64(estimatedGas))

spendTotal.WithLabelValues(dstChain, string(s.gasToken)).Add(totalSpendGwei(tx, rec))
receiptAttrs := []any{
"valset_id", sub.ValidatorSetID,
"status", rec.Status,
"nonce", tx.Nonce(),
"height", rec.BlockNumber.Uint64(),
"gas_used", rec.GasUsed,
"tx_hash", rec.TxHash,
}

if s.onSubmit != nil {
go s.onSubmit(ctx, tx, rec, sub)
}
spendTotal.WithLabelValues(dstChain, string(s.gasToken)).Add(totalSpendGwei(tx, rec))

if rec.Status == 0 {
// Try and get debug information of the reverted transaction
resp, err := s.rpcClient.CallContract(ctx, callFromTx(s.txMgr.From(), tx), rec.BlockNumber)
if s.onSubmit != nil {
go s.onSubmit(ctx, tx, rec, sub)
}

errAttrs := slices.Concat(receiptAttrs, reqAttrs, []any{
"call_resp", hexutil.Encode(resp),
"call_err", err,
"gas_limit", tx.Gas(),
})
const statusReverted = 0
if rec.Status == statusReverted {
// Try and get debug information of the reverted transaction
resp, err := s.ethCl.CallContract(ctx, callFromTx(s.txMgr.From(), tx), rec.BlockNumber)

revertedSubmissionTotal.WithLabelValues(srcChain, dstChain).Inc()
errAttrs := slices.Concat(receiptAttrs, reqAttrs, []any{
"call_resp", hexutil.Encode(resp),
"call_err", err,
"gas_limit", tx.Gas(),
})

return errors.New("submission reverted", errAttrs...)
}
revertedSubmissionTotal.WithLabelValues(srcChain, dstChain).Inc()

asyncResp <- errors.New("submission reverted", errAttrs...)

return
}

log.Info(ctx, "Sent submission", receiptAttrs...)
log.Info(ctx, "Sent submission", receiptAttrs...)
asyncResp <- nil
}()

return nil
return asyncResp
}

func callFromTx(from common.Address, tx *ethtypes.Transaction) ethereum.CallMsg {
Expand Down
Loading

0 comments on commit 27b79e9

Please sign in to comment.