Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wire up postage stamp syncing #1114

Merged
merged 5 commits into from
Jan 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
optionNameSwapFactoryAddress = "swap-factory-address"
optionNameSwapInitialDeposit = "swap-initial-deposit"
optionNameSwapEnable = "swap-enable"
optionNamePostageStampAddress = "postage-stamp-address"
optionNamePriceOracleAddress = "price-oracle-address"
)

func init() {
Expand Down Expand Up @@ -204,4 +206,6 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionNameSwapFactoryAddress, "", "swap factory address")
cmd.Flags().Uint64(optionNameSwapInitialDeposit, 100000000, "initial deposit if deploying a new chequebook")
cmd.Flags().Bool(optionNameSwapEnable, true, "enable swap")
cmd.Flags().String(optionNamePostageStampAddress, "", "postage stamp address")
cmd.Flags().String(optionNamePriceOracleAddress, "", "price oracle address")
}
2 changes: 2 additions & 0 deletions cmd/bee/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz
SwapFactoryAddress: c.config.GetString(optionNameSwapFactoryAddress),
SwapInitialDeposit: c.config.GetUint64(optionNameSwapInitialDeposit),
SwapEnable: c.config.GetBool(optionNameSwapEnable),
PostageStampAddress: c.config.GetString(optionNamePostageStampAddress),
PriceOracleAddress: c.config.GetString(optionNamePriceOracleAddress),
})
if err != nil {
return err
Expand Down
69 changes: 63 additions & 6 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/postage/batchservice"
"github.com/ethersphere/bee/pkg/postage/batchstore"
"github.com/ethersphere/bee/pkg/postage/listener"
"github.com/ethersphere/bee/pkg/pricing"
"github.com/ethersphere/bee/pkg/pss"
"github.com/ethersphere/bee/pkg/puller"
Expand Down Expand Up @@ -77,6 +80,7 @@ type Bee struct {
pullerCloser io.Closer
pullSyncCloser io.Closer
pssCloser io.Closer
listenerCloser io.Closer
recoveryHandleCleanup func()
}

Expand Down Expand Up @@ -107,6 +111,8 @@ type Options struct {
SwapFactoryAddress string
SwapInitialDeposit uint64
SwapEnable bool
PostageStampAddress string
PriceOracleAddress string
}

func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, signer crypto.Signer, networkID uint64, logger logging.Logger, libp2pPrivateKey, pssPrivateKey *ecdsa.PrivateKey, o Options) (*Bee, error) {
Expand Down Expand Up @@ -149,6 +155,13 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
if err != nil {
return nil, err
}

chainID, err := swapBackend.ChainID(p2pCtx)
if err != nil {
logger.Infof("could not connect to backend at %v. A working blockchain node (for goerli network in production) is required. Check your node or specify another node using --swap-endpoint.", o.SwapEndpoint)
return nil, fmt.Errorf("could not get chain id from ethereum backend: %w", err)
}

transactionService, err := transaction.NewService(logger, swapBackend, signer)
if err != nil {
return nil, err
Expand All @@ -158,12 +171,6 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
return nil, err
}

chainID, err := swapBackend.ChainID(p2pCtx)
if err != nil {
logger.Infof("could not connect to backend at %v. In a swap-enabled network a working blockchain node (for goerli network in production) is required. Check your node or specify another node using --swap-endpoint.", o.SwapEndpoint)
return nil, fmt.Errorf("could not get chain id from ethereum backend: %w", err)
}

var factoryAddress common.Address
if o.SwapFactoryAddress == "" {
var found bool
Expand Down Expand Up @@ -211,6 +218,50 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
}
}

batchStore := batchstore.New(stateStore)

