From f9cb72664d94594bf614f90c8a0edf075ba883b0 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 12 Sep 2024 17:35:43 -0700 Subject: [PATCH 01/13] Add a minheap to speed up gc when nothing to do --- p2p/host/peerstore/pstoremem/addr_book.go | 135 +++++++++++++++++----- 1 file changed, 107 insertions(+), 28 deletions(-) diff --git a/p2p/host/peerstore/pstoremem/addr_book.go b/p2p/host/peerstore/pstoremem/addr_book.go index 209937ca83..b38f13d745 100644 --- a/p2p/host/peerstore/pstoremem/addr_book.go +++ b/p2p/host/peerstore/pstoremem/addr_book.go @@ -1,6 +1,7 @@ package pstoremem import ( + "container/heap" "context" "fmt" "sort" @@ -21,6 +22,9 @@ type expiringAddr struct { Addr ma.Multiaddr TTL time.Duration Expires time.Time + + // to sort by expiry time + heapIndex int } func (e *expiringAddr) ExpiredBy(t time.Time) bool { @@ -34,13 +38,63 @@ type peerRecordState struct { type addrSegments [256]*addrSegment +type sortedAddrsByExpiry struct { + m map[string]*expiringAddr + order []*expiringAddr +} + +// Essentially Go stdlib's Priority Queue example +var _ heap.Interface = &sortedAddrsByExpiry{} + +func (s *sortedAddrsByExpiry) Len() int { return len(s.order) } +func (s *sortedAddrsByExpiry) Less(i, j int) bool { + return s.order[i].Expires.Before(s.order[j].Expires) +} +func (s *sortedAddrsByExpiry) Swap(i, j int) { + s.order[i], s.order[j] = s.order[j], s.order[i] + s.order[i].heapIndex = i + s.order[j].heapIndex = j +} +func (s *sortedAddrsByExpiry) Push(x any) { + a := x.(*expiringAddr) + s.m[string(a.Addr.Bytes())] = a + a.heapIndex = len(s.order) + s.order = append(s.order, a) +} +func (s *sortedAddrsByExpiry) Pop() any { + old := s.order + n := len(old) + x := old[n-1] + x.heapIndex = -1 + s.order = old[0 : n-1] + delete(s.m, string(x.Addr.Bytes())) + return x +} + +func (s *sortedAddrsByExpiry) Fix(a *expiringAddr) { + heap.Fix(s, a.heapIndex) +} + +func (s *sortedAddrsByExpiry) Delete(a *expiringAddr) { + heap.Remove(s, a.heapIndex) + a.heapIndex = -1 + delete(s.m, string(a.Addr.Bytes())) +} + +func (s *sortedAddrsByExpiry) gc(now time.Time) { + for len(s.order) > 0 && s.order[len(s.order)-1].ExpiredBy(now) { + v := s.Pop().(*expiringAddr) + delete(s.m, string(v.Addr.Bytes())) + } +} + type addrSegment struct { sync.RWMutex // Use pointers to save memory. Maps always leave some fraction of their // space unused. storing the *values* directly in the map will // drastically increase the space waste. In our case, by 6x. - addrs map[peer.ID]map[string]*expiringAddr + addrs map[peer.ID]*sortedAddrsByExpiry signedPeerRecords map[peer.ID]*peerRecordState } @@ -83,7 +137,7 @@ func NewAddrBook() *memoryAddrBook { segments: func() (ret addrSegments) { for i := range ret { ret[i] = &addrSegment{ - addrs: make(map[peer.ID]map[string]*expiringAddr), + addrs: make(map[peer.ID]*sortedAddrsByExpiry), signedPeerRecords: make(map[peer.ID]*peerRecordState)} } return ret @@ -134,12 +188,8 @@ func (mab *memoryAddrBook) gc() { for _, s := range mab.segments { s.Lock() for p, amap := range s.addrs { - for k, addr := range amap { - if addr.ExpiredBy(now) { - delete(amap, k) - } - } - if len(amap) == 0 { + amap.gc(now) + if amap.Len() == 0 { delete(s.addrs, p) delete(s.signedPeerRecords, p) } @@ -154,7 +204,7 @@ func (mab *memoryAddrBook) PeersWithAddrs() peer.IDSlice { for _, s := range mab.segments { s.RLock() for pid, amap := range s.addrs { - if len(amap) > 0 { + if amap.Len() > 0 { set[pid] = struct{}{} } } @@ -232,7 +282,10 @@ func (mab *memoryAddrBook) addAddrsUnlocked(s *addrSegment, p peer.ID, addrs []m amap, ok := s.addrs[p] if !ok { - amap = make(map[string]*expiringAddr) + amap = &sortedAddrsByExpiry{ + m: make(map[string]*expiringAddr), + order: nil, + } s.addrs[p] = amap } @@ -250,20 +303,26 @@ func (mab *memoryAddrBook) addAddrsUnlocked(s *addrSegment, p peer.ID, addrs []m } // find the highest TTL and Expiry time between // existing records and function args - a, found := amap[string(addr.Bytes())] // won't allocate. + a, found := amap.m[string(addr.Bytes())] // won't allocate. if !found { // not found, announce it. entry := &expiringAddr{Addr: addr, Expires: exp, TTL: ttl} - amap[string(addr.Bytes())] = entry + amap.Push(entry) mab.subManager.BroadcastAddr(p, addr) } else { // update ttl & exp to whichever is greater between new and existing entry + var changed bool if ttl > a.TTL { + changed = true a.TTL = ttl } if exp.After(a.Expires) { + changed = true a.Expires = exp } + if changed { + amap.Fix(a) + } } } } @@ -282,7 +341,9 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du amap, ok := s.addrs[p] if !ok { - amap = make(map[string]*expiringAddr) + amap = &sortedAddrsByExpiry{ + m: make(map[string]*expiringAddr), + } s.addrs[p] = amap } @@ -298,14 +359,23 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du continue } aBytes := addr.Bytes() - key := string(aBytes) - // re-set all of them for new ttl. - if ttl > 0 { - amap[key] = &expiringAddr{Addr: addr, Expires: exp, TTL: ttl} - mab.subManager.BroadcastAddr(p, addr) + if a, found := amap.m[string(aBytes)]; found { + // re-set all of them for new ttl. + if ttl > 0 { + a.Addr = addr + a.Expires = exp + a.TTL = ttl + amap.Fix(a) + mab.subManager.BroadcastAddr(p, addr) + } else { + amap.Delete(a) + } } else { - delete(amap, key) + if ttl > 0 { + amap.Push(&expiringAddr{Addr: addr, Expires: exp, TTL: ttl}) + mab.subManager.BroadcastAddr(p, addr) + } } } } @@ -322,14 +392,14 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t return } - for k, a := range amap { + for _, a := range amap.m { if oldTTL == a.TTL { if newTTL == 0 { - delete(amap, k) + amap.Delete(a) } else { a.TTL = newTTL a.Expires = exp - amap[k] = a + amap.Fix(a) } } } @@ -341,7 +411,10 @@ func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr { s.RLock() defer s.RUnlock() - return validAddrs(mab.clock.Now(), s.addrs[p]) + if _, ok := s.addrs[p]; !ok { + return nil + } + return validAddrs(mab.clock.Now(), s.addrs[p].m) } func validAddrs(now time.Time, amap map[string]*expiringAddr) []ma.Multiaddr { @@ -366,10 +439,13 @@ func (mab *memoryAddrBook) GetPeerRecord(p peer.ID) *record.Envelope { s.RLock() defer s.RUnlock() + if _, ok := s.addrs[p]; !ok { + return nil + } // although the signed record gets garbage collected when all addrs inside it are expired, // we may be in between the expiration time and the GC interval // so, we check to see if we have any valid signed addrs before returning the record - if len(validAddrs(mab.clock.Now(), s.addrs[p])) == 0 { + if len(validAddrs(mab.clock.Now(), s.addrs[p].m)) == 0 { return nil } @@ -397,10 +473,13 @@ func (mab *memoryAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma. s.RLock() defer s.RUnlock() - baseaddrslice := s.addrs[p] - initial := make([]ma.Multiaddr, 0, len(baseaddrslice)) - for _, a := range baseaddrslice { - initial = append(initial, a.Addr) + var initial []ma.Multiaddr + + if baseaddrslice, ok := s.addrs[p]; ok { + initial = make([]ma.Multiaddr, 0, len(baseaddrslice.m)) + for _, a := range baseaddrslice.m { + initial = append(initial, a.Addr) + } } return mab.subManager.AddrStream(ctx, p, initial) From 61b4865f7820a77533f6aa22b1664643a6166023 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 12 Sep 2024 17:35:55 -0700 Subject: [PATCH 02/13] Bump gc interval to 1 minute --- p2p/host/peerstore/pstoremem/addr_book.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/host/peerstore/pstoremem/addr_book.go b/p2p/host/peerstore/pstoremem/addr_book.go index b38f13d745..ac38fdf8b5 100644 --- a/p2p/host/peerstore/pstoremem/addr_book.go +++ b/p2p/host/peerstore/pstoremem/addr_book.go @@ -163,7 +163,7 @@ func WithClock(clock clock) AddrBookOption { // background periodically schedules a gc func (mab *memoryAddrBook) background(ctx context.Context) { defer mab.refCount.Done() - ticker := time.NewTicker(1 * time.Hour) + ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() for { From 23b1dbbc9251a8f6c4cd8bf88e8627438303a38a Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 12 Sep 2024 17:37:11 -0700 Subject: [PATCH 03/13] Add BenchmarkGC --- p2p/host/peerstore/pstoremem/inmem_test.go | 28 ++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/p2p/host/peerstore/pstoremem/inmem_test.go b/p2p/host/peerstore/pstoremem/inmem_test.go index 097a1fd25e..1ba3df4f71 100644 --- a/p2p/host/peerstore/pstoremem/inmem_test.go +++ b/p2p/host/peerstore/pstoremem/inmem_test.go @@ -1,10 +1,14 @@ package pstoremem import ( + "strconv" "testing" + "time" + "github.com/libp2p/go-libp2p/core/peer" pstore "github.com/libp2p/go-libp2p/core/peerstore" pt "github.com/libp2p/go-libp2p/p2p/host/peerstore/test" + "github.com/multiformats/go-multiaddr" mockClock "github.com/benbjohnson/clock" "github.com/stretchr/testify/require" @@ -82,3 +86,27 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), ) } + +func BenchmarkGC(b *testing.B) { + clock := mockClock.NewMock() + ps, err := NewPeerstore(WithClock(clock)) + require.NoError(b, err) + defer ps.Close() + + peerCount := 10_000 + addrsPerPeer := 32 + + for i := 0; i < peerCount; i++ { + id := peer.ID(strconv.Itoa(i)) + addrs := make([]multiaddr.Multiaddr, addrsPerPeer) + for j := 0; j < addrsPerPeer; j++ { + addrs[j] = multiaddr.StringCast("/ip4/1.2.3.4/tcp/" + strconv.Itoa(j)) + } + ps.AddAddrs(id, addrs, 24*time.Hour) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ps.gc() + } +} From d5a88cd3e72598874b4d02f7be92a904cb8cfebf Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 13 Sep 2024 20:56:00 -0700 Subject: [PATCH 04/13] Refactor: remove segments. Heap across peers --- p2p/host/peerstore/pstoremem/addr_book.go | 276 ++++++++++------------ 1 file changed, 120 insertions(+), 156 deletions(-) diff --git a/p2p/host/peerstore/pstoremem/addr_book.go b/p2p/host/peerstore/pstoremem/addr_book.go index ac38fdf8b5..31aeab21dc 100644 --- a/p2p/host/peerstore/pstoremem/addr_book.go +++ b/p2p/host/peerstore/pstoremem/addr_book.go @@ -22,7 +22,7 @@ type expiringAddr struct { Addr ma.Multiaddr TTL time.Duration Expires time.Time - + Peer peer.ID // to sort by expiry time heapIndex int } @@ -36,74 +36,86 @@ type peerRecordState struct { Seq uint64 } -type addrSegments [256]*addrSegment +// Essentially Go stdlib's Priority Queue example +var _ heap.Interface = &peerAddrs{} -type sortedAddrsByExpiry struct { - m map[string]*expiringAddr - order []*expiringAddr +type peerAddrs struct { + addrs map[peer.ID]map[string]*expiringAddr // peer.ID -> addr.Bytes() -> *expiringAddr + expiringHeap []*expiringAddr } -// Essentially Go stdlib's Priority Queue example -var _ heap.Interface = &sortedAddrsByExpiry{} +func newPeerAddrs() *peerAddrs { + return &peerAddrs{ + addrs: make(map[peer.ID]map[string]*expiringAddr), + } +} -func (s *sortedAddrsByExpiry) Len() int { return len(s.order) } -func (s *sortedAddrsByExpiry) Less(i, j int) bool { - return s.order[i].Expires.Before(s.order[j].Expires) +func (pa *peerAddrs) Len() int { return len(pa.expiringHeap) } +func (pa *peerAddrs) Less(i, j int) bool { + return pa.expiringHeap[i].Expires.Before(pa.expiringHeap[j].Expires) } -func (s *sortedAddrsByExpiry) Swap(i, j int) { - s.order[i], s.order[j] = s.order[j], s.order[i] - s.order[i].heapIndex = i - s.order[j].heapIndex = j +func (pa *peerAddrs) Swap(i, j int) { + pa.expiringHeap[i], pa.expiringHeap[j] = pa.expiringHeap[j], pa.expiringHeap[i] + pa.expiringHeap[i].heapIndex = i + pa.expiringHeap[j].heapIndex = j } -func (s *sortedAddrsByExpiry) Push(x any) { +func (pa *peerAddrs) Push(x any) { a := x.(*expiringAddr) - s.m[string(a.Addr.Bytes())] = a - a.heapIndex = len(s.order) - s.order = append(s.order, a) + if _, ok := pa.addrs[a.Peer]; !ok { + pa.addrs[a.Peer] = make(map[string]*expiringAddr) + } + pa.addrs[a.Peer][string(a.Addr.Bytes())] = a + a.heapIndex = len(pa.expiringHeap) + pa.expiringHeap = append(pa.expiringHeap, a) } -func (s *sortedAddrsByExpiry) Pop() any { - old := s.order +func (pa *peerAddrs) Pop() any { + old := pa.expiringHeap n := len(old) - x := old[n-1] - x.heapIndex = -1 - s.order = old[0 : n-1] - delete(s.m, string(x.Addr.Bytes())) - return x + a := old[n-1] + a.heapIndex = -1 + pa.expiringHeap = old[0 : n-1] + + if m, ok := pa.addrs[a.Peer]; ok { + delete(m, string(a.Addr.Bytes())) + if len(m) == 0 { + delete(pa.addrs, a.Peer) + } + } + + return a } -func (s *sortedAddrsByExpiry) Fix(a *expiringAddr) { - heap.Fix(s, a.heapIndex) +func (pa *peerAddrs) Fix(a *expiringAddr) { + heap.Fix(pa, a.heapIndex) } -func (s *sortedAddrsByExpiry) Delete(a *expiringAddr) { - heap.Remove(s, a.heapIndex) +func (pa *peerAddrs) Delete(a *expiringAddr) { + heap.Remove(pa, a.heapIndex) a.heapIndex = -1 - delete(s.m, string(a.Addr.Bytes())) + if m, ok := pa.addrs[a.Peer]; ok { + delete(m, string(a.Addr.Bytes())) + if len(m) == 0 { + delete(pa.addrs, a.Peer) + } + } } -func (s *sortedAddrsByExpiry) gc(now time.Time) { - for len(s.order) > 0 && s.order[len(s.order)-1].ExpiredBy(now) { - v := s.Pop().(*expiringAddr) - delete(s.m, string(v.Addr.Bytes())) +func (pa *peerAddrs) FindAddr(p peer.ID, addrBytes ma.Multiaddr) (*expiringAddr, bool) { + if m, ok := pa.addrs[p]; ok { + v, ok := m[string(addrBytes.Bytes())] + return v, ok } + return nil, false } -type addrSegment struct { - sync.RWMutex - - // Use pointers to save memory. Maps always leave some fraction of their - // space unused. storing the *values* directly in the map will - // drastically increase the space waste. In our case, by 6x. - addrs map[peer.ID]*sortedAddrsByExpiry - - signedPeerRecords map[peer.ID]*peerRecordState +func (pa *peerAddrs) Peek() *expiringAddr { + return pa.expiringHeap[len(pa.expiringHeap)-1] } -func (segments *addrSegments) get(p peer.ID) *addrSegment { - if len(p) == 0 { // it's not terribly useful to use an empty peer ID, but at least we should not panic - return segments[0] +func (pa *peerAddrs) gc(now time.Time) { + for len(pa.expiringHeap) > 0 && pa.Peek().ExpiredBy(now) { + heap.Pop(pa) } - return segments[p[len(p)-1]] } type clock interface { @@ -118,7 +130,11 @@ func (rc realclock) Now() time.Time { // memoryAddrBook manages addresses. type memoryAddrBook struct { - segments addrSegments + mu sync.RWMutex + // TODO bound this + addrs *peerAddrs + // TODO bound this + signedPeerRecords map[peer.ID]*peerRecordState refCount sync.WaitGroup cancel func() @@ -134,17 +150,11 @@ func NewAddrBook() *memoryAddrBook { ctx, cancel := context.WithCancel(context.Background()) ab := &memoryAddrBook{ - segments: func() (ret addrSegments) { - for i := range ret { - ret[i] = &addrSegment{ - addrs: make(map[peer.ID]*sortedAddrsByExpiry), - signedPeerRecords: make(map[peer.ID]*peerRecordState)} - } - return ret - }(), - subManager: NewAddrSubManager(), - cancel: cancel, - clock: realclock{}, + addrs: newPeerAddrs(), + signedPeerRecords: make(map[peer.ID]*peerRecordState), + subManager: NewAddrSubManager(), + cancel: cancel, + clock: realclock{}, } ab.refCount.Add(1) go ab.background(ctx) @@ -185,33 +195,16 @@ func (mab *memoryAddrBook) Close() error { // gc garbage collects the in-memory address book. func (mab *memoryAddrBook) gc() { now := mab.clock.Now() - for _, s := range mab.segments { - s.Lock() - for p, amap := range s.addrs { - amap.gc(now) - if amap.Len() == 0 { - delete(s.addrs, p) - delete(s.signedPeerRecords, p) - } - } - s.Unlock() - } + mab.mu.Lock() + defer mab.mu.Unlock() + mab.addrs.gc(now) } func (mab *memoryAddrBook) PeersWithAddrs() peer.IDSlice { - // deduplicate, since the same peer could have both signed & unsigned addrs - set := make(map[peer.ID]struct{}) - for _, s := range mab.segments { - s.RLock() - for pid, amap := range s.addrs { - if amap.Len() > 0 { - set[pid] = struct{}{} - } - } - s.RUnlock() - } - peers := make(peer.IDSlice, 0, len(set)) - for pid := range set { + mab.mu.RLock() + defer mab.mu.RUnlock() + peers := make(peer.IDSlice, 0, len(mab.addrs.addrs)) + for pid := range mab.addrs.addrs { peers = append(peers, pid) } return peers @@ -251,44 +244,33 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt } // ensure seq is greater than, or equal to, the last received - s := mab.segments.get(rec.PeerID) - s.Lock() - defer s.Unlock() - lastState, found := s.signedPeerRecords[rec.PeerID] + mab.mu.Lock() + defer mab.mu.Unlock() + lastState, found := mab.signedPeerRecords[rec.PeerID] if found && lastState.Seq > rec.Seq { return false, nil } - s.signedPeerRecords[rec.PeerID] = &peerRecordState{ + mab.signedPeerRecords[rec.PeerID] = &peerRecordState{ Envelope: recordEnvelope, Seq: rec.Seq, } - mab.addAddrsUnlocked(s, rec.PeerID, rec.Addrs, ttl, true) + mab.addAddrsUnlocked(rec.PeerID, rec.Addrs, ttl) return true, nil } func (mab *memoryAddrBook) addAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { - s := mab.segments.get(p) - s.Lock() - defer s.Unlock() + mab.mu.Lock() + defer mab.mu.Unlock() - mab.addAddrsUnlocked(s, p, addrs, ttl, false) + mab.addAddrsUnlocked(p, addrs, ttl) } -func (mab *memoryAddrBook) addAddrsUnlocked(s *addrSegment, p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, signed bool) { +func (mab *memoryAddrBook) addAddrsUnlocked(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { // if ttl is zero, exit. nothing to do. if ttl <= 0 { return } - amap, ok := s.addrs[p] - if !ok { - amap = &sortedAddrsByExpiry{ - m: make(map[string]*expiringAddr), - order: nil, - } - s.addrs[p] = amap - } - exp := mab.clock.Now().Add(ttl) for _, addr := range addrs { // Remove suffix of /p2p/peer-id from address @@ -303,11 +285,11 @@ func (mab *memoryAddrBook) addAddrsUnlocked(s *addrSegment, p peer.ID, addrs []m } // find the highest TTL and Expiry time between // existing records and function args - a, found := amap.m[string(addr.Bytes())] // won't allocate. + a, found := mab.addrs.FindAddr(p, addr) if !found { // not found, announce it. - entry := &expiringAddr{Addr: addr, Expires: exp, TTL: ttl} - amap.Push(entry) + entry := &expiringAddr{Addr: addr, Expires: exp, TTL: ttl, Peer: p} + heap.Push(mab.addrs, entry) mab.subManager.BroadcastAddr(p, addr) } else { // update ttl & exp to whichever is greater between new and existing entry @@ -321,7 +303,7 @@ func (mab *memoryAddrBook) addAddrsUnlocked(s *addrSegment, p peer.ID, addrs []m a.Expires = exp } if changed { - amap.Fix(a) + mab.addrs.Fix(a) } } } @@ -335,17 +317,8 @@ func (mab *memoryAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Durati // SetAddrs sets the ttl on addresses. This clears any TTL there previously. // This is used when we receive the best estimate of the validity of an address. func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { - s := mab.segments.get(p) - s.Lock() - defer s.Unlock() - - amap, ok := s.addrs[p] - if !ok { - amap = &sortedAddrsByExpiry{ - m: make(map[string]*expiringAddr), - } - s.addrs[p] = amap - } + mab.mu.Lock() + defer mab.mu.Unlock() exp := mab.clock.Now().Add(ttl) for _, addr := range addrs { @@ -358,22 +331,21 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du log.Warnf("was passed p2p address with a different peerId, found: %s wanted: %s", addrPid, p) continue } - aBytes := addr.Bytes() - if a, found := amap.m[string(aBytes)]; found { + if a, found := mab.addrs.FindAddr(p, addr); found { // re-set all of them for new ttl. if ttl > 0 { a.Addr = addr a.Expires = exp a.TTL = ttl - amap.Fix(a) + mab.addrs.Fix(a) mab.subManager.BroadcastAddr(p, addr) } else { - amap.Delete(a) + mab.addrs.Delete(a) } } else { if ttl > 0 { - amap.Push(&expiringAddr{Addr: addr, Expires: exp, TTL: ttl}) + heap.Push(mab.addrs, &expiringAddr{Addr: addr, Expires: exp, TTL: ttl, Peer: p}) mab.subManager.BroadcastAddr(p, addr) } } @@ -383,23 +355,18 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du // UpdateAddrs updates the addresses associated with the given peer that have // the given oldTTL to have the given newTTL. func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) { - s := mab.segments.get(p) - s.Lock() - defer s.Unlock() + mab.mu.Lock() + defer mab.mu.Unlock() exp := mab.clock.Now().Add(newTTL) - amap, found := s.addrs[p] - if !found { - return - } - for _, a := range amap.m { + for _, a := range mab.addrs.addrs[p] { if oldTTL == a.TTL { if newTTL == 0 { - amap.Delete(a) + mab.addrs.Delete(a) } else { a.TTL = newTTL a.Expires = exp - amap.Fix(a) + mab.addrs.Fix(a) } } } @@ -407,14 +374,13 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t // Addrs returns all known (and valid) addresses for a given peer func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr { - s := mab.segments.get(p) - s.RLock() - defer s.RUnlock() + mab.mu.RLock() + defer mab.mu.RUnlock() - if _, ok := s.addrs[p]; !ok { + if _, ok := mab.addrs.addrs[p]; !ok { return nil } - return validAddrs(mab.clock.Now(), s.addrs[p].m) + return validAddrs(mab.clock.Now(), mab.addrs.addrs[p]) } func validAddrs(now time.Time, amap map[string]*expiringAddr) []ma.Multiaddr { @@ -435,21 +401,20 @@ func validAddrs(now time.Time, amap map[string]*expiringAddr) []ma.Multiaddr { // given peer id, if one exists. // Returns nil if no signed PeerRecord exists for the peer. func (mab *memoryAddrBook) GetPeerRecord(p peer.ID) *record.Envelope { - s := mab.segments.get(p) - s.RLock() - defer s.RUnlock() + mab.mu.RLock() + defer mab.mu.RUnlock() - if _, ok := s.addrs[p]; !ok { + if _, ok := mab.addrs.addrs[p]; !ok { return nil } // although the signed record gets garbage collected when all addrs inside it are expired, // we may be in between the expiration time and the GC interval // so, we check to see if we have any valid signed addrs before returning the record - if len(validAddrs(mab.clock.Now(), s.addrs[p].m)) == 0 { + if len(validAddrs(mab.clock.Now(), mab.addrs.addrs[p])) == 0 { return nil } - state := s.signedPeerRecords[p] + state := mab.signedPeerRecords[p] if state == nil { return nil } @@ -458,29 +423,28 @@ func (mab *memoryAddrBook) GetPeerRecord(p peer.ID) *record.Envelope { // ClearAddrs removes all previously stored addresses func (mab *memoryAddrBook) ClearAddrs(p peer.ID) { - s := mab.segments.get(p) - s.Lock() - defer s.Unlock() + mab.mu.Lock() + defer mab.mu.Unlock() - delete(s.addrs, p) - delete(s.signedPeerRecords, p) + delete(mab.signedPeerRecords, p) + for _, a := range mab.addrs.addrs[p] { + mab.addrs.Delete(a) + } } // AddrStream returns a channel on which all new addresses discovered for a // given peer ID will be published. func (mab *memoryAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr { - s := mab.segments.get(p) - s.RLock() - defer s.RUnlock() - var initial []ma.Multiaddr - if baseaddrslice, ok := s.addrs[p]; ok { - initial = make([]ma.Multiaddr, 0, len(baseaddrslice.m)) - for _, a := range baseaddrslice.m { + mab.mu.RLock() + if m, ok := mab.addrs.addrs[p]; ok { + initial = make([]ma.Multiaddr, 0, len(m)) + for _, a := range m { initial = append(initial, a.Addr) } } + mab.mu.RUnlock() return mab.subManager.AddrStream(ctx, p, initial) } From f7be8eea1244b35212cedcb15d633a5ff70d3dcb Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 13 Sep 2024 20:59:01 -0700 Subject: [PATCH 05/13] Bound signed peer records --- p2p/host/peerstore/pstoremem/addr_book.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/p2p/host/peerstore/pstoremem/addr_book.go b/p2p/host/peerstore/pstoremem/addr_book.go index 31aeab21dc..893eb58b7a 100644 --- a/p2p/host/peerstore/pstoremem/addr_book.go +++ b/p2p/host/peerstore/pstoremem/addr_book.go @@ -16,6 +16,8 @@ import ( ma "github.com/multiformats/go-multiaddr" ) +var SignedPeerRecordBound = 1_000 + var log = logging.Logger("peerstore") type expiringAddr struct { @@ -131,9 +133,8 @@ func (rc realclock) Now() time.Time { // memoryAddrBook manages addresses. type memoryAddrBook struct { mu sync.RWMutex - // TODO bound this - addrs *peerAddrs - // TODO bound this + // TODO bound the number of not connected addresses we store. + addrs *peerAddrs signedPeerRecords map[peer.ID]*peerRecordState refCount sync.WaitGroup @@ -227,6 +228,8 @@ func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du mab.addAddrs(p, addrs, ttl) } +var ErrTooManyRecords = fmt.Errorf("too many signed peer records. Dropping this one") + // ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in // a record.Envelope), which will expire after the given TTL. // See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details. @@ -246,6 +249,10 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt // ensure seq is greater than, or equal to, the last received mab.mu.Lock() defer mab.mu.Unlock() + if (len(mab.signedPeerRecords)) >= SignedPeerRecordBound { + return false, ErrTooManyRecords + } + lastState, found := mab.signedPeerRecords[rec.PeerID] if found && lastState.Seq > rec.Seq { return false, nil From d517ab9d8eff44c1c1b7447ac4bf5e87ceac7335 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 13 Sep 2024 22:11:33 -0700 Subject: [PATCH 06/13] Use NextExpiry instead of Peek --- p2p/host/peerstore/pstoremem/addr_book.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/p2p/host/peerstore/pstoremem/addr_book.go b/p2p/host/peerstore/pstoremem/addr_book.go index 893eb58b7a..263b48bc57 100644 --- a/p2p/host/peerstore/pstoremem/addr_book.go +++ b/p2p/host/peerstore/pstoremem/addr_book.go @@ -110,12 +110,15 @@ func (pa *peerAddrs) FindAddr(p peer.ID, addrBytes ma.Multiaddr) (*expiringAddr, return nil, false } -func (pa *peerAddrs) Peek() *expiringAddr { - return pa.expiringHeap[len(pa.expiringHeap)-1] +func (pa *peerAddrs) NextExpiry() time.Time { + if len(pa.expiringHeap) == 0 { + return time.Time{} + } + return pa.expiringHeap[len(pa.expiringHeap)-1].Expires } func (pa *peerAddrs) gc(now time.Time) { - for len(pa.expiringHeap) > 0 && pa.Peek().ExpiredBy(now) { + for len(pa.expiringHeap) > 0 && now.After(pa.NextExpiry()) { heap.Pop(pa) } } From c7626c055d8e1bcf908820ed820a24bc46efa966 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Sat, 14 Sep 2024 08:36:19 -0700 Subject: [PATCH 07/13] Avoid extra pointer --- p2p/host/peerstore/pstoremem/addr_book.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/p2p/host/peerstore/pstoremem/addr_book.go b/p2p/host/peerstore/pstoremem/addr_book.go index 263b48bc57..d7f5d8d6f7 100644 --- a/p2p/host/peerstore/pstoremem/addr_book.go +++ b/p2p/host/peerstore/pstoremem/addr_book.go @@ -46,8 +46,8 @@ type peerAddrs struct { expiringHeap []*expiringAddr } -func newPeerAddrs() *peerAddrs { - return &peerAddrs{ +func newPeerAddrs() peerAddrs { + return peerAddrs{ addrs: make(map[peer.ID]map[string]*expiringAddr), } } @@ -137,7 +137,7 @@ func (rc realclock) Now() time.Time { type memoryAddrBook struct { mu sync.RWMutex // TODO bound the number of not connected addresses we store. - addrs *peerAddrs + addrs peerAddrs signedPeerRecords map[peer.ID]*peerRecordState refCount sync.WaitGroup @@ -299,7 +299,7 @@ func (mab *memoryAddrBook) addAddrsUnlocked(p peer.ID, addrs []ma.Multiaddr, ttl if !found { // not found, announce it. entry := &expiringAddr{Addr: addr, Expires: exp, TTL: ttl, Peer: p} - heap.Push(mab.addrs, entry) + heap.Push(&mab.addrs, entry) mab.subManager.BroadcastAddr(p, addr) } else { // update ttl & exp to whichever is greater between new and existing entry @@ -355,7 +355,7 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du } } else { if ttl > 0 { - heap.Push(mab.addrs, &expiringAddr{Addr: addr, Expires: exp, TTL: ttl, Peer: p}) + heap.Push(&mab.addrs, &expiringAddr{Addr: addr, Expires: exp, TTL: ttl, Peer: p}) mab.subManager.BroadcastAddr(p, addr) } } From 0a16cde9fd85f3cfeb91c42e5e59c8141fdecca3 Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 16 Sep 2024 01:30:59 +0530 Subject: [PATCH 08/13] gc signed peer records too --- core/peerstore/peerstore.go | 55 +++++--------- p2p/host/peerstore/pstoremem/addr_book.go | 91 ++++++++++++----------- 2 files changed, 66 insertions(+), 80 deletions(-) diff --git a/core/peerstore/peerstore.go b/core/peerstore/peerstore.go index 0ef09df9fe..7f844f1335 100644 --- a/core/peerstore/peerstore.go +++ b/core/peerstore/peerstore.go @@ -122,17 +122,9 @@ type AddrBook interface { } // CertifiedAddrBook manages "self-certified" addresses for remote peers. -// Self-certified addresses are contained in peer.PeerRecords -// which are wrapped in a record.Envelope and signed by the peer -// to whom they belong. -// -// Certified addresses (CA) are generally more secure than uncertified -// addresses (UA). Consequently, CAs beat and displace UAs. When the -// peerstore learns CAs for a peer, it will reject UAs for the same peer -// (as long as the former haven't expired). -// Furthermore, peer records act like sequenced snapshots of CAs. Therefore, -// processing a peer record that's newer than the last one seen overwrites -// all addresses with the incoming ones. +// Self-certified addresses are contained in signed peer.PeerRecords. +// Certified addresses are generally more secure than uncertified +// addresses. // // This interface is most useful when combined with AddrBook. // To test whether a given AddrBook / Peerstore implementation supports @@ -143,36 +135,23 @@ type AddrBook interface { // cab.ConsumePeerRecord(signedPeerRecord, aTTL) // } type CertifiedAddrBook interface { - // ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in - // a record.Envelope), which will expire after the given TTL. - // - // The 'accepted' return value indicates that the record was successfully processed - // and integrated into the CertifiedAddrBook state. If 'accepted' is false but no - // error is returned, it means that the record was ignored, most likely because - // a newer record exists for the same peer. - // - // Signed records added via this method will be stored without - // alteration as long as the address TTLs remain valid. The Envelopes - // containing the PeerRecords can be retrieved by calling GetPeerRecord(peerID). + // ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will expire when + // all addresses associated with the peer have expired. The addresses in provided signed + // peer.PeerRecord are expired after `ttl` duration. // - // If the signed PeerRecord belongs to a peer that already has certified - // addresses in the CertifiedAddrBook, the new addresses will replace the - // older ones, if the new record has a higher sequence number than the - // existing record. Attempting to add a peer record with a - // sequence number that's <= an existing record for the same peer will not - // result in an error, but the record will be ignored, and the 'accepted' - // bool return value will be false. + // The `accepted` return value indicates that the record was successfully processed. If + // `accepted` is false but no error is returned, it means that the record was ignored, most + // likely because a newer record exists for the same peer. // - // If the CertifiedAddrBook is also an AddrBook (which is most likely the case), - // adding certified addresses for a peer will *replace* any - // existing non-certified addresses for that peer, and only the certified - // addresses will be returned from AddrBook.Addrs thereafter. + // If the signed peer.PeerRecord belongs to a peer that already has certified addresses in + // the CertifiedAddrBook, and if the new record has a higher sequence number than the + // existing record, the new addresses will be added and the older ones will be kept + // unchanged. Attempting to add a peer record with a sequence number that's lower than an + // existing record will not result in an error, but the record will be ignored, and the + // `accepted` return value will be false. // - // Likewise, once certified addresses have been added for a given peer, - // any non-certified addresses added via AddrBook.AddAddrs or - // AddrBook.SetAddrs will be ignored. AddrBook.SetAddrs may still be used - // to update the TTL of certified addresses that have previously been - // added via ConsumePeerRecord. + // The Envelopes containing the PeerRecords can be retrieved by calling + // GetPeerRecord(peerID). ConsumePeerRecord(s *record.Envelope, ttl time.Duration) (accepted bool, err error) // GetPeerRecord returns an Envelope containing a PeerRecord for the diff --git a/p2p/host/peerstore/pstoremem/addr_book.go b/p2p/host/peerstore/pstoremem/addr_book.go index d7f5d8d6f7..8019828f3e 100644 --- a/p2p/host/peerstore/pstoremem/addr_book.go +++ b/p2p/host/peerstore/pstoremem/addr_book.go @@ -16,7 +16,7 @@ import ( ma "github.com/multiformats/go-multiaddr" ) -var SignedPeerRecordBound = 1_000 +var SignedPeerRecordBound = 100_000 var log = logging.Logger("peerstore") @@ -42,13 +42,13 @@ type peerRecordState struct { var _ heap.Interface = &peerAddrs{} type peerAddrs struct { - addrs map[peer.ID]map[string]*expiringAddr // peer.ID -> addr.Bytes() -> *expiringAddr + Addrs map[peer.ID]map[string]*expiringAddr // peer.ID -> addr.Bytes() -> *expiringAddr expiringHeap []*expiringAddr } func newPeerAddrs() peerAddrs { return peerAddrs{ - addrs: make(map[peer.ID]map[string]*expiringAddr), + Addrs: make(map[peer.ID]map[string]*expiringAddr), } } @@ -63,10 +63,10 @@ func (pa *peerAddrs) Swap(i, j int) { } func (pa *peerAddrs) Push(x any) { a := x.(*expiringAddr) - if _, ok := pa.addrs[a.Peer]; !ok { - pa.addrs[a.Peer] = make(map[string]*expiringAddr) + if _, ok := pa.Addrs[a.Peer]; !ok { + pa.Addrs[a.Peer] = make(map[string]*expiringAddr) } - pa.addrs[a.Peer][string(a.Addr.Bytes())] = a + pa.Addrs[a.Peer][string(a.Addr.Bytes())] = a a.heapIndex = len(pa.expiringHeap) pa.expiringHeap = append(pa.expiringHeap, a) } @@ -77,10 +77,10 @@ func (pa *peerAddrs) Pop() any { a.heapIndex = -1 pa.expiringHeap = old[0 : n-1] - if m, ok := pa.addrs[a.Peer]; ok { + if m, ok := pa.Addrs[a.Peer]; ok { delete(m, string(a.Addr.Bytes())) if len(m) == 0 { - delete(pa.addrs, a.Peer) + delete(pa.Addrs, a.Peer) } } @@ -94,16 +94,16 @@ func (pa *peerAddrs) Fix(a *expiringAddr) { func (pa *peerAddrs) Delete(a *expiringAddr) { heap.Remove(pa, a.heapIndex) a.heapIndex = -1 - if m, ok := pa.addrs[a.Peer]; ok { + if m, ok := pa.Addrs[a.Peer]; ok { delete(m, string(a.Addr.Bytes())) if len(m) == 0 { - delete(pa.addrs, a.Peer) + delete(pa.Addrs, a.Peer) } } } func (pa *peerAddrs) FindAddr(p peer.ID, addrBytes ma.Multiaddr) (*expiringAddr, bool) { - if m, ok := pa.addrs[p]; ok { + if m, ok := pa.Addrs[p]; ok { v, ok := m[string(addrBytes.Bytes())] return v, ok } @@ -117,10 +117,12 @@ func (pa *peerAddrs) NextExpiry() time.Time { return pa.expiringHeap[len(pa.expiringHeap)-1].Expires } -func (pa *peerAddrs) gc(now time.Time) { - for len(pa.expiringHeap) > 0 && now.After(pa.NextExpiry()) { - heap.Pop(pa) +func (pa *peerAddrs) PopIfExpired(now time.Time) (*expiringAddr, bool) { + if len(pa.expiringHeap) > 0 && now.After(pa.NextExpiry()) { + a := heap.Pop(pa) + return a.(*expiringAddr), true } + return nil, false } type clock interface { @@ -201,14 +203,20 @@ func (mab *memoryAddrBook) gc() { now := mab.clock.Now() mab.mu.Lock() defer mab.mu.Unlock() - mab.addrs.gc(now) + for { + ea, ok := mab.addrs.PopIfExpired(now) + if !ok { + return + } + mab.maybeDeleteSignedPeerRecordUnlocked(ea.Peer) + } } func (mab *memoryAddrBook) PeersWithAddrs() peer.IDSlice { mab.mu.RLock() defer mab.mu.RUnlock() - peers := make(peer.IDSlice, 0, len(mab.addrs.addrs)) - for pid := range mab.addrs.addrs { + peers := make(peer.IDSlice, 0, len(mab.addrs.Addrs)) + for pid := range mab.addrs.Addrs { peers = append(peers, pid) } return peers @@ -219,22 +227,15 @@ func (mab *memoryAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Durati mab.AddAddrs(p, []ma.Multiaddr{addr}, ttl) } -// AddAddrs gives memoryAddrBook addresses to use, with a given ttl -// (time-to-live), after which the address is no longer valid. +// AddAddrs adds `addrs` for peer `p`, which will expire after the given `ttl`. // This function never reduces the TTL or expiration of an address. func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { - // if we have a valid peer record, ignore unsigned addrs - // peerRec := mab.GetPeerRecord(p) - // if peerRec != nil { - // return - // } mab.addAddrs(p, addrs, ttl) } var ErrTooManyRecords = fmt.Errorf("too many signed peer records. Dropping this one") -// ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in -// a record.Envelope), which will expire after the given TTL. +// ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will expire after the given TTL. // See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details. func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) { r, err := recordEnvelope.Record() @@ -249,13 +250,13 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt return false, fmt.Errorf("signing key does not match PeerID in PeerRecord") } - // ensure seq is greater than, or equal to, the last received mab.mu.Lock() defer mab.mu.Unlock() if (len(mab.signedPeerRecords)) >= SignedPeerRecordBound { return false, ErrTooManyRecords } + // ensure seq is greater than or equal to the last received lastState, found := mab.signedPeerRecords[rec.PeerID] if found && lastState.Seq > rec.Seq { return false, nil @@ -268,6 +269,12 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt return true, nil } +func (mab *memoryAddrBook) maybeDeleteSignedPeerRecordUnlocked(p peer.ID) { + if len(mab.addrs.Addrs[p]) == 0 { + delete(mab.signedPeerRecords, p) + } +} + func (mab *memoryAddrBook) addAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { mab.mu.Lock() defer mab.mu.Unlock() @@ -276,6 +283,8 @@ func (mab *memoryAddrBook) addAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du } func (mab *memoryAddrBook) addAddrsUnlocked(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { + defer mab.maybeDeleteSignedPeerRecordUnlocked(p) + // if ttl is zero, exit. nothing to do. if ttl <= 0 { return @@ -293,8 +302,6 @@ func (mab *memoryAddrBook) addAddrsUnlocked(p peer.ID, addrs []ma.Multiaddr, ttl log.Warnf("Was passed p2p address with a different peerId. found: %s, expected: %s", addrPid, p) continue } - // find the highest TTL and Expiry time between - // existing records and function args a, found := mab.addrs.FindAddr(p, addr) if !found { // not found, announce it. @@ -330,6 +337,8 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du mab.mu.Lock() defer mab.mu.Unlock() + defer mab.maybeDeleteSignedPeerRecordUnlocked(p) + exp := mab.clock.Now().Add(ttl) for _, addr := range addrs { addr, addrPid := peer.SplitAddr(addr) @@ -343,7 +352,6 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du } if a, found := mab.addrs.FindAddr(p, addr); found { - // re-set all of them for new ttl. if ttl > 0 { a.Addr = addr a.Expires = exp @@ -367,9 +375,11 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) { mab.mu.Lock() defer mab.mu.Unlock() - exp := mab.clock.Now().Add(newTTL) - for _, a := range mab.addrs.addrs[p] { + defer mab.maybeDeleteSignedPeerRecordUnlocked(p) + + exp := mab.clock.Now().Add(newTTL) + for _, a := range mab.addrs.Addrs[p] { if oldTTL == a.TTL { if newTTL == 0 { mab.addrs.Delete(a) @@ -386,11 +396,10 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr { mab.mu.RLock() defer mab.mu.RUnlock() - - if _, ok := mab.addrs.addrs[p]; !ok { + if _, ok := mab.addrs.Addrs[p]; !ok { return nil } - return validAddrs(mab.clock.Now(), mab.addrs.addrs[p]) + return validAddrs(mab.clock.Now(), mab.addrs.Addrs[p]) } func validAddrs(now time.Time, amap map[string]*expiringAddr) []ma.Multiaddr { @@ -414,13 +423,11 @@ func (mab *memoryAddrBook) GetPeerRecord(p peer.ID) *record.Envelope { mab.mu.RLock() defer mab.mu.RUnlock() - if _, ok := mab.addrs.addrs[p]; !ok { + if _, ok := mab.addrs.Addrs[p]; !ok { return nil } - // although the signed record gets garbage collected when all addrs inside it are expired, - // we may be in between the expiration time and the GC interval - // so, we check to see if we have any valid signed addrs before returning the record - if len(validAddrs(mab.clock.Now(), mab.addrs.addrs[p])) == 0 { + // The record may have expired, but not gargage collected. + if len(validAddrs(mab.clock.Now(), mab.addrs.Addrs[p])) == 0 { return nil } @@ -437,7 +444,7 @@ func (mab *memoryAddrBook) ClearAddrs(p peer.ID) { defer mab.mu.Unlock() delete(mab.signedPeerRecords, p) - for _, a := range mab.addrs.addrs[p] { + for _, a := range mab.addrs.Addrs[p] { mab.addrs.Delete(a) } } @@ -448,7 +455,7 @@ func (mab *memoryAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma. var initial []ma.Multiaddr mab.mu.RLock() - if m, ok := mab.addrs.addrs[p]; ok { + if m, ok := mab.addrs.Addrs[p]; ok { initial = make([]ma.Multiaddr, 0, len(m)) for _, a := range m { initial = append(initial, a.Addr) From f4954828a998df55db516e168b328b914907f64b Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 16 Sep 2024 02:31:04 +0530 Subject: [PATCH 09/13] fix heap property, fix after check --- core/peerstore/peerstore.go | 2 +- p2p/host/peerstore/pstoremem/addr_book.go | 11 +- .../peerstore/pstoremem/addr_book_test.go | 133 ++++++++++++++++++ 3 files changed, 139 insertions(+), 7 deletions(-) create mode 100644 p2p/host/peerstore/pstoremem/addr_book_test.go diff --git a/core/peerstore/peerstore.go b/core/peerstore/peerstore.go index 7f844f1335..7bc28b9d74 100644 --- a/core/peerstore/peerstore.go +++ b/core/peerstore/peerstore.go @@ -175,7 +175,7 @@ func GetCertifiedAddrBook(ab AddrBook) (cab CertifiedAddrBook, ok bool) { // KeyBook tracks the keys of Peers. type KeyBook interface { - // PubKey stores the public key of a peer. + // PubKey returns the public key of a peer. PubKey(peer.ID) ic.PubKey // AddPubKey stores the public key of a peer. diff --git a/p2p/host/peerstore/pstoremem/addr_book.go b/p2p/host/peerstore/pstoremem/addr_book.go index 8019828f3e..34d91a2043 100644 --- a/p2p/host/peerstore/pstoremem/addr_book.go +++ b/p2p/host/peerstore/pstoremem/addr_book.go @@ -71,11 +71,9 @@ func (pa *peerAddrs) Push(x any) { pa.expiringHeap = append(pa.expiringHeap, a) } func (pa *peerAddrs) Pop() any { - old := pa.expiringHeap - n := len(old) - a := old[n-1] + a := pa.expiringHeap[len(pa.expiringHeap)-1] a.heapIndex = -1 - pa.expiringHeap = old[0 : n-1] + pa.expiringHeap = pa.expiringHeap[0 : len(pa.expiringHeap)-1] if m, ok := pa.Addrs[a.Peer]; ok { delete(m, string(a.Addr.Bytes())) @@ -114,11 +112,12 @@ func (pa *peerAddrs) NextExpiry() time.Time { if len(pa.expiringHeap) == 0 { return time.Time{} } - return pa.expiringHeap[len(pa.expiringHeap)-1].Expires + return pa.expiringHeap[0].Expires } func (pa *peerAddrs) PopIfExpired(now time.Time) (*expiringAddr, bool) { - if len(pa.expiringHeap) > 0 && now.After(pa.NextExpiry()) { + // Use !Before and not After to ensure that we expire *at* now and not just after now. + if len(pa.expiringHeap) > 0 && !now.Before(pa.NextExpiry()) { a := heap.Pop(pa) return a.(*expiringAddr), true } diff --git a/p2p/host/peerstore/pstoremem/addr_book_test.go b/p2p/host/peerstore/pstoremem/addr_book_test.go new file mode 100644 index 0000000000..46a0faa5d1 --- /dev/null +++ b/p2p/host/peerstore/pstoremem/addr_book_test.go @@ -0,0 +1,133 @@ +package pstoremem + +import ( + "container/heap" + "fmt" + "slices" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" +) + +func TestPeerAddrsNextExpiry(t *testing.T) { + paa := newPeerAddrs() + pa := &paa + a1 := ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1") + a2 := ma.StringCast("/ip4/1.2.3.4/udp/2/quic-v1") + + // t1 is before t2 + t1 := time.Time{}.Add(1 * time.Second) + t2 := time.Time{}.Add(2 * time.Second) + heap.Push(pa, &expiringAddr{Addr: a1, Expires: t1, TTL: 10 * time.Second, Peer: "p1"}) + heap.Push(pa, &expiringAddr{Addr: a2, Expires: t2, TTL: 10 * time.Second, Peer: "p2"}) + + if pa.NextExpiry() != t1 { + t.Fatal("expiry should be set to t1, got", pa.NextExpiry()) + } +} + +func TestPeerAddrsHeapProperty(t *testing.T) { + paa := newPeerAddrs() + pa := &paa + addrs := []ma.Multiaddr{} + expiries := []time.Time{} + + const N = 10000 + for i := 0; i < N; i++ { + addrs = append(addrs, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.4/udp/%d/quic-v1", i))) + expiries = append(expiries, time.Time{}.Add(time.Duration(10000-i)*time.Second)) // expiries are in reverse order + pid := peer.ID(fmt.Sprintf("p%d", i)) + heap.Push(pa, &expiringAddr{Addr: addrs[i], Expires: expiries[i], TTL: 10 * time.Second, Peer: pid}) + } + + for i := N - 1; i >= 0; i-- { + ea, ok := pa.PopIfExpired(expiries[i]) + require.True(t, ok) + require.Equal(t, ea.Addr, addrs[i]) + + ea, ok = pa.PopIfExpired(expiries[i]) + require.False(t, ok) + require.Nil(t, ea) + } +} + +func TestPeerAddrsHeapPropertyDeletions(t *testing.T) { + paa := newPeerAddrs() + pa := &paa + expiringAddrs := []*expiringAddr{} + + const N = 10000 + for i := 0; i < N; i++ { + a := ma.StringCast(fmt.Sprintf("/ip4/1.2.3.4/udp/%d/quic-v1", i)) + e := time.Time{}.Add(time.Duration(10000-i) * time.Second) // expiries are in reverse order + p := peer.ID(fmt.Sprintf("p%d", i)) + expiringAddrs = append(expiringAddrs, &expiringAddr{Addr: a, Expires: e, TTL: 10 * time.Second, Peer: p}) + heap.Push(pa, expiringAddrs[i]) + } + + // delete every 3rd element + for i := 0; i < N; i += 3 { + paa.Delete(expiringAddrs[i]) + } + + for i := N - 1; i >= 0; i-- { + ea, ok := pa.PopIfExpired(expiringAddrs[i].Expires) + if i%3 == 0 { + require.False(t, ok) + require.Nil(t, ea) + } else { + require.True(t, ok) + require.Equal(t, ea.Addr, expiringAddrs[i].Addr) + } + + ea, ok = pa.PopIfExpired(expiringAddrs[i].Expires) + require.False(t, ok) + require.Nil(t, ea) + } +} + +func TestPeerAddrsHeapPropertyUpdates(t *testing.T) { + paa := newPeerAddrs() + pa := &paa + expiringAddrs := []*expiringAddr{} + + const N = 10000 + for i := 0; i < N; i++ { + a := ma.StringCast(fmt.Sprintf("/ip4/1.2.3.4/udp/%d/quic-v1", i)) + e := time.Time{}.Add(time.Duration(N-i) * time.Second) // expiries are in reverse order + p := peer.ID(fmt.Sprintf("p%d", i)) + expiringAddrs = append(expiringAddrs, &expiringAddr{Addr: a, Expires: e, TTL: 10 * time.Second, Peer: p}) + heap.Push(pa, expiringAddrs[i]) + } + + // update every 3rd element to expire at the end + var endElements []ma.Multiaddr + for i := 0; i < N; i += 3 { + expiringAddrs[i].Expires = time.Time{}.Add(1000_000 * time.Second) + pa.Fix(expiringAddrs[i]) + endElements = append(endElements, expiringAddrs[i].Addr) + } + + for i := N - 1; i >= 0; i-- { + if i%3 == 0 { + continue // skip the elements at the end + } + ea, ok := pa.PopIfExpired(expiringAddrs[i].Expires) + require.True(t, ok, "pos: %d", i) + require.Equal(t, ea.Addr, expiringAddrs[i].Addr) + + ea, ok = pa.PopIfExpired(expiringAddrs[i].Expires) + require.False(t, ok) + require.Nil(t, ea) + } + + for len(endElements) > 0 { + ea, ok := pa.PopIfExpired(time.Time{}.Add(1000_000 * time.Second)) + require.True(t, ok) + require.Contains(t, endElements, ea.Addr) + endElements = slices.DeleteFunc(endElements, func(a ma.Multiaddr) bool { return ea.Addr.Equal(a) }) + } +} From 253dfeda962ffdbd1dba2e4f7dccce2aaf37d9d7 Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 16 Sep 2024 12:48:15 +0530 Subject: [PATCH 10/13] add an exhaustive test for expiry --- .../peerstore/pstoremem/addr_book_test.go | 87 ++++++++++++++----- 1 file changed, 64 insertions(+), 23 deletions(-) diff --git a/p2p/host/peerstore/pstoremem/addr_book_test.go b/p2p/host/peerstore/pstoremem/addr_book_test.go index 46a0faa5d1..caa1bb293b 100644 --- a/p2p/host/peerstore/pstoremem/addr_book_test.go +++ b/p2p/host/peerstore/pstoremem/addr_book_test.go @@ -3,6 +3,7 @@ package pstoremem import ( "container/heap" "fmt" + "math/rand" "slices" "testing" "time" @@ -29,26 +30,33 @@ func TestPeerAddrsNextExpiry(t *testing.T) { } } +func peerAddrsInput(n int) []*expiringAddr { + expiringAddrs := make([]*expiringAddr, n) + for i := 0; i < n; i++ { + a := ma.StringCast(fmt.Sprintf("/ip4/1.2.3.4/udp/%d/quic-v1", i)) + e := time.Time{}.Add(time.Duration(i) * time.Second) // expiries are in reverse order + p := peer.ID(fmt.Sprintf("p%d", i)) + expiringAddrs[i] = &expiringAddr{Addr: a, Expires: e, TTL: 10 * time.Second, Peer: p} + } + return expiringAddrs +} + func TestPeerAddrsHeapProperty(t *testing.T) { paa := newPeerAddrs() pa := &paa - addrs := []ma.Multiaddr{} - expiries := []time.Time{} const N = 10000 + expiringAddrs := peerAddrsInput(N) for i := 0; i < N; i++ { - addrs = append(addrs, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.4/udp/%d/quic-v1", i))) - expiries = append(expiries, time.Time{}.Add(time.Duration(10000-i)*time.Second)) // expiries are in reverse order - pid := peer.ID(fmt.Sprintf("p%d", i)) - heap.Push(pa, &expiringAddr{Addr: addrs[i], Expires: expiries[i], TTL: 10 * time.Second, Peer: pid}) + heap.Push(pa, expiringAddrs[i]) } - for i := N - 1; i >= 0; i-- { - ea, ok := pa.PopIfExpired(expiries[i]) - require.True(t, ok) - require.Equal(t, ea.Addr, addrs[i]) + for i := 0; i < N; i++ { + ea, ok := pa.PopIfExpired(expiringAddrs[i].Expires) + require.True(t, ok, "pos: %d", i) + require.Equal(t, ea.Addr, expiringAddrs[i].Addr) - ea, ok = pa.PopIfExpired(expiries[i]) + ea, ok = pa.PopIfExpired(expiringAddrs[i].Expires) require.False(t, ok) require.Nil(t, ea) } @@ -57,14 +65,10 @@ func TestPeerAddrsHeapProperty(t *testing.T) { func TestPeerAddrsHeapPropertyDeletions(t *testing.T) { paa := newPeerAddrs() pa := &paa - expiringAddrs := []*expiringAddr{} const N = 10000 + expiringAddrs := peerAddrsInput(N) for i := 0; i < N; i++ { - a := ma.StringCast(fmt.Sprintf("/ip4/1.2.3.4/udp/%d/quic-v1", i)) - e := time.Time{}.Add(time.Duration(10000-i) * time.Second) // expiries are in reverse order - p := peer.ID(fmt.Sprintf("p%d", i)) - expiringAddrs = append(expiringAddrs, &expiringAddr{Addr: a, Expires: e, TTL: 10 * time.Second, Peer: p}) heap.Push(pa, expiringAddrs[i]) } @@ -73,7 +77,7 @@ func TestPeerAddrsHeapPropertyDeletions(t *testing.T) { paa.Delete(expiringAddrs[i]) } - for i := N - 1; i >= 0; i-- { + for i := 0; i < N; i++ { ea, ok := pa.PopIfExpired(expiringAddrs[i].Expires) if i%3 == 0 { require.False(t, ok) @@ -92,14 +96,10 @@ func TestPeerAddrsHeapPropertyDeletions(t *testing.T) { func TestPeerAddrsHeapPropertyUpdates(t *testing.T) { paa := newPeerAddrs() pa := &paa - expiringAddrs := []*expiringAddr{} const N = 10000 + expiringAddrs := peerAddrsInput(N) for i := 0; i < N; i++ { - a := ma.StringCast(fmt.Sprintf("/ip4/1.2.3.4/udp/%d/quic-v1", i)) - e := time.Time{}.Add(time.Duration(N-i) * time.Second) // expiries are in reverse order - p := peer.ID(fmt.Sprintf("p%d", i)) - expiringAddrs = append(expiringAddrs, &expiringAddr{Addr: a, Expires: e, TTL: 10 * time.Second, Peer: p}) heap.Push(pa, expiringAddrs[i]) } @@ -111,7 +111,7 @@ func TestPeerAddrsHeapPropertyUpdates(t *testing.T) { endElements = append(endElements, expiringAddrs[i].Addr) } - for i := N - 1; i >= 0; i-- { + for i := 0; i < N; i++ { if i%3 == 0 { continue // skip the elements at the end } @@ -131,3 +131,44 @@ func TestPeerAddrsHeapPropertyUpdates(t *testing.T) { endElements = slices.DeleteFunc(endElements, func(a ma.Multiaddr) bool { return ea.Addr.Equal(a) }) } } + +// TestPeerAddrsExpiry tests for multiple element expiry with PopIfExpired. +func TestPeerAddrsExpiry(t *testing.T) { + const T = 100_000 + for x := 0; x < T; x++ { + paa := newPeerAddrs() + pa := &paa + // Try a lot of random inputs. + // T > 5*((5^5)*5) (=15k) + // So this should test for all possible 5 element inputs. + const N = 5 + expiringAddrs := peerAddrsInput(N) + for i := 0; i < N; i++ { + expiringAddrs[i].Expires = time.Time{}.Add(time.Duration(1+rand.Intn(N)) * time.Second) + } + for i := 0; i < N; i++ { + heap.Push(pa, expiringAddrs[i]) + } + + expiry := time.Time{}.Add(time.Duration(1+rand.Intn(N)) * time.Second) + expected := []ma.Multiaddr{} + for i := 0; i < N; i++ { + if !expiry.Before(expiringAddrs[i].Expires) { + expected = append(expected, expiringAddrs[i].Addr) + } + } + got := []ma.Multiaddr{} + for { + ea, ok := pa.PopIfExpired(expiry) + if !ok { + break + } + got = append(got, ea.Addr) + } + expiries := []int{} + for i := 0; i < N; i++ { + expiries = append(expiries, expiringAddrs[i].Expires.Second()) + } + require.ElementsMatch(t, expected, got, "failed for input: element expiries: %v, expiry: %v", expiries, expiry.Second()) + } +} From 0a6a57997b5d0c39f6ccfdbe6d6f6e2fb3158173 Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 16 Sep 2024 12:51:12 +0530 Subject: [PATCH 11/13] rename Expires -> Expiry --- p2p/host/peerstore/pstoremem/addr_book.go | 30 +++++++++---------- .../peerstore/pstoremem/addr_book_test.go | 26 ++++++++-------- 2 files changed, 27 insertions(+), 29 deletions(-) diff --git a/p2p/host/peerstore/pstoremem/addr_book.go b/p2p/host/peerstore/pstoremem/addr_book.go index 34d91a2043..b37aee83c5 100644 --- a/p2p/host/peerstore/pstoremem/addr_book.go +++ b/p2p/host/peerstore/pstoremem/addr_book.go @@ -21,16 +21,16 @@ var SignedPeerRecordBound = 100_000 var log = logging.Logger("peerstore") type expiringAddr struct { - Addr ma.Multiaddr - TTL time.Duration - Expires time.Time - Peer peer.ID + Addr ma.Multiaddr + TTL time.Duration + Expiry time.Time + Peer peer.ID // to sort by expiry time heapIndex int } func (e *expiringAddr) ExpiredBy(t time.Time) bool { - return !t.Before(e.Expires) + return !t.Before(e.Expiry) } type peerRecordState struct { @@ -54,7 +54,7 @@ func newPeerAddrs() peerAddrs { func (pa *peerAddrs) Len() int { return len(pa.expiringHeap) } func (pa *peerAddrs) Less(i, j int) bool { - return pa.expiringHeap[i].Expires.Before(pa.expiringHeap[j].Expires) + return pa.expiringHeap[i].Expiry.Before(pa.expiringHeap[j].Expiry) } func (pa *peerAddrs) Swap(i, j int) { pa.expiringHeap[i], pa.expiringHeap[j] = pa.expiringHeap[j], pa.expiringHeap[i] @@ -81,7 +81,6 @@ func (pa *peerAddrs) Pop() any { delete(pa.Addrs, a.Peer) } } - return a } @@ -112,11 +111,11 @@ func (pa *peerAddrs) NextExpiry() time.Time { if len(pa.expiringHeap) == 0 { return time.Time{} } - return pa.expiringHeap[0].Expires + return pa.expiringHeap[0].Expiry } func (pa *peerAddrs) PopIfExpired(now time.Time) (*expiringAddr, bool) { - // Use !Before and not After to ensure that we expire *at* now and not just after now. + // Use `!Before` instead of `After` to ensure that we expire *at* now, and not *just after now*. if len(pa.expiringHeap) > 0 && !now.Before(pa.NextExpiry()) { a := heap.Pop(pa) return a.(*expiringAddr), true @@ -304,7 +303,7 @@ func (mab *memoryAddrBook) addAddrsUnlocked(p peer.ID, addrs []ma.Multiaddr, ttl a, found := mab.addrs.FindAddr(p, addr) if !found { // not found, announce it. - entry := &expiringAddr{Addr: addr, Expires: exp, TTL: ttl, Peer: p} + entry := &expiringAddr{Addr: addr, Expiry: exp, TTL: ttl, Peer: p} heap.Push(&mab.addrs, entry) mab.subManager.BroadcastAddr(p, addr) } else { @@ -314,9 +313,9 @@ func (mab *memoryAddrBook) addAddrsUnlocked(p peer.ID, addrs []ma.Multiaddr, ttl changed = true a.TTL = ttl } - if exp.After(a.Expires) { + if exp.After(a.Expiry) { changed = true - a.Expires = exp + a.Expiry = exp } if changed { mab.addrs.Fix(a) @@ -353,7 +352,7 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du if a, found := mab.addrs.FindAddr(p, addr); found { if ttl > 0 { a.Addr = addr - a.Expires = exp + a.Expiry = exp a.TTL = ttl mab.addrs.Fix(a) mab.subManager.BroadcastAddr(p, addr) @@ -362,7 +361,7 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du } } else { if ttl > 0 { - heap.Push(&mab.addrs, &expiringAddr{Addr: addr, Expires: exp, TTL: ttl, Peer: p}) + heap.Push(&mab.addrs, &expiringAddr{Addr: addr, Expiry: exp, TTL: ttl, Peer: p}) mab.subManager.BroadcastAddr(p, addr) } } @@ -384,7 +383,7 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t mab.addrs.Delete(a) } else { a.TTL = newTTL - a.Expires = exp + a.Expiry = exp mab.addrs.Fix(a) } } @@ -411,7 +410,6 @@ func validAddrs(now time.Time, amap map[string]*expiringAddr) []ma.Multiaddr { good = append(good, m.Addr) } } - return good } diff --git a/p2p/host/peerstore/pstoremem/addr_book_test.go b/p2p/host/peerstore/pstoremem/addr_book_test.go index caa1bb293b..a6da9d7bdd 100644 --- a/p2p/host/peerstore/pstoremem/addr_book_test.go +++ b/p2p/host/peerstore/pstoremem/addr_book_test.go @@ -22,8 +22,8 @@ func TestPeerAddrsNextExpiry(t *testing.T) { // t1 is before t2 t1 := time.Time{}.Add(1 * time.Second) t2 := time.Time{}.Add(2 * time.Second) - heap.Push(pa, &expiringAddr{Addr: a1, Expires: t1, TTL: 10 * time.Second, Peer: "p1"}) - heap.Push(pa, &expiringAddr{Addr: a2, Expires: t2, TTL: 10 * time.Second, Peer: "p2"}) + heap.Push(pa, &expiringAddr{Addr: a1, Expiry: t1, TTL: 10 * time.Second, Peer: "p1"}) + heap.Push(pa, &expiringAddr{Addr: a2, Expiry: t2, TTL: 10 * time.Second, Peer: "p2"}) if pa.NextExpiry() != t1 { t.Fatal("expiry should be set to t1, got", pa.NextExpiry()) @@ -36,7 +36,7 @@ func peerAddrsInput(n int) []*expiringAddr { a := ma.StringCast(fmt.Sprintf("/ip4/1.2.3.4/udp/%d/quic-v1", i)) e := time.Time{}.Add(time.Duration(i) * time.Second) // expiries are in reverse order p := peer.ID(fmt.Sprintf("p%d", i)) - expiringAddrs[i] = &expiringAddr{Addr: a, Expires: e, TTL: 10 * time.Second, Peer: p} + expiringAddrs[i] = &expiringAddr{Addr: a, Expiry: e, TTL: 10 * time.Second, Peer: p} } return expiringAddrs } @@ -52,11 +52,11 @@ func TestPeerAddrsHeapProperty(t *testing.T) { } for i := 0; i < N; i++ { - ea, ok := pa.PopIfExpired(expiringAddrs[i].Expires) + ea, ok := pa.PopIfExpired(expiringAddrs[i].Expiry) require.True(t, ok, "pos: %d", i) require.Equal(t, ea.Addr, expiringAddrs[i].Addr) - ea, ok = pa.PopIfExpired(expiringAddrs[i].Expires) + ea, ok = pa.PopIfExpired(expiringAddrs[i].Expiry) require.False(t, ok) require.Nil(t, ea) } @@ -78,7 +78,7 @@ func TestPeerAddrsHeapPropertyDeletions(t *testing.T) { } for i := 0; i < N; i++ { - ea, ok := pa.PopIfExpired(expiringAddrs[i].Expires) + ea, ok := pa.PopIfExpired(expiringAddrs[i].Expiry) if i%3 == 0 { require.False(t, ok) require.Nil(t, ea) @@ -87,7 +87,7 @@ func TestPeerAddrsHeapPropertyDeletions(t *testing.T) { require.Equal(t, ea.Addr, expiringAddrs[i].Addr) } - ea, ok = pa.PopIfExpired(expiringAddrs[i].Expires) + ea, ok = pa.PopIfExpired(expiringAddrs[i].Expiry) require.False(t, ok) require.Nil(t, ea) } @@ -106,7 +106,7 @@ func TestPeerAddrsHeapPropertyUpdates(t *testing.T) { // update every 3rd element to expire at the end var endElements []ma.Multiaddr for i := 0; i < N; i += 3 { - expiringAddrs[i].Expires = time.Time{}.Add(1000_000 * time.Second) + expiringAddrs[i].Expiry = time.Time{}.Add(1000_000 * time.Second) pa.Fix(expiringAddrs[i]) endElements = append(endElements, expiringAddrs[i].Addr) } @@ -115,11 +115,11 @@ func TestPeerAddrsHeapPropertyUpdates(t *testing.T) { if i%3 == 0 { continue // skip the elements at the end } - ea, ok := pa.PopIfExpired(expiringAddrs[i].Expires) + ea, ok := pa.PopIfExpired(expiringAddrs[i].Expiry) require.True(t, ok, "pos: %d", i) require.Equal(t, ea.Addr, expiringAddrs[i].Addr) - ea, ok = pa.PopIfExpired(expiringAddrs[i].Expires) + ea, ok = pa.PopIfExpired(expiringAddrs[i].Expiry) require.False(t, ok) require.Nil(t, ea) } @@ -144,7 +144,7 @@ func TestPeerAddrsExpiry(t *testing.T) { const N = 5 expiringAddrs := peerAddrsInput(N) for i := 0; i < N; i++ { - expiringAddrs[i].Expires = time.Time{}.Add(time.Duration(1+rand.Intn(N)) * time.Second) + expiringAddrs[i].Expiry = time.Time{}.Add(time.Duration(1+rand.Intn(N)) * time.Second) } for i := 0; i < N; i++ { heap.Push(pa, expiringAddrs[i]) @@ -153,7 +153,7 @@ func TestPeerAddrsExpiry(t *testing.T) { expiry := time.Time{}.Add(time.Duration(1+rand.Intn(N)) * time.Second) expected := []ma.Multiaddr{} for i := 0; i < N; i++ { - if !expiry.Before(expiringAddrs[i].Expires) { + if !expiry.Before(expiringAddrs[i].Expiry) { expected = append(expected, expiringAddrs[i].Addr) } } @@ -167,7 +167,7 @@ func TestPeerAddrsExpiry(t *testing.T) { } expiries := []int{} for i := 0; i < N; i++ { - expiries = append(expiries, expiringAddrs[i].Expires.Second()) + expiries = append(expiries, expiringAddrs[i].Expiry.Second()) } require.ElementsMatch(t, expected, got, "failed for input: element expiries: %v, expiry: %v", expiries, expiry.Second()) } From ca8f4d6b80fc405acc34424e1ae22139e1a03965 Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 16 Sep 2024 15:46:14 +0530 Subject: [PATCH 12/13] remove bound on number of SignedPeerRecords --- core/peerstore/peerstore.go | 38 ++++++++-------- p2p/host/basic/basic_host.go | 55 +++++++++++++---------- p2p/host/peerstore/pstoremem/addr_book.go | 7 --- 3 files changed, 49 insertions(+), 51 deletions(-) diff --git a/core/peerstore/peerstore.go b/core/peerstore/peerstore.go index 7bc28b9d74..10469e72cb 100644 --- a/core/peerstore/peerstore.go +++ b/core/peerstore/peerstore.go @@ -121,12 +121,10 @@ type AddrBook interface { PeersWithAddrs() peer.IDSlice } -// CertifiedAddrBook manages "self-certified" addresses for remote peers. -// Self-certified addresses are contained in signed peer.PeerRecords. -// Certified addresses are generally more secure than uncertified -// addresses. +// CertifiedAddrBook manages signed peer records and "self-certified" addresses +// contained within them. +// Use this interface with an `AddrBook`. // -// This interface is most useful when combined with AddrBook. // To test whether a given AddrBook / Peerstore implementation supports // certified addresses, callers should use the GetCertifiedAddrBook helper or // type-assert on the CertifiedAddrBook interface: @@ -135,28 +133,28 @@ type AddrBook interface { // cab.ConsumePeerRecord(signedPeerRecord, aTTL) // } type CertifiedAddrBook interface { - // ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will expire when - // all addresses associated with the peer have expired. The addresses in provided signed - // peer.PeerRecord are expired after `ttl` duration. + // ConsumePeerRecord stores a signed peer record and the contained addresses for + // for ttl duration. + // The addresses contained in the signed peer record will expire after ttl. If any + // address is already present in the peer store, it'll expire at max of existing ttl and + // provided ttl. + // The signed peer record itself will be expired when all the addresses associated with the peer, + // self-certified or not, are removed from the AddrBook. + // To delete the signed peer record, use `AddrBook.UpdateAddrs`,`AddrBook.SetAddrs`, or + // `AddrBook.ClearAddrs` with ttl 0. + // Note: Future calls to ConsumePeerRecord will not expire self-certified addresses from the + // previous calls. // // The `accepted` return value indicates that the record was successfully processed. If // `accepted` is false but no error is returned, it means that the record was ignored, most - // likely because a newer record exists for the same peer. + // likely because a newer record exists for the same peer with a greater seq value. // - // If the signed peer.PeerRecord belongs to a peer that already has certified addresses in - // the CertifiedAddrBook, and if the new record has a higher sequence number than the - // existing record, the new addresses will be added and the older ones will be kept - // unchanged. Attempting to add a peer record with a sequence number that's lower than an - // existing record will not result in an error, but the record will be ignored, and the - // `accepted` return value will be false. - // - // The Envelopes containing the PeerRecords can be retrieved by calling + // The Envelopes containing the signed peer records can be retrieved by calling // GetPeerRecord(peerID). ConsumePeerRecord(s *record.Envelope, ttl time.Duration) (accepted bool, err error) - // GetPeerRecord returns an Envelope containing a PeerRecord for the - // given peer id, if one exists. - // Returns nil if no signed PeerRecord exists for the peer. + // GetPeerRecord returns an Envelope containing a peer record for the + // peer, or nil if no record exists. GetPeerRecord(p peer.ID) *record.Envelope } diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 7b7f8855fb..b5d252e9d2 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -495,9 +495,12 @@ func (h *BasicHost) SignalAddressChange() { } } -func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated { +func (h *BasicHost) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated { + if prev == nil && current == nil { + return nil + } prevmap := make(map[string]ma.Multiaddr, len(prev)) - evt := event.EvtLocalAddressesUpdated{Diffs: true} + evt := &event.EvtLocalAddressesUpdated{Diffs: true} addrsAdded := false for _, addr := range prev { @@ -524,7 +527,19 @@ func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddresses return nil } - return &evt + // Our addresses have changed. Make a new signed peer record. + if !h.disableSignedPeerRecord { + // add signed peer record to the event + sr, err := h.makeSignedPeerRecord(current) + if err != nil { + log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err) + // drop this change + return nil + } + evt.SignedPeerRecord = sr + } + + return evt } func (h *BasicHost) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope, error) { @@ -548,34 +563,27 @@ func (h *BasicHost) background() { var lastAddrs []ma.Multiaddr emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) { - // nothing to do if both are nil..defensive check - if currentAddrs == nil && lastAddrs == nil { - return - } - - changeEvt := makeUpdatedAddrEvent(lastAddrs, currentAddrs) - + changeEvt := h.makeUpdatedAddrEvent(lastAddrs, currentAddrs) if changeEvt == nil { return } - + // Our addresses have changed. + // store the signed peer record in the peer store. if !h.disableSignedPeerRecord { - // add signed peer record to the event - sr, err := h.makeSignedPeerRecord(currentAddrs) - if err != nil { - log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err) - return - } - changeEvt.SignedPeerRecord = sr - - // persist the signed record to the peerstore - if _, err := h.caBook.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil { + if _, err := h.caBook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil { log.Errorf("failed to persist signed peer record in peer store, err=%s", err) return } } + // update host addresses in the peer store + removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed)) + for _, ua := range changeEvt.Removed { + removedAddrs = append(removedAddrs, ua.Address) + } + h.Peerstore().SetAddrs(h.ID(), currentAddrs, peerstore.PermanentAddrTTL) + h.Peerstore().SetAddrs(h.ID(), removedAddrs, 0) - // emit addr change event on the bus + // emit addr change event if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil { log.Warnf("error emitting event for updated addrs: %s", err) } @@ -587,11 +595,10 @@ func (h *BasicHost) background() { defer ticker.Stop() for { + // Update our local IP addresses before checking our current addresses. if len(h.network.ListenAddresses()) > 0 { h.updateLocalIpAddr() } - // Request addresses anyways because, technically, address filters still apply. - // The underlying AllAddrs call is effectively a no-op. curr := h.Addrs() emitAddrChange(curr, lastAddrs) lastAddrs = curr diff --git a/p2p/host/peerstore/pstoremem/addr_book.go b/p2p/host/peerstore/pstoremem/addr_book.go index b37aee83c5..89b87bdb47 100644 --- a/p2p/host/peerstore/pstoremem/addr_book.go +++ b/p2p/host/peerstore/pstoremem/addr_book.go @@ -16,8 +16,6 @@ import ( ma "github.com/multiformats/go-multiaddr" ) -var SignedPeerRecordBound = 100_000 - var log = logging.Logger("peerstore") type expiringAddr struct { @@ -231,8 +229,6 @@ func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du mab.addAddrs(p, addrs, ttl) } -var ErrTooManyRecords = fmt.Errorf("too many signed peer records. Dropping this one") - // ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will expire after the given TTL. // See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details. func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) { @@ -250,9 +246,6 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt mab.mu.Lock() defer mab.mu.Unlock() - if (len(mab.signedPeerRecords)) >= SignedPeerRecordBound { - return false, ErrTooManyRecords - } // ensure seq is greater than or equal to the last received lastState, found := mab.signedPeerRecords[rec.PeerID] From 6c832d9ce74cc76b95140dce202497207a75a7f7 Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 17 Sep 2024 19:36:49 +0530 Subject: [PATCH 13/13] add heap benchmark --- .../peerstore/pstoremem/addr_book_test.go | 30 +++++++++++++++++-- p2p/host/peerstore/pstoremem/inmem_test.go | 22 +++++++------- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/p2p/host/peerstore/pstoremem/addr_book_test.go b/p2p/host/peerstore/pstoremem/addr_book_test.go index a6da9d7bdd..963c4552cf 100644 --- a/p2p/host/peerstore/pstoremem/addr_book_test.go +++ b/p2p/host/peerstore/pstoremem/addr_book_test.go @@ -33,8 +33,9 @@ func TestPeerAddrsNextExpiry(t *testing.T) { func peerAddrsInput(n int) []*expiringAddr { expiringAddrs := make([]*expiringAddr, n) for i := 0; i < n; i++ { - a := ma.StringCast(fmt.Sprintf("/ip4/1.2.3.4/udp/%d/quic-v1", i)) - e := time.Time{}.Add(time.Duration(i) * time.Second) // expiries are in reverse order + port := i % 65535 + a := ma.StringCast(fmt.Sprintf("/ip4/1.2.3.4/udp/%d/quic-v1", port)) + e := time.Time{}.Add(time.Duration(i) * time.Second) p := peer.ID(fmt.Sprintf("p%d", i)) expiringAddrs[i] = &expiringAddr{Addr: a, Expiry: e, TTL: 10 * time.Second, Peer: p} } @@ -172,3 +173,28 @@ func TestPeerAddrsExpiry(t *testing.T) { require.ElementsMatch(t, expected, got, "failed for input: element expiries: %v, expiry: %v", expiries, expiry.Second()) } } + +func BenchmarkPeerAddrs(b *testing.B) { + sizes := [...]int{1, 10, 100, 1000, 10_000, 100_000, 1000_000} + for _, sz := range sizes { + b.Run(fmt.Sprintf("%d", sz), func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + paa := newPeerAddrs() + pa := &paa + expiringAddrs := peerAddrsInput(sz) + for i := 0; i < sz; i++ { + heap.Push(pa, expiringAddrs[i]) + } + b.StartTimer() + for { + _, ok := pa.PopIfExpired(expiringAddrs[len(expiringAddrs)-1].Expiry) + if !ok { + break + } + } + } + }) + } + +} diff --git a/p2p/host/peerstore/pstoremem/inmem_test.go b/p2p/host/peerstore/pstoremem/inmem_test.go index 1ba3df4f71..f87d6e4f45 100644 --- a/p2p/host/peerstore/pstoremem/inmem_test.go +++ b/p2p/host/peerstore/pstoremem/inmem_test.go @@ -93,20 +93,22 @@ func BenchmarkGC(b *testing.B) { require.NoError(b, err) defer ps.Close() - peerCount := 10_000 + peerCount := 100_000 addrsPerPeer := 32 - for i := 0; i < peerCount; i++ { - id := peer.ID(strconv.Itoa(i)) - addrs := make([]multiaddr.Multiaddr, addrsPerPeer) - for j := 0; j < addrsPerPeer; j++ { - addrs[j] = multiaddr.StringCast("/ip4/1.2.3.4/tcp/" + strconv.Itoa(j)) - } - ps.AddAddrs(id, addrs, 24*time.Hour) - } - b.ResetTimer() for i := 0; i < b.N; i++ { + b.StopTimer() + for i := 0; i < peerCount; i++ { + id := peer.ID(strconv.Itoa(i)) + addrs := make([]multiaddr.Multiaddr, addrsPerPeer) + for j := 0; j < addrsPerPeer; j++ { + addrs[j] = multiaddr.StringCast("/ip4/1.2.3.4/tcp/" + strconv.Itoa(j)) + } + ps.AddAddrs(id, addrs, 24*time.Hour) + } + clock.Add(25 * time.Hour) + b.StartTimer() ps.gc() } }