Skip to content

Commit

Permalink
review comments 2
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Jun 10, 2024
1 parent 0aa1911 commit 1219f9e
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 112 deletions.
40 changes: 20 additions & 20 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ type Config struct {

DisableAutoNATv2 bool

UDPBlackHoleFilter *swarm.BlackHoleFilter
CustomUDPBlackHoleFilter bool
IPv6BlackHoleFilter *swarm.BlackHoleFilter
CustomIPv6BlackHoleFilter bool
UDPBlackHoleSuccessCounter *swarm.BlackHoleSuccessCounter
CustomUDPBlackHoleSuccessCounter bool
IPv6BlackHoleSuccessCounter *swarm.BlackHoleSuccessCounter
CustomIPv6BlackHoleSuccessCounter bool
}

func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {
Expand Down Expand Up @@ -174,8 +174,8 @@ func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swa
}

opts := append(cfg.SwarmOpts,
swarm.WithUDPBlackHoleFilter(cfg.UDPBlackHoleFilter),
swarm.WithIPv6BlackHoleFilter(cfg.IPv6BlackHoleFilter),
swarm.WithUDPBlackHoleSuccessCounter(cfg.UDPBlackHoleSuccessCounter),
swarm.WithIPv6BlackHoleSuccessCounter(cfg.IPv6BlackHoleSuccessCounter),
)
if cfg.Reporter != nil {
opts = append(opts, swarm.WithMetrics(cfg.Reporter))
Expand Down Expand Up @@ -215,18 +215,18 @@ func (cfg *Config) makeAutoNATV2Host() (host.Host, error) {
}

autoNatCfg := Config{
Transports: cfg.Transports,
Muxers: cfg.Muxers,
SecurityTransports: cfg.SecurityTransports,
Insecure: cfg.Insecure,
PSK: cfg.PSK,
ConnectionGater: cfg.ConnectionGater,
Reporter: cfg.Reporter,
PeerKey: autonatPrivKey,
Peerstore: ps,
DialRanker: swarm.NoDelayDialRanker,
UDPBlackHoleFilter: cfg.UDPBlackHoleFilter,
IPv6BlackHoleFilter: cfg.IPv6BlackHoleFilter,
Transports: cfg.Transports,
Muxers: cfg.Muxers,
SecurityTransports: cfg.SecurityTransports,
Insecure: cfg.Insecure,
PSK: cfg.PSK,
ConnectionGater: cfg.ConnectionGater,
Reporter: cfg.Reporter,
PeerKey: autonatPrivKey,
Peerstore: ps,
DialRanker: swarm.NoDelayDialRanker,
UDPBlackHoleSuccessCounter: cfg.UDPBlackHoleSuccessCounter,
IPv6BlackHoleSuccessCounter: cfg.IPv6BlackHoleSuccessCounter,
SwarmOpts: []swarm.Option{
// Don't update black hole state for failed autonat dials
swarm.WithReadOnlyBlackHoleDetector(),
Expand Down Expand Up @@ -572,8 +572,8 @@ func (cfg *Config) addAutoNAT(h *bhost.BasicHost) error {
Peerstore: ps,
DialRanker: swarm.NoDelayDialRanker,
SwarmOpts: []swarm.Option{
swarm.WithUDPBlackHoleFilter(nil),
swarm.WithIPv6BlackHoleFilter(nil),
swarm.WithUDPBlackHoleSuccessCounter(nil),
swarm.WithIPv6BlackHoleSuccessCounter(nil),
},
}

Expand Down
8 changes: 4 additions & 4 deletions defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ var DefaultPrometheusRegisterer = func(cfg *Config) error {
var defaultUDPBlackHoleDetector = func(cfg *Config) error {
// A black hole is a binary property. On a network if UDP dials are blocked, all dials will
// fail. So a low success rate of 5 out 100 dials is good enough.
return cfg.Apply(UDPBlackHoleFilter(&swarm.BlackHoleFilter{N: 100, MinSuccesses: 5, Name: "UDP"}))
return cfg.Apply(UDPBlackHoleSuccessCounter(&swarm.BlackHoleSuccessCounter{N: 100, MinSuccesses: 5, Name: "UDP"}))
}

var defaultIPv6BlackHoleDetector = func(cfg *Config) error {
// A black hole is a binary property. On a network if there is no IPv6 connectivity, all
// dials will fail. So a low success rate of 5 out 100 dials is good enough.
return cfg.Apply(IPv6BlackHoleFilter(&swarm.BlackHoleFilter{N: 100, MinSuccesses: 5, Name: "IPv6"}))
return cfg.Apply(IPv6BlackHoleSuccessCounter(&swarm.BlackHoleSuccessCounter{N: 100, MinSuccesses: 5, Name: "IPv6"}))
}

// Complete list of default options and when to fallback on them.
Expand Down Expand Up @@ -204,13 +204,13 @@ var defaults = []struct {
},
{
fallback: func(cfg *Config) bool {
return !cfg.CustomUDPBlackHoleFilter && cfg.UDPBlackHoleFilter == nil
return !cfg.CustomUDPBlackHoleSuccessCounter && cfg.UDPBlackHoleSuccessCounter == nil
},
opt: defaultUDPBlackHoleDetector,
},
{
fallback: func(cfg *Config) bool {
return !cfg.CustomIPv6BlackHoleFilter && cfg.IPv6BlackHoleFilter == nil
return !cfg.CustomIPv6BlackHoleSuccessCounter && cfg.IPv6BlackHoleSuccessCounter == nil
},
opt: defaultIPv6BlackHoleDetector,
},
Expand Down
16 changes: 8 additions & 8 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,20 +618,20 @@ func DisableAutoNATv2() Option {
}
}

// UDPBlackHoleFilter configures libp2p to use f as the black hole filter for UDP addrs
func UDPBlackHoleFilter(f *swarm.BlackHoleFilter) Option {
// UDPBlackHoleSuccessCounter configures libp2p to use f as the black hole filter for UDP addrs
func UDPBlackHoleSuccessCounter(f *swarm.BlackHoleSuccessCounter) Option {
return func(cfg *Config) error {
cfg.UDPBlackHoleFilter = f
cfg.CustomUDPBlackHoleFilter = true
cfg.UDPBlackHoleSuccessCounter = f
cfg.CustomUDPBlackHoleSuccessCounter = true
return nil
}
}

// IPv6BlackHoleFilter configures libp2p to use f as the black hole filter for IPv6 addrs
func IPv6BlackHoleFilter(f *swarm.BlackHoleFilter) Option {
// IPv6BlackHoleSuccessCounter configures libp2p to use f as the black hole filter for IPv6 addrs
func IPv6BlackHoleSuccessCounter(f *swarm.BlackHoleSuccessCounter) Option {
return func(cfg *Config) error {
cfg.IPv6BlackHoleFilter = f
cfg.CustomIPv6BlackHoleFilter = true
cfg.IPv6BlackHoleSuccessCounter = f
cfg.CustomIPv6BlackHoleSuccessCounter = true
return nil
}
}
30 changes: 15 additions & 15 deletions p2p/net/swarm/black_hole_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ func (st blackHoleState) String() string {
}
}

// BlackHoleFilter provides black hole filtering for dials. This filter should be used in concert
// BlackHoleSuccessCounter provides black hole filtering for dials. This filter should be used in concert
// with a UDP or IPv6 address filter to detect UDP or IPv6 black hole. In a black holed environment,
// dial requests are refused Requests are blocked if the number of successes in the last N dials is
// less than MinSuccesses.
// If a request succeeds in Blocked state, the filter state is reset and N subsequent requests are
// allowed before reevaluating black hole state. Dials cancelled when some other concurrent dial
// succeeded are counted as failures. A sufficiently large N prevents false negatives in such cases.
type BlackHoleFilter struct {
type BlackHoleSuccessCounter struct {
// N is
// 1. The minimum number of completed dials required before evaluating black hole state
// 2. the minimum number of requests after which we probe the state of the black hole in
Expand All @@ -63,7 +63,7 @@ type BlackHoleFilter struct {
// RecordResult records the outcome of a dial. A successful dial in Blocked state will change the
// state of the filter to Probing. A failed dial only blocks subsequent requests if the success
// fraction over the last n outcomes is less than the minSuccessFraction of the filter.
func (b *BlackHoleFilter) RecordResult(success bool) {
func (b *BlackHoleSuccessCounter) RecordResult(success bool) {
b.mu.Lock()
defer b.mu.Unlock()

Expand Down Expand Up @@ -91,7 +91,7 @@ func (b *BlackHoleFilter) RecordResult(success bool) {
}

// HandleRequest returns the result of applying the black hole filter for the request.
func (b *BlackHoleFilter) HandleRequest() blackHoleState {
func (b *BlackHoleSuccessCounter) HandleRequest() blackHoleState {
b.mu.Lock()
defer b.mu.Unlock()

Expand All @@ -106,14 +106,14 @@ func (b *BlackHoleFilter) HandleRequest() blackHoleState {
}
}

func (b *BlackHoleFilter) reset() {
func (b *BlackHoleSuccessCounter) reset() {
b.successes = 0
b.dialResults = b.dialResults[:0]
b.requests = 0
b.updateState()
}

func (b *BlackHoleFilter) updateState() {
func (b *BlackHoleSuccessCounter) updateState() {
st := b.state

if len(b.dialResults) < b.N {
Expand All @@ -129,7 +129,7 @@ func (b *BlackHoleFilter) updateState() {
}
}

func (b *BlackHoleFilter) State() blackHoleState {
func (b *BlackHoleSuccessCounter) State() blackHoleState {
b.mu.Lock()
defer b.mu.Unlock()

Expand All @@ -143,7 +143,7 @@ type blackHoleInfo struct {
successFraction float64
}

func (b *BlackHoleFilter) info() blackHoleInfo {
func (b *BlackHoleSuccessCounter) info() blackHoleInfo {
b.mu.Lock()
defer b.mu.Unlock()

Expand All @@ -165,8 +165,8 @@ func (b *BlackHoleFilter) info() blackHoleInfo {
}
}

// blackHoleDetector provides UDP and IPv6 black hole detection using a `blackHoleFilter` for each.
// For details of the black hole detection logic see `blackHoleFilter`.
// blackHoleDetector provides UDP and IPv6 black hole detection using a `BlackHoleSuccessCounter` for each.
// For details of the black hole detection logic see `BlackHoleSuccessCounter`.
// In Read Only mode, detector doesn't update the state of underlying filters and refuses requests
// when black hole state is unknown. This is useful for Swarms made specifically for services like
// AutoNAT where we care about accurately reporting the reachability of a peer.
Expand All @@ -175,7 +175,7 @@ func (b *BlackHoleFilter) info() blackHoleInfo {
// of the black hole state are actually dialed and are not skipped because of dial prioritisation
// logic.
type blackHoleDetector struct {
udp, ipv6 *BlackHoleFilter
udp, ipv6 *BlackHoleSuccessCounter
mt MetricsTracer
readOnly bool
}
Expand Down Expand Up @@ -236,7 +236,7 @@ func (d *blackHoleDetector) FilterAddrs(addrs []ma.Multiaddr) (valid []ma.Multia
), blackHoled
}

// RecordResult updates the state of the relevant blackHoleFilters for addr
// RecordResult updates the state of the relevant BlackHoleSuccessCounters for addr
func (d *blackHoleDetector) RecordResult(addr ma.Multiaddr, success bool) {
if d.readOnly || !manet.IsPublicAddr(addr) {
return
Expand All @@ -251,7 +251,7 @@ func (d *blackHoleDetector) RecordResult(addr ma.Multiaddr, success bool) {
}
}

func (d *blackHoleDetector) getFilterState(f *BlackHoleFilter) blackHoleState {
func (d *blackHoleDetector) getFilterState(f *BlackHoleSuccessCounter) blackHoleState {
if d.readOnly {
if f.State() != blackHoleStateAllowed {
return blackHoleStateBlocked
Expand All @@ -261,11 +261,11 @@ func (d *blackHoleDetector) getFilterState(f *BlackHoleFilter) blackHoleState {
return f.HandleRequest()
}

func (d *blackHoleDetector) trackMetrics(f *BlackHoleFilter) {
func (d *blackHoleDetector) trackMetrics(f *BlackHoleSuccessCounter) {
if d.readOnly || d.mt == nil {
return
}
// Track metrics only in non readOnly state
info := f.info()
d.mt.UpdatedBlackHoleFilterState(info.name, info.state, info.nextProbeAfter, info.successFraction)
d.mt.UpdatedBlackHoleSuccessCounter(info.name, info.state, info.nextProbeAfter, info.successFraction)
}
28 changes: 14 additions & 14 deletions p2p/net/swarm/black_hole_detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"github.com/stretchr/testify/require"
)

func TestBlackHoleFilterReset(t *testing.T) {
func TestBlackHoleSuccessCounterReset(t *testing.T) {
n := 10
bhf := &BlackHoleFilter{N: n, MinSuccesses: 2, Name: "test"}
bhf := &BlackHoleSuccessCounter{N: n, MinSuccesses: 2, Name: "test"}
var i = 0
// calls up to n should be probing
for i = 1; i <= n; i++ {
Expand Down Expand Up @@ -55,7 +55,7 @@ func TestBlackHoleFilterReset(t *testing.T) {
}
}

func TestBlackHoleFilterSuccessFraction(t *testing.T) {
func TestBlackHoleSuccessCounterSuccessFraction(t *testing.T) {
n := 10
tests := []struct {
minSuccesses, successes int
Expand All @@ -71,7 +71,7 @@ func TestBlackHoleFilterSuccessFraction(t *testing.T) {
}
for i, tc := range tests {
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {
bhf := BlackHoleFilter{N: n, MinSuccesses: tc.minSuccesses}
bhf := BlackHoleSuccessCounter{N: n, MinSuccesses: tc.minSuccesses}
for i := 0; i < tc.successes; i++ {
bhf.RecordResult(true)
}
Expand All @@ -87,8 +87,8 @@ func TestBlackHoleFilterSuccessFraction(t *testing.T) {
}

func TestBlackHoleDetectorInApplicableAddress(t *testing.T) {
udpF := &BlackHoleFilter{N: 10, MinSuccesses: 5}
ipv6F := &BlackHoleFilter{N: 10, MinSuccesses: 5}
udpF := &BlackHoleSuccessCounter{N: 10, MinSuccesses: 5}
ipv6F := &BlackHoleSuccessCounter{N: 10, MinSuccesses: 5}
bhd := &blackHoleDetector{udp: udpF, ipv6: ipv6F}
addrs := []ma.Multiaddr{
ma.StringCast("/ip4/1.2.3.4/tcp/1234"),
Expand All @@ -106,7 +106,7 @@ func TestBlackHoleDetectorInApplicableAddress(t *testing.T) {
}

func TestBlackHoleDetectorUDPDisabled(t *testing.T) {
ipv6F := &BlackHoleFilter{N: 10, MinSuccesses: 5}
ipv6F := &BlackHoleSuccessCounter{N: 10, MinSuccesses: 5}
bhd := &blackHoleDetector{ipv6: ipv6F}
publicAddr := ma.StringCast("/ip4/1.2.3.4/udp/1234/quic-v1")
privAddr := ma.StringCast("/ip4/192.168.1.5/udp/1234/quic-v1")
Expand All @@ -122,7 +122,7 @@ func TestBlackHoleDetectorUDPDisabled(t *testing.T) {
}

func TestBlackHoleDetectorIPv6Disabled(t *testing.T) {
udpF := &BlackHoleFilter{N: 10, MinSuccesses: 5}
udpF := &BlackHoleSuccessCounter{N: 10, MinSuccesses: 5}
bhd := &blackHoleDetector{udp: udpF}
publicAddr := ma.StringCast("/ip6/2001::1/tcp/1234")
privAddr := ma.StringCast("/ip6/::1/tcp/1234")
Expand All @@ -140,8 +140,8 @@ func TestBlackHoleDetectorIPv6Disabled(t *testing.T) {

func TestBlackHoleDetectorProbes(t *testing.T) {
bhd := &blackHoleDetector{
udp: &BlackHoleFilter{N: 2, MinSuccesses: 1, Name: "udp"},
ipv6: &BlackHoleFilter{N: 3, MinSuccesses: 1, Name: "ipv6"},
udp: &BlackHoleSuccessCounter{N: 2, MinSuccesses: 1, Name: "udp"},
ipv6: &BlackHoleSuccessCounter{N: 3, MinSuccesses: 1, Name: "ipv6"},
}
udp6Addr := ma.StringCast("/ip6/2001::1/udp/1234/quic-v1")
addrs := []ma.Multiaddr{udp6Addr}
Expand Down Expand Up @@ -175,8 +175,8 @@ func TestBlackHoleDetectorAddrFiltering(t *testing.T) {

makeBHD := func(udpBlocked, ipv6Blocked bool) *blackHoleDetector {
bhd := &blackHoleDetector{
udp: &BlackHoleFilter{N: 100, MinSuccesses: 10, Name: "udp"},
ipv6: &BlackHoleFilter{N: 100, MinSuccesses: 10, Name: "ipv6"},
udp: &BlackHoleSuccessCounter{N: 100, MinSuccesses: 10, Name: "udp"},
ipv6: &BlackHoleSuccessCounter{N: 100, MinSuccesses: 10, Name: "ipv6"},
}
for i := 0; i < 100; i++ {
bhd.RecordResult(udp4Pub, !udpBlocked)
Expand Down Expand Up @@ -213,8 +213,8 @@ func TestBlackHoleDetectorAddrFiltering(t *testing.T) {
}

func TestBlackHoleDetectorReadOnlyMode(t *testing.T) {
udpF := &BlackHoleFilter{N: 10, MinSuccesses: 5}
ipv6F := &BlackHoleFilter{N: 10, MinSuccesses: 5}
udpF := &BlackHoleSuccessCounter{N: 10, MinSuccesses: 5}
ipv6F := &BlackHoleSuccessCounter{N: 10, MinSuccesses: 5}
bhd := &blackHoleDetector{udp: udpF, ipv6: ipv6F, readOnly: true}
publicAddr := ma.StringCast("/ip4/1.2.3.4/udp/1234/quic-v1")
privAddr := ma.StringCast("/ip6/::1/tcp/1234")
Expand Down
16 changes: 8 additions & 8 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,20 @@ func WithDialRanker(d network.DialRanker) Option {
}
}

// WithUDPBlackHoleFilter configures swarm to use the provided config for UDP black hole detection
// WithUDPBlackHoleSuccessCounter configures swarm to use the provided config for UDP black hole detection
// n is the size of the sliding window used to evaluate black hole state
// min is the minimum number of successes out of n required to not block requests
func WithUDPBlackHoleFilter(f *BlackHoleFilter) Option {
func WithUDPBlackHoleSuccessCounter(f *BlackHoleSuccessCounter) Option {
return func(s *Swarm) error {
s.udpBHF = f
return nil
}
}

// WithIPv6BlackHoleFilter configures swarm to use the provided config for IPv6 black hole detection
// WithIPv6BlackHoleSuccessCounter configures swarm to use the provided config for IPv6 black hole detection
// n is the size of the sliding window used to evaluate black hole state
// min is the minimum number of successes out of n required to not block requests
func WithIPv6BlackHoleFilter(f *BlackHoleFilter) Option {
func WithIPv6BlackHoleSuccessCounter(f *BlackHoleSuccessCounter) Option {
return func(s *Swarm) error {
s.ipv6BHF = f
return nil
Expand Down Expand Up @@ -215,8 +215,8 @@ type Swarm struct {
dialRanker network.DialRanker

connectednessEventEmitter *connectednessEventEmitter
udpBHF *BlackHoleFilter
ipv6BHF *BlackHoleFilter
udpBHF *BlackHoleSuccessCounter
ipv6BHF *BlackHoleSuccessCounter
bhd *blackHoleDetector
readOnlyBHD bool
}
Expand All @@ -242,8 +242,8 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts
// A black hole is a binary property. On a network if UDP dials are blocked or there is
// no IPv6 connectivity, all dials will fail. So a low success rate of 5 out 100 dials
// is good enough.
udpBHF: &BlackHoleFilter{N: 100, MinSuccesses: 5, Name: "UDP"},
ipv6BHF: &BlackHoleFilter{N: 100, MinSuccesses: 5, Name: "IPv6"},
udpBHF: &BlackHoleSuccessCounter{N: 100, MinSuccesses: 5, Name: "UDP"},
ipv6BHF: &BlackHoleSuccessCounter{N: 100, MinSuccesses: 5, Name: "IPv6"},
}

s.conns.m = make(map[peer.ID][]*Conn)
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm_dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func TestBlackHoledAddrBlocked(t *testing.T) {
defer s.Close()

n := 3
s.bhd.ipv6 = &BlackHoleFilter{N: n, MinSuccesses: 1, Name: "IPv6"}
s.bhd.ipv6 = &BlackHoleSuccessCounter{N: n, MinSuccesses: 1, Name: "IPv6"}

// All dials to this addr will fail.
// manet.IsPublic is aggressive for IPv6 addresses. Use a NAT64 address.
Expand Down
Loading

0 comments on commit 1219f9e

Please sign in to comment.