if !o.Standalone {
swapBackend, err := ethclient.Dial(o.SwapEndpoint)
if err != nil {
return nil, err
}

chainID, err := swapBackend.ChainID(p2pCtx)
if err != nil {
logger.Infof("could not connect to backend at %v. A working blockchain node (for goerli network in production) is required. Check your node or specify another node using --swap-endpoint.", o.SwapEndpoint)
return nil, fmt.Errorf("could not get chain id from ethereum backend: %w", err)
}

postageStampAddress, priceOracleAddress, found := listener.DiscoverAddresses(chainID.Int64())
if o.PostageStampAddress != "" {
if !common.IsHexAddress(o.PostageStampAddress) {
return nil, errors.New("malformed postage stamp address")
}
postageStampAddress = common.HexToAddress(o.PostageStampAddress)
}
if o.PriceOracleAddress != "" {
if !common.IsHexAddress(o.PriceOracleAddress) {
return nil, errors.New("malformed price oracle address")
}
priceOracleAddress = common.HexToAddress(o.PriceOracleAddress)
}
if (o.PostageStampAddress == "" || o.PriceOracleAddress == "") && !found {
return nil, errors.New("no known postage stamp addresses for this network")
}

eventListener := listener.New(logger, swapBackend, postageStampAddress, priceOracleAddress)
b.listenerCloser = eventListener

batchService, err := batchservice.New(batchStore, logger, eventListener)
if err != nil {
return nil, err
}
err = batchService.Start()
if err != nil {
return nil, err
}
}

