Skip to content

Commit

Permalink
wip: ordered submissions
Browse files Browse the repository at this point in the history
  • Loading branch information
corverroos committed Oct 17, 2024
1 parent 1e4f92a commit a87a10b
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 57 deletions.
4 changes: 2 additions & 2 deletions relayer/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func Run(ctx context.Context, cfg Config) error {

for _, destChain := range network.EVMChains() {
// Setup sender provider
sendProvider := func() (SendFunc, error) {
sendProvider := func() (SendAsync, error) {
sender, err := NewSender(
network.ID,
destChain,
Expand All @@ -71,7 +71,7 @@ func Run(ctx context.Context, cfg Config) error {
return nil, err
}

return sender.SendTransaction, nil
return sender.SendAsync, nil
}

// Setup validator set awaiter
Expand Down
10 changes: 6 additions & 4 deletions relayer/app/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ type activeBuffer struct {
buffer chan xchain.Submission
mempoolLimit int64
errChan chan error
sender SendFunc
sender SendAsync
}

func newActiveBuffer(chainName string, mempoolLimit int64, sender SendFunc) *activeBuffer {
func newActiveBuffer(chainName string, mempoolLimit int64, sender SendAsync) *activeBuffer {
return &activeBuffer{
chainName: chainName,
buffer: make(chan xchain.Submission),
Expand Down Expand Up @@ -60,9 +60,11 @@ func (b *activeBuffer) Run(ctx context.Context) error {
}
mempoolLen.WithLabelValues(b.chainName).Inc()

response := b.sender(ctx, submission)
go func() {
if err := b.sender(ctx, submission); err != nil {
b.submitErr(err)
err := <- response
if err != nil {
b.submitErr(errors.Wrap(err, "send submission"))
}
sema.Release(1)
mempoolLen.WithLabelValues(b.chainName).Dec()
Expand Down
105 changes: 60 additions & 45 deletions relayer/app/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,22 @@ func NewSender(
}, nil
}

// SendTransaction sends the submission to the destination chain.
func (s Sender) SendTransaction(ctx context.Context, sub xchain.Submission) error {
// SendAsync sends the submission to the destination chain asynchronously.
// It returns a channel that will receive an error if the submission fails or nil when it succeeds.
// Nonces are however reserved synchronously, so ordering of submissions
// is preserved.
func (s Sender) SendAsync(ctx context.Context, sub xchain.Submission) chan error {
returnErr := func(err error) chan error {
resp := make(chan error, 1)
resp <- err
return resp
}

if s.txMgr == nil {
return errors.New("tx mgr not found", "dest_chain_id", sub.DestChainID)
return returnErr(errors.New("tx mgr not found", "dest_chain_id", sub.DestChainID))
} else if sub.DestChainID != s.chain.ID {
return errors.New("unexpected destination chain [BUG]",
"got", sub.DestChainID, "expect", s.chain.ID)
return returnErr(errors.New("unexpected destination chain [BUG]",
"got", sub.DestChainID, "expect", s.chain.ID))
}

// Get some info for logging
Expand All @@ -122,13 +131,13 @@ func (s Sender) SendTransaction(ctx context.Context, sub xchain.Submission) erro

txData, err := xchain.EncodeXSubmit(xchain.SubmissionToBinding(sub))
if err != nil {
return err
return returnErr(err)
}

// Reserve a nonce here to ensure correctly ordered submissions.
nonce, err := s.txMgr.ReserveNextNonce(ctx)
if err != nil {
return err
return returnErr(err)
}

estimatedGas := s.gasEstimator(s.chain.ID, sub.Msgs)
Expand All @@ -141,44 +150,50 @@ func (s Sender) SendTransaction(ctx context.Context, sub xchain.Submission) erro
Nonce: &nonce,
}

tx, rec, err := s.txMgr.Send(ctx, candidate)
if err != nil {
return errors.Wrap(err, "failed to send tx", reqAttrs...)
}

submissionTotal.WithLabelValues(srcChain, dstChain).Inc()
msgTotal.WithLabelValues(srcChain, dstChain).Add(float64(len(sub.Msgs)))
gasEstimated.WithLabelValues(dstChain).Observe(float64(estimatedGas))

receiptAttrs := []any{
"valset_id", sub.ValidatorSetID,
"status", rec.Status,
"nonce", tx.Nonce(),
"height", rec.BlockNumber.Uint64(),
"gas_used", rec.GasUsed,
"tx_hash", rec.TxHash,
}

spendTotal.WithLabelValues(dstChain, string(s.gasToken)).Add(totalSpentGwei(tx, rec))

if rec.Status == 0 {
// Try and get debug information of the reverted transaction
resp, err := s.rpcClient.CallContract(ctx, callFromTx(s.txMgr.From(), tx), rec.BlockNumber)

errAttrs := slices.Concat(receiptAttrs, reqAttrs, []any{
"call_resp", hexutil.Encode(resp),
"call_err", err,
"gas_limit", tx.Gas(),
})

revertedSubmissionTotal.WithLabelValues(srcChain, dstChain).Inc()

return errors.New("submission reverted", errAttrs...)
}

log.Info(ctx, "Sent submission", receiptAttrs...)

return nil
asyncResp := make(chan error, 1)
go func() {
tx, rec, err := s.txMgr.Send(ctx, candidate)
if err != nil {
asyncResp <- errors.Wrap(err, "failed to send tx", reqAttrs...)
return
}

submissionTotal.WithLabelValues(srcChain, dstChain).Inc()
msgTotal.WithLabelValues(srcChain, dstChain).Add(float64(len(sub.Msgs)))
gasEstimated.WithLabelValues(dstChain).Observe(float64(estimatedGas))

receiptAttrs := []any{
"valset_id", sub.ValidatorSetID,
"status", rec.Status,
"nonce", tx.Nonce(),
"height", rec.BlockNumber.Uint64(),
"gas_used", rec.GasUsed,
"tx_hash", rec.TxHash,
}

spendTotal.WithLabelValues(dstChain, string(s.gasToken)).Add(totalSpentGwei(tx, rec))

if rec.Status == 0 {
// Try and get debug information of the reverted transaction
resp, err := s.rpcClient.CallContract(ctx, callFromTx(s.txMgr.From(), tx), rec.BlockNumber)

errAttrs := slices.Concat(receiptAttrs, reqAttrs, []any{
"call_resp", hexutil.Encode(resp),
"call_err", err,
"gas_limit", tx.Gas(),
})

revertedSubmissionTotal.WithLabelValues(srcChain, dstChain).Inc()

asyncResp <- errors.New("submission reverted", errAttrs...)
return
}

log.Info(ctx, "Sent submission", receiptAttrs...)
asyncResp <- nil
}()

return asyncResp
}

func callFromTx(from common.Address, tx *ethtypes.Transaction) ethereum.CallMsg {
Expand Down
8 changes: 6 additions & 2 deletions relayer/app/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ type StreamUpdate struct {
// CreateFunc is a function that creates one or more submissions from the given stream update.
type CreateFunc func(streamUpdate StreamUpdate) ([]xchain.Submission, error)

// SendFunc sends a submission to the destination chain by invoking "xsubmit" on portal contract.
type SendFunc func(ctx context.Context, submission xchain.Submission) error
// SendAsync sends a submission to the destination chain asynchronously
// by invoking "xsubmit" on portal contract. It returns a channel that
// will receive an error if the submission fails or nil when it succeeds.
// Nonces are however reserved synchronously, so ordering of submissions
// is preserved.
type SendAsync func(ctx context.Context, submission xchain.Submission) chan error

// randomHex7 returns a random 7-character hex string.
func randomHex7() string {
Expand Down
8 changes: 4 additions & 4 deletions relayer/app/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ type Worker struct {
cProvider cchain.Provider
xProvider xchain.Provider
creator CreateFunc
sendProvider func() (SendFunc, error)
sendProvider func() (SendAsync, error)
awaitValSet awaitValSet
}

// NewWorker creates a new worker for a single destination chain.
func NewWorker(destChain netconf.Chain, network netconf.Network, cProvider cchain.Provider,
xProvider xchain.Provider, creator CreateFunc, sendProvider func() (SendFunc, error),
xProvider xchain.Provider, creator CreateFunc, sendProvider func() (SendAsync, error),
awaitValSet awaitValSet,
) *Worker {
return &Worker{
Expand Down Expand Up @@ -187,7 +187,7 @@ func newMsgStreamMapper(network netconf.Network) msgStreamMapper {

func (w *Worker) newCallback(
msgFilter *msgCursorFilter,
sender SendFunc,
sendBuffer func(context.Context, xchain.Submission) error,
msgStreamMapper msgStreamMapper,
) cchain.ProviderCallback {
return func(ctx context.Context, att xchain.Attestation) error {
Expand Down Expand Up @@ -238,7 +238,7 @@ func (w *Worker) newCallback(
}

for _, subs := range submissions {
if err := sender(ctx, subs); err != nil {
if err := sendBuffer(ctx, subs); err != nil {
return err
}
}
Expand Down

0 comments on commit a87a10b

Please sign in to comment.