Skip to content

Commit

Permalink
batchstore, kademlia: add reserve radius as depth upper bound
Browse files Browse the repository at this point in the history
  • Loading branch information
acud committed Apr 23, 2021
1 parent f6d94b2 commit d7b2cd2
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 10 deletions.
43 changes: 36 additions & 7 deletions pkg/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Kad struct {
knownPeers *pslice.PSlice // both are po aware slice of addresses
bootnodes []ma.Multiaddr
depth uint8 // current neighborhood depth
radius uint8 // storage area of responsibility
depthMu sync.RWMutex // protect depth changes
manageC chan struct{} // trigger the manage forever loop to connect to new peers
waitNext map[string]retryInfo // sanction connections to a peer, key is overlay string and value is a retry information
Expand All @@ -87,7 +88,12 @@ type retryInfo struct {
}

// New returns a new Kademlia.
func New(base swarm.Address, addressbook addressbook.Interface, discovery discovery.Driver, p2p p2p.Service, logger logging.Logger, o Options) *Kad {
func New(base swarm.Address,
addressbook addressbook.Interface,
discovery discovery.Driver,
p2p p2p.Service,
logger logging.Logger,
o Options) *Kad {
if o.SaturationFunc == nil {
o.SaturationFunc = binSaturated
}
Expand Down Expand Up @@ -340,7 +346,7 @@ func (k *Kad) manage() {
k.connectedPeers.Add(peer, po)

k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers)
k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock()

k.logger.Debugf("connected to peer: %s for bin: %d", peer, i)
Expand Down Expand Up @@ -421,7 +427,7 @@ func (k *Kad) manage() {
k.connectedPeers.Add(peer, po)

k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers)
k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock()

k.logger.Debugf("connected to peer: %s old depth: %d new depth: %d", peer, currentDepth, k.NeighborhoodDepth())
Expand Down Expand Up @@ -518,7 +524,7 @@ func (k *Kad) connectBootnodes(ctx context.Context) {
// when a bin is not saturated it means we would like to proactively
// initiate connections to other peers in the bin.
func binSaturated(bin uint8, peers, connected *pslice.PSlice) (bool, bool) {
potentialDepth := recalcDepth(peers)
potentialDepth := recalcDepth(peers, swarm.MaxPO)

// short circuit for bins which are >= depth
if bin >= potentialDepth {
Expand All @@ -544,7 +550,7 @@ func binSaturated(bin uint8, peers, connected *pslice.PSlice) (bool, bool) {
}

// recalcDepth calculates and returns the kademlia depth.
func recalcDepth(peers *pslice.PSlice) uint8 {
func recalcDepth(peers *pslice.PSlice, radius uint8) uint8 {
// handle edge case separately
if peers.Length() <= nnLowWatermark {
return 0
Expand Down Expand Up @@ -590,9 +596,15 @@ func recalcDepth(peers *pslice.PSlice) uint8 {
return false, false, nil
})
if shallowestUnsaturated > candidate {
if radius < candidate {
return radius
}
return candidate
}

if radius < shallowestUnsaturated {
return radius
}
return shallowestUnsaturated
}

Expand Down Expand Up @@ -761,7 +773,7 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
k.waitNextMu.Unlock()

k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers)
k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock()

k.notifyPeerSig()
Expand All @@ -779,7 +791,7 @@ func (k *Kad) Disconnected(peer p2p.Peer) {
k.waitNextMu.Unlock()

k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers)
k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock()

select {
Expand Down Expand Up @@ -1018,6 +1030,23 @@ func (k *Kad) IsBalanced(bin uint8) bool {
return true
}

func (k *Kad) SetRadius(r uint8) {
k.depthMu.Lock()
defer k.depthMu.Unlock()
if k.radius == r {
return
}
k.radius = r
oldD := k.depth
k.depth = recalcDepth(k.connectedPeers, k.radius)
if k.depth != oldD {
select {
case k.manageC <- struct{}{}:
default:
}
}
}

func (k *Kad) Snapshot() *topology.KadParams {
var infos []topology.BinInfo
for i := int(swarm.MaxPO); i >= 0; i-- {
Expand Down
21 changes: 20 additions & 1 deletion pkg/kademlia/kademlia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func TestNeighborhoodDepth(t *testing.T) {
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{})
)

kad.SetRadius(swarm.MaxPO) // initial tests do not check for radius

if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -112,8 +114,14 @@ func TestNeighborhoodDepth(t *testing.T) {
// depth is 7 because bin 7 is unsaturated (1 peer)
kDepth(t, kad, 7)

// expect shallow peers not in depth
// set the radius to be lower than unsaturated, expect radius as depth
kad.SetRadius(6)
kDepth(t, kad, 6)

// set the radius to MaxPO again so that intermediate checks can run
kad.SetRadius(swarm.MaxPO)

// expect shallow peers not in depth
for _, a := range shallowPeers {
if kad.IsWithinDepth(a) {
t.Fatal("expected address to outside of depth")
Expand Down Expand Up @@ -142,6 +150,13 @@ func TestNeighborhoodDepth(t *testing.T) {
waitConn(t, &conns)
kDepth(t, kad, 8)

// again set radius to lower value, expect that as depth
kad.SetRadius(5)
kDepth(t, kad, 5)

// reset radius to MaxPO for the rest of the checks
kad.SetRadius(swarm.MaxPO)

var addrs []swarm.Address
// fill the rest up to the bin before last and check that everything works at the edges
for i := 9; i < int(swarm.MaxBins); i++ {
Expand Down Expand Up @@ -304,6 +319,8 @@ func TestManageWithBalancing(t *testing.T) {
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{SaturationFunc: saturationFunc, BitSuffixLength: 2})
)

kad.SetRadius(swarm.MaxPO) // don't use radius for checks

// implement satiration function (while having access to Kademlia instance)
sfImpl := func(bin uint8, peers, connected *pslice.PSlice) (bool, bool) {
return kad.IsBalanced(bin), false
Expand Down Expand Up @@ -419,6 +436,7 @@ func TestOversaturation(t *testing.T) {
conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{})
)
kad.SetRadius(swarm.MaxPO) // don't use radius for checks

if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -474,6 +492,7 @@ func TestOversaturationBootnode(t *testing.T) {
conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{BootnodeMode: true})
)
kad.SetRadius(swarm.MaxPO) // don't use radius for checks

if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
Expand Down
1 change: 1 addition & 0 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
b.topologyCloser = kad
hive.SetAddPeersHandler(kad.AddPeers)
p2ps.SetPickyNotifier(kad)
batchStore.SetRadiusSetter(kad)

paymentThreshold, ok := new(big.Int).SetString(o.PaymentThreshold, 10)
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions pkg/postage/batchstore/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,7 @@ func (bs *BatchStore) PutChainState(cs *postage.ChainState) error {
func (bs *BatchStore) GetReserveState() *postage.Reservestate {
return bs.rs
}

func (bs *BatchStore) SetRadiusSetter(r postage.RadiusSetter) {
panic("not implemented")
}
7 changes: 7 additions & 0 deletions pkg/postage/batchstore/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestBatchStoreUnreserveEvents(t *testing.T) {
batchstore.Capacity = batchstore.Exp2(16)

bStore, unreserved := setupBatchStore(t)
bStore.SetRadiusSetter(noopRadiusSetter{})
batches := make(map[string]*postage.Batch)

t.Run("new batches only", func(t *testing.T) {
Expand Down Expand Up @@ -139,6 +140,7 @@ func TestBatchStoreUnreserveAll(t *testing.T) {
batchstore.Capacity = batchstore.Exp2(16)

bStore, unreserved := setupBatchStore(t)
bStore.SetRadiusSetter(noopRadiusSetter{})
var batches [][]byte
// iterate starting from batchstore.DefaultDepth to maxPO
_, depth := batchstore.GetReserve(bStore)
Expand Down Expand Up @@ -211,6 +213,7 @@ func setupBatchStore(t *testing.T) (postage.Storer, map[string]uint8) {
return nil
}
bStore, _ := batchstore.New(stateStore, unreserveFunc)
bStore.SetRadiusSetter(noopRadiusSetter{})

// initialise chainstate
err = bStore.PutChainState(&postage.ChainState{
Expand Down Expand Up @@ -535,6 +538,7 @@ func TestBatchStore_Unreserve(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
store, unreserved := setupBatchStore(t)
store.SetRadiusSetter(noopRadiusSetter{})
batches := addBatch(t, store,
depthValue(initBatchDepth, 3),
depthValue(initBatchDepth, 4),
Expand Down Expand Up @@ -651,6 +655,7 @@ func TestBatchStore_Topup(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
store, unreserved := setupBatchStore(t)
store.SetRadiusSetter(noopRadiusSetter{})
batches := addBatch(t, store,
depthValue(initBatchDepth, 2),
depthValue(initBatchDepth, 3),
Expand Down Expand Up @@ -771,6 +776,7 @@ func TestBatchStore_Dilution(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
store, unreserved := setupBatchStore(t)
store.SetRadiusSetter(noopRadiusSetter{})
batches := addBatch(t, store,
depthValue(initBatchDepth, 2),
depthValue(initBatchDepth, 3),
Expand Down Expand Up @@ -799,6 +805,7 @@ func TestBatchStore_EvictExpired(t *testing.T) {
initBatchDepth := uint8(8)

store, unreserved := setupBatchStore(t)
store.SetRadiusSetter(noopRadiusSetter{})
batches := addBatch(t, store,
depthValue(initBatchDepth, 2),
depthValue(initBatchDepth, 3),
Expand Down
23 changes: 22 additions & 1 deletion pkg/postage/batchstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type store struct {
rs *reserveState // the reserve state
unreserveFunc unreserveFn // unreserve function
metrics metrics // metrics

radiusSetter postage.RadiusSetter // setter for radius notifications
//radiusSetterMu sync.Mutex
}

// New constructs a new postage batch store.
Expand Down Expand Up @@ -58,7 +61,15 @@ func New(st storage.StateStorer, unreserveFunc unreserveFn) (postage.Storer, err
Available: Capacity,
}
}
return &store{st, cs, rs, unreserveFunc, newMetrics()}, nil
s := &store{
store: st,
cs: cs,
rs: rs,
unreserveFunc: unreserveFunc,
metrics: newMetrics(),
}

return s, nil
}

func (s *store) GetReserveState() *postage.Reservestate {
Expand Down Expand Up @@ -99,6 +110,8 @@ func (s *store) Put(b *postage.Batch, value *big.Int, depth uint8) error {
if err != nil {
return err
}
s.radiusSetter.SetRadius(s.rs.Radius)

return s.store.Put(batchKey(b.ID), b)
}

Expand Down Expand Up @@ -130,6 +143,8 @@ func (s *store) PutChainState(cs *postage.ChainState) error {
if err != nil {
return err
}
s.radiusSetter.SetRadius(s.rs.Radius)

return s.store.Put(chainStateKey, cs)
}

Expand All @@ -139,6 +154,12 @@ func (s *store) GetChainState() *postage.ChainState {
return s.cs
}

func (s *store) SetRadiusSetter(r postage.RadiusSetter) {
//s.radiusSetterMu.Lock()
s.radiusSetter = r
//s.radiusSetterMu.Unlock()
}

// batchKey returns the index key for the batch ID used in the by-ID batch index.
func batchKey(id []byte) string {
return batchKeyPrefix + string(id)
Expand Down
8 changes: 7 additions & 1 deletion pkg/postage/batchstore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestBatchStorePut(t *testing.T) {

stateStore := mock.NewStateStore()
batchStore, _ := batchstore.New(stateStore, unreserve)

batchStore.SetRadiusSetter(noopRadiusSetter{})
batchStorePutBatch(t, batchStore, testBatch)

var got postage.Batch
Expand All @@ -46,6 +46,7 @@ func TestBatchStoreGetChainState(t *testing.T) {

stateStore := mock.NewStateStore()
batchStore, _ := batchstore.New(stateStore, nil)
batchStore.SetRadiusSetter(noopRadiusSetter{})

err := batchStore.PutChainState(testChainState)
if err != nil {
Expand All @@ -60,6 +61,7 @@ func TestBatchStorePutChainState(t *testing.T) {

stateStore := mock.NewStateStore()
batchStore, _ := batchstore.New(stateStore, nil)
batchStore.SetRadiusSetter(noopRadiusSetter{})

batchStorePutChainState(t, batchStore, testChainState)
var got postage.ChainState
Expand Down Expand Up @@ -101,3 +103,7 @@ func batchStorePutChainState(t *testing.T, st postage.Storer, cs *postage.ChainS
t.Fatalf("postage storer put chain state: %v", err)
}
}

type noopRadiusSetter struct{}

func (_ noopRadiusSetter) SetRadius(_ uint8) {}
5 changes: 5 additions & 0 deletions pkg/postage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type Storer interface {
PutChainState(*ChainState) error
GetChainState() *ChainState
GetReserveState() *Reservestate
SetRadiusSetter(RadiusSetter)
}

type RadiusSetter interface {
SetRadius(uint8)
}

// Listener provides a blockchain event iterator.
Expand Down

0 comments on commit d7b2cd2

Please sign in to comment.