Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(relayer): ensure submission order #2198

Merged
merged 2 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions relayer/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func Run(ctx context.Context, cfg Config) error {
pnl := newPnlLogger(network.ID, pricer)

for _, destChain := range network.EVMChains() {
// Setup sender provider
sendProvider := func() (SendFunc, error) {
// Setup send provider
sendProvider := func() (SendAsync, error) {
sender, err := NewSender(
network.ID,
destChain,
Expand All @@ -75,7 +75,7 @@ func Run(ctx context.Context, cfg Config) error {
return nil, err
}

return sender.SendTransaction, nil
return sender.SendAsync, nil
}

// Setup validator set awaiter
Expand Down
32 changes: 20 additions & 12 deletions relayer/app/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,34 @@ import (
"golang.org/x/sync/semaphore"
)

// activeBuffer links the output of cprovider/creator to the opsender.
// It has an large activeBuffer allowing many submissions to be queued up.
// It however limits the number of concurrent transactions it forwards to opsender
// to limiting our mempool size.
// activeBuffer links the output of each worker's cprovider/creators (one per chain version)
// to the destination chain async sender. Fan-in buffer.
//
// It limits the number of concurrent transactions it forwards to the async sender
// to limit the mempool size.
//
// While the mempool limit is reached, calls to AddInput block.
//
// If stops processing on any error.
type activeBuffer struct {
chainName string
buffer chan xchain.Submission
mempoolLimit int64
errChan chan error
sender SendFunc
sendAsync SendAsync
}

func newActiveBuffer(chainName string, mempoolLimit int64, sender SendFunc) *activeBuffer {
func newActiveBuffer(chainName string, mempoolLimit int64, sendAsync SendAsync) *activeBuffer {
return &activeBuffer{
chainName: chainName,
buffer: make(chan xchain.Submission),
mempoolLimit: mempoolLimit,
errChan: make(chan error, 1),
sender: sender,
sendAsync: sendAsync,
}
}

// AddInput adds a new submission to the buffer.
// AddInput adds a new submission to the buffer. It blocks while mempoolLimit is reached.
func (b *activeBuffer) AddInput(ctx context.Context, submission xchain.Submission) error {
select {
case <-ctx.Done():
Expand All @@ -45,7 +49,7 @@ func (b *activeBuffer) AddInput(ctx context.Context, submission xchain.Submissio
return nil
}

// Run processes the buffer, sending submissions to the opsender.
// Run processes the buffer, sending submissions to the async sender.
func (b *activeBuffer) Run(ctx context.Context) error {
sema := semaphore.NewWeighted(b.mempoolLimit)
for {
Expand All @@ -60,12 +64,16 @@ func (b *activeBuffer) Run(ctx context.Context) error {
}
mempoolLen.WithLabelValues(b.chainName).Inc()

// Trigger async send synchronously (for ordered nonces), but wait for response async.
response := b.sendAsync(ctx, submission)
go func() {
if err := b.sender(ctx, submission); err != nil {
b.submitErr(err)
err := <-response
if err != nil {
b.submitErr(errors.Wrap(err, "send submission"))
}
sema.Release(1)

mempoolLen.WithLabelValues(b.chainName).Dec()
sema.Release(1)
}()
}
}
Expand Down
32 changes: 22 additions & 10 deletions relayer/app/buffer_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,16 @@ func newMockSender() *mockBufSender {
}
}

func (m *mockBufSender) Send(_ context.Context, sub xchain.Submission) error {
m.sendChan <- sub
return nil
func (m *mockBufSender) Send(_ context.Context, sub xchain.Submission) <-chan error {
// Simulate async send that returns success when MineNext is called below.
resp := make(chan error, 1)

go func() {
m.sendChan <- sub
resp <- nil
}()

return resp
}

func (m *mockBufSender) Next() xchain.Submission {
Expand Down Expand Up @@ -88,10 +95,12 @@ func Test_activeBuffer_Run(t *testing.T) {
}
}()

require.Eventuallyf(t, func() bool {
return counter.Load() == memLimit+1
},
time.Second, time.Millisecond, "expected %d", memLimit+1)
require.Eventuallyf(t,
func() bool {
return counter.Load() == memLimit+1
},
time.Second, time.Millisecond, "expected %d", memLimit+1,
)

// assert again that buf is blocking
require.Equal(t, memLimit+1, counter.Load())
Expand All @@ -102,9 +111,12 @@ func Test_activeBuffer_Run(t *testing.T) {
output = append(output, sender.Next())
}

require.Eventuallyf(t, func() bool {
return counter.Load() == int64(size)
}, time.Second, time.Millisecond, "expected %d", size)
require.Eventuallyf(t,
func() bool {
return counter.Load() == int64(size)
},
time.Second, time.Millisecond, "expected %d", size,
)

// Assert equality of input and output submissions
require.Len(t, input, len(output))
Expand Down
4 changes: 2 additions & 2 deletions relayer/app/cursors_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ func (m *mockXChainClient) GetEmittedCursor(ctx context.Context, ref xchain.Emit
}

type mockSender struct {
SendTransactionFn func(ctx context.Context, submission xchain.Submission) error
SendTransactionFn func(ctx context.Context, submission xchain.Submission) <-chan error
}

func (m *mockSender) SendTransaction(ctx context.Context, submission xchain.Submission) error {
func (m *mockSender) SendTransaction(ctx context.Context, submission xchain.Submission) <-chan error {
return m.SendTransactionFn(ctx, submission)
}

Expand Down
125 changes: 72 additions & 53 deletions relayer/app/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,23 @@ import (
"github.com/ethereum/go-ethereum/params"
)

// Sender uses txmgr to send transactions a specific destination chain.
type onSubmitFunc func(context.Context, *ethtypes.Transaction, *ethtypes.Receipt, xchain.Submission)

// Sender uses txmgr to send transactions to the destination chain.
// Sender uses txmgr to send transactions to a specific destination chain.
type Sender struct {
network netconf.ID
txMgr txmgr.TxManager
gasEstimator gasEstimator
portal common.Address
abi *abi.ABI
chain netconf.Chain
gasToken tokens.Token
chainNames map[xchain.ChainVersion]string
rpcClient ethclient.Client
ethCl ethclient.Client
onSubmit onSubmitFunc
}

// NewSender creates a new sender that uses txmgr to send transactions to the destination chain.
// NewSender returns a new sender.
func NewSender(
network netconf.ID,
chain netconf.Chain,
Expand All @@ -51,12 +51,13 @@ func NewSender(
chainNames map[xchain.ChainVersion]string,
onSubmit onSubmitFunc,
) (Sender, error) {
// we want to query receipts every 1/3 of the block time
cfg, err := txmgr.NewConfig(txmgr.NewCLIConfig(
chain.ID,
chain.BlockPeriod/3,
txmgr.DefaultSenderFlagValues,
),
const receiptPollFreq = 3 // Query receipts every 1/3 of the block time
cfg, err := txmgr.NewConfig(
txmgr.NewCLIConfig(
chain.ID,
chain.BlockPeriod/receiptPollFreq,
txmgr.DefaultSenderFlagValues,
),
&privateKey,
rpcClient,
)
Expand Down Expand Up @@ -84,23 +85,33 @@ func NewSender(
network: network,
txMgr: txMgr,
gasEstimator: newGasEstimator(network),
portal: chain.PortalAddress,
abi: &parsedAbi,
chain: chain,
gasToken: meta.NativeToken,
chainNames: chainNames,
rpcClient: rpcClient,
ethCl: rpcClient,
onSubmit: onSubmit,
}, nil
}

// SendTransaction sends the submission to the destination chain.
func (s Sender) SendTransaction(ctx context.Context, sub xchain.Submission) error {
// SendAsync sends the submission to the destination chain asynchronously.
// It returns a channel that will receive an error if the submission fails or nil when it succeeds.
// Nonces are however reserved synchronously, so ordering of submissions
// is preserved.
func (s Sender) SendAsync(ctx context.Context, sub xchain.Submission) <-chan error {
// Helper function to return error "synchronously".
returnErr := func(err error) chan error {
resp := make(chan error, 1)
resp <- err

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should close resp channel

Suggested change
close(resp)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is buffered, and we either add err or nil so I do not think we need to close it

return resp
}

if s.txMgr == nil {
return errors.New("tx mgr not found", "dest_chain_id", sub.DestChainID)
return returnErr(errors.New("tx mgr not found [BUG]", "dest_chain_id", sub.DestChainID))
} else if sub.DestChainID != s.chain.ID {
return errors.New("unexpected destination chain [BUG]",
"got", sub.DestChainID, "expect", s.chain.ID)
return returnErr(errors.New("unexpected destination chain [BUG]",
"got", sub.DestChainID, "expect", s.chain.ID))
}

// Get some info for logging
Expand All @@ -127,67 +138,75 @@ func (s Sender) SendTransaction(ctx context.Context, sub xchain.Submission) erro

txData, err := xchain.EncodeXSubmit(xchain.SubmissionToBinding(sub))
if err != nil {
return err
return returnErr(err)
}

// Reserve a nonce here to ensure correctly ordered submissions.
nonce, err := s.txMgr.ReserveNextNonce(ctx)
if err != nil {
return err
return returnErr(err)
}

estimatedGas := s.gasEstimator(s.chain.ID, sub.Msgs)

candidate := txmgr.TxCandidate{
TxData: txData,
To: &s.portal,
To: &s.chain.PortalAddress,
GasLimit: estimatedGas,
Value: big.NewInt(0),
Nonce: &nonce,
}

tx, rec, err := s.txMgr.Send(ctx, candidate)
if err != nil {
return errors.Wrap(err, "failed to send tx", reqAttrs...)
}
asyncResp := make(chan error, 1) // Actual async response populated by goroutine below.
go func() {
tx, rec, err := s.txMgr.Send(ctx, candidate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should close asyncResp channel, to prevent routine leak

Suggested change
tx, rec, err := s.txMgr.Send(ctx, candidate)
defer close(asyncResp)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no goroutines to leak

if err != nil {
asyncResp <- errors.Wrap(err, "failed to send tx", reqAttrs...)
return
}

submissionTotal.WithLabelValues(srcChain, dstChain).Inc()
msgTotal.WithLabelValues(srcChain, dstChain).Add(float64(len(sub.Msgs)))
gasEstimated.WithLabelValues(dstChain).Observe(float64(estimatedGas))

receiptAttrs := []any{
"valset_id", sub.ValidatorSetID,
"status", rec.Status,
"nonce", tx.Nonce(),
"height", rec.BlockNumber.Uint64(),
"gas_used", rec.GasUsed,
"tx_hash", rec.TxHash,
}
submissionTotal.WithLabelValues(srcChain, dstChain).Inc()
msgTotal.WithLabelValues(srcChain, dstChain).Add(float64(len(sub.Msgs)))
gasEstimated.WithLabelValues(dstChain).Observe(float64(estimatedGas))

spendTotal.WithLabelValues(dstChain, string(s.gasToken)).Add(totalSpendGwei(tx, rec))
receiptAttrs := []any{
"valset_id", sub.ValidatorSetID,
"status", rec.Status,
"nonce", tx.Nonce(),
"height", rec.BlockNumber.Uint64(),
"gas_used", rec.GasUsed,
"tx_hash", rec.TxHash,
}

if s.onSubmit != nil {
go s.onSubmit(ctx, tx, rec, sub)
}
spendTotal.WithLabelValues(dstChain, string(s.gasToken)).Add(totalSpendGwei(tx, rec))

if rec.Status == 0 {
// Try and get debug information of the reverted transaction
resp, err := s.rpcClient.CallContract(ctx, callFromTx(s.txMgr.From(), tx), rec.BlockNumber)
if s.onSubmit != nil {
go s.onSubmit(ctx, tx, rec, sub)
}

errAttrs := slices.Concat(receiptAttrs, reqAttrs, []any{
"call_resp", hexutil.Encode(resp),
"call_err", err,
"gas_limit", tx.Gas(),
})
const statusReverted = 0
if rec.Status == statusReverted {
// Try and get debug information of the reverted transaction
resp, err := s.ethCl.CallContract(ctx, callFromTx(s.txMgr.From(), tx), rec.BlockNumber)

revertedSubmissionTotal.WithLabelValues(srcChain, dstChain).Inc()
errAttrs := slices.Concat(receiptAttrs, reqAttrs, []any{
"call_resp", hexutil.Encode(resp),
"call_err", err,
"gas_limit", tx.Gas(),
})

return errors.New("submission reverted", errAttrs...)
}
revertedSubmissionTotal.WithLabelValues(srcChain, dstChain).Inc()

asyncResp <- errors.New("submission reverted", errAttrs...)

return
}

log.Info(ctx, "Sent submission", receiptAttrs...)
log.Info(ctx, "Sent submission", receiptAttrs...)
asyncResp <- nil
}()

return nil
return asyncResp
}

func callFromTx(from common.Address, tx *ethtypes.Transaction) ethereum.CallMsg {
Expand Down
Loading
Loading