Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Better GC in membacked peerstore #2960

Merged
merged 13 commits into from
Sep 17, 2024
67 changes: 22 additions & 45 deletions core/peerstore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,10 @@ type AddrBook interface {
PeersWithAddrs() peer.IDSlice
}

// CertifiedAddrBook manages "self-certified" addresses for remote peers.
// Self-certified addresses are contained in peer.PeerRecords
// which are wrapped in a record.Envelope and signed by the peer
// to whom they belong.
// CertifiedAddrBook manages signed peer records and "self-certified" addresses
// contained within them.
// Use this interface with an `AddrBook`.
//
// Certified addresses (CA) are generally more secure than uncertified
// addresses (UA). Consequently, CAs beat and displace UAs. When the
// peerstore learns CAs for a peer, it will reject UAs for the same peer
// (as long as the former haven't expired).
// Furthermore, peer records act like sequenced snapshots of CAs. Therefore,
// processing a peer record that's newer than the last one seen overwrites
// all addresses with the incoming ones.
//
// This interface is most useful when combined with AddrBook.
// To test whether a given AddrBook / Peerstore implementation supports
// certified addresses, callers should use the GetCertifiedAddrBook helper or
// type-assert on the CertifiedAddrBook interface:
Expand All @@ -143,41 +133,28 @@ type AddrBook interface {
// cab.ConsumePeerRecord(signedPeerRecord, aTTL)
// }
type CertifiedAddrBook interface {
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in
// a record.Envelope), which will expire after the given TTL.
//
// The 'accepted' return value indicates that the record was successfully processed
// and integrated into the CertifiedAddrBook state. If 'accepted' is false but no
// error is returned, it means that the record was ignored, most likely because
// a newer record exists for the same peer.
//
// Signed records added via this method will be stored without
// alteration as long as the address TTLs remain valid. The Envelopes
// containing the PeerRecords can be retrieved by calling GetPeerRecord(peerID).
//
// If the signed PeerRecord belongs to a peer that already has certified
// addresses in the CertifiedAddrBook, the new addresses will replace the
// older ones, if the new record has a higher sequence number than the
// existing record. Attempting to add a peer record with a
// sequence number that's <= an existing record for the same peer will not
// result in an error, but the record will be ignored, and the 'accepted'
// bool return value will be false.
// ConsumePeerRecord stores a signed peer record and the contained addresses for
// for ttl duration.
// The addresses contained in the signed peer record will expire after ttl. If any
// address is already present in the peer store, it'll expire at max of existing ttl and
// provided ttl.
// The signed peer record itself will be expired when all the addresses associated with the peer,
// self-certified or not, are removed from the AddrBook.
// To delete the signed peer record, use `AddrBook.UpdateAddrs`,`AddrBook.SetAddrs`, or
// `AddrBook.ClearAddrs` with ttl 0.
// Note: Future calls to ConsumePeerRecord will not expire self-certified addresses from the
// previous calls.
//
// If the CertifiedAddrBook is also an AddrBook (which is most likely the case),
// adding certified addresses for a peer will *replace* any
// existing non-certified addresses for that peer, and only the certified
// addresses will be returned from AddrBook.Addrs thereafter.
// The `accepted` return value indicates that the record was successfully processed. If
// `accepted` is false but no error is returned, it means that the record was ignored, most
// likely because a newer record exists for the same peer with a greater seq value.
//
// Likewise, once certified addresses have been added for a given peer,
// any non-certified addresses added via AddrBook.AddAddrs or
// AddrBook.SetAddrs will be ignored. AddrBook.SetAddrs may still be used
// to update the TTL of certified addresses that have previously been
// added via ConsumePeerRecord.
// The Envelopes containing the signed peer records can be retrieved by calling
// GetPeerRecord(peerID).
ConsumePeerRecord(s *record.Envelope, ttl time.Duration) (accepted bool, err error)

// GetPeerRecord returns an Envelope containing a PeerRecord for the
// given peer id, if one exists.
// Returns nil if no signed PeerRecord exists for the peer.
// GetPeerRecord returns an Envelope containing a peer record for the
// peer, or nil if no record exists.
GetPeerRecord(p peer.ID) *record.Envelope
}

Expand All @@ -196,7 +173,7 @@ func GetCertifiedAddrBook(ab AddrBook) (cab CertifiedAddrBook, ok bool) {

// KeyBook tracks the keys of Peers.
type KeyBook interface {
// PubKey stores the public key of a peer.
// PubKey returns the public key of a peer.
PubKey(peer.ID) ic.PubKey

// AddPubKey stores the public key of a peer.
Expand Down
55 changes: 31 additions & 24 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,9 +495,12 @@ func (h *BasicHost) SignalAddressChange() {
}
}

func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated {
func (h *BasicHost) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated {
if prev == nil && current == nil {
return nil
}
prevmap := make(map[string]ma.Multiaddr, len(prev))
evt := event.EvtLocalAddressesUpdated{Diffs: true}
evt := &event.EvtLocalAddressesUpdated{Diffs: true}
addrsAdded := false

for _, addr := range prev {
Expand All @@ -524,7 +527,19 @@ func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddresses
return nil
}

return &evt
// Our addresses have changed. Make a new signed peer record.
if !h.disableSignedPeerRecord {
// add signed peer record to the event
sr, err := h.makeSignedPeerRecord(current)
if err != nil {
log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err)
// drop this change
return nil
}
evt.SignedPeerRecord = sr
}

return evt
}

func (h *BasicHost) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope, error) {
Expand All @@ -548,34 +563,27 @@ func (h *BasicHost) background() {
var lastAddrs []ma.Multiaddr

emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) {
// nothing to do if both are nil..defensive check
if currentAddrs == nil && lastAddrs == nil {
return
}

changeEvt := makeUpdatedAddrEvent(lastAddrs, currentAddrs)

changeEvt := h.makeUpdatedAddrEvent(lastAddrs, currentAddrs)
if changeEvt == nil {
return
}

// Our addresses have changed.
// store the signed peer record in the peer store.
if !h.disableSignedPeerRecord {
// add signed peer record to the event
sr, err := h.makeSignedPeerRecord(currentAddrs)
if err != nil {
log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err)
return
}
changeEvt.SignedPeerRecord = sr

// persist the signed record to the peerstore
if _, err := h.caBook.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil {
if _, err := h.caBook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil {
log.Errorf("failed to persist signed peer record in peer store, err=%s", err)
return
}
}
// update host addresses in the peer store
removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed))
for _, ua := range changeEvt.Removed {
removedAddrs = append(removedAddrs, ua.Address)
}
h.Peerstore().SetAddrs(h.ID(), currentAddrs, peerstore.PermanentAddrTTL)
h.Peerstore().SetAddrs(h.ID(), removedAddrs, 0)

// emit addr change event on the bus
// emit addr change event
if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
Expand All @@ -587,11 +595,10 @@ func (h *BasicHost) background() {
defer ticker.Stop()

for {
// Update our local IP addresses before checking our current addresses.
if len(h.network.ListenAddresses()) > 0 {
h.updateLocalIpAddr()
}
// Request addresses anyways because, technically, address filters still apply.
// The underlying AllAddrs call is effectively a no-op.
curr := h.Addrs()
emitAddrChange(curr, lastAddrs)
lastAddrs = curr
Expand Down
Loading
Loading