From 7e9a8c82e833f284e94d7802be4acae7da0a7841 Mon Sep 17 00:00:00 2001 From: zelig Date: Sat, 7 Nov 2020 09:50:25 +0100 Subject: [PATCH] postage/batchstore: initial commit --- pkg/postage/batchstore/state.go | 55 ++++++++++ pkg/postage/batchstore/store.go | 180 ++++++++++++++++++++++++++++++++ pkg/postage/interface.go | 41 ++++++++ 3 files changed, 276 insertions(+) create mode 100644 pkg/postage/batchstore/state.go create mode 100644 pkg/postage/batchstore/store.go create mode 100644 pkg/postage/interface.go diff --git a/pkg/postage/batchstore/state.go b/pkg/postage/batchstore/state.go new file mode 100644 index 00000000000..9231df5375e --- /dev/null +++ b/pkg/postage/batchstore/state.go @@ -0,0 +1,55 @@ +package batchstore + +import ( + "encoding" + "encoding/binary" + "math/big" + + "github.com/ethersphere/bee/pkg/storage" +) + +var stateKey = "stateKey" + +// state implements BinaryMarshaler interface +var _ encoding.BinaryMarshaler = (*state)(nil) + +// state represents the current state of the reserve +type state struct { + block uint64 // the block number of the last postage event + total *big.Int // cumulative amount paid per stamp + price *big.Int // bzz/chunk/block normalised price +} + +// MarshalBinary serialises the state to be used by the state store +func (st *state) MarshalBinary() ([]byte, error) { + buf := make([]byte, 9) + binary.BigEndian.PutUint64(buf, st.block) + totalBytes := st.total.Bytes() + buf[8] = uint8(len(totalBytes)) + buf = append(buf, totalBytes...) + return append(buf, st.price.Bytes()...), nil +} + +// UnmarshalBinary deserialises the state to be used by the state store +func (st *state) UnmarshalBinary(buf []byte) error { + st.block = binary.BigEndian.Uint64(buf[:8]) + totalLen := int(buf[8]) + st.total = new(big.Int).SetBytes(buf[9 : 9+totalLen]) + st.price = new(big.Int).SetBytes(buf[9+totalLen:]) + return nil +} + +// loads the state from statestore, initialises if not found +func (st *state) load(store storage.StateStorer) error { + err := store.Get(stateKey, st) + if err == storage.ErrNotFound { + st.total = big.NewInt(0) + st.price = big.NewInt(0) + return nil + } + return err +} + +func (st *state) save(store storage.StateStorer) error { + return store.Put(stateKey, st) +} diff --git a/pkg/postage/batchstore/store.go b/pkg/postage/batchstore/store.go new file mode 100644 index 00000000000..56b6ca8103a --- /dev/null +++ b/pkg/postage/batchstore/store.go @@ -0,0 +1,180 @@ +package batchstore + +import ( + "math/big" + "sync" + + "github.com/ethersphere/bee/pkg/logging" + "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/storage" +) + +var ( + batchKeyPrefix = "batchKeyPrefix" + valueKeyPrefix = "valueKeyPrefix" +) + +var _ postage.EventUpdater = (*Store)(nil) + +// Store is a local store for postage batches +type Store struct { + store storage.StateStorer // state store backend to persist batches + mu sync.Mutex // mutex to lock statestore during atomic changes + cancel func() // cancel sync and wait till done + state *state // the current state + logger logging.Logger +} + +// New constructs a new postage batch store +func New(store storage.StateStorer, events postage.Events, logger logging.Logger) (*Store, error) { + // initialise state from statestore or start with 0-s + st := &state{} + if err := st.load(store); err != nil { + return nil, err + } + s := &Store{ + store: store, + logger: logger, + } + s.cancel = events.Each(st.block, s.update) + return s, nil +} + + +// Close quits the sync routine and closes the statestore +func (s *Store) Close() error { + s.cancel() + return s.store.Close() +} + + +// Sync starts the forever loop that keeps the batch Store in sync with the blockchain +// takes a postage.Listener interface as argument and uses it as an iterator +func (s *Store) sync(lis postage.Listener) { + stop, errs := events.Sync(lis, s.state.block, s.update) + quit := make(chan struct{}) + s.cancel = func() { + close(quit) + stop() + <-errs + } + go func() { + select { + case <-quit: + case err := <-errs: + s.logger.Error("error syncing: %v", err) + } + } +} + +// update wraps around the update call for the specific event and +// abstracts the process shared across events +// - lock +// - settle = update cumulative outpayment normalised +// - update specific to event +// - save state +// - unlock +// this is the function that is given to the listener iterator when synchronising +func (s *Store) update(block uint64, ev postage.Event) error { + s.mu.Lock() + defer s.mu.Unlock() + s.settle(block) + if err := ev.Update(s); err != nil { + return err + } + return s.state.save(s.store) +} + +// settle retrieves the current state +// - sets the cumulative outpayment normalised, cno+=price*period +// - sets the new block number +// caller holds the store mutex +func (s *Store) settle(block uint64) { + period := int64(block - s.state.block) + s.state.block = block + s.state.total.Add(s.state.total, new(big.Int).Mul(s.state.price, big.NewInt(period))) +} + +// +func (s *Store) balance(b *postage.Batch, add *big.Int) (*big.Int, error) { + return nil, nil +} + +// batchKey returns the index key for the batch ID used in the by-ID batch index +func batchKey(id []byte) string { + return batchKeyPrefix + string(id) +} + +// valueKey returns the index key for the batch value used in the by-value (priority) batch index +func valueKey(v *big.Int) string { + key := make([]byte, 32) + value := v.Bytes() + copy(key[32-len(value):], value) + return valueKeyPrefix + string(key) +} + +func (s *Store) get(id []byte) (*postage.Batch, error) { + b := &postage.Batch{} + err := s.store.Get(batchKey(id), b) + return b, err +} + +func (s *Store) put(b *postage.Batch) error { + return s.store.Put(batchKey(b.ID), b) +} + +func (s *Store) replace(id []byte, oldValue, newValue *big.Int) error { + err := s.store.Delete(valueKey(oldValue)) + if err != nil { + return err + } + return s.store.Put(valueKey(newValue), id) +} + +func (s *Store) Create(id []byte, owner []byte, amount *big.Int, depth uint8) error { + b := &postage.Batch{ + ID: id, + Start: s.state.block, + Owner: owner, + Depth: depth, + } + value, err := s.balance(b, amount) + if err != nil { + return err + } + err = s.replace(id, b.Value, value) + if err != nil { + return err + } + return s.put(b) +} + +func (s *Store) TopUp(id []byte, amount *big.Int) error { + b, err := s.get(id) + if err != nil { + return err + } + value, err := s.balance(b, amount) + if err != nil { + return err + } + err = s.replace(id, b.Value, value) + if err != nil { + return err + } + return s.put(b) +} + +func (s *Store) UpdateDepth(id []byte, depth uint8) error { + b, err := s.get(id) + if err != nil { + return err + } + b.Depth = depth + return s.put(b) +} + +func (s *Store) UpdatePrice(price *big.Int) error { + s.state.price = price + return nil +} \ No newline at end of file diff --git a/pkg/postage/interface.go b/pkg/postage/interface.go new file mode 100644 index 00000000000..3b12fb82fbc --- /dev/null +++ b/pkg/postage/interface.go @@ -0,0 +1,41 @@ +package postage + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/core/types" +) + +// EventUpdater interface definitions reflect the updates triggered by events emitted by +// the postage contract on the blockchain +type EventUpdater interface { + Create(id []byte, owner []byte, amount *big.Int, depth uint8) error + TopUp(id []byte, amount *big.Int) error + UpdateDepth(id []byte, depth uint8) error + UpdatePrice(price *big.Int) error +} + +// Event is the interface subsuming all postage contract blockchain events +// +// postage contract event | golang Event | Update call on EventUpdater +// ------------------------+---------------------------+--------------------------- +// BatchCreated | batchCreatedEvent | Create +// BatchTopUp | batchTopUpEvent | TopUp +// BatchDepthIncrease | batchDepthIncreaseEvent | UpdateDepth +// PriceUpdate | priceUpdateEvent | UpdatePrice +type Event interface { + Update(s EventUpdater) error +} + +// Events provides an iterator for postage events +type Events interface { + Each(from uint64, update func(block uint64, ev Event) error) func() +} + +// Listener provides a blockchain event iterator +type Listener interface { + // - it starts at block from + // - it terminates with no error when quit channel is closed + // - if the update function returns an error, the call returns with that error + Listen(from uint64, quit chan struct{}, update func(types.Log) error) error +}