Skip to content

Commit

Permalink
da: try using blobdata for eth fallback
Browse files Browse the repository at this point in the history
  • Loading branch information
tuxcanfly committed Jul 25, 2024
1 parent afa450b commit c1a23cd
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 38 deletions.
56 changes: 39 additions & 17 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,20 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh
l.queueTx(txData{}, true, candidate, queue, receiptsCh)
}

// fallbackTxCandidate creates a fallback tx candidate for the given txdata.
func (l *BatchSubmitter) fallbackTxCandidate(_ context.Context, txdata txData) (*txmgr.TxCandidate, error) {
switch l.DAClient.FallbackMode {
case celestia.FallbackModeBlobData:
return l.blobTxCandidate(txdata)
case celestia.FallbackModeCallData:
return l.calldataTxCandidate(txdata.CallData()), nil
case celestia.FallbackModeDisabled:
return nil, fmt.Errorf("celestia: fallback disabled")
default:
return nil, fmt.Errorf("celestia: unknown fallback mode: %s", l.DAClient.FallbackMode)
}
}

// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
Expand Down Expand Up @@ -580,10 +594,15 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que
// signal plasma commitment tx with TxDataVersion1
data = comm.TxData()
}
candidate, err = l.calldataTxCandidate(data)
candidate, err = l.celestiaTxCandidate(data)
if err != nil {
l.recordFailedTx(txdata.ID(), err)
return nil
l.Log.Error("celestia: blob submission failed", "err", err)
candidate, err = l.fallbackTxCandidate(ctx, txdata)
if err != nil {
l.Log.Error("celestia: fallback failed", "err", err)
l.recordFailedTx(txdata.ID(), err)
return nil
}
}
}

Expand Down Expand Up @@ -619,25 +638,28 @@ func (l *BatchSubmitter) blobTxCandidate(data txData) (*txmgr.TxCandidate, error
}, nil
}

