Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batchstore, kademlia: add radius as depth upper bound #1594

Merged
merged 1 commit into from
Apr 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions pkg/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Kad struct {
knownPeers *pslice.PSlice // both are po aware slice of addresses
bootnodes []ma.Multiaddr
depth uint8 // current neighborhood depth
radius uint8 // storage area of responsibility
depthMu sync.RWMutex // protect depth changes
manageC chan struct{} // trigger the manage forever loop to connect to new peers
waitNext map[string]retryInfo // sanction connections to a peer, key is overlay string and value is a retry information
Expand All @@ -87,7 +88,12 @@ type retryInfo struct {
}

// New returns a new Kademlia.
func New(base swarm.Address, addressbook addressbook.Interface, discovery discovery.Driver, p2p p2p.Service, logger logging.Logger, o Options) *Kad {
func New(base swarm.Address,
addressbook addressbook.Interface,
discovery discovery.Driver,
p2p p2p.Service,
logger logging.Logger,
o Options) *Kad {
if o.SaturationFunc == nil {
o.SaturationFunc = binSaturated
}
Expand Down Expand Up @@ -340,7 +346,7 @@ func (k *Kad) manage() {
k.connectedPeers.Add(peer, po)

k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers)
k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock()

k.logger.Debugf("connected to peer: %s for bin: %d", peer, i)
Expand Down Expand Up @@ -421,7 +427,7 @@ func (k *Kad) manage() {
k.connectedPeers.Add(peer, po)

k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers)
k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock()

k.logger.Debugf("connected to peer: %s old depth: %d new depth: %d", peer, currentDepth, k.NeighborhoodDepth())
Expand Down Expand Up @@ -518,7 +524,7 @@ func (k *Kad) connectBootnodes(ctx context.Context) {
// when a bin is not saturated it means we would like to proactively
// initiate connections to other peers in the bin.
func binSaturated(bin uint8, peers, connected *pslice.PSlice) (bool, bool) {
potentialDepth := recalcDepth(peers)
potentialDepth := recalcDepth(peers, swarm.MaxPO)

// short circuit for bins which are >= depth
if bin >= potentialDepth {
Expand All @@ -544,7 +550,7 @@ func binSaturated(bin uint8, peers, connected *pslice.PSlice) (bool, bool) {
}

// recalcDepth calculates and returns the kademlia depth.
func recalcDepth(peers *pslice.PSlice) uint8 {
func recalcDepth(peers *pslice.PSlice, radius uint8) uint8 {
// handle edge case separately
if peers.Length() <= nnLowWatermark {
return 0
Expand Down Expand Up @@ -590,9 +596,15 @@ func recalcDepth(peers *pslice.PSlice) uint8 {
return false, false, nil
})
if shallowestUnsaturated > candidate {
if radius < candidate {
return radius
}
return candidate
}

if radius < shallowestUnsaturated {
return radius
}
return shallowestUnsaturated
}

Expand Down Expand Up @@ -761,7 +773,7 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
k.waitNextMu.Unlock()

k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers)
k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock()

k.notifyPeerSig()
Expand All @@ -779,7 +791,7 @@ func (k *Kad) Disconnected(peer p2p.Peer) {
k.waitNextMu.Unlock()

k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers)
k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock()

select {
Expand Down Expand Up @@ -1018,6 +1030,23 @@ func (k *Kad) IsBalanced(bin uint8) bool {
return true
}

func (k *Kad) SetRadius(r uint8) {
k.depthMu.Lock()
defer k.depthMu.Unlock()
if k.radius == r {
return
}
k.radius = r
oldD := k.depth
k.depth = recalcDepth(k.connectedPeers, k.radius)
if k.depth != oldD {
select {
case k.manageC <- struct{}{}:
default:
}
}
}

func (k *Kad) Snapshot() *topology.KadParams {
var infos []topology.BinInfo
for i := int(swarm.MaxPO); i >= 0; i-- {
Expand Down
21 changes: 20 additions & 1 deletion pkg/kademlia/kademlia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func TestNeighborhoodDepth(t *testing.T) {
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{})
)

kad.SetRadius(swarm.MaxPO) // initial tests do not check for radius

if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -112,8 +114,14 @@ func TestNeighborhoodDepth(t *testing.T) {
// depth is 7 because bin 7 is unsaturated (1 peer)
kDepth(t, kad, 7)

// expect shallow peers not in depth
// set the radius to be lower than unsaturated, expect radius as depth
kad.SetRadius(6)
kDepth(t, kad, 6)

// set the radius to MaxPO again so that intermediate checks can run
kad.SetRadius(swarm.MaxPO)

// expect shallow peers not in depth
for _, a := range shallowPeers {
if kad.IsWithinDepth(a) {
t.Fatal("expected address to outside of depth")
Expand Down Expand Up @@ -142,6 +150,13 @@ func TestNeighborhoodDepth(t *testing.T) {
waitConn(t, &conns)
kDepth(t, kad, 8)

// again set radius to lower value, expect that as depth
kad.SetRadius(5)
kDepth(t, kad, 5)

// reset radius to MaxPO for the rest of the checks
kad.SetRadius(swarm.MaxPO)

var addrs []swarm.Address
// fill the rest up to the bin before last and check that everything works at the edges
for i := 9; i < int(swarm.MaxBins); i++ {
Expand Down Expand Up @@ -304,6 +319,8 @@ func TestManageWithBalancing(t *testing.T) {
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{SaturationFunc: saturationFunc, BitSuffixLength: 2})
)

kad.SetRadius(swarm.MaxPO) // don't use radius for checks

// implement satiration function (while having access to Kademlia instance)
sfImpl := func(bin uint8, peers, connected *pslice.PSlice) (bool, bool) {
return kad.IsBalanced(bin), false
Expand Down Expand Up @@ -419,6 +436,7 @@ func TestOversaturation(t *testing.T) {
conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{})
)
kad.SetRadius(swarm.MaxPO) // don't use radius for checks

if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -474,6 +492,7 @@ func TestOversaturationBootnode(t *testing.T) {
conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{BootnodeMode: true})
)
kad.SetRadius(swarm.MaxPO) // don't use radius for checks

if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
Expand Down
1 change: 1 addition & 0 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
b.topologyCloser = kad
hive.SetAddPeersHandler(kad.AddPeers)
p2ps.SetPickyNotifier(kad)
batchStore.SetRadiusSetter(kad)

paymentThreshold, ok := new(big.Int).SetString(o.PaymentThreshold, 10)
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions pkg/postage/batchstore/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,7 @@ func (bs *BatchStore) PutChainState(cs *postage.ChainState) error {
func (bs *BatchStore) GetReserveState() *postage.Reservestate {
return bs.rs
}

func (bs *BatchStore) SetRadiusSetter(r postage.RadiusSetter) {
panic("not implemented")
}
7 changes: 7 additions & 0 deletions pkg/postage/batchstore/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestBatchStoreUnreserveEvents(t *testing.T) {
batchstore.Capacity = batchstore.Exp2(16)

bStore, unreserved := setupBatchStore(t)
bStore.SetRadiusSetter(noopRadiusSetter{})
batches := make(map[string]*postage.Batch)

t.Run("new batches only", func(t *testing.T) {
Expand Down Expand Up @@ -139,6 +140,7 @@ func TestBatchStoreUnreserveAll(t *testing.T) {
batchstore.Capacity = batchstore.Exp2(16)

bStore, unreserved := setupBatchStore(t)
bStore.SetRadiusSetter(noopRadiusSetter{})
var batches [][]byte
// iterate starting from batchstore.DefaultDepth to maxPO
_, depth := batchstore.GetReserve(bStore)
Expand Down Expand Up @@ -211,6 +213,7 @@ func setupBatchStore(t *testing.T) (postage.Storer, map[string]uint8) {
return nil
}
bStore, _ := batchstore.New(stateStore, unreserveFunc)
bStore.SetRadiusSetter(noopRadiusSetter{})

// initialise chainstate
err = bStore.PutChainState(&postage.ChainState{
Expand Down Expand Up @@ -535,6 +538,7 @@ func TestBatchStore_Unreserve(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
store, unreserved := setupBatchStore(t)
store.SetRadiusSetter(noopRadiusSetter{})
batches := addBatch(t, store,
depthValue(initBatchDepth, 3),
depthValue(initBatchDepth, 4),
Expand Down Expand Up @@ -651,6 +655,7 @@ func TestBatchStore_Topup(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
store, unreserved := setupBatchStore(t)
store.SetRadiusSetter(noopRadiusSetter{})
batches := addBatch(t, store,
depthValue(initBatchDepth, 2),
depthValue(initBatchDepth, 3),
Expand Down Expand Up @@ -771,6 +776,7 @@ func TestBatchStore_Dilution(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
store, unreserved := setupBatchStore(t)
store.SetRadiusSetter(noopRadiusSetter{})
batches := addBatch(t, store,
depthValue(initBatchDepth, 2),
depthValue(initBatchDepth, 3),
Expand Down Expand Up @@ -799,6 +805,7 @@ func TestBatchStore_EvictExpired(t *testing.T) {
initBatchDepth := uint8(8)

store, unreserved := setupBatchStore(t)
store.SetRadiusSetter(noopRadiusSetter{})
batches := addBatch(t, store,
depthValue(initBatchDepth, 2),
depthValue(initBatchDepth, 3),
Expand Down
23 changes: 22 additions & 1 deletion pkg/postage/batchstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type store struct {
rs *reserveState // the reserve state
unreserveFunc unreserveFn // unreserve function
metrics metrics // metrics

radiusSetter postage.RadiusSetter // setter for radius notifications
//radiusSetterMu sync.Mutex
}

// New constructs a new postage batch store.
Expand Down Expand Up @@ -58,7 +61,15 @@ func New(st storage.StateStorer, unreserveFunc unreserveFn) (postage.Storer, err
Available: Capacity,
}
}
return &store{st, cs, rs, unreserveFunc, newMetrics()}, nil
s := &store{
store: st,
cs: cs,
rs: rs,
unreserveFunc: unreserveFunc,
metrics: newMetrics(),
}

return s, nil
}

func (s *store) GetReserveState() *postage.Reservestate {
Expand Down Expand Up @@ -99,6 +110,8 @@ func (s *store) Put(b *postage.Batch, value *big.Int, depth uint8) error {
if err != nil {
return err
}
s.radiusSetter.SetRadius(s.rs.Radius)

return s.store.Put(batchKey(b.ID), b)
}

Expand Down Expand Up @@ -130,6 +143,8 @@ func (s *store) PutChainState(cs *postage.ChainState) error {
if err != nil {
return err
}
s.radiusSetter.SetRadius(s.rs.Radius)

return s.store.Put(chainStateKey, cs)
}

Expand All @@ -139,6 +154,12 @@ func (s *store) GetChainState() *postage.ChainState {
return s.cs
}

func (s *store) SetRadiusSetter(r postage.RadiusSetter) {
//s.radiusSetterMu.Lock()
s.radiusSetter = r
//s.radiusSetterMu.Unlock()
}

// batchKey returns the index key for the batch ID used in the by-ID batch index.
func batchKey(id []byte) string {
return batchKeyPrefix + string(id)
Expand Down
8 changes: 7 additions & 1 deletion pkg/postage/batchstore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestBatchStorePut(t *testing.T) {

stateStore := mock.NewStateStore()
batchStore, _ := batchstore.New(stateStore, unreserve)

batchStore.SetRadiusSetter(noopRadiusSetter{})
batchStorePutBatch(t, batchStore, testBatch)

var got postage.Batch
Expand All @@ -46,6 +46,7 @@ func TestBatchStoreGetChainState(t *testing.T) {

stateStore := mock.NewStateStore()
batchStore, _ := batchstore.New(stateStore, nil)
batchStore.SetRadiusSetter(noopRadiusSetter{})

err := batchStore.PutChainState(testChainState)
if err != nil {
Expand All @@ -60,6 +61,7 @@ func TestBatchStorePutChainState(t *testing.T) {

stateStore := mock.NewStateStore()
batchStore, _ := batchstore.New(stateStore, nil)
batchStore.SetRadiusSetter(noopRadiusSetter{})

batchStorePutChainState(t, batchStore, testChainState)
var got postage.ChainState
Expand Down Expand Up @@ -101,3 +103,7 @@ func batchStorePutChainState(t *testing.T, st postage.Storer, cs *postage.ChainS
t.Fatalf("postage storer put chain state: %v", err)
}
}

type noopRadiusSetter struct{}

func (_ noopRadiusSetter) SetRadius(_ uint8) {}
5 changes: 5 additions & 0 deletions pkg/postage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type Storer interface {
PutChainState(*ChainState) error
GetChainState() *ChainState
GetReserveState() *Reservestate
SetRadiusSetter(RadiusSetter)
}

type RadiusSetter interface {
SetRadius(uint8)
}

// Listener provides a blockchain event iterator.
Expand Down