diff --git a/cmd/bee/cmd/cmd.go b/cmd/bee/cmd/cmd.go index c85a6a7d67b..6c36431d7bf 100644 --- a/cmd/bee/cmd/cmd.go +++ b/cmd/bee/cmd/cmd.go @@ -49,6 +49,8 @@ const ( optionNameSwapFactoryAddress = "swap-factory-address" optionNameSwapInitialDeposit = "swap-initial-deposit" optionNameSwapEnable = "swap-enable" + optionNamePostageStampAddress = "postage-stamp-address" + optionNamePriceOracleAddress = "price-oracle-address" ) func init() { @@ -204,4 +206,6 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().String(optionNameSwapFactoryAddress, "", "swap factory address") cmd.Flags().Uint64(optionNameSwapInitialDeposit, 100000000, "initial deposit if deploying a new chequebook") cmd.Flags().Bool(optionNameSwapEnable, true, "enable swap") + cmd.Flags().String(optionNamePostageStampAddress, "", "postage stamp address") + cmd.Flags().String(optionNamePriceOracleAddress, "", "price oracle address") } diff --git a/cmd/bee/cmd/start.go b/cmd/bee/cmd/start.go index 201a7246ed7..40b58ff1716 100644 --- a/cmd/bee/cmd/start.go +++ b/cmd/bee/cmd/start.go @@ -146,6 +146,8 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz SwapFactoryAddress: c.config.GetString(optionNameSwapFactoryAddress), SwapInitialDeposit: c.config.GetUint64(optionNameSwapInitialDeposit), SwapEnable: c.config.GetBool(optionNameSwapEnable), + PostageStampAddress: c.config.GetString(optionNamePostageStampAddress), + PriceOracleAddress: c.config.GetString(optionNamePriceOracleAddress), }) if err != nil { return err diff --git a/pkg/node/node.go b/pkg/node/node.go index 53871c56e0e..de130e9c366 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -32,6 +32,9 @@ import ( "github.com/ethersphere/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/pingpong" "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/postage/batchservice" + "github.com/ethersphere/bee/pkg/postage/batchstore" + "github.com/ethersphere/bee/pkg/postage/listener" "github.com/ethersphere/bee/pkg/pricing" "github.com/ethersphere/bee/pkg/pss" "github.com/ethersphere/bee/pkg/puller" @@ -77,6 +80,7 @@ type Bee struct { pullerCloser io.Closer pullSyncCloser io.Closer pssCloser io.Closer + listenerCloser io.Closer recoveryHandleCleanup func() } @@ -107,6 +111,8 @@ type Options struct { SwapFactoryAddress string SwapInitialDeposit uint64 SwapEnable bool + PostageStampAddress string + PriceOracleAddress string } func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, signer crypto.Signer, networkID uint64, logger logging.Logger, libp2pPrivateKey, pssPrivateKey *ecdsa.PrivateKey, o Options) (*Bee, error) { @@ -149,6 +155,13 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, if err != nil { return nil, err } + + chainID, err := swapBackend.ChainID(p2pCtx) + if err != nil { + logger.Infof("could not connect to backend at %v. A working blockchain node (for goerli network in production) is required. Check your node or specify another node using --swap-endpoint.", o.SwapEndpoint) + return nil, fmt.Errorf("could not get chain id from ethereum backend: %w", err) + } + transactionService, err := transaction.NewService(logger, swapBackend, signer) if err != nil { return nil, err @@ -158,12 +171,6 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, return nil, err } - chainID, err := swapBackend.ChainID(p2pCtx) - if err != nil { - logger.Infof("could not connect to backend at %v. In a swap-enabled network a working blockchain node (for goerli network in production) is required. Check your node or specify another node using --swap-endpoint.", o.SwapEndpoint) - return nil, fmt.Errorf("could not get chain id from ethereum backend: %w", err) - } - var factoryAddress common.Address if o.SwapFactoryAddress == "" { var found bool @@ -211,6 +218,50 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, } } + batchStore := batchstore.New(stateStore) + + if !o.Standalone { + swapBackend, err := ethclient.Dial(o.SwapEndpoint) + if err != nil { + return nil, err + } + + chainID, err := swapBackend.ChainID(p2pCtx) + if err != nil { + logger.Infof("could not connect to backend at %v. A working blockchain node (for goerli network in production) is required. Check your node or specify another node using --swap-endpoint.", o.SwapEndpoint) + return nil, fmt.Errorf("could not get chain id from ethereum backend: %w", err) + } + + postageStampAddress, priceOracleAddress, found := listener.DiscoverAddresses(chainID.Int64()) + if o.PostageStampAddress != "" { + if !common.IsHexAddress(o.PostageStampAddress) { + return nil, errors.New("malformed postage stamp address") + } + postageStampAddress = common.HexToAddress(o.PostageStampAddress) + } + if o.PriceOracleAddress != "" { + if !common.IsHexAddress(o.PriceOracleAddress) { + return nil, errors.New("malformed price oracle address") + } + priceOracleAddress = common.HexToAddress(o.PriceOracleAddress) + } + if (o.PostageStampAddress == "" || o.PriceOracleAddress == "") && !found { + return nil, errors.New("no known postage stamp addresses for this network") + } + + eventListener := listener.New(logger, swapBackend, postageStampAddress, priceOracleAddress) + b.listenerCloser = eventListener + + batchService, err := batchservice.New(batchStore, logger, eventListener) + if err != nil { + return nil, err + } + err = batchService.Start() + if err != nil { + return nil, err + } + } + p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, logger, tracer, libp2p.Options{ PrivateKey: libp2pPrivateKey, NATAddr: o.NATAddr, @@ -546,6 +597,12 @@ func (b *Bee) Shutdown(ctx context.Context) error { errs.add(fmt.Errorf("tag persistence: %w", err)) } + if b.listenerCloser != nil { + if err := b.listenerCloser.Close(); err != nil { + errs.add(fmt.Errorf("error listener: %w", err)) + } + } + if err := b.stateStoreCloser.Close(); err != nil { errs.add(fmt.Errorf("statestore: %w", err)) } diff --git a/pkg/postage/batchservice/batchservice.go b/pkg/postage/batchservice/batchservice.go index 3705f562d4b..8031615907b 100644 --- a/pkg/postage/batchservice/batchservice.go +++ b/pkg/postage/batchservice/batchservice.go @@ -14,16 +14,23 @@ import ( ) type batchService struct { - cs *postage.ChainState - storer postage.Storer - logger logging.Logger + cs *postage.ChainState + storer postage.Storer + logger logging.Logger + listener postage.Listener +} + +type Interface interface { + postage.EventUpdater + Start() error } // New will create a new BatchService. -func New(storer postage.Storer, logger logging.Logger) (postage.EventUpdater, error) { +func New(storer postage.Storer, logger logging.Logger, listener postage.Listener) (Interface, error) { b := &batchService{ - storer: storer, - logger: logger, + storer: storer, + logger: logger, + listener: listener, } cs, err := storer.GetChainState() @@ -31,10 +38,18 @@ func New(storer postage.Storer, logger logging.Logger) (postage.EventUpdater, er return nil, fmt.Errorf("get chain state: %w", err) } b.cs = cs - return b, nil } +func (svc *batchService) Start() error { + cs, err := svc.storer.GetChainState() + if err != nil { + return fmt.Errorf("get chain state: %w", err) + } + svc.listener.Listen(cs.Block+1, svc) + return nil +} + // Create will create a new batch with the given ID, owner value and depth and // stores it in the BatchStore. func (svc *batchService) Create(id, owner []byte, normalisedBalance *big.Int, depth uint8) error { @@ -106,3 +121,14 @@ func (svc *batchService) UpdatePrice(price *big.Int) error { svc.logger.Debugf("updated chain price to %s", svc.cs.Price) return nil } + +func (svc *batchService) UpdateBlockNumber(blockNumber uint64) error { + svc.cs.Block = blockNumber + + if err := svc.storer.PutChainState(svc.cs); err != nil { + return fmt.Errorf("put chain state: %w", err) + } + + svc.logger.Debugf("updated block height to %d", svc.cs.Block) + return nil +} diff --git a/pkg/postage/batchservice/batchservice_test.go b/pkg/postage/batchservice/batchservice_test.go index f2c7934f440..07a12207dd3 100644 --- a/pkg/postage/batchservice/batchservice_test.go +++ b/pkg/postage/batchservice/batchservice_test.go @@ -23,12 +23,22 @@ var ( testErr = errors.New("fails") ) +type mockListener struct { +} + +func (*mockListener) Listen(from uint64, updater postage.EventUpdater) {} +func (*mockListener) Close() error { return nil } + +func newMockListener() *mockListener { + return &mockListener{} +} + func TestNewBatchService(t *testing.T) { t.Run("expect get error", func(t *testing.T) { store := mock.New( mock.WithGetErr(testErr, 0), ) - _, err := batchservice.New(store, testLog) + _, err := batchservice.New(store, testLog, newMockListener()) if err == nil { t.Fatal("expected get error") } @@ -39,7 +49,7 @@ func TestNewBatchService(t *testing.T) { store := mock.New( mock.WithChainState(testChainState), ) - _, err := batchservice.New(store, testLog) + _, err := batchservice.New(store, testLog, newMockListener()) if err != nil { t.Fatalf("new batch service: %v", err) } @@ -245,7 +255,7 @@ func newTestStoreAndService(t *testing.T, opts ...mock.Option) (postage.EventUpd t.Helper() store := mock.New(opts...) - svc, err := batchservice.New(store, testLog) + svc, err := batchservice.New(store, testLog, newMockListener()) if err != nil { t.Fatalf("new batch service: %v", err) } diff --git a/pkg/postage/batchstore/store.go b/pkg/postage/batchstore/store.go index ebca43802e2..9ffa882471c 100644 --- a/pkg/postage/batchstore/store.go +++ b/pkg/postage/batchstore/store.go @@ -5,6 +5,8 @@ package batchstore import ( + "math/big" + "github.com/ethersphere/bee/pkg/postage" "github.com/ethersphere/bee/pkg/storage" ) @@ -47,6 +49,13 @@ func (s *store) PutChainState(state *postage.ChainState) error { func (s *store) GetChainState() (*postage.ChainState, error) { cs := &postage.ChainState{} err := s.store.Get(stateKey, cs) + if err == storage.ErrNotFound { + return &postage.ChainState{ + Block: 0, + Total: big.NewInt(0), + Price: big.NewInt(0), + }, nil + } return cs, err } diff --git a/pkg/postage/interface.go b/pkg/postage/interface.go index 1b449bb4773..c18f9a21add 100644 --- a/pkg/postage/interface.go +++ b/pkg/postage/interface.go @@ -5,6 +5,7 @@ package postage import ( + "io" "math/big" ) @@ -15,6 +16,7 @@ type EventUpdater interface { TopUp(id []byte, normalisedBalance *big.Int) error UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int) error UpdatePrice(price *big.Int) error + UpdateBlockNumber(blockNumber uint64) error } // Storer represents the persistence layer for batches on the current (highest @@ -28,5 +30,6 @@ type Storer interface { // Listener provides a blockchain event iterator. type Listener interface { + io.Closer Listen(from uint64, updater EventUpdater) } diff --git a/pkg/postage/listener/export_test.go b/pkg/postage/listener/export_test.go index 26305c47509..5c77e1f82b8 100644 --- a/pkg/postage/listener/export_test.go +++ b/pkg/postage/listener/export_test.go @@ -8,4 +8,6 @@ var ( BatchTopupTopic = batchTopupTopic BatchDepthIncreaseTopic = batchDepthIncreaseTopic PriceUpdateTopic = priceUpdateTopic + + TailSize = tailSize ) diff --git a/pkg/postage/listener/listener.go b/pkg/postage/listener/listener.go index cd7d2d36628..3d05a2fc3a9 100644 --- a/pkg/postage/listener/listener.go +++ b/pkg/postage/listener/listener.go @@ -20,7 +20,7 @@ import ( const ( blockPage = 500 // how many blocks to sync every time - tailSize = 20 // how many blocks to tail from the tip of the chain + tailSize = 4 // how many blocks to tail from the tip of the chain ) var ( @@ -178,6 +178,16 @@ func (l *listener) sync(from uint64, updater postage.EventUpdater) error { return err } + if to < from { + // if the blockNumber is actually less than what we already, it might mean the backend is not synced or some reorg scenario + continue + } + + if to < tailSize { + // in a test blockchain there might be not be enough blocks yet + continue + } + // consider to-tailSize as the "latest" block we need to sync to to = to - tailSize @@ -198,6 +208,11 @@ func (l *listener) sync(from uint64, updater postage.EventUpdater) error { } } + err = updater.UpdateBlockNumber(to) + if err != nil { + return err + } + from = to + 1 } } @@ -250,3 +265,12 @@ type batchDepthIncreaseEvent struct { type priceUpdateEvent struct { Price *big.Int } + +// DiscoverAddresses returns the canonical contracts for this chainID +func DiscoverAddresses(chainID int64) (postageStamp common.Address, priceOracle common.Address, found bool) { + if chainID == 5 { + // goerli + return common.HexToAddress("0xf0870E3abb457026BE46d9b2CDf35e8FFcB27955"), common.HexToAddress("0xc1B598609A38D0A0F85f68eD0fFEFdeC9cD061C9"), true + } + return common.Address{}, common.Address{}, false +} diff --git a/pkg/postage/listener/listener_test.go b/pkg/postage/listener/listener_test.go index 184a9191fff..f91b2f69797 100644 --- a/pkg/postage/listener/listener_test.go +++ b/pkg/postage/listener/listener_test.go @@ -147,6 +147,8 @@ func TestListener(t *testing.T) { price: big.NewInt(500), } + blockNumber := uint64(500) + ev, evC := newEventUpdaterMock() mf := newMockFilterer( WithFilterLogEvents( @@ -155,9 +157,10 @@ func TestListener(t *testing.T) { depthIncrease.toLog(), priceUpdate.toLog(), ), + WithBlockNumber(blockNumber), ) - listener := listener.New(logging.New(ioutil.Discard, 0), mf, postageStampAddress, priceOracleAddress) - listener.Listen(0, ev) + l := listener.New(logger, mf, postageStampAddress, priceOracleAddress) + l.Listen(0, ev) select { case e := <-evC: @@ -186,6 +189,13 @@ func TestListener(t *testing.T) { case <-time.After(timeout): t.Fatal("timed out waiting for event") } + + select { + case e := <-evC: + e.(blockNumberCall).compare(t, blockNumber-uint64(listener.TailSize)) + case <-time.After(timeout): + t.Fatal("timed out waiting for block number update") + } }) } @@ -232,14 +242,22 @@ func (u *updater) UpdatePrice(price *big.Int) error { return nil } +func (u *updater) UpdateBlockNumber(blockNumber uint64) error { + u.eventC <- blockNumberCall{blockNumber: blockNumber} + return nil +} + type mockFilterer struct { filterLogEvents []types.Log subscriptionEvents []types.Log sub *sub + blockNumber uint64 } func newMockFilterer(opts ...Option) *mockFilterer { - mock := new(mockFilterer) + mock := &mockFilterer{ + blockNumber: uint64(listener.TailSize), // use the tailSize as blockNumber by default to ensure at least block 0 is ready + } for _, o := range opts { o.apply(mock) } @@ -252,6 +270,12 @@ func WithFilterLogEvents(events ...types.Log) Option { }) } +func WithBlockNumber(blockNumber uint64) Option { + return optionFunc(func(s *mockFilterer) { + s.blockNumber = blockNumber + }) +} + func (m *mockFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { return m.filterLogEvents, nil } @@ -271,7 +295,7 @@ func (m *mockFilterer) Close() { } func (m *mockFilterer) BlockNumber(context.Context) (uint64, error) { - return 0, nil + return m.blockNumber, nil } type sub struct { @@ -396,6 +420,16 @@ func (p priceArgs) toLog() types.Log { } } +type blockNumberCall struct { + blockNumber uint64 +} + +func (b blockNumberCall) compare(t *testing.T, want uint64) { + if b.blockNumber != want { + t.Fatalf("blockNumber mismatch. got %d want %d", b.blockNumber, want) + } +} + type Option interface { apply(*mockFilterer) }