func (l *BatchSubmitter) calldataTxCandidate(data []byte) (*txmgr.TxCandidate, error) {
func (l *BatchSubmitter) calldataTxCandidate(data []byte) *txmgr.TxCandidate {
l.Log.Info("Building Calldata transaction candidate", "size", len(data))
return &txmgr.TxCandidate{
To: &l.RollupConfig.BatchInboxAddress,
TxData: data,
}
}

func (l *BatchSubmitter) celestiaTxCandidate(data []byte) (*txmgr.TxCandidate, error) {
l.Log.Info("Building Celestia transaction candidate", "size", len(data))
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Duration(l.RollupConfig.BlockTime)*time.Second)
ids, err := l.DAClient.Client.Submit(ctx, [][]byte{data}, -1, l.DAClient.Namespace)
cancel()
if err == nil && len(ids) == 1 {
l.Log.Info("celestia: blob successfully submitted", "id", hex.EncodeToString(ids[0]))
data = append([]byte{celestia.DerivationVersionCelestia}, ids[0]...)
} else {
if l.DAClient.EthFallbackDisabled {
return nil, fmt.Errorf("celestia: blob submission failed; eth fallback disabled: %w", err)
}

l.Log.Info("celestia: blob submission failed; falling back to eth", "err", err)
if err != nil {
return nil, err
}
return &txmgr.TxCandidate{
To: &l.RollupConfig.BatchInboxAddress,
TxData: data,
}, nil
if len(ids) != 1 {
return nil, fmt.Errorf("celestia: expected 1 id, got %d", len(ids))
}
l.Log.Info("celestia: blob successfully submitted", "id", hex.EncodeToString(ids[0]))
data = append([]byte{celestia.DerivationVersionCelestia}, ids[0]...)
return l.calldataTxCandidate(data), nil
}

func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txRef]) {
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (bs *BatcherService) initPlasmaDA(cfg *CLIConfig) error {
}

func (bs *BatcherService) initDA(cfg *CLIConfig) error {
client, err := celestia.NewDAClient(cfg.DaConfig.Rpc, cfg.DaConfig.AuthToken, cfg.DaConfig.Namespace, cfg.DaConfig.EthFallbackDisabled)
client, err := celestia.NewDAClient(cfg.DaConfig.Rpc, cfg.DaConfig.AuthToken, cfg.DaConfig.Namespace, cfg.DaConfig.FallbackMode)
if err != nil {
return err
}
Expand Down
49 changes: 40 additions & 9 deletions op-celestia/cli.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
package celestia

import (
"fmt"

"github.com/urfave/cli/v2"

opservice "github.com/ethereum-optimism/optimism/op-service"
)

const (
// FallbackModeDisabled is the fallback mode disabled
FallbackModeDisabled = "disabled"
// FallbackModeBlobData is the fallback mode blob data
FallbackModeBlobData = "blobdata"
// FallbackModeCallData is the fallback mode call data
FallbackModeCallData = "calldata"
)

const (
// RPCFlagName defines the flag for the rpc url
RPCFlagName = "da.rpc"
Expand All @@ -15,6 +26,8 @@ const (
NamespaceFlagName = "da.namespace"
// EthFallbackDisabledFlagName defines the flag for disabling eth fallback
EthFallbackDisabledFlagName = "da.eth_fallback_disabled"
// FallbackModeFlagName defines the flag for fallback mode
FallbackModeFlagName = "da.fallback_mode"

// NamespaceSize is the size of the hex encoded namespace string
NamespaceSize = 58
Expand Down Expand Up @@ -43,17 +56,35 @@ func CLIFlags(envPrefix string) []cli.Flag {
},
&cli.BoolFlag{
Name: EthFallbackDisabledFlagName,
Usage: "disable eth fallback",
Usage: "disable eth fallback (deprecated, use FallbackModeFlag instead)",
EnvVars: opservice.PrefixEnvVar(envPrefix, "DA_ETH_FALLBACK_DISABLED"),
Action: func(c *cli.Context, e bool) error {
if e {
return c.Set(FallbackModeFlagName, FallbackModeDisabled)
}
return nil
},
},
&cli.StringFlag{
Name: FallbackModeFlagName,
Usage: fmt.Sprintf("fallback mode; must be one of: %s, %s or %s", FallbackModeDisabled, FallbackModeBlobData, FallbackModeCallData),
EnvVars: opservice.PrefixEnvVar(envPrefix, "DA_FALLBACK_MODE"),
Value: FallbackModeCallData,
Action: func(c *cli.Context, s string) error {
if s != FallbackModeDisabled && s != FallbackModeBlobData && s != FallbackModeCallData {
return fmt.Errorf("invalid fallback mode: %s; must be one of: %s, %s or %s", s, FallbackModeDisabled, FallbackModeBlobData, FallbackModeCallData)
}
return nil
},
},
}
}

type CLIConfig struct {
Rpc string
AuthToken string
Namespace string
EthFallbackDisabled bool
Rpc string
AuthToken string
Namespace string
FallbackMode string
}

func (c CLIConfig) Check() error {
Expand All @@ -68,9 +99,9 @@ func NewCLIConfig() CLIConfig {

func ReadCLIConfig(ctx *cli.Context) CLIConfig {
return CLIConfig{
Rpc: ctx.String(RPCFlagName),
AuthToken: ctx.String(AuthTokenFlagName),
Namespace: ctx.String(NamespaceFlagName),
EthFallbackDisabled: ctx.Bool(EthFallbackDisabledFlagName),
Rpc: ctx.String(RPCFlagName),
AuthToken: ctx.String(AuthTokenFlagName),
Namespace: ctx.String(NamespaceFlagName),
FallbackMode: ctx.String(FallbackModeFlagName),
}
}
22 changes: 13 additions & 9 deletions op-celestia/da_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@ package celestia

import (
"encoding/hex"
"fmt"
"time"

"github.com/rollkit/go-da"
"github.com/rollkit/go-da/proxy"
)

type DAClient struct {
Client da.DA
GetTimeout time.Duration
Namespace da.Namespace
EthFallbackDisabled bool
Client da.DA
GetTimeout time.Duration
Namespace da.Namespace
FallbackMode string
}

func NewDAClient(rpc, token, namespace string, ethFallbackDisabled bool) (*DAClient, error) {
func NewDAClient(rpc, token, namespace, fallbackMode string) (*DAClient, error) {
client, err := proxy.NewClient(rpc, token)
if err != nil {
return nil, err
Expand All @@ -24,10 +25,13 @@ func NewDAClient(rpc, token, namespace string, ethFallbackDisabled bool) (*DACli
if err != nil {
return nil, err
}
if fallbackMode != "disabled" && fallbackMode != "blobdata" && fallbackMode != "calldata" {
return nil, fmt.Errorf("celestia: unknown fallback mode: %s", fallbackMode)
}
return &DAClient{
Client: client,
GetTimeout: time.Minute,
Namespace: ns,
EthFallbackDisabled: ethFallbackDisabled,
Client: client,
GetTimeout: time.Minute,
Namespace: ns,
FallbackMode: fallbackMode,
}, nil
}
2 changes: 1 addition & 1 deletion op-e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
BatchType: batchType,
DataAvailabilityType: sys.Cfg.DataAvailabilityType,
CompressionAlgo: compressionAlgo,
DaConfig: celestia.CLIConfig{DaRpc: "localhost:26650"},
DaConfig: celestia.CLIConfig{Rpc: "localhost:26650"},
}
// Batch Submitter
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"])
Expand Down
13 changes: 12 additions & 1 deletion op-node/rollup/driver/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@ import (
)

func SetDAClient(cfg celestia.CLIConfig) error {
client, err := celestia.NewDAClient(cfg.Rpc, cfg.AuthToken, cfg.Namespace, false)
// NOTE: we always read using blob_data_source.go
// If the transaction has calldata, based on the prefix byte.
// - If the prefix byte is 0xce
// - We interpret the calldata as a celestia reference and fetch
// the data from celestia.
// - Otherwise, we use the calldata fallback mode.
// If the transaction has blobs, we use blobdata fallback mode.
// See dataAndHashesFromTxs and DataFromEVMTransactions
// The read path always operates in the most permissive mode and is
// independent of the fallback mode.
// Therefore the configuration value for FallbackMode passed here does not matter.
client, err := celestia.NewDAClient(cfg.Rpc, cfg.AuthToken, cfg.Namespace, cfg.FallbackMode)
if err != nil {
return err
}
Expand Down

0 comments on commit c1a23cd

Please sign in to comment.