p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, logger, tracer, libp2p.Options{
PrivateKey: libp2pPrivateKey,
NATAddr: o.NATAddr,
Expand Down Expand Up @@ -546,6 +597,12 @@ func (b *Bee) Shutdown(ctx context.Context) error {
errs.add(fmt.Errorf("tag persistence: %w", err))
}

if b.listenerCloser != nil {
if err := b.listenerCloser.Close(); err != nil {
errs.add(fmt.Errorf("error listener: %w", err))
}
}

if err := b.stateStoreCloser.Close(); err != nil {
errs.add(fmt.Errorf("statestore: %w", err))
}
Expand Down
40 changes: 33 additions & 7 deletions pkg/postage/batchservice/batchservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,42 @@ import (
)

type batchService struct {
cs *postage.ChainState
storer postage.Storer
logger logging.Logger
cs *postage.ChainState
storer postage.Storer
logger logging.Logger
listener postage.Listener
}

type Interface interface {
postage.EventUpdater
Start() error
}

// New will create a new BatchService.
func New(storer postage.Storer, logger logging.Logger) (postage.EventUpdater, error) {
func New(storer postage.Storer, logger logging.Logger, listener postage.Listener) (Interface, error) {
b := &batchService{
storer: storer,
logger: logger,
storer: storer,
logger: logger,
listener: listener,
}

cs, err := storer.GetChainState()
if err != nil {
return nil, fmt.Errorf("get chain state: %w", err)
}
b.cs = cs

return b, nil
}

func (svc *batchService) Start() error {
cs, err := svc.storer.GetChainState()
if err != nil {
return fmt.Errorf("get chain state: %w", err)
}
svc.listener.Listen(cs.Block+1, svc)
return nil
}

// Create will create a new batch with the given ID, owner value and depth and
// stores it in the BatchStore.
func (svc *batchService) Create(id, owner []byte, normalisedBalance *big.Int, depth uint8) error {
Expand Down Expand Up @@ -106,3 +121,14 @@ func (svc *batchService) UpdatePrice(price *big.Int) error {
svc.logger.Debugf("updated chain price to %s", svc.cs.Price)
return nil
}

func (svc *batchService) UpdateBlockNumber(blockNumber uint64) error {
svc.cs.Block = blockNumber

if err := svc.storer.PutChainState(svc.cs); err != nil {
return fmt.Errorf("put chain state: %w", err)
}

svc.logger.Debugf("updated block height to %d", svc.cs.Block)
return nil
}
16 changes: 13 additions & 3 deletions pkg/postage/batchservice/batchservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,22 @@ var (
testErr = errors.New("fails")
)

type mockListener struct {
}

func (*mockListener) Listen(from uint64, updater postage.EventUpdater) {}
func (*mockListener) Close() error { return nil }

func newMockListener() *mockListener {
return &mockListener{}
}

func TestNewBatchService(t *testing.T) {
t.Run("expect get error", func(t *testing.T) {
store := mock.New(
mock.WithGetErr(testErr, 0),
)
_, err := batchservice.New(store, testLog)
_, err := batchservice.New(store, testLog, newMockListener())
if err == nil {
t.Fatal("expected get error")
}
Expand All @@ -39,7 +49,7 @@ func TestNewBatchService(t *testing.T) {
store := mock.New(
mock.WithChainState(testChainState),
)
_, err := batchservice.New(store, testLog)
_, err := batchservice.New(store, testLog, newMockListener())
if err != nil {
t.Fatalf("new batch service: %v", err)
}
Expand Down Expand Up @@ -245,7 +255,7 @@ func newTestStoreAndService(t *testing.T, opts ...mock.Option) (postage.EventUpd
t.Helper()

store := mock.New(opts...)
svc, err := batchservice.New(store, testLog)
svc, err := batchservice.New(store, testLog, newMockListener())
if err != nil {
t.Fatalf("new batch service: %v", err)
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/postage/batchstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package batchstore

import (
"math/big"

"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/storage"
)
Expand Down Expand Up @@ -47,6 +49,13 @@ func (s *store) PutChainState(state *postage.ChainState) error {
func (s *store) GetChainState() (*postage.ChainState, error) {
cs := &postage.ChainState{}
err := s.store.Get(stateKey, cs)
if err == storage.ErrNotFound {
return &postage.ChainState{
Block: 0,
Total: big.NewInt(0),
Price: big.NewInt(0),
}, nil
}

return cs, err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/postage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package postage

import (
"io"
"math/big"
)

Expand All @@ -15,6 +16,7 @@ type EventUpdater interface {
TopUp(id []byte, normalisedBalance *big.Int) error
UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int) error
UpdatePrice(price *big.Int) error
UpdateBlockNumber(blockNumber uint64) error
}

// Storer represents the persistence layer for batches on the current (highest
Expand All @@ -28,5 +30,6 @@ type Storer interface {

// Listener provides a blockchain event iterator.
type Listener interface {
io.Closer
Listen(from uint64, updater EventUpdater)
}
2 changes: 2 additions & 0 deletions pkg/postage/listener/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ var (
BatchTopupTopic = batchTopupTopic
BatchDepthIncreaseTopic = batchDepthIncreaseTopic
PriceUpdateTopic = priceUpdateTopic

TailSize = tailSize
)
26 changes: 25 additions & 1 deletion pkg/postage/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

const (
blockPage = 500 // how many blocks to sync every time
tailSize = 20 // how many blocks to tail from the tip of the chain
tailSize = 4 // how many blocks to tail from the tip of the chain
)

var (
Expand Down Expand Up @@ -178,6 +178,16 @@ func (l *listener) sync(from uint64, updater postage.EventUpdater) error {
return err
}

if to < from {
// if the blockNumber is actually less than what we already, it might mean the backend is not synced or some reorg scenario
continue
}

if to < tailSize {
// in a test blockchain there might be not be enough blocks yet
continue
}

// consider to-tailSize as the "latest" block we need to sync to
to = to - tailSize

Expand All @@ -198,6 +208,11 @@ func (l *listener) sync(from uint64, updater postage.EventUpdater) error {
}
}

err = updater.UpdateBlockNumber(to)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that moving the block height to the actual individual event signatures on EventUpdater and using the Log.BlockNumber (and thus updating the chain state on every write in batch service) would be more consistent. Then we can avoid this method altogether. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the advantage of this approach is that the block number is only updated after the entire block has been processed. whereas if its on individual events it's already updated after the first event. if the bee node stops/crashes mid-processing a block it will reprocess some events but that should be fine as all of them should be idempotent (at least better than skipping anything).

if err != nil {
return err
}

from = to + 1
}
}
Expand Down Expand Up @@ -250,3 +265,12 @@ type batchDepthIncreaseEvent struct {
type priceUpdateEvent struct {
Price *big.Int
}

// DiscoverAddresses returns the canonical contracts for this chainID
func DiscoverAddresses(chainID int64) (postageStamp common.Address, priceOracle common.Address, found bool) {
if chainID == 5 {
// goerli
return common.HexToAddress("0xf0870E3abb457026BE46d9b2CDf35e8FFcB27955"), common.HexToAddress("0xc1B598609A38D0A0F85f68eD0fFEFdeC9cD061C9"), true
}
return common.Address{}, common.Address{}, false
}
Loading