Skip to content

Commit

Permalink
batchstore: document tests and cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
acud committed Mar 26, 2021
1 parent f652df3 commit a572579
Show file tree
Hide file tree
Showing 7 changed files with 773 additions and 79 deletions.
3 changes: 3 additions & 0 deletions pkg/postage/batchservice/batchservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ func (svc *batchService) UpdatePrice(price *big.Int) error {

func (svc *batchService) UpdateBlockNumber(blockNumber uint64) error {
cs := svc.storer.GetChainState()
diff := big.NewInt(0).SetUint64(blockNumber - cs.Block)

cs.Total.Add(cs.Total, diff.Mul(diff, cs.Price))
cs.Block = blockNumber
if err := svc.storer.PutChainState(cs); err != nil {
return fmt.Errorf("put chain state: %w", err)
Expand Down
22 changes: 22 additions & 0 deletions pkg/postage/batchservice/batchservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,28 @@ func TestBatchServiceUpdatePrice(t *testing.T) {
}
})
}
func TestBatchServiceUpdateBlockNumber(t *testing.T) {
testChainState := &postage.ChainState{
Block: 1,
Price: big.NewInt(100),
Total: big.NewInt(100),
}
svc, batchStore := newTestStoreAndService(
mock.WithChainState(testChainState),
)

// advance the block number and expect total cumulative payout to update
nextBlock := uint64(4)

if err := svc.UpdateBlockNumber(nextBlock); err != nil {
t.Fatalf("update price: %v", err)
}
nn := big.NewInt(400)
cs := batchStore.GetChainState()
if cs.Total.Cmp(nn) != 0 {
t.Fatalf("bad price: want %v, got %v", nn, cs.Total)
}
}

func newTestStoreAndService(opts ...mock.Option) (postage.EventUpdater, postage.Storer) {
store := mock.New(opts...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/postage/batchstore/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func IterateAll(bs postage.Storer, f func(b *postage.Batch) (bool, error)) error
// GetReserve extracts the inner limit and depth of reserve
func GetReserve(si postage.Storer) (*big.Int, uint8) {
s, _ := si.(*store)
return s.rs.Inner, s.rs.Depth
return s.rs.Inner, s.rs.Radius
}

func (s *store) String() string {
Expand Down
137 changes: 95 additions & 42 deletions pkg/postage/batchstore/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,44 @@ var ErrBatchNotFound = errors.New("postage batch not found or expired")
// DefaultDepth is the initial depth for the reserve
const DefaultDepth = 5

// Capacity = number of chunks in reserve. `2^23` was chosen to remain
// relatively near the current 5M chunks ~25GB
// Capacity is the number of chunks in reserve. `2^23` (8388608) was chosen to remain
// relatively near the current 5M chunks ~25GB.
// Utilization is estimated at 50%-60%, which should result in about 4~5mil chunks in reserve.
var Capacity = exp2(23)

var big1 = big.NewInt(1)

// reserveState records the state and is persisted in the state store
type reserveState struct {
Depth uint8 `json:"depth"` // Radius of responsibility
Capacity int64 `json:"capacity"` // size of the reserve = number of chunks
Outer *big.Int `json:"outer"` // lower value limit for outer layer = the further half of chunks
Inner *big.Int `json:"inner"` // lower value limit for inner layer = the closer half of chunks
// Radius is the radius of responsibility,
// it defines the proximity order of chunks which we
// would like to guarantee that all chunks are stored
Radius uint8 `json:"radius"`
// Available capacity of the reserve which can still be used.
Available int64 `json:"available"`
Outer *big.Int `json:"outer"` // lower value limit for outer layer = the further half of chunks
Inner *big.Int `json:"inner"` // lower value limit for inner layer = the closer half of chunks
}

// unreserve is called when the batchstore decides not to reserve a batch on a PO
// i.e. chunk of the batch in bins [0 upto PO] (closed interval) are unreserved
// i.e. chunk of the batch in bins [0 upto PO] (closed interval) are unreserved
func (s *store) unreserve(b *postage.Batch, radius uint8) error {
return s.unreserveFunc(b.ID, radius)
}

// evictExpired is called when PutChainState is called (and there is 'settlement')
func (s *store) evictExpired() error {
var toDelete [][]byte

// set until to total or inner whichever is greater
until := new(big.Int)

// if inner > 0 && total >= inner
if s.rs.Inner.Cmp(big.NewInt(0)) > 0 && s.cs.Total.Cmp(s.rs.Inner) >= 0 {
// collect until total+1
until.Add(s.cs.Total, big1)
} else {
// collect until inner (collect all outer ones)
until.Set(s.rs.Inner)
}
var multiplier int64
Expand All @@ -78,24 +88,28 @@ func (s *store) evictExpired() error {
if err != nil {
return true, err
}

// if batch value >= until then continue to next.
// terminate iteration if until is passed
if b.Value.Cmp(until) >= 0 {
return true, nil
}
//
// if multiplier == 0 && batch value >= inner
if multiplier == 0 && b.Value.Cmp(s.rs.Inner) >= 0 {
multiplier = 1
}
//
// if multiplier == 1 && batch value >= outer
if multiplier == 1 && b.Value.Cmp(s.rs.Outer) >= 0 {
multiplier = 2
}

// unreserve batch fully
err = s.unreserve(b, swarm.MaxPO)
if err != nil {
return true, err
}
s.rs.Capacity += multiplier * exp2(b.Depth-s.rs.Depth-1)
s.rs.Available += multiplier * exp2(b.Radius-s.rs.Radius-1)

// if batch has no value then delete it
if b.Value.Cmp(s.cs.Total) <= 0 {
toDelete = append(toDelete, b.ID)
Expand All @@ -107,6 +121,8 @@ func (s *store) evictExpired() error {
}
// set inner/outer to total if total is greater
s.rs.Inner.Set(until)

// if outer < until
if s.rs.Outer.Cmp(until) < 0 {
s.rs.Outer.Set(until)
}
Expand All @@ -117,73 +133,111 @@ func (s *store) evictExpired() error {
}

// tier represents the sections of the reserve that can be described as value intervals
// 0 - out of reserve
// 0 - out of the reserve
// 1 - within reserve radius = depth (inner half)
// 2 - within reserve radius = depth-1 (both inner and outer halves)
type tier int

const (
none tier = iota // out of the reserve
half // the mid range where chunks are kept within depth
full // top range where chunks are kept within depth - 1
unreserved tier = iota // out of the reserve
inner // the mid range where chunks are kept within depth
outer // top range where chunks are kept within depth - 1
)

// change calculates info relevant to the value change from old to new value and old and new depth
// returns the change in capacity and the radius of reserve
func (rs *reserveState) change(oldv, newv *big.Int, oldDepth, newDepth uint8) (capacityChange int64, reserveRadius uint8) {
was := rs.tier(oldv)
is := rs.setLimits(newv, rs.tier(newv))
capacityChange = int64(was)*exp2(oldDepth-rs.Depth-1) - int64(is)*exp2(newDepth-rs.Depth-1)
reserveRadius = rs.radius(is)
return
func (rs *reserveState) change(oldv, newv *big.Int, oldDepth, newDepth uint8) (int64, uint8) {
oldTier := rs.tier(oldv)
newTier := rs.setLimits(newv, rs.tier(newv))

oldSize := rs.size(oldDepth, oldTier)
newSize := rs.size(newDepth, newTier)

availableCapacityChange := oldSize - newSize
reserveRadius := rs.radius(newTier)

return availableCapacityChange, reserveRadius
}

// size returns the number of chunks the local node is responsible
// to store in its reserve.
func (rs *reserveState) size(depth uint8, t tier) int64 {
size := exp2(depth - rs.Radius - 1)
switch t {
case inner:
return size
case outer:
return 2 * size
default:
// case is unreserved
return 0
}
}

// tier returns which tier a value falls into
func (rs *reserveState) tier(x *big.Int) tier {

// x < rs.Inner || x == 0
if x.Cmp(rs.Inner) < 0 || rs.Inner.Cmp(big.NewInt(0)) == 0 {
return none
return unreserved
}

// x < rs.Outer
if x.Cmp(rs.Outer) < 0 {
return half
return inner
}
return full

// x >= rs.Outer
return outer
}

// radius returns the reserve radius of a batch given the depth (radius of responsibility)
// based on the tier it falls in
func (rs *reserveState) radius(t tier) uint8 {
switch t {
case none:
case unreserved:
return swarm.MaxPO
case half:
return rs.Depth
default: // full
return rs.Depth - 1
case inner:
return rs.Radius
default:
// outer
return rs.Radius - 1
}
}

// setLimits sets the tier 1 value limit, if new item is the minimum so far (or the very first batch)
func (rs *reserveState) setLimits(val *big.Int, was tier) (is tier) {
if was != none {
return was
// returns the adjusted new tier
func (rs *reserveState) setLimits(val *big.Int, newTier tier) tier {
if newTier != unreserved {
return newTier
}

// if we're here it means that the new tier
// falls under the unreserved tier
var adjustedTier tier

// rs.Inner == 0 || rs.Inner > val
if rs.Inner.Cmp(big.NewInt(0)) == 0 || rs.Inner.Cmp(val) > 0 {
is = half
adjustedTier = inner
// if the outer is the same as the inner
if rs.Outer.Cmp(rs.Inner) == 0 {
// the value falls below inner and outer
rs.Outer.Set(val)
is = full
adjustedTier = outer
}
// inner is decreased to val, this is done when the
// batch is diluted, decreasing the value of it.
rs.Inner.Set(val)
}
return is
return adjustedTier
}

// update manages what chunks of which batch are allocated to the reserve
func (s *store) update(b *postage.Batch, oldDepth uint8, oldValue *big.Int) error {
newValue := b.Value
newDepth := b.Depth
capacityChange, reserveRadius := s.rs.change(oldValue, newValue, oldDepth, newDepth)
s.rs.Capacity += capacityChange
s.rs.Available += capacityChange
if err := s.unreserve(b, reserveRadius); err != nil {
return err
}
Expand All @@ -193,7 +247,7 @@ func (s *store) update(b *postage.Batch, oldDepth uint8, oldValue *big.Int) erro
// evictOuter is responsible for keeping capacity positive by unreserving lowest priority batches
func (s *store) evictOuter(last *postage.Batch) error {
// if capacity is positive nothing to evict
if s.rs.Capacity >= 0 {
if s.rs.Available >= 0 {
return nil
}
err := s.store.Iterate(valueKeyPrefix, func(key, _ []byte) (bool, error) {
Expand All @@ -212,14 +266,13 @@ func (s *store) evictOuter(last *postage.Batch) error {
return false, nil
}
// stop iteration only if we consumed all batches of the same value as the one that put capacity above zero
if s.rs.Capacity >= 0 && s.rs.Outer.Cmp(b.Value) != 0 {
if s.rs.Available >= 0 && s.rs.Outer.Cmp(b.Value) != 0 {
return true, nil
}
//
// unreserve outer PO of the lowest priority batch until capacity is back to positive
s.rs.Capacity += exp2(b.Depth - s.rs.Depth - 1)
s.rs.Available += exp2(b.Depth - s.rs.Radius - 1)
s.rs.Outer.Set(b.Value)
return false, s.unreserve(b, s.rs.Depth)
return false, s.unreserve(b, s.rs.Radius)
})
if err != nil {
return err
Expand All @@ -228,8 +281,8 @@ func (s *store) evictOuter(last *postage.Batch) error {
s.rs.Outer.Add(s.rs.Outer, big1)
// if we consumed all batches, ie. we unreserved all chunks on the outer = depth PO
// then its time to increase depth
if s.rs.Capacity < 0 {
s.rs.Depth++
if s.rs.Available < 0 {
s.rs.Radius++
s.rs.Outer.Set(s.rs.Inner) // reset outer limit to inner limit
return s.evictOuter(last)
}
Expand Down
Loading

0 comments on commit a572579

Please sign in to comment.