diff --git a/dht.go b/dht.go index a1a496791..5e8c28704 100644 --- a/dht.go +++ b/dht.go @@ -63,8 +63,7 @@ type IpfsDHT struct { strmap map[peer.ID]*messageSender smlk sync.Mutex - plk sync.Mutex - peers map[peer.ID]*peerTracker + plk sync.Mutex } // NewDHT creates a new DHT object with the given peer as the 'local' host @@ -119,7 +118,6 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT { providers: providers.NewProviderManager(ctx, h.ID(), dstore), birth: time.Now(), routingTable: rt, - peers: make(map[peer.ID]*peerTracker), Validator: make(record.Validator), Selector: make(record.Selector), diff --git a/notif.go b/notif.go index c44f5fb66..b90ae719c 100644 --- a/notif.go +++ b/notif.go @@ -1,9 +1,6 @@ package dht import ( - "context" - "io" - inet "github.com/libp2p/go-libp2p-net" ma "github.com/multiformats/go-multiaddr" mstream "github.com/multiformats/go-multistream" @@ -12,15 +9,12 @@ import ( // netNotifiee defines methods to be used with the IpfsDHT type netNotifiee IpfsDHT +var dhtProtocols = []string{string(ProtocolDHT), string(ProtocolDHTOld)} + func (nn *netNotifiee) DHT() *IpfsDHT { return (*IpfsDHT)(nn) } -type peerTracker struct { - refcount int - cancel func() -} - func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { dht := nn.DHT() select { @@ -29,61 +23,56 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { default: } - dht.plk.Lock() - defer dht.plk.Unlock() - - conn, ok := nn.peers[v.RemotePeer()] - if ok { - conn.refcount++ + p := v.RemotePeer() + protos, err := dht.peerstore.SupportsProtocols(p, dhtProtocols...) + if err == nil && len(protos) != 0 { + // We lock here for consistency with the lock in testConnection. + // This probably isn't necessary because (dis)connect + // notifications are serialized but it's nice to be consistent. + dht.plk.Lock() + defer dht.plk.Unlock() + if dht.host.Network().Connectedness(p) == inet.Connected { + dht.Update(dht.Context(), p) + } return } - ctx, cancel := context.WithCancel(dht.Context()) - - nn.peers[v.RemotePeer()] = &peerTracker{ - refcount: 1, - cancel: cancel, - } - - // Note: We *could* just check the peerstore to see if the remote side supports the dht - // protocol, but its not clear that that information will make it into the peerstore - // by the time this notification is sent. So just to be very careful, we brute force this - // and open a new stream - go nn.testConnection(ctx, v) - + // Note: Unfortunately, the peerstore may not yet now that this peer is + // a DHT server. So, if it didn't return a positive response above, test + // manually. + go nn.testConnection(v) } -func (nn *netNotifiee) testConnection(ctx context.Context, v inet.Conn) { +func (nn *netNotifiee) testConnection(v inet.Conn) { dht := nn.DHT() - for { - s, err := dht.host.NewStream(ctx, v.RemotePeer(), ProtocolDHT, ProtocolDHTOld) - - switch err { - case nil: - s.Close() - dht.plk.Lock() - - // Check if canceled under the lock. - if ctx.Err() == nil { - dht.Update(dht.Context(), v.RemotePeer()) - } - - dht.plk.Unlock() - case io.EOF: - if ctx.Err() == nil { - // Connection died but we may still have *an* open connection (context not canceled) so try again. - continue - } - case context.Canceled: - // Context canceled while connecting. - case mstream.ErrNotSupported: - // Client mode only, don't bother adding them to our routing table - default: - // real error? thats odd - log.Warningf("checking dht client type: %s", err) - } + p := v.RemotePeer() + + // Forcibly use *this* connection. Otherwise, if we have two connections, we could: + // 1. Test it twice. + // 2. Have it closed from under us leaving the second (open) connection untested. + s, err := v.NewStream() + if err != nil { + // Connection error return } + defer s.Close() + + selected, err := mstream.SelectOneOf(dhtProtocols, s) + if err != nil { + // Doesn't support the protocol + return + } + // Remember this choice (makes subsequent negotiations faster) + dht.peerstore.AddProtocols(p, selected) + + // We lock here as we race with disconnect. If we didn't lock, we could + // finish processing a connect after handling the associated disconnect + // event and add the peer to the routing table after removing it. + dht.plk.Lock() + defer dht.plk.Unlock() + if dht.host.Network().Connectedness(p) == inet.Connected { + dht.Update(dht.Context(), p) + } } func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { @@ -96,23 +85,16 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { p := v.RemotePeer() - func() { - dht.plk.Lock() - defer dht.plk.Unlock() + // Lock and check to see if we're still connected. We lock to make sure + // we don't concurrently process a connect event. + dht.plk.Lock() + defer dht.plk.Unlock() + if dht.host.Network().Connectedness(p) == inet.Connected { + // We're still connected. + return + } - conn, ok := nn.peers[p] - if !ok { - // Unmatched disconnects are fine. It just means that we were - // already connected when we registered the listener. - return - } - conn.refcount -= 1 - if conn.refcount == 0 { - delete(nn.peers, p) - conn.cancel() - dht.routingTable.Remove(p) - } - }() + dht.routingTable.Remove(p) dht.smlk.Lock() defer dht.smlk.Unlock()