-
Notifications
You must be signed in to change notification settings - Fork 228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix connection tracking race #111
Merged
Changes from 1 commit
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,6 @@ | ||
package dht | ||
|
||
import ( | ||
"context" | ||
"io" | ||
|
||
inet "github.com/libp2p/go-libp2p-net" | ||
ma "github.com/multiformats/go-multiaddr" | ||
mstream "github.com/multiformats/go-multistream" | ||
|
@@ -12,15 +9,12 @@ import ( | |
// netNotifiee defines methods to be used with the IpfsDHT | ||
type netNotifiee IpfsDHT | ||
|
||
var dhtProtocols = []string{string(ProtocolDHT), string(ProtocolDHTOld)} | ||
|
||
func (nn *netNotifiee) DHT() *IpfsDHT { | ||
return (*IpfsDHT)(nn) | ||
} | ||
|
||
type peerTracker struct { | ||
refcount int | ||
cancel func() | ||
} | ||
|
||
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { | ||
dht := nn.DHT() | ||
select { | ||
|
@@ -29,61 +23,51 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { | |
default: | ||
} | ||
|
||
dht.plk.Lock() | ||
defer dht.plk.Unlock() | ||
|
||
conn, ok := nn.peers[v.RemotePeer()] | ||
if ok { | ||
conn.refcount++ | ||
p := v.RemotePeer() | ||
protos, err := dht.peerstore.SupportsProtocols(p, dhtProtocols...) | ||
if err == nil && len(protos) != 0 { | ||
dht.plk.Lock() | ||
defer dht.plk.Unlock() | ||
if dht.host.Network().Connectedness(p) == inet.Connected { | ||
dht.Update(dht.Context(), p) | ||
} | ||
return | ||
} | ||
|
||
ctx, cancel := context.WithCancel(dht.Context()) | ||
|
||
nn.peers[v.RemotePeer()] = &peerTracker{ | ||
refcount: 1, | ||
cancel: cancel, | ||
} | ||
|
||
// Note: We *could* just check the peerstore to see if the remote side supports the dht | ||
// protocol, but its not clear that that information will make it into the peerstore | ||
// by the time this notification is sent. So just to be very careful, we brute force this | ||
// and open a new stream | ||
go nn.testConnection(ctx, v) | ||
|
||
// Note: Unfortunately, the peerstore may not yet now that this peer is | ||
// a DHT server. So, if it didn't return a positive response above, test | ||
// manually. | ||
go nn.testConnection(v) | ||
} | ||
|
||
func (nn *netNotifiee) testConnection(ctx context.Context, v inet.Conn) { | ||
func (nn *netNotifiee) testConnection(v inet.Conn) { | ||
dht := nn.DHT() | ||
for { | ||
s, err := dht.host.NewStream(ctx, v.RemotePeer(), ProtocolDHT, ProtocolDHTOld) | ||
|
||
switch err { | ||
case nil: | ||
s.Close() | ||
dht.plk.Lock() | ||
|
||
// Check if canceled under the lock. | ||
if ctx.Err() == nil { | ||
dht.Update(dht.Context(), v.RemotePeer()) | ||
} | ||
|
||
dht.plk.Unlock() | ||
case io.EOF: | ||
if ctx.Err() == nil { | ||
// Connection died but we may still have *an* open connection (context not canceled) so try again. | ||
continue | ||
} | ||
case context.Canceled: | ||
// Context canceled while connecting. | ||
case mstream.ErrNotSupported: | ||
// Client mode only, don't bother adding them to our routing table | ||
default: | ||
// real error? thats odd | ||
log.Warningf("checking dht client type: %s", err) | ||
} | ||
p := v.RemotePeer() | ||
|
||
// Forcibly use *this* connection. Otherwise, if we have two connections, we could: | ||
// 1. Test it twice. | ||
// 2. Have it closed from under us leaving the second (open) connection untested. | ||
s, err := v.NewStream() | ||
if err != nil { | ||
// Connection error | ||
return | ||
} | ||
defer s.Close() | ||
|
||
selected, err := mstream.SelectOneOf(dhtProtocols, s) | ||
if err != nil { | ||
// Doesn't support the protocol | ||
return | ||
} | ||
// Remember this choice (makes subsequent negotiations faster) | ||
dht.peerstore.AddProtocols(p, selected) | ||
|
||
dht.plk.Lock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and here again with the lock! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See above. |
||
defer dht.plk.Unlock() | ||
// Make sure we're still connected under the lock (race with disconnect) | ||
if dht.host.Network().Connectedness(p) == inet.Connected { | ||
dht.Update(dht.Context(), p) | ||
} | ||
} | ||
|
||
func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { | ||
|
@@ -100,16 +84,7 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { | |
dht.plk.Lock() | ||
defer dht.plk.Unlock() | ||
|
||
conn, ok := nn.peers[p] | ||
if !ok { | ||
// Unmatched disconnects are fine. It just means that we were | ||
// already connected when we registered the listener. | ||
return | ||
} | ||
conn.refcount -= 1 | ||
if conn.refcount == 0 { | ||
delete(nn.peers, p) | ||
conn.cancel() | ||
if dht.host.Network().Connectedness(p) != inet.Connected { | ||
dht.routingTable.Remove(p) | ||
} | ||
}() | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what exactly does this lock protect here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to protect Update -- but that's a public interface function.
It has no business being both public and requiring the lock to be held.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This particular lock is probably unnecessary because
Connect
andDisconnect
notifications are synchronous (although I still want to leave it as it doesn't hurt and I like being consistent). However, we do need to take it below in theDisconnect
handler and intestConnection
. Otherwise, we could end up with the following interleaving:This is an alternative to reference counting open connections (what we did before) that doesn't require keeping a bunch of additional state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, fair enough. Can we add
a commentsome comments to that effect -- it looks totally out of place otherwise.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be the effect of calling
Update
without this lock? I am concerned about the public interface uses of it.