Skip to content

Commit

Permalink
wire up postage stamp syncing
Browse files Browse the repository at this point in the history
  • Loading branch information
ralph-pichler committed Jan 15, 2021
1 parent 61ddd05 commit 54c0a11
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 18 deletions.
4 changes: 4 additions & 0 deletions cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
optionNameSwapFactoryAddress = "swap-factory-address"
optionNameSwapInitialDeposit = "swap-initial-deposit"
optionNameSwapEnable = "swap-enable"
optionNamePostageStampAddress = "postage-stamp-address"
optionNamePriceOracleAddress = "price-oracle-address"
)

func init() {
Expand Down Expand Up @@ -204,4 +206,6 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionNameSwapFactoryAddress, "", "swap factory address")
cmd.Flags().Uint64(optionNameSwapInitialDeposit, 100000000, "initial deposit if deploying a new chequebook")
cmd.Flags().Bool(optionNameSwapEnable, true, "enable swap")
cmd.Flags().String(optionNamePostageStampAddress, "", "postage stamp address")
cmd.Flags().String(optionNamePriceOracleAddress, "", "price oracle address")
}
2 changes: 2 additions & 0 deletions cmd/bee/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz
SwapFactoryAddress: c.config.GetString(optionNameSwapFactoryAddress),
SwapInitialDeposit: c.config.GetUint64(optionNameSwapInitialDeposit),
SwapEnable: c.config.GetBool(optionNameSwapEnable),
PostageStampAddress: c.config.GetString(optionNamePostageStampAddress),
PriceOracleAddress: c.config.GetString(optionNamePriceOracleAddress),
})
if err != nil {
return err
Expand Down
32 changes: 28 additions & 4 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/postage/batchservice"
"github.com/ethersphere/bee/pkg/postage/batchstore"
"github.com/ethersphere/bee/pkg/postage/listener"
"github.com/ethersphere/bee/pkg/pricing"
"github.com/ethersphere/bee/pkg/pss"
"github.com/ethersphere/bee/pkg/puller"
Expand Down Expand Up @@ -79,6 +82,7 @@ type Bee struct {
pullerCloser io.Closer
pullSyncCloser io.Closer
pssCloser io.Closer
listenerCloser io.Closer
recoveryHandleCleanup func()
}

Expand Down Expand Up @@ -109,6 +113,8 @@ type Options struct {
SwapFactoryAddress string
SwapInitialDeposit uint64
SwapEnable bool
PostageStampAddress string
PriceOracleAddress string
}

func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, signer crypto.Signer, networkID uint64, logger logging.Logger, libp2pPrivateKey, pssPrivateKey *ecdsa.PrivateKey, o Options) (*Bee, error) {
Expand Down Expand Up @@ -142,15 +148,16 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
b.stateStoreCloser = stateStore
addressbook := addressbook.New(stateStore)

swapBackend, err := ethclient.Dial(o.SwapEndpoint)
if err != nil {
return nil, err
}

var chequebookService chequebook.Service
var chequeStore chequebook.ChequeStore
var cashoutService chequebook.CashoutService
var overlayEthAddress common.Address
if o.SwapEnable {
swapBackend, err := ethclient.Dial(o.SwapEndpoint)
if err != nil {
return nil, err
}
transactionService, err := transaction.NewService(logger, swapBackend, signer)
if err != nil {
return nil, err
Expand Down Expand Up @@ -213,6 +220,19 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
}
}

batchStore := batchstore.New(stateStore)
eventListener := listener.New(logger, swapBackend, common.HexToAddress(o.PostageStampAddress), common.HexToAddress(o.PriceOracleAddress))
b.listenerCloser = eventListener

batchService, err := batchservice.New(batchStore, logger, eventListener)
if err != nil {
return nil, err
}
err = batchService.Start()
if err != nil {
return nil, err
}

p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, logger, tracer, libp2p.Options{
PrivateKey: libp2pPrivateKey,
NATAddr: o.NATAddr,
Expand Down Expand Up @@ -575,6 +595,10 @@ func (b *Bee) Shutdown(ctx context.Context) error {
}
}

if err := b.listenerCloser.Close(); err != nil {
errs.add(fmt.Errorf("error listener: %w", err))
}

if errs.hasErrors() {
return errs
}
Expand Down
40 changes: 33 additions & 7 deletions pkg/postage/batchservice/batchservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,42 @@ import (
)

type batchService struct {
cs *postage.ChainState
storer postage.Storer
logger logging.Logger
cs *postage.ChainState
storer postage.Storer
logger logging.Logger
listener postage.Listener
}

type Interface interface {
postage.EventUpdater
Start() error
}

// New will create a new BatchService.
func New(storer postage.Storer, logger logging.Logger) (postage.EventUpdater, error) {
func New(storer postage.Storer, logger logging.Logger, listener postage.Listener) (Interface, error) {
b := &batchService{
storer: storer,
logger: logger,
storer: storer,
logger: logger,
listener: listener,
}

cs, err := storer.GetChainState()
if err != nil {
return nil, fmt.Errorf("get chain state: %w", err)
}
b.cs = cs

return b, nil
}

func (svc *batchService) Start() error {
cs, err := svc.storer.GetChainState()
if err != nil {
return fmt.Errorf("get chain state: %w", err)
}
svc.listener.Listen(cs.Block+1, svc)
return nil
}

// Create will create a new batch with the given ID, owner value and depth and
// stores it in the BatchStore.
func (svc *batchService) Create(id, owner []byte, normalisedBalance *big.Int, depth uint8) error {
Expand Down Expand Up @@ -106,3 +121,14 @@ func (svc *batchService) UpdatePrice(price *big.Int) error {
svc.logger.Debugf("updated chain price to %s", svc.cs.Price)
return nil
}

func (svc *batchService) UpdateBlockNumber(blockNumber uint64) error {
svc.cs.Block = blockNumber

if err := svc.storer.PutChainState(svc.cs); err != nil {
return fmt.Errorf("put chain state: %w", err)
}

svc.logger.Debugf("updated block height to %d", svc.cs.Block)
return nil
}
16 changes: 13 additions & 3 deletions pkg/postage/batchservice/batchservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,22 @@ var (
testErr = errors.New("fails")
)

type mockListener struct {
}

func (*mockListener) Listen(from uint64, updater postage.EventUpdater) {}
func (*mockListener) Close() error { return nil }

func newMockListener() *mockListener {
return &mockListener{}
}

func TestNewBatchService(t *testing.T) {
t.Run("expect get error", func(t *testing.T) {
store := mock.New(
mock.WithGetErr(testErr, 0),
)
_, err := batchservice.New(store, testLog)
_, err := batchservice.New(store, testLog, newMockListener())
if err == nil {
t.Fatal("expected get error")
}
Expand All @@ -39,7 +49,7 @@ func TestNewBatchService(t *testing.T) {
store := mock.New(
mock.WithChainState(testChainState),
)
_, err := batchservice.New(store, testLog)
_, err := batchservice.New(store, testLog, newMockListener())
if err != nil {
t.Fatalf("new batch service: %v", err)
}
Expand Down Expand Up @@ -245,7 +255,7 @@ func newTestStoreAndService(t *testing.T, opts ...mock.Option) (postage.EventUpd
t.Helper()

store := mock.New(opts...)
svc, err := batchservice.New(store, testLog)
svc, err := batchservice.New(store, testLog, newMockListener())
if err != nil {
t.Fatalf("new batch service: %v", err)
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/postage/batchstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package batchstore

import (
"math/big"

"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/storage"
)
Expand Down Expand Up @@ -47,6 +49,13 @@ func (s *store) PutChainState(state *postage.ChainState) error {
func (s *store) GetChainState() (*postage.ChainState, error) {
cs := &postage.ChainState{}
err := s.store.Get(stateKey, cs)
if err == storage.ErrNotFound {
return &postage.ChainState{
Block: 0,
Total: big.NewInt(0),
Price: big.NewInt(0),
}, nil
}

return cs, err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/postage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package postage

import (
"io"
"math/big"
)

Expand All @@ -15,6 +16,7 @@ type EventUpdater interface {
TopUp(id []byte, normalisedBalance *big.Int) error
UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int) error
UpdatePrice(price *big.Int) error
UpdateBlockNumber(blockNumber uint64) error
}

// Storer represents the persistence layer for batches on the current (highest
Expand All @@ -28,5 +30,6 @@ type Storer interface {

// Listener provides a blockchain event iterator.
type Listener interface {
io.Closer
Listen(from uint64, updater EventUpdater)
}
2 changes: 2 additions & 0 deletions pkg/postage/listener/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ var (
BatchTopupTopic = batchTopupTopic
BatchDepthIncreaseTopic = batchDepthIncreaseTopic
PriceUpdateTopic = priceUpdateTopic

TailSize = tailSize
)
15 changes: 15 additions & 0 deletions pkg/postage/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,16 @@ func (l *listener) sync(from uint64, updater postage.EventUpdater) error {
return err
}

if to < from {
// if the blockNumber is actually less than what we already, it might mean the backend is not synced or some reorg scenario
continue
}

if to < tailSize {
// in a test blockchain there might be not be enough blocks yet
continue
}

// consider to-tailSize as the "latest" block we need to sync to
to = to - tailSize

Expand All @@ -198,6 +208,11 @@ func (l *listener) sync(from uint64, updater postage.EventUpdater) error {
}
}

err = updater.UpdateBlockNumber(to)
if err != nil {
return err
}

from = to + 1
}
}
Expand Down
42 changes: 38 additions & 4 deletions pkg/postage/listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ func TestListener(t *testing.T) {
price: big.NewInt(500),
}

blockNumber := uint64(500)

ev, evC := newEventUpdaterMock()
mf := newMockFilterer(
WithFilterLogEvents(
Expand All @@ -155,9 +157,10 @@ func TestListener(t *testing.T) {
depthIncrease.toLog(),
priceUpdate.toLog(),
),
WithBlockNumber(blockNumber),
)
listener := listener.New(logging.New(ioutil.Discard, 0), mf, postageStampAddress, priceOracleAddress)
listener.Listen(0, ev)
l := listener.New(logging.New(ioutil.Discard, 0), mf, postageStampAddress, priceOracleAddress)
l.Listen(0, ev)

select {
case e := <-evC:
Expand Down Expand Up @@ -186,6 +189,13 @@ func TestListener(t *testing.T) {
case <-time.After(timeout):
t.Fatal("timed out waiting for event")
}

select {
case e := <-evC:
e.(blockNumberCall).compare(t, blockNumber-uint64(listener.TailSize))
case <-time.After(timeout):
t.Fatal("timed out waiting for block number update")
}
})
}

Expand Down Expand Up @@ -232,14 +242,22 @@ func (u *updater) UpdatePrice(price *big.Int) error {
return nil
}

func (u *updater) UpdateBlockNumber(blockNumber uint64) error {
u.eventC <- blockNumberCall{blockNumber: blockNumber}
return nil
}

type mockFilterer struct {
filterLogEvents []types.Log
subscriptionEvents []types.Log
sub *sub
blockNumber uint64
}

func newMockFilterer(opts ...Option) *mockFilterer {
mock := new(mockFilterer)
mock := &mockFilterer{
blockNumber: uint64(listener.TailSize), // use the tailSize as blockNumber by default to ensure at least block 0 is ready
}
for _, o := range opts {
o.apply(mock)
}
Expand All @@ -252,6 +270,12 @@ func WithFilterLogEvents(events ...types.Log) Option {
})
}

func WithBlockNumber(blockNumber uint64) Option {
return optionFunc(func(s *mockFilterer) {
s.blockNumber = blockNumber
})
}

func (m *mockFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) {
return m.filterLogEvents, nil
}
Expand All @@ -271,7 +295,7 @@ func (m *mockFilterer) Close() {
}

func (m *mockFilterer) BlockNumber(context.Context) (uint64, error) {
return 0, nil
return m.blockNumber, nil
}

type sub struct {
Expand Down Expand Up @@ -396,6 +420,16 @@ func (p priceArgs) toLog() types.Log {
}
}

type blockNumberCall struct {
blockNumber uint64
}

func (b blockNumberCall) compare(t *testing.T, want uint64) {
if b.blockNumber != want {
t.Fatalf("blockNumber mismatch. got %d want %d", b.blockNumber, want)
}
}

type Option interface {
apply(*mockFilterer)
}
Expand Down

0 comments on commit 54c0a11

Please sign in to comment.