Skip to content

Commit

Permalink
postage/batchstore: initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
zelig committed Nov 12, 2020
1 parent 46f3a5f commit 4ad4230
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 0 deletions.
55 changes: 55 additions & 0 deletions pkg/postage/batchstore/state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package batchstore

import (
"encoding"
"encoding/binary"
"math/big"

"github.com/ethersphere/bee/pkg/storage"
)

var stateKey = "stateKey"

// state implements BinaryMarshaler interface
var _ encoding.BinaryMarshaler = (*state)(nil)

// state represents the current state of the reserve
type state struct {
block uint64 // the block number of the last postage event
total *big.Int // cumulative amount paid per stamp
price *big.Int // bzz/chunk/block normalised price
}

// MarshalBinary serialises the state to be used by the state store
func (st *state) MarshalBinary() ([]byte, error) {
buf := make([]byte, 9)
binary.BigEndian.PutUint64(buf, st.block)
totalBytes := st.total.Bytes()
buf[8] = uint8(len(totalBytes))
buf = append(buf, totalBytes...)
return append(buf, st.price.Bytes()...), nil
}

// UnmarshalBinary deserialises the state to be used by the state store
func (st *state) UnmarshalBinary(buf []byte) error {
st.block = binary.BigEndian.Uint64(buf[:8])
totalLen := int(buf[8])
st.total = new(big.Int).SetBytes(buf[9 : 9+totalLen])
st.price = new(big.Int).SetBytes(buf[9+totalLen:])
return nil
}

// loads the state from statestore, initialises if not found
func (st *state) load(store storage.StateStorer) error {
err := store.Get(stateKey, st)
if err == storage.ErrNotFound {
st.total = big.NewInt(0)
st.price = big.NewInt(0)
return nil
}
return err
}

func (st *state) save(store storage.StateStorer) error {
return store.Put(stateKey, st)
}
159 changes: 159 additions & 0 deletions pkg/postage/batchstore/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package batchstore

import (
"math/big"
"sync"

"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/storage"
)

var (
batchKeyPrefix = "batchKeyPrefix"
valueKeyPrefix = "valueKeyPrefix"
)

var _ postage.EventUpdater = (*Store)(nil)

// Store is a local store for postage batches
type Store struct {
store storage.StateStorer // state store backend to persist batches
mu sync.Mutex // mutex to lock statestore during atomic changes
cancel func() // cancel sync and wait till done
state *state // the current state
logger logging.Logger
}

// New constructs a new postage batch store
func New(store storage.StateStorer, events postage.Events, logger logging.Logger) (*Store, error) {
// initialise state from statestore or start with 0-s
st := &state{}
if err := st.load(store); err != nil {
return nil, err
}
s := &Store{
store: store,
logger: logger,
}
s.cancel = events.Each(st.block, s.update)
return s, nil
}

// Close quits the sync routine and closes the statestore
func (s *Store) Close() error {
s.cancel()
return s.store.Close()
}

// update wraps around the update call for the specific event and
// abstracts the process shared across events
// - lock
// - settle = update cumulative outpayment normalised
// - update specific to event
// - save state
// - unlock
// this is the function that is given to the listener iterator when synchronising
func (s *Store) update(block uint64, ev postage.Event) error {
s.mu.Lock()
defer s.mu.Unlock()
s.settle(block)
if err := ev.Update(s); err != nil {
return err
}
return s.state.save(s.store)
}

// settle retrieves the current state
// - sets the cumulative outpayment normalised, cno+=price*period
// - sets the new block number
// caller holds the store mutex
func (s *Store) settle(block uint64) {
period := int64(block - s.state.block)
s.state.block = block
s.state.total.Add(s.state.total, new(big.Int).Mul(s.state.price, big.NewInt(period)))
}

//
func (s *Store) balance(b *postage.Batch, add *big.Int) (*big.Int, error) {
return nil, nil
}

// batchKey returns the index key for the batch ID used in the by-ID batch index
func batchKey(id []byte) string {
return batchKeyPrefix + string(id)
}

// valueKey returns the index key for the batch value used in the by-value (priority) batch index
func valueKey(v *big.Int) string {
key := make([]byte, 32)
value := v.Bytes()
copy(key[32-len(value):], value)
return valueKeyPrefix + string(key)
}

func (s *Store) get(id []byte) (*postage.Batch, error) {
b := &postage.Batch{}
err := s.store.Get(batchKey(id), b)
return b, err
}

func (s *Store) put(b *postage.Batch) error {
return s.store.Put(batchKey(b.ID), b)
}

func (s *Store) replace(id []byte, oldValue, newValue *big.Int) error {
err := s.store.Delete(valueKey(oldValue))
if err != nil {
return err
}
return s.store.Put(valueKey(newValue), id)
}

func (s *Store) Create(id []byte, owner []byte, amount *big.Int, depth uint8) error {
b := &postage.Batch{
ID: id,
Start: s.state.block,
Owner: owner,
Depth: depth,
}
value, err := s.balance(b, amount)
if err != nil {
return err
}
err = s.replace(id, b.Value, value)
if err != nil {
return err
}
return s.put(b)
}

func (s *Store) TopUp(id []byte, amount *big.Int) error {
b, err := s.get(id)
if err != nil {
return err
}
value, err := s.balance(b, amount)
if err != nil {
return err
}
err = s.replace(id, b.Value, value)
if err != nil {
return err
}
return s.put(b)
}

func (s *Store) UpdateDepth(id []byte, depth uint8) error {
b, err := s.get(id)
if err != nil {
return err
}
b.Depth = depth
return s.put(b)
}

func (s *Store) UpdatePrice(price *big.Int) error {
s.state.price = price
return nil
}
41 changes: 41 additions & 0 deletions pkg/postage/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package postage

import (
"math/big"

"github.com/ethereum/go-ethereum/core/types"
)

// EventUpdater interface definitions reflect the updates triggered by events emitted by
// the postage contract on the blockchain
type EventUpdater interface {
Create(id []byte, owner []byte, amount *big.Int, depth uint8) error
TopUp(id []byte, amount *big.Int) error
UpdateDepth(id []byte, depth uint8) error
UpdatePrice(price *big.Int) error
}

// Event is the interface subsuming all postage contract blockchain events
//
// postage contract event | golang Event | Update call on EventUpdater
// ------------------------+---------------------------+---------------------------
// BatchCreated | batchCreatedEvent | Create
// BatchTopUp | batchTopUpEvent | TopUp
// BatchDepthIncrease | batchDepthIncreaseEvent | UpdateDepth
// PriceUpdate | priceUpdateEvent | UpdatePrice
type Event interface {
Update(s EventUpdater) error
}

// Events provides an iterator for postage events
type Events interface {
Each(from uint64, update func(block uint64, ev Event) error) func()
}

// Listener provides a blockchain event iterator
type Listener interface {
// - it starts at block from
// - it terminates with no error when quit channel is closed
// - if the update function returns an error, the call returns with that error
Listen(from uint64, quit chan struct{}, update func(types.Log) error) error
}

0 comments on commit 4ad4230

Please sign in to comment.