From b9054b5df6051fa0c2f7557cd285d4a1a1647986 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Fri, 15 Jan 2021 14:54:47 +0100 Subject: [PATCH 1/5] wire up postage stamp syncing --- cmd/bee/cmd/cmd.go | 4 ++ cmd/bee/cmd/start.go | 2 + pkg/node/node.go | 32 ++++++++++++-- pkg/postage/batchservice/batchservice.go | 40 ++++++++++++++---- pkg/postage/batchservice/batchservice_test.go | 16 +++++-- pkg/postage/batchstore/store.go | 9 ++++ pkg/postage/interface.go | 3 ++ pkg/postage/listener/export_test.go | 2 + pkg/postage/listener/listener.go | 15 +++++++ pkg/postage/listener/listener_test.go | 42 +++++++++++++++++-- 10 files changed, 147 insertions(+), 18 deletions(-) diff --git a/cmd/bee/cmd/cmd.go b/cmd/bee/cmd/cmd.go index c85a6a7d67b..6c36431d7bf 100644 --- a/cmd/bee/cmd/cmd.go +++ b/cmd/bee/cmd/cmd.go @@ -49,6 +49,8 @@ const ( optionNameSwapFactoryAddress = "swap-factory-address" optionNameSwapInitialDeposit = "swap-initial-deposit" optionNameSwapEnable = "swap-enable" + optionNamePostageStampAddress = "postage-stamp-address" + optionNamePriceOracleAddress = "price-oracle-address" ) func init() { @@ -204,4 +206,6 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().String(optionNameSwapFactoryAddress, "", "swap factory address") cmd.Flags().Uint64(optionNameSwapInitialDeposit, 100000000, "initial deposit if deploying a new chequebook") cmd.Flags().Bool(optionNameSwapEnable, true, "enable swap") + cmd.Flags().String(optionNamePostageStampAddress, "", "postage stamp address") + cmd.Flags().String(optionNamePriceOracleAddress, "", "price oracle address") } diff --git a/cmd/bee/cmd/start.go b/cmd/bee/cmd/start.go index 201a7246ed7..40b58ff1716 100644 --- a/cmd/bee/cmd/start.go +++ b/cmd/bee/cmd/start.go @@ -146,6 +146,8 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz SwapFactoryAddress: c.config.GetString(optionNameSwapFactoryAddress), SwapInitialDeposit: c.config.GetUint64(optionNameSwapInitialDeposit), SwapEnable: c.config.GetBool(optionNameSwapEnable), + PostageStampAddress: c.config.GetString(optionNamePostageStampAddress), + PriceOracleAddress: c.config.GetString(optionNamePriceOracleAddress), }) if err != nil { return err diff --git a/pkg/node/node.go b/pkg/node/node.go index 53871c56e0e..8a74060f076 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -32,6 +32,9 @@ import ( "github.com/ethersphere/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/pingpong" "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/postage/batchservice" + "github.com/ethersphere/bee/pkg/postage/batchstore" + "github.com/ethersphere/bee/pkg/postage/listener" "github.com/ethersphere/bee/pkg/pricing" "github.com/ethersphere/bee/pkg/pss" "github.com/ethersphere/bee/pkg/puller" @@ -77,6 +80,7 @@ type Bee struct { pullerCloser io.Closer pullSyncCloser io.Closer pssCloser io.Closer + listenerCloser io.Closer recoveryHandleCleanup func() } @@ -107,6 +111,8 @@ type Options struct { SwapFactoryAddress string SwapInitialDeposit uint64 SwapEnable bool + PostageStampAddress string + PriceOracleAddress string } func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, signer crypto.Signer, networkID uint64, logger logging.Logger, libp2pPrivateKey, pssPrivateKey *ecdsa.PrivateKey, o Options) (*Bee, error) { @@ -140,15 +146,16 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, b.stateStoreCloser = stateStore addressbook := addressbook.New(stateStore) + swapBackend, err := ethclient.Dial(o.SwapEndpoint) + if err != nil { + return nil, err + } + var chequebookService chequebook.Service var chequeStore chequebook.ChequeStore var cashoutService chequebook.CashoutService var overlayEthAddress common.Address if o.SwapEnable { - swapBackend, err := ethclient.Dial(o.SwapEndpoint) - if err != nil { - return nil, err - } transactionService, err := transaction.NewService(logger, swapBackend, signer) if err != nil { return nil, err @@ -211,6 +218,19 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, } } + batchStore := batchstore.New(stateStore) + eventListener := listener.New(logger, swapBackend, common.HexToAddress(o.PostageStampAddress), common.HexToAddress(o.PriceOracleAddress)) + b.listenerCloser = eventListener + + batchService, err := batchservice.New(batchStore, logger, eventListener) + if err != nil { + return nil, err + } + err = batchService.Start() + if err != nil { + return nil, err + } + p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, logger, tracer, libp2p.Options{ PrivateKey: libp2pPrivateKey, NATAddr: o.NATAddr, @@ -569,6 +589,10 @@ func (b *Bee) Shutdown(ctx context.Context) error { } } + if err := b.listenerCloser.Close(); err != nil { + errs.add(fmt.Errorf("error listener: %w", err)) + } + if errs.hasErrors() { return errs } diff --git a/pkg/postage/batchservice/batchservice.go b/pkg/postage/batchservice/batchservice.go index 3705f562d4b..8031615907b 100644 --- a/pkg/postage/batchservice/batchservice.go +++ b/pkg/postage/batchservice/batchservice.go @@ -14,16 +14,23 @@ import ( ) type batchService struct { - cs *postage.ChainState - storer postage.Storer - logger logging.Logger + cs *postage.ChainState + storer postage.Storer + logger logging.Logger + listener postage.Listener +} + +type Interface interface { + postage.EventUpdater + Start() error } // New will create a new BatchService. -func New(storer postage.Storer, logger logging.Logger) (postage.EventUpdater, error) { +func New(storer postage.Storer, logger logging.Logger, listener postage.Listener) (Interface, error) { b := &batchService{ - storer: storer, - logger: logger, + storer: storer, + logger: logger, + listener: listener, } cs, err := storer.GetChainState() @@ -31,10 +38,18 @@ func New(storer postage.Storer, logger logging.Logger) (postage.EventUpdater, er return nil, fmt.Errorf("get chain state: %w", err) } b.cs = cs - return b, nil } +func (svc *batchService) Start() error { + cs, err := svc.storer.GetChainState() + if err != nil { + return fmt.Errorf("get chain state: %w", err) + } + svc.listener.Listen(cs.Block+1, svc) + return nil +} + // Create will create a new batch with the given ID, owner value and depth and // stores it in the BatchStore. func (svc *batchService) Create(id, owner []byte, normalisedBalance *big.Int, depth uint8) error { @@ -106,3 +121,14 @@ func (svc *batchService) UpdatePrice(price *big.Int) error { svc.logger.Debugf("updated chain price to %s", svc.cs.Price) return nil } + +func (svc *batchService) UpdateBlockNumber(blockNumber uint64) error { + svc.cs.Block = blockNumber + + if err := svc.storer.PutChainState(svc.cs); err != nil { + return fmt.Errorf("put chain state: %w", err) + } + + svc.logger.Debugf("updated block height to %d", svc.cs.Block) + return nil +} diff --git a/pkg/postage/batchservice/batchservice_test.go b/pkg/postage/batchservice/batchservice_test.go index f2c7934f440..07a12207dd3 100644 --- a/pkg/postage/batchservice/batchservice_test.go +++ b/pkg/postage/batchservice/batchservice_test.go @@ -23,12 +23,22 @@ var ( testErr = errors.New("fails") ) +type mockListener struct { +} + +func (*mockListener) Listen(from uint64, updater postage.EventUpdater) {} +func (*mockListener) Close() error { return nil } + +func newMockListener() *mockListener { + return &mockListener{} +} + func TestNewBatchService(t *testing.T) { t.Run("expect get error", func(t *testing.T) { store := mock.New( mock.WithGetErr(testErr, 0), ) - _, err := batchservice.New(store, testLog) + _, err := batchservice.New(store, testLog, newMockListener()) if err == nil { t.Fatal("expected get error") } @@ -39,7 +49,7 @@ func TestNewBatchService(t *testing.T) { store := mock.New( mock.WithChainState(testChainState), ) - _, err := batchservice.New(store, testLog) + _, err := batchservice.New(store, testLog, newMockListener()) if err != nil { t.Fatalf("new batch service: %v", err) } @@ -245,7 +255,7 @@ func newTestStoreAndService(t *testing.T, opts ...mock.Option) (postage.EventUpd t.Helper() store := mock.New(opts...) - svc, err := batchservice.New(store, testLog) + svc, err := batchservice.New(store, testLog, newMockListener()) if err != nil { t.Fatalf("new batch service: %v", err) } diff --git a/pkg/postage/batchstore/store.go b/pkg/postage/batchstore/store.go index ebca43802e2..9ffa882471c 100644 --- a/pkg/postage/batchstore/store.go +++ b/pkg/postage/batchstore/store.go @@ -5,6 +5,8 @@ package batchstore import ( + "math/big" + "github.com/ethersphere/bee/pkg/postage" "github.com/ethersphere/bee/pkg/storage" ) @@ -47,6 +49,13 @@ func (s *store) PutChainState(state *postage.ChainState) error { func (s *store) GetChainState() (*postage.ChainState, error) { cs := &postage.ChainState{} err := s.store.Get(stateKey, cs) + if err == storage.ErrNotFound { + return &postage.ChainState{ + Block: 0, + Total: big.NewInt(0), + Price: big.NewInt(0), + }, nil + } return cs, err } diff --git a/pkg/postage/interface.go b/pkg/postage/interface.go index 1b449bb4773..c18f9a21add 100644 --- a/pkg/postage/interface.go +++ b/pkg/postage/interface.go @@ -5,6 +5,7 @@ package postage import ( + "io" "math/big" ) @@ -15,6 +16,7 @@ type EventUpdater interface { TopUp(id []byte, normalisedBalance *big.Int) error UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int) error UpdatePrice(price *big.Int) error + UpdateBlockNumber(blockNumber uint64) error } // Storer represents the persistence layer for batches on the current (highest @@ -28,5 +30,6 @@ type Storer interface { // Listener provides a blockchain event iterator. type Listener interface { + io.Closer Listen(from uint64, updater EventUpdater) } diff --git a/pkg/postage/listener/export_test.go b/pkg/postage/listener/export_test.go index 26305c47509..5c77e1f82b8 100644 --- a/pkg/postage/listener/export_test.go +++ b/pkg/postage/listener/export_test.go @@ -8,4 +8,6 @@ var ( BatchTopupTopic = batchTopupTopic BatchDepthIncreaseTopic = batchDepthIncreaseTopic PriceUpdateTopic = priceUpdateTopic + + TailSize = tailSize ) diff --git a/pkg/postage/listener/listener.go b/pkg/postage/listener/listener.go index cd7d2d36628..9e5619526f8 100644 --- a/pkg/postage/listener/listener.go +++ b/pkg/postage/listener/listener.go @@ -178,6 +178,16 @@ func (l *listener) sync(from uint64, updater postage.EventUpdater) error { return err } + if to < from { + // if the blockNumber is actually less than what we already, it might mean the backend is not synced or some reorg scenario + continue + } + + if to < tailSize { + // in a test blockchain there might be not be enough blocks yet + continue + } + // consider to-tailSize as the "latest" block we need to sync to to = to - tailSize @@ -198,6 +208,11 @@ func (l *listener) sync(from uint64, updater postage.EventUpdater) error { } } + err = updater.UpdateBlockNumber(to) + if err != nil { + return err + } + from = to + 1 } } diff --git a/pkg/postage/listener/listener_test.go b/pkg/postage/listener/listener_test.go index 184a9191fff..1b8dc9d2039 100644 --- a/pkg/postage/listener/listener_test.go +++ b/pkg/postage/listener/listener_test.go @@ -147,6 +147,8 @@ func TestListener(t *testing.T) { price: big.NewInt(500), } + blockNumber := uint64(500) + ev, evC := newEventUpdaterMock() mf := newMockFilterer( WithFilterLogEvents( @@ -155,9 +157,10 @@ func TestListener(t *testing.T) { depthIncrease.toLog(), priceUpdate.toLog(), ), + WithBlockNumber(blockNumber), ) - listener := listener.New(logging.New(ioutil.Discard, 0), mf, postageStampAddress, priceOracleAddress) - listener.Listen(0, ev) + l := listener.New(logging.New(ioutil.Discard, 0), mf, postageStampAddress, priceOracleAddress) + l.Listen(0, ev) select { case e := <-evC: @@ -186,6 +189,13 @@ func TestListener(t *testing.T) { case <-time.After(timeout): t.Fatal("timed out waiting for event") } + + select { + case e := <-evC: + e.(blockNumberCall).compare(t, blockNumber-uint64(listener.TailSize)) + case <-time.After(timeout): + t.Fatal("timed out waiting for block number update") + } }) } @@ -232,14 +242,22 @@ func (u *updater) UpdatePrice(price *big.Int) error { return nil } +func (u *updater) UpdateBlockNumber(blockNumber uint64) error { + u.eventC <- blockNumberCall{blockNumber: blockNumber} + return nil +} + type mockFilterer struct { filterLogEvents []types.Log subscriptionEvents []types.Log sub *sub + blockNumber uint64 } func newMockFilterer(opts ...Option) *mockFilterer { - mock := new(mockFilterer) + mock := &mockFilterer{ + blockNumber: uint64(listener.TailSize), // use the tailSize as blockNumber by default to ensure at least block 0 is ready + } for _, o := range opts { o.apply(mock) } @@ -252,6 +270,12 @@ func WithFilterLogEvents(events ...types.Log) Option { }) } +func WithBlockNumber(blockNumber uint64) Option { + return optionFunc(func(s *mockFilterer) { + s.blockNumber = blockNumber + }) +} + func (m *mockFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { return m.filterLogEvents, nil } @@ -271,7 +295,7 @@ func (m *mockFilterer) Close() { } func (m *mockFilterer) BlockNumber(context.Context) (uint64, error) { - return 0, nil + return m.blockNumber, nil } type sub struct { @@ -396,6 +420,16 @@ func (p priceArgs) toLog() types.Log { } } +type blockNumberCall struct { + blockNumber uint64 +} + +func (b blockNumberCall) compare(t *testing.T, want uint64) { + if b.blockNumber != want { + t.Fatalf("blockNumber mismatch. got %d want %d", b.blockNumber, want) + } +} + type Option interface { apply(*mockFilterer) } From 74a3dfe88f7c3937c2fb564af82e765f53192909 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Sat, 16 Jan 2021 14:25:41 +0100 Subject: [PATCH 2/5] add postage stamp contract address discovery --- pkg/node/node.go | 42 +++++++++++++++++++++++--------- pkg/postage/listener/listener.go | 14 +++++++++-- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index 8a74060f076..4ed5cf508d1 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -151,6 +151,12 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, return nil, err } + chainID, err := swapBackend.ChainID(p2pCtx) + if err != nil { + logger.Infof("could not connect to backend at %v. A working blockchain node (for goerli network in production) is required. Check your node or specify another node using --swap-endpoint.", o.SwapEndpoint) + return nil, fmt.Errorf("could not get chain id from ethereum backend: %w", err) + } + var chequebookService chequebook.Service var chequeStore chequebook.ChequeStore var cashoutService chequebook.CashoutService @@ -165,12 +171,6 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, return nil, err } - chainID, err := swapBackend.ChainID(p2pCtx) - if err != nil { - logger.Infof("could not connect to backend at %v. In a swap-enabled network a working blockchain node (for goerli network in production) is required. Check your node or specify another node using --swap-endpoint.", o.SwapEndpoint) - return nil, fmt.Errorf("could not get chain id from ethereum backend: %w", err) - } - var factoryAddress common.Address if o.SwapFactoryAddress == "" { var found bool @@ -219,7 +219,25 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, } batchStore := batchstore.New(stateStore) - eventListener := listener.New(logger, swapBackend, common.HexToAddress(o.PostageStampAddress), common.HexToAddress(o.PriceOracleAddress)) + + postageStampAddress, priceOracleAddress, found := listener.DiscoverAddresses(chainID.Int64()) + if o.PostageStampAddress != "" { + if !common.IsHexAddress(o.PostageStampAddress) { + return nil, errors.New("malformed postage stamp address") + } + postageStampAddress = common.HexToAddress(o.PostageStampAddress) + } + if o.PriceOracleAddress != "" { + if !common.IsHexAddress(o.PriceOracleAddress) { + return nil, errors.New("malformed price oracle address") + } + priceOracleAddress = common.HexToAddress(o.PriceOracleAddress) + } + if (o.PostageStampAddress == "" || o.PriceOracleAddress == "") && !found { + return nil, errors.New("no known postage stamp addresses for this network") + } + + eventListener := listener.New(logger, swapBackend, postageStampAddress, priceOracleAddress) b.listenerCloser = eventListener batchService, err := batchservice.New(batchStore, logger, eventListener) @@ -231,6 +249,8 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, return nil, err } + return b, nil + p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, logger, tracer, libp2p.Options{ PrivateKey: libp2pPrivateKey, NATAddr: o.NATAddr, @@ -566,6 +586,10 @@ func (b *Bee) Shutdown(ctx context.Context) error { errs.add(fmt.Errorf("tag persistence: %w", err)) } + if err := b.listenerCloser.Close(); err != nil { + errs.add(fmt.Errorf("error listener: %w", err)) + } + if err := b.stateStoreCloser.Close(); err != nil { errs.add(fmt.Errorf("statestore: %w", err)) } @@ -589,10 +613,6 @@ func (b *Bee) Shutdown(ctx context.Context) error { } } - if err := b.listenerCloser.Close(); err != nil { - errs.add(fmt.Errorf("error listener: %w", err)) - } - if errs.hasErrors() { return errs } diff --git a/pkg/postage/listener/listener.go b/pkg/postage/listener/listener.go index 9e5619526f8..040c43c6753 100644 --- a/pkg/postage/listener/listener.go +++ b/pkg/postage/listener/listener.go @@ -19,8 +19,8 @@ import ( ) const ( - blockPage = 500 // how many blocks to sync every time - tailSize = 20 // how many blocks to tail from the tip of the chain + blockPage = 50000 // how many blocks to sync every time + tailSize = 4 // how many blocks to tail from the tip of the chain ) var ( @@ -197,6 +197,7 @@ func (l *listener) sync(from uint64, updater postage.EventUpdater) error { to = from + blockPage } + //l.logger.Errorf("syncing from %d to %d", from, to) events, err := l.ev.FilterLogs(ctx, l.filterQuery(big.NewInt(int64(from)), big.NewInt(int64(to)))) if err != nil { return err @@ -265,3 +266,12 @@ type batchDepthIncreaseEvent struct { type priceUpdateEvent struct { Price *big.Int } + +// DiscoverAddresses returns the canonical contracts for this chainID +func DiscoverAddresses(chainID int64) (postageStamp common.Address, priceOracle common.Address, found bool) { + if chainID == 5 { + // goerli + return common.HexToAddress("0xf0870E3abb457026BE46d9b2CDf35e8FFcB27955"), common.HexToAddress("0xc1B598609A38D0A0F85f68eD0fFEFdeC9cD061C9"), true + } + return common.Address{}, common.Address{}, false +} From 3998f3dcf36e255c2274e2e6fc221f4122f2d264 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Sat, 16 Jan 2021 14:29:21 +0100 Subject: [PATCH 3/5] remove test code --- pkg/node/node.go | 2 -- pkg/postage/listener/listener.go | 5 ++--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index 4ed5cf508d1..c46330ed1b8 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -249,8 +249,6 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, return nil, err } - return b, nil - p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, logger, tracer, libp2p.Options{ PrivateKey: libp2pPrivateKey, NATAddr: o.NATAddr, diff --git a/pkg/postage/listener/listener.go b/pkg/postage/listener/listener.go index 040c43c6753..3d05a2fc3a9 100644 --- a/pkg/postage/listener/listener.go +++ b/pkg/postage/listener/listener.go @@ -19,8 +19,8 @@ import ( ) const ( - blockPage = 50000 // how many blocks to sync every time - tailSize = 4 // how many blocks to tail from the tip of the chain + blockPage = 500 // how many blocks to sync every time + tailSize = 4 // how many blocks to tail from the tip of the chain ) var ( @@ -197,7 +197,6 @@ func (l *listener) sync(from uint64, updater postage.EventUpdater) error { to = from + blockPage } - //l.logger.Errorf("syncing from %d to %d", from, to) events, err := l.ev.FilterLogs(ctx, l.filterQuery(big.NewInt(int64(from)), big.NewInt(int64(to)))) if err != nil { return err From c4343cb1777b857ca94718e16231c1a4703a075f Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Sat, 16 Jan 2021 14:37:39 +0100 Subject: [PATCH 4/5] fix standalone --- pkg/node/node.go | 87 ++++++++++++++++++++++++++++-------------------- 1 file changed, 51 insertions(+), 36 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index c46330ed1b8..de130e9c366 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -146,22 +146,22 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, b.stateStoreCloser = stateStore addressbook := addressbook.New(stateStore) - swapBackend, err := ethclient.Dial(o.SwapEndpoint) - if err != nil { - return nil, err - } - - chainID, err := swapBackend.ChainID(p2pCtx) - if err != nil { - logger.Infof("could not connect to backend at %v. A working blockchain node (for goerli network in production) is required. Check your node or specify another node using --swap-endpoint.", o.SwapEndpoint) - return nil, fmt.Errorf("could not get chain id from ethereum backend: %w", err) - } - var chequebookService chequebook.Service var chequeStore chequebook.ChequeStore var cashoutService chequebook.CashoutService var overlayEthAddress common.Address if o.SwapEnable { + swapBackend, err := ethclient.Dial(o.SwapEndpoint) + if err != nil { + return nil, err + } + + chainID, err := swapBackend.ChainID(p2pCtx) + if err != nil { + logger.Infof("could not connect to backend at %v. A working blockchain node (for goerli network in production) is required. Check your node or specify another node using --swap-endpoint.", o.SwapEndpoint) + return nil, fmt.Errorf("could not get chain id from ethereum backend: %w", err) + } + transactionService, err := transaction.NewService(logger, swapBackend, signer) if err != nil { return nil, err @@ -220,33 +220,46 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, batchStore := batchstore.New(stateStore) - postageStampAddress, priceOracleAddress, found := listener.DiscoverAddresses(chainID.Int64()) - if o.PostageStampAddress != "" { - if !common.IsHexAddress(o.PostageStampAddress) { - return nil, errors.New("malformed postage stamp address") + if !o.Standalone { + swapBackend, err := ethclient.Dial(o.SwapEndpoint) + if err != nil { + return nil, err } - postageStampAddress = common.HexToAddress(o.PostageStampAddress) - } - if o.PriceOracleAddress != "" { - if !common.IsHexAddress(o.PriceOracleAddress) { - return nil, errors.New("malformed price oracle address") + + chainID, err := swapBackend.ChainID(p2pCtx) + if err != nil { + logger.Infof("could not connect to backend at %v. A working blockchain node (for goerli network in production) is required. Check your node or specify another node using --swap-endpoint.", o.SwapEndpoint) + return nil, fmt.Errorf("could not get chain id from ethereum backend: %w", err) } - priceOracleAddress = common.HexToAddress(o.PriceOracleAddress) - } - if (o.PostageStampAddress == "" || o.PriceOracleAddress == "") && !found { - return nil, errors.New("no known postage stamp addresses for this network") - } - eventListener := listener.New(logger, swapBackend, postageStampAddress, priceOracleAddress) - b.listenerCloser = eventListener + postageStampAddress, priceOracleAddress, found := listener.DiscoverAddresses(chainID.Int64()) + if o.PostageStampAddress != "" { + if !common.IsHexAddress(o.PostageStampAddress) { + return nil, errors.New("malformed postage stamp address") + } + postageStampAddress = common.HexToAddress(o.PostageStampAddress) + } + if o.PriceOracleAddress != "" { + if !common.IsHexAddress(o.PriceOracleAddress) { + return nil, errors.New("malformed price oracle address") + } + priceOracleAddress = common.HexToAddress(o.PriceOracleAddress) + } + if (o.PostageStampAddress == "" || o.PriceOracleAddress == "") && !found { + return nil, errors.New("no known postage stamp addresses for this network") + } - batchService, err := batchservice.New(batchStore, logger, eventListener) - if err != nil { - return nil, err - } - err = batchService.Start() - if err != nil { - return nil, err + eventListener := listener.New(logger, swapBackend, postageStampAddress, priceOracleAddress) + b.listenerCloser = eventListener + + batchService, err := batchservice.New(batchStore, logger, eventListener) + if err != nil { + return nil, err + } + err = batchService.Start() + if err != nil { + return nil, err + } } p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, logger, tracer, libp2p.Options{ @@ -584,8 +597,10 @@ func (b *Bee) Shutdown(ctx context.Context) error { errs.add(fmt.Errorf("tag persistence: %w", err)) } - if err := b.listenerCloser.Close(); err != nil { - errs.add(fmt.Errorf("error listener: %w", err)) + if b.listenerCloser != nil { + if err := b.listenerCloser.Close(); err != nil { + errs.add(fmt.Errorf("error listener: %w", err)) + } } if err := b.stateStoreCloser.Close(); err != nil { From fb7d20abaa6652292a860b90a377eaa583d55b50 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Sat, 16 Jan 2021 14:41:56 +0100 Subject: [PATCH 5/5] reuse logger in tests --- pkg/postage/listener/listener_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/postage/listener/listener_test.go b/pkg/postage/listener/listener_test.go index 1b8dc9d2039..f91b2f69797 100644 --- a/pkg/postage/listener/listener_test.go +++ b/pkg/postage/listener/listener_test.go @@ -159,7 +159,7 @@ func TestListener(t *testing.T) { ), WithBlockNumber(blockNumber), ) - l := listener.New(logging.New(ioutil.Discard, 0), mf, postageStampAddress, priceOracleAddress) + l := listener.New(logger, mf, postageStampAddress, priceOracleAddress) l.Listen(0, ev) select {