diff --git a/pkg/kademlia/kademlia.go b/pkg/kademlia/kademlia.go index 183e4bb2347..3c264c62082 100644 --- a/pkg/kademlia/kademlia.go +++ b/pkg/kademlia/kademlia.go @@ -67,6 +67,7 @@ type Kad struct { knownPeers *pslice.PSlice // both are po aware slice of addresses bootnodes []ma.Multiaddr depth uint8 // current neighborhood depth + radius uint8 // storage area of responsibility depthMu sync.RWMutex // protect depth changes manageC chan struct{} // trigger the manage forever loop to connect to new peers waitNext map[string]retryInfo // sanction connections to a peer, key is overlay string and value is a retry information @@ -87,7 +88,12 @@ type retryInfo struct { } // New returns a new Kademlia. -func New(base swarm.Address, addressbook addressbook.Interface, discovery discovery.Driver, p2p p2p.Service, logger logging.Logger, o Options) *Kad { +func New(base swarm.Address, + addressbook addressbook.Interface, + discovery discovery.Driver, + p2p p2p.Service, + logger logging.Logger, + o Options) *Kad { if o.SaturationFunc == nil { o.SaturationFunc = binSaturated } @@ -340,7 +346,7 @@ func (k *Kad) manage() { k.connectedPeers.Add(peer, po) k.depthMu.Lock() - k.depth = recalcDepth(k.connectedPeers) + k.depth = recalcDepth(k.connectedPeers, k.radius) k.depthMu.Unlock() k.logger.Debugf("connected to peer: %s for bin: %d", peer, i) @@ -421,7 +427,7 @@ func (k *Kad) manage() { k.connectedPeers.Add(peer, po) k.depthMu.Lock() - k.depth = recalcDepth(k.connectedPeers) + k.depth = recalcDepth(k.connectedPeers, k.radius) k.depthMu.Unlock() k.logger.Debugf("connected to peer: %s old depth: %d new depth: %d", peer, currentDepth, k.NeighborhoodDepth()) @@ -518,7 +524,7 @@ func (k *Kad) connectBootnodes(ctx context.Context) { // when a bin is not saturated it means we would like to proactively // initiate connections to other peers in the bin. func binSaturated(bin uint8, peers, connected *pslice.PSlice) (bool, bool) { - potentialDepth := recalcDepth(peers) + potentialDepth := recalcDepth(peers, swarm.MaxPO) // short circuit for bins which are >= depth if bin >= potentialDepth { @@ -544,7 +550,7 @@ func binSaturated(bin uint8, peers, connected *pslice.PSlice) (bool, bool) { } // recalcDepth calculates and returns the kademlia depth. -func recalcDepth(peers *pslice.PSlice) uint8 { +func recalcDepth(peers *pslice.PSlice, radius uint8) uint8 { // handle edge case separately if peers.Length() <= nnLowWatermark { return 0 @@ -590,9 +596,15 @@ func recalcDepth(peers *pslice.PSlice) uint8 { return false, false, nil }) if shallowestUnsaturated > candidate { + if radius < candidate { + return radius + } return candidate } + if radius < shallowestUnsaturated { + return radius + } return shallowestUnsaturated } @@ -761,7 +773,7 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error { k.waitNextMu.Unlock() k.depthMu.Lock() - k.depth = recalcDepth(k.connectedPeers) + k.depth = recalcDepth(k.connectedPeers, k.radius) k.depthMu.Unlock() k.notifyPeerSig() @@ -779,7 +791,7 @@ func (k *Kad) Disconnected(peer p2p.Peer) { k.waitNextMu.Unlock() k.depthMu.Lock() - k.depth = recalcDepth(k.connectedPeers) + k.depth = recalcDepth(k.connectedPeers, k.radius) k.depthMu.Unlock() select { @@ -1018,6 +1030,23 @@ func (k *Kad) IsBalanced(bin uint8) bool { return true } +func (k *Kad) SetRadius(r uint8) { + k.depthMu.Lock() + defer k.depthMu.Unlock() + if k.radius == r { + return + } + k.radius = r + oldD := k.depth + k.depth = recalcDepth(k.connectedPeers, k.radius) + if k.depth != oldD { + select { + case k.manageC <- struct{}{}: + default: + } + } +} + func (k *Kad) Snapshot() *topology.KadParams { var infos []topology.BinInfo for i := int(swarm.MaxPO); i >= 0; i-- { diff --git a/pkg/kademlia/kademlia_test.go b/pkg/kademlia/kademlia_test.go index 021b525afad..d111d1359c8 100644 --- a/pkg/kademlia/kademlia_test.go +++ b/pkg/kademlia/kademlia_test.go @@ -51,6 +51,8 @@ func TestNeighborhoodDepth(t *testing.T) { base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{}) ) + kad.SetRadius(swarm.MaxPO) // initial tests do not check for radius + if err := kad.Start(context.Background()); err != nil { t.Fatal(err) } @@ -112,8 +114,14 @@ func TestNeighborhoodDepth(t *testing.T) { // depth is 7 because bin 7 is unsaturated (1 peer) kDepth(t, kad, 7) - // expect shallow peers not in depth + // set the radius to be lower than unsaturated, expect radius as depth + kad.SetRadius(6) + kDepth(t, kad, 6) + // set the radius to MaxPO again so that intermediate checks can run + kad.SetRadius(swarm.MaxPO) + + // expect shallow peers not in depth for _, a := range shallowPeers { if kad.IsWithinDepth(a) { t.Fatal("expected address to outside of depth") @@ -142,6 +150,13 @@ func TestNeighborhoodDepth(t *testing.T) { waitConn(t, &conns) kDepth(t, kad, 8) + // again set radius to lower value, expect that as depth + kad.SetRadius(5) + kDepth(t, kad, 5) + + // reset radius to MaxPO for the rest of the checks + kad.SetRadius(swarm.MaxPO) + var addrs []swarm.Address // fill the rest up to the bin before last and check that everything works at the edges for i := 9; i < int(swarm.MaxBins); i++ { @@ -304,6 +319,8 @@ func TestManageWithBalancing(t *testing.T) { base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{SaturationFunc: saturationFunc, BitSuffixLength: 2}) ) + kad.SetRadius(swarm.MaxPO) // don't use radius for checks + // implement satiration function (while having access to Kademlia instance) sfImpl := func(bin uint8, peers, connected *pslice.PSlice) (bool, bool) { return kad.IsBalanced(bin), false @@ -419,6 +436,7 @@ func TestOversaturation(t *testing.T) { conns int32 // how many connect calls were made to the p2p mock base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{}) ) + kad.SetRadius(swarm.MaxPO) // don't use radius for checks if err := kad.Start(context.Background()); err != nil { t.Fatal(err) @@ -474,6 +492,7 @@ func TestOversaturationBootnode(t *testing.T) { conns int32 // how many connect calls were made to the p2p mock base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{BootnodeMode: true}) ) + kad.SetRadius(swarm.MaxPO) // don't use radius for checks if err := kad.Start(context.Background()); err != nil { t.Fatal(err) diff --git a/pkg/node/node.go b/pkg/node/node.go index 0eb74535308..fed40e2384b 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -392,6 +392,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, b.topologyCloser = kad hive.SetAddPeersHandler(kad.AddPeers) p2ps.SetPickyNotifier(kad) + batchStore.SetRadiusSetter(kad) paymentThreshold, ok := new(big.Int).SetString(o.PaymentThreshold, 10) if !ok { diff --git a/pkg/postage/batchstore/mock/store.go b/pkg/postage/batchstore/mock/store.go index b894139f8bc..e6965de7a7c 100644 --- a/pkg/postage/batchstore/mock/store.go +++ b/pkg/postage/batchstore/mock/store.go @@ -123,3 +123,7 @@ func (bs *BatchStore) PutChainState(cs *postage.ChainState) error { func (bs *BatchStore) GetReserveState() *postage.Reservestate { return bs.rs } + +func (bs *BatchStore) SetRadiusSetter(r postage.RadiusSetter) { + panic("not implemented") +} diff --git a/pkg/postage/batchstore/reserve_test.go b/pkg/postage/batchstore/reserve_test.go index e7c5e994579..318459de0c1 100644 --- a/pkg/postage/batchstore/reserve_test.go +++ b/pkg/postage/batchstore/reserve_test.go @@ -57,6 +57,7 @@ func TestBatchStoreUnreserveEvents(t *testing.T) { batchstore.Capacity = batchstore.Exp2(16) bStore, unreserved := setupBatchStore(t) + bStore.SetRadiusSetter(noopRadiusSetter{}) batches := make(map[string]*postage.Batch) t.Run("new batches only", func(t *testing.T) { @@ -139,6 +140,7 @@ func TestBatchStoreUnreserveAll(t *testing.T) { batchstore.Capacity = batchstore.Exp2(16) bStore, unreserved := setupBatchStore(t) + bStore.SetRadiusSetter(noopRadiusSetter{}) var batches [][]byte // iterate starting from batchstore.DefaultDepth to maxPO _, depth := batchstore.GetReserve(bStore) @@ -211,6 +213,7 @@ func setupBatchStore(t *testing.T) (postage.Storer, map[string]uint8) { return nil } bStore, _ := batchstore.New(stateStore, unreserveFunc) + bStore.SetRadiusSetter(noopRadiusSetter{}) // initialise chainstate err = bStore.PutChainState(&postage.ChainState{ @@ -535,6 +538,7 @@ func TestBatchStore_Unreserve(t *testing.T) { } { t.Run(tc.desc, func(t *testing.T) { store, unreserved := setupBatchStore(t) + store.SetRadiusSetter(noopRadiusSetter{}) batches := addBatch(t, store, depthValue(initBatchDepth, 3), depthValue(initBatchDepth, 4), @@ -651,6 +655,7 @@ func TestBatchStore_Topup(t *testing.T) { } { t.Run(tc.desc, func(t *testing.T) { store, unreserved := setupBatchStore(t) + store.SetRadiusSetter(noopRadiusSetter{}) batches := addBatch(t, store, depthValue(initBatchDepth, 2), depthValue(initBatchDepth, 3), @@ -771,6 +776,7 @@ func TestBatchStore_Dilution(t *testing.T) { } { t.Run(tc.desc, func(t *testing.T) { store, unreserved := setupBatchStore(t) + store.SetRadiusSetter(noopRadiusSetter{}) batches := addBatch(t, store, depthValue(initBatchDepth, 2), depthValue(initBatchDepth, 3), @@ -799,6 +805,7 @@ func TestBatchStore_EvictExpired(t *testing.T) { initBatchDepth := uint8(8) store, unreserved := setupBatchStore(t) + store.SetRadiusSetter(noopRadiusSetter{}) batches := addBatch(t, store, depthValue(initBatchDepth, 2), depthValue(initBatchDepth, 3), diff --git a/pkg/postage/batchstore/store.go b/pkg/postage/batchstore/store.go index 4a0aacb9340..5d51144c884 100644 --- a/pkg/postage/batchstore/store.go +++ b/pkg/postage/batchstore/store.go @@ -28,6 +28,9 @@ type store struct { rs *reserveState // the reserve state unreserveFunc unreserveFn // unreserve function metrics metrics // metrics + + radiusSetter postage.RadiusSetter // setter for radius notifications + //radiusSetterMu sync.Mutex } // New constructs a new postage batch store. @@ -58,7 +61,15 @@ func New(st storage.StateStorer, unreserveFunc unreserveFn) (postage.Storer, err Available: Capacity, } } - return &store{st, cs, rs, unreserveFunc, newMetrics()}, nil + s := &store{ + store: st, + cs: cs, + rs: rs, + unreserveFunc: unreserveFunc, + metrics: newMetrics(), + } + + return s, nil } func (s *store) GetReserveState() *postage.Reservestate { @@ -99,6 +110,8 @@ func (s *store) Put(b *postage.Batch, value *big.Int, depth uint8) error { if err != nil { return err } + s.radiusSetter.SetRadius(s.rs.Radius) + return s.store.Put(batchKey(b.ID), b) } @@ -130,6 +143,8 @@ func (s *store) PutChainState(cs *postage.ChainState) error { if err != nil { return err } + s.radiusSetter.SetRadius(s.rs.Radius) + return s.store.Put(chainStateKey, cs) } @@ -139,6 +154,12 @@ func (s *store) GetChainState() *postage.ChainState { return s.cs } +func (s *store) SetRadiusSetter(r postage.RadiusSetter) { + //s.radiusSetterMu.Lock() + s.radiusSetter = r + //s.radiusSetterMu.Unlock() +} + // batchKey returns the index key for the batch ID used in the by-ID batch index. func batchKey(id []byte) string { return batchKeyPrefix + string(id) diff --git a/pkg/postage/batchstore/store_test.go b/pkg/postage/batchstore/store_test.go index 3eb99d2e925..475176b0219 100644 --- a/pkg/postage/batchstore/store_test.go +++ b/pkg/postage/batchstore/store_test.go @@ -33,7 +33,7 @@ func TestBatchStorePut(t *testing.T) { stateStore := mock.NewStateStore() batchStore, _ := batchstore.New(stateStore, unreserve) - + batchStore.SetRadiusSetter(noopRadiusSetter{}) batchStorePutBatch(t, batchStore, testBatch) var got postage.Batch @@ -46,6 +46,7 @@ func TestBatchStoreGetChainState(t *testing.T) { stateStore := mock.NewStateStore() batchStore, _ := batchstore.New(stateStore, nil) + batchStore.SetRadiusSetter(noopRadiusSetter{}) err := batchStore.PutChainState(testChainState) if err != nil { @@ -60,6 +61,7 @@ func TestBatchStorePutChainState(t *testing.T) { stateStore := mock.NewStateStore() batchStore, _ := batchstore.New(stateStore, nil) + batchStore.SetRadiusSetter(noopRadiusSetter{}) batchStorePutChainState(t, batchStore, testChainState) var got postage.ChainState @@ -101,3 +103,7 @@ func batchStorePutChainState(t *testing.T, st postage.Storer, cs *postage.ChainS t.Fatalf("postage storer put chain state: %v", err) } } + +type noopRadiusSetter struct{} + +func (_ noopRadiusSetter) SetRadius(_ uint8) {} diff --git a/pkg/postage/interface.go b/pkg/postage/interface.go index 7367514df9b..cd57a1fd904 100644 --- a/pkg/postage/interface.go +++ b/pkg/postage/interface.go @@ -27,6 +27,11 @@ type Storer interface { PutChainState(*ChainState) error GetChainState() *ChainState GetReserveState() *Reservestate + SetRadiusSetter(RadiusSetter) +} + +type RadiusSetter interface { + SetRadius(uint8) } // Listener provides a blockchain event iterator.