From e4824768b7db2fa54aceaa0bfeb5e84c4ba27bb6 Mon Sep 17 00:00:00 2001 From: acud <12988138+acud@users.noreply.github.com> Date: Tue, 27 Apr 2021 06:58:21 +0200 Subject: [PATCH] node: wait till synced with postage contract before starting node (#1603) --- pkg/node/node.go | 11 +- pkg/postage/batchservice/batchservice.go | 4 +- pkg/postage/batchservice/batchservice_test.go | 4 +- pkg/postage/interface.go | 4 +- pkg/postage/listener/listener.go | 133 ++++++++++-------- pkg/postage/listener/listener_test.go | 2 +- 6 files changed, 88 insertions(+), 70 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index 18d8c8fa2e0..c9a61627720 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -392,7 +392,16 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, hive.SetAddPeersHandler(kad.AddPeers) p2ps.SetPickyNotifier(kad) batchStore.SetRadiusSetter(kad) - batchSvc.Start() + syncedChan := batchSvc.Start() + + // wait for the postage contract listener to sync + logger.Info("waiting to sync postage contract data, this may take a while... more info available in Debug loglevel") + + // arguably this is not a very nice solution since we dont support + // interrupts at this stage of the application lifecycle. some changes + // would be needed on the cmd level to support context cancellation at + // this stage + <-syncedChan paymentThreshold, ok := new(big.Int).SetString(o.PaymentThreshold, 10) if !ok { diff --git a/pkg/postage/batchservice/batchservice.go b/pkg/postage/batchservice/batchservice.go index 6f5575ac1b0..e13606f40c3 100644 --- a/pkg/postage/batchservice/batchservice.go +++ b/pkg/postage/batchservice/batchservice.go @@ -108,7 +108,7 @@ func (svc *batchService) UpdateBlockNumber(blockNumber uint64) error { return nil } -func (svc *batchService) Start() { +func (svc *batchService) Start() <-chan struct{} { cs := svc.storer.GetChainState() - svc.listener.Listen(cs.Block+1, svc) + return svc.listener.Listen(cs.Block+1, svc) } diff --git a/pkg/postage/batchservice/batchservice_test.go b/pkg/postage/batchservice/batchservice_test.go index a554f4e4cd8..9d23947da33 100644 --- a/pkg/postage/batchservice/batchservice_test.go +++ b/pkg/postage/batchservice/batchservice_test.go @@ -26,8 +26,8 @@ var ( type mockListener struct { } -func (*mockListener) Listen(from uint64, updater postage.EventUpdater) {} -func (*mockListener) Close() error { return nil } +func (*mockListener) Listen(from uint64, updater postage.EventUpdater) <-chan struct{} { return nil } +func (*mockListener) Close() error { return nil } func newMockListener() *mockListener { return &mockListener{} diff --git a/pkg/postage/interface.go b/pkg/postage/interface.go index f833227041d..0dc13a952c6 100644 --- a/pkg/postage/interface.go +++ b/pkg/postage/interface.go @@ -17,7 +17,7 @@ type EventUpdater interface { UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int) error UpdatePrice(price *big.Int) error UpdateBlockNumber(blockNumber uint64) error - Start() + Start() <-chan struct{} } // Storer represents the persistence layer for batches on the current (highest @@ -38,5 +38,5 @@ type RadiusSetter interface { // Listener provides a blockchain event iterator. type Listener interface { io.Closer - Listen(from uint64, updater EventUpdater) + Listen(from uint64, updater EventUpdater) <-chan struct{} } diff --git a/pkg/postage/listener/listener.go b/pkg/postage/listener/listener.go index be0cdca6db3..993c1f928c0 100644 --- a/pkg/postage/listener/listener.go +++ b/pkg/postage/listener/listener.go @@ -54,9 +54,8 @@ type listener struct { postageStampAddress common.Address priceOracleAddress common.Address - - quit chan struct{} - wg sync.WaitGroup + quit chan struct{} + wg sync.WaitGroup } func New( @@ -71,23 +70,10 @@ func New( postageStampAddress: postageStampAddress, priceOracleAddress: priceOracleAddress, - - quit: make(chan struct{}), + quit: make(chan struct{}), } } -func (l *listener) Listen(from uint64, updater postage.EventUpdater) { - l.wg.Add(1) - - go func() { - defer l.wg.Done() - err := l.sync(from, updater) - if err != nil { - l.logger.Errorf("event listener sync: %v", err) - } - }() -} - func (l *listener) filterQuery(from, to *big.Int) ethereum.FilterQuery { return ethereum.FilterQuery{ FromBlock: from, @@ -156,64 +142,87 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error } } -func (l *listener) sync(from uint64, updater postage.EventUpdater) error { - ctx := context.Background() +func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan struct{} { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-l.quit + cancel() + }() + + synced := make(chan struct{}) + closeOnce := new(sync.Once) paged := make(chan struct{}, 1) paged <- struct{}{} - for { - select { - case <-paged: - // if we paged then it means there's more things to sync on - case <-time.After(chainUpdateInterval): - case <-l.quit: - return nil - } - to, err := l.ev.BlockNumber(ctx) - if err != nil { - return err - } - if to < from { - // if the blockNumber is actually less than what we already, it might mean the backend is not synced or some reorg scenario - continue - } + l.wg.Add(1) + listenf := func() error { + defer l.wg.Done() + for { + select { + case <-paged: + // if we paged then it means there's more things to sync on + case <-time.After(chainUpdateInterval): + case <-l.quit: + return nil + } + to, err := l.ev.BlockNumber(ctx) + if err != nil { + return err + } - if to < tailSize { - // in a test blockchain there might be not be enough blocks yet - continue - } + if to < tailSize { + // in a test blockchain there might be not be enough blocks yet + continue + } - // consider to-tailSize as the "latest" block we need to sync to - to = to - tailSize + // consider to-tailSize as the "latest" block we need to sync to + to = to - tailSize - // do some paging (sub-optimal) - if to-from > blockPage { - paged <- struct{}{} - to = from + blockPage - } + if to < from { + // if the blockNumber is actually less than what we already, it might mean the backend is not synced or some reorg scenario + continue + } - events, err := l.ev.FilterLogs(ctx, l.filterQuery(big.NewInt(int64(from)), big.NewInt(int64(to)))) - if err != nil { - return err - } + // do some paging (sub-optimal) + if to-from > blockPage { + paged <- struct{}{} + to = from + blockPage + } else { + closeOnce.Do(func() { close(synced) }) + } - // this is called before processing the events - // so that the eviction in batchstore gets the correct - // block height context for the gc round. otherwise - // expired batches might be "revived". - err = updater.UpdateBlockNumber(to) - if err != nil { - return err - } + events, err := l.ev.FilterLogs(ctx, l.filterQuery(big.NewInt(int64(from)), big.NewInt(int64(to)))) + if err != nil { + return err + } - for _, e := range events { - if err = l.processEvent(e, updater); err != nil { + // this is called before processing the events + // so that the eviction in batchstore gets the correct + // block height context for the gc round. otherwise + // expired batches might be "revived". + err = updater.UpdateBlockNumber(to) + if err != nil { return err } - } - from = to + 1 + for _, e := range events { + if err = l.processEvent(e, updater); err != nil { + return err + } + } + + from = to + 1 + } } + + go func() { + err := listenf() + if err != nil { + l.logger.Errorf("event listener sync: %v", err) + } + }() + + return synced } func (l *listener) Close() error { diff --git a/pkg/postage/listener/listener_test.go b/pkg/postage/listener/listener_test.go index 9f811fbce6b..e0696f0b25e 100644 --- a/pkg/postage/listener/listener_test.go +++ b/pkg/postage/listener/listener_test.go @@ -279,7 +279,7 @@ func (u *updater) UpdateBlockNumber(blockNumber uint64) error { return nil } -func (u *updater) Start() {} +func (u *updater) Start() <-chan struct{} { return nil } type mockFilterer struct { filterLogEvents []types.Log