Skip to content

Commit

Permalink
node: wait till synced with postage contract before starting node (#1603
Browse files Browse the repository at this point in the history
)
  • Loading branch information
acud committed Apr 27, 2021
1 parent 0e029e1 commit e482476
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 70 deletions.
11 changes: 10 additions & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,16 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
hive.SetAddPeersHandler(kad.AddPeers)
p2ps.SetPickyNotifier(kad)
batchStore.SetRadiusSetter(kad)
batchSvc.Start()
syncedChan := batchSvc.Start()

// wait for the postage contract listener to sync
logger.Info("waiting to sync postage contract data, this may take a while... more info available in Debug loglevel")

// arguably this is not a very nice solution since we dont support
// interrupts at this stage of the application lifecycle. some changes
// would be needed on the cmd level to support context cancellation at
// this stage
<-syncedChan

paymentThreshold, ok := new(big.Int).SetString(o.PaymentThreshold, 10)
if !ok {
Expand Down
4 changes: 2 additions & 2 deletions pkg/postage/batchservice/batchservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (svc *batchService) UpdateBlockNumber(blockNumber uint64) error {
return nil
}

func (svc *batchService) Start() {
func (svc *batchService) Start() <-chan struct{} {
cs := svc.storer.GetChainState()
svc.listener.Listen(cs.Block+1, svc)
return svc.listener.Listen(cs.Block+1, svc)
}
4 changes: 2 additions & 2 deletions pkg/postage/batchservice/batchservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ var (
type mockListener struct {
}

func (*mockListener) Listen(from uint64, updater postage.EventUpdater) {}
func (*mockListener) Close() error { return nil }
func (*mockListener) Listen(from uint64, updater postage.EventUpdater) <-chan struct{} { return nil }
func (*mockListener) Close() error { return nil }

func newMockListener() *mockListener {
return &mockListener{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/postage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type EventUpdater interface {
UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int) error
UpdatePrice(price *big.Int) error
UpdateBlockNumber(blockNumber uint64) error
Start()
Start() <-chan struct{}
}

// Storer represents the persistence layer for batches on the current (highest
Expand All @@ -38,5 +38,5 @@ type RadiusSetter interface {
// Listener provides a blockchain event iterator.
type Listener interface {
io.Closer
Listen(from uint64, updater EventUpdater)
Listen(from uint64, updater EventUpdater) <-chan struct{}
}
133 changes: 71 additions & 62 deletions pkg/postage/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ type listener struct {

postageStampAddress common.Address
priceOracleAddress common.Address

quit chan struct{}
wg sync.WaitGroup
quit chan struct{}
wg sync.WaitGroup
}

func New(
Expand All @@ -71,23 +70,10 @@ func New(

postageStampAddress: postageStampAddress,
priceOracleAddress: priceOracleAddress,

quit: make(chan struct{}),
quit: make(chan struct{}),
}
}

func (l *listener) Listen(from uint64, updater postage.EventUpdater) {
l.wg.Add(1)

go func() {
defer l.wg.Done()
err := l.sync(from, updater)
if err != nil {
l.logger.Errorf("event listener sync: %v", err)
}
}()
}

func (l *listener) filterQuery(from, to *big.Int) ethereum.FilterQuery {
return ethereum.FilterQuery{
FromBlock: from,
Expand Down Expand Up @@ -156,64 +142,87 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error
}
}

func (l *listener) sync(from uint64, updater postage.EventUpdater) error {
ctx := context.Background()
func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan struct{} {
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-l.quit
cancel()
}()

synced := make(chan struct{})
closeOnce := new(sync.Once)
paged := make(chan struct{}, 1)
paged <- struct{}{}
for {
select {
case <-paged:
// if we paged then it means there's more things to sync on
case <-time.After(chainUpdateInterval):
case <-l.quit:
return nil
}
to, err := l.ev.BlockNumber(ctx)
if err != nil {
return err
}

if to < from {
// if the blockNumber is actually less than what we already, it might mean the backend is not synced or some reorg scenario
continue
}
l.wg.Add(1)
listenf := func() error {
defer l.wg.Done()
for {
select {
case <-paged:
// if we paged then it means there's more things to sync on
case <-time.After(chainUpdateInterval):
case <-l.quit:
return nil
}
to, err := l.ev.BlockNumber(ctx)
if err != nil {
return err
}

if to < tailSize {
// in a test blockchain there might be not be enough blocks yet
continue
}
if to < tailSize {
// in a test blockchain there might be not be enough blocks yet
continue
}

// consider to-tailSize as the "latest" block we need to sync to
to = to - tailSize
// consider to-tailSize as the "latest" block we need to sync to
to = to - tailSize

// do some paging (sub-optimal)
if to-from > blockPage {
paged <- struct{}{}
to = from + blockPage
}
if to < from {
// if the blockNumber is actually less than what we already, it might mean the backend is not synced or some reorg scenario
continue
}

events, err := l.ev.FilterLogs(ctx, l.filterQuery(big.NewInt(int64(from)), big.NewInt(int64(to))))
if err != nil {
return err
}
// do some paging (sub-optimal)
if to-from > blockPage {
paged <- struct{}{}
to = from + blockPage
} else {
closeOnce.Do(func() { close(synced) })
}

// this is called before processing the events
// so that the eviction in batchstore gets the correct
// block height context for the gc round. otherwise
// expired batches might be "revived".
err = updater.UpdateBlockNumber(to)
if err != nil {
return err
}
events, err := l.ev.FilterLogs(ctx, l.filterQuery(big.NewInt(int64(from)), big.NewInt(int64(to))))
if err != nil {
return err
}

for _, e := range events {
if err = l.processEvent(e, updater); err != nil {
// this is called before processing the events
// so that the eviction in batchstore gets the correct
// block height context for the gc round. otherwise
// expired batches might be "revived".
err = updater.UpdateBlockNumber(to)
if err != nil {
return err
}
}

from = to + 1
for _, e := range events {
if err = l.processEvent(e, updater); err != nil {
return err
}
}

from = to + 1
}
}

go func() {
err := listenf()
if err != nil {
l.logger.Errorf("event listener sync: %v", err)
}
}()

return synced
}

func (l *listener) Close() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/postage/listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (u *updater) UpdateBlockNumber(blockNumber uint64) error {
return nil
}

func (u *updater) Start() {}
func (u *updater) Start() <-chan struct{} { return nil }

type mockFilterer struct {
filterLogEvents []types.Log
Expand Down

0 comments on commit e482476

Please sign in to comment.