Skip to content

Commit

Permalink
filter unworkable peers in queries + enhanced logging (#363)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk authored Jun 26, 2019
1 parent a4cabc7 commit 8fe679a
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 52 deletions.
29 changes: 25 additions & 4 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,20 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
return nil, err
}
}

// print the routing table every minute.
go func() {
tick := time.Tick(1 * time.Minute)
for {
select {
case <-tick:
dht.routingTable.Print()
case <-ctx.Done():
return
}
}
}()

return dht, nil
}

Expand Down Expand Up @@ -341,7 +355,7 @@ func (dht *IpfsDHT) shouldAddPeerToRoutingTable(c network.Conn) bool {
}

ai := dht.host.Peerstore().PeerInfo(c.RemotePeer())
if dht.isPeerLocallyConnected(c) {
if isPeerLocallyConnected(c) {
// TODO: for now, we can't easily tell if the peer on our subnet
// is dialable or not, so don't discriminate.

Expand All @@ -350,10 +364,10 @@ func (dht *IpfsDHT) shouldAddPeerToRoutingTable(c network.Conn) bool {
return len(ai.Addrs) > 0
}

return dht.isPubliclyRoutable(ai)
return isPubliclyRoutable(ai)
}

func (dht *IpfsDHT) isPubliclyRoutable(ai peer.AddrInfo) bool {
func isPubliclyRoutable(ai peer.AddrInfo) bool {
if len(ai.Addrs) == 0 {
return false
}
Expand Down Expand Up @@ -387,7 +401,7 @@ func isRelayAddr(a ma.Multiaddr) bool {
return isRelay
}

func (dht *IpfsDHT) isPeerLocallyConnected(c network.Conn) bool {
func isPeerLocallyConnected(c network.Conn) bool {
addr := c.RemoteMultiaddr()
return manet.IsPrivateAddr(addr) || manet.IsIPLoopback(addr)
}
Expand Down Expand Up @@ -607,6 +621,13 @@ func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...ta
return ctx
}

func (dht *IpfsDHT) connForPeer(p peer.ID) network.Conn {
if cs := dht.host.Network().ConnsToPeer(p); len(cs) > 0 {
return cs[0]
}
return nil
}

func (dht *IpfsDHT) handleProtocolChanges(ctx context.Context) {
// register for event bus protocol ID changes
sub, err := dht.host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated))
Expand Down
87 changes: 83 additions & 4 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kbucket/keyspace"
"github.com/multiformats/go-multiaddr"

msmux "github.com/multiformats/go-multistream"

ggio "github.com/gogo/protobuf/io"
"github.com/ipfs/go-cid"

"github.com/libp2p/go-msgio"
"go.opencensus.io/stats"
Expand Down Expand Up @@ -76,6 +79,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
r := msgio.NewVarintReaderSize(s, network.MessageSizeMax)

mPeer := s.Conn().RemotePeer()
addr := s.Conn().RemoteMultiaddr()

timer := time.AfterFunc(dhtStreamIdleTimeout, func() { s.Reset() })
defer timer.Stop()
Expand Down Expand Up @@ -131,14 +135,21 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
metrics.ReceivedBytes.M(int64(req.Size())),
)

c, _ := cid.Cast(req.Key)
pid, _ := peer.IDFromBytes(req.Key)

// note: MessageType implements Stringer.
logger.Debugf("[inbound rpc] handling incoming message; from_peer=%s, type=%s, cid_key=%s, peer_key=%s, raw_key=%x, closer=%d, providers=%d, remote_addr=%s",
mPeer, req.GetType(), c, pid, req.Key, len(req.GetCloserPeers()), len(req.ProviderPeers), addr)

handler := dht.handlerForMsgType(req.GetType())
if handler == nil {
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
logger.Warningf("can't handle received message of type %v", req.GetType())
return false
}

resp, err := handler(ctx, mPeer, &req)
resp, err := handler(ctx, mPeer, &req, s.Conn())
if err != nil {
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
logger.Debugf("error handling message: %v", err)
Expand All @@ -151,6 +162,9 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
continue
}

logger.Debugf("[inbound rpc] writing response message; to_peer=%s, type=%s, cid_key=%s, peer_key=%s, raw_key=%x, closer=%d, providers=%d, remote_addr=%s",
mPeer, resp.GetType(), c, pid, req.Key, len(resp.GetCloserPeers()), len(resp.ProviderPeers), addr)

// send out response msg
err = writeMsg(s, resp)
if err != nil {
Expand All @@ -161,6 +175,11 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {

elapsedTime := time.Since(startTime)
latencyMillis := float64(elapsedTime) / float64(time.Millisecond)

// note: MessageType implements Stringer.
logger.Debugf("[inbound rpc] wrote response message; to_peer=%s, type=%s, cid_key=%s, peer_key=%s, raw_key=%x, closer=%d, providers=%d, elapsed_ms=%f, remote_addr=%s",
mPeer, resp.GetType(), c, pid, req.Key, len(resp.GetCloserPeers()), len(resp.ProviderPeers), latencyMillis, addr)

stats.Record(ctx, metrics.InboundRequestLatency.M(latencyMillis))
}
}
Expand Down Expand Up @@ -337,24 +356,52 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro
ms.lk.Lock()
defer ms.lk.Unlock()
retry := false

c, _ := cid.Cast(pmes.Key)
pid, _ := peer.IDFromBytes(pmes.Key)

distance := keyspace.ZeroPrefixLen(keyspace.XORKeySpace.Key(pmes.Key).Distance(keyspace.XORKeySpace.Key([]byte(ms.p))).Bytes())
var addr multiaddr.Multiaddr
if ms.s != nil {
addr = ms.s.Conn().RemoteMultiaddr()
}

for {
if err := ms.prep(ctx); err != nil {
return err
}

// note: MessageType implements Stringer.
logger.Debugf("[outbound rpc] writing fire-and-forget outbound message; to_peer=%s, type=%s, cid_key=%s, peer_key=%s, raw_key=%x, closer=%d, providers=%d, xor_common_zeros=%d, remote_addr=%s",
ms.p, pmes.GetType(), c, pid, pmes.Key, len(pmes.GetCloserPeers()), len(pmes.ProviderPeers), distance, addr)

startTime := time.Now()
if err := ms.writeMsg(pmes); err != nil {
ms.s.Reset()
ms.s = nil

elapsedTime := time.Since(startTime)
latencyMillis := float64(elapsedTime) / float64(time.Millisecond)

if retry {
logger.Info("error writing message, bailing: ", err)
logger.Infof("[outbound rpc] error while writing fire-and-forget, bailing; err=%s, to_peer=%s, type=%s, cid_key=%s, peer_key=%s, raw_key=%x, closer=%d, providers=%d, elapsed_ms=%f, xor_common_zeros=%d, remote_addr=%s",
err, ms.p, pmes.GetType(), c, pid, pmes.Key, len(pmes.GetCloserPeers()), len(pmes.ProviderPeers), latencyMillis, distance, addr)
return err
}
logger.Info("error writing message, trying again: ", err)
logger.Infof("[outbound rpc] error while writing fire-and-forget, trying again; err=%s, to_peer=%s, type=%s, cid_key=%s, peer_key=%s, raw_key=%x, closer=%d, providers=%d, elapsed_ms=%f, xor_common_zeros=%d, remote_addr=%s",
err, ms.p, pmes.GetType(), c, pid, pmes.Key, len(pmes.GetCloserPeers()), len(pmes.ProviderPeers), latencyMillis, distance, addr)

retry = true
continue
}

elapsedTime := time.Since(startTime)
latencyMillis := float64(elapsedTime) / float64(time.Millisecond)

// note: MessageType implements Stringer.
logger.Debugf("[outbound rpc] wrote fire-and-forget outbound message; to_peer=%s, type=%s, cid_key=%s, peer_key=%s, raw_key=%x, closer=%d, providers=%d, elapsed_ms=%f, xor_common_zeros=%d, remote_addr=%s",
ms.p, pmes.GetType(), c, pid, pmes.Key, len(pmes.GetCloserPeers()), len(pmes.ProviderPeers), latencyMillis, distance, addr)

logger.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

if ms.singleMes > streamReuseTries {
Expand All @@ -372,11 +419,26 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
ms.lk.Lock()
defer ms.lk.Unlock()
retry := false

c, _ := cid.Cast(pmes.Key)
pid, _ := peer.IDFromBytes(pmes.Key)

distance := keyspace.ZeroPrefixLen(keyspace.XORKeySpace.Key(pmes.Key).Distance(keyspace.XORKeySpace.Key([]byte(ms.p))).Bytes())
var addr multiaddr.Multiaddr
if ms.s != nil {
addr = ms.s.Conn().RemoteMultiaddr()
}

for {
if err := ms.prep(ctx); err != nil {
return nil, err
}

startTime := time.Now()
// note: MessageType implements Stringer.
logger.Debugf("[outbound rpc] writing request outbound message; to_peer=%s, type=%s, cid_key=%s, peer_key=%s, raw_key=%x, closer=%d, providers=%d, xor_common_zeros=%d, remote_addr=%s",
ms.p, pmes.GetType(), c, pid, pmes.Key, len(pmes.GetCloserPeers()), len(pmes.ProviderPeers), distance, addr)

if err := ms.writeMsg(pmes); err != nil {
ms.s.Reset()
ms.s = nil
Expand All @@ -390,6 +452,14 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
continue
}

elapsedTime := time.Since(startTime)
latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
// note: MessageType implements Stringer.
logger.Debugf("[outbound rpc] wrote request outbound message; to_peer=%s, type=%s, cid_key=%s, peer_key=%s, raw_key=%x, closer=%d, providers=%d, elapsed_ms=%f, xor_common_zeros=%d, remote_addr=%s",
ms.p, pmes.GetType(), c, pid, pmes.Key, len(pmes.GetCloserPeers()), len(pmes.ProviderPeers), latencyMillis, distance, addr)

startTime = time.Now()

mes := new(pb.Message)
if err := ms.ctxReadMsg(ctx, mes); err != nil {
ms.s.Reset()
Expand All @@ -404,6 +474,15 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
continue
}

elapsedTime = time.Since(startTime)
latencyMillis = float64(elapsedTime) / float64(time.Millisecond)

c, _ = cid.Cast(mes.Key)
pid, _ = peer.IDFromBytes(pmes.Key)
// note: MessageType implements Stringer.
logger.Debugf("[outbound rpc] read response message; from_peer=%s, type=%s, cid_key=%s, peer_key=%s, raw_key=%x, closer=%d, providers=%d, elapsed_ms=%f, xor_common_zeros=%d, remote_addr=%s",
ms.p, mes.GetType(), c, pid, mes.Key, len(mes.GetCloserPeers()), len(mes.ProviderPeers), latencyMillis, distance, addr)

logger.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

if ms.singleMes > streamReuseTries {
Expand Down
2 changes: 1 addition & 1 deletion dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ func TestBadProtoMessages(t *testing.T) {
d := setupDHT(ctx, t, false)

nilrec := new(pb.Message)
if _, err := d.handlePutValue(ctx, "testpeer", nilrec); err == nil {
if _, err := d.handlePutValue(ctx, "testpeer", nilrec, nil); err == nil {
t.Fatal("should have errored on nil record")
}
}
Expand Down
13 changes: 0 additions & 13 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
Expand Down Expand Up @@ -108,15 +107,8 @@ github.com/libp2p/go-eventbus v0.0.2 h1:L9eslON8FjFBJlyUs9fyEZKnxSqZd2AMDUNldPrq
github.com/libp2p/go-eventbus v0.0.2/go.mod h1:Hr/yGlwxA/stuLnpMiu82lpNKpvRy3EaJxPu40XYOwk=
github.com/libp2p/go-flow-metrics v0.0.1 h1:0gxuFd2GuK7IIP5pKljLwps6TvcuYgvG7Atqi3INF5s=
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/libp2p/go-libp2p v0.1.3-0.20190625164522-49fcff4f51ef h1:D3p2rC0THm6+xlTQ488m8JuIsejIYXy6osVfNkkKSWw=
github.com/libp2p/go-libp2p v0.1.3-0.20190625164522-49fcff4f51ef/go.mod h1:5nXHmf4Hs+NmkaMsmWcFJgUHTbYNpCfxr20lwus0p1c=
github.com/libp2p/go-libp2p v0.1.3-0.20190626165858-721fd27ceb38 h1:7ZAgEEAEtPEBKCTsR0TMNvktW83ZRu9eyHvb6L6dqJY=
github.com/libp2p/go-libp2p v0.1.3-0.20190626165858-721fd27ceb38/go.mod h1:+TYaPrxDRORO40RdCcuEEHGJhfo17kQVv9prZHiAnjk=
github.com/libp2p/go-libp2p v0.1.3-0.20190626170118-712b2cfb577f h1:tCEJnI783rM+BP1Jk4aftYgGLem1F74CSdXbwkTAZAk=
github.com/libp2p/go-libp2p v0.1.3-0.20190626170118-712b2cfb577f/go.mod h1:+TYaPrxDRORO40RdCcuEEHGJhfo17kQVv9prZHiAnjk=
github.com/libp2p/go-libp2p v0.1.3-0.20190626170235-f299d252e778 h1:S6+XMA5RQX9hlr+M1+8m6IcI6qNg744b5xqWieCeFzQ=
github.com/libp2p/go-libp2p v0.1.3-0.20190626170235-f299d252e778/go.mod h1:+TYaPrxDRORO40RdCcuEEHGJhfo17kQVv9prZHiAnjk=
github.com/libp2p/go-libp2p-autonat v0.1.0 h1:aCWAu43Ri4nU0ZPO7NyLzUvvfqd0nE3dX0R/ZGYVgOU=
github.com/libp2p/go-libp2p-autonat v0.1.0/go.mod h1:1tLf2yXxiE/oKGtDwPYWTSYG3PtvYlJmg7NeVtPRqH8=
github.com/libp2p/go-libp2p-blankhost v0.1.1/go.mod h1:pf2fvdLJPsC1FsVrNP3DUUvMzUts2dsLLBEpo1vW1ro=
github.com/libp2p/go-libp2p-blankhost v0.1.3 h1:0KycuXvPDhmehw0ASsg+s1o3IfXgCUDqfzAl94KEBOg=
Expand All @@ -129,7 +121,6 @@ github.com/libp2p/go-libp2p-core v0.0.6/go.mod h1:0d9xmaYAVY5qmbp/fcgxHT3ZJsLjYe
github.com/libp2p/go-libp2p-core v0.0.7-0.20190626134135-aca080dccfc2 h1:zrJkzQO8t/4rklfCb/t26oEqcRMOShVZiKqZxxvMZn0=
github.com/libp2p/go-libp2p-core v0.0.7-0.20190626134135-aca080dccfc2/go.mod h1:0d9xmaYAVY5qmbp/fcgxHT3ZJsLjYeYPMJAUKpaCHrE=
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-discovery v0.1.0 h1:j+R6cokKcGbnZLf4kcNwpx6mDEUPF3N6SrqMymQhmvs=
github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g=
github.com/libp2p/go-libp2p-kbucket v0.2.0 h1:FB2a0VkOTNGTP5gu/I444u4WabNM9V1zCkQcWb7zajI=
github.com/libp2p/go-libp2p-kbucket v0.2.0/go.mod h1:JNymBToym3QXKBMKGy3m29+xprg0EVr/GJFHxFEdgh8=
Expand All @@ -144,7 +135,6 @@ github.com/libp2p/go-libp2p-netutil v0.1.0 h1:zscYDNVEcGxyUpMd0JReUZTrpMfia8PmLK
github.com/libp2p/go-libp2p-netutil v0.1.0/go.mod h1:3Qv/aDqtMLTUyQeundkKsA+YCThNdbQD54k3TqjpbFU=
github.com/libp2p/go-libp2p-peer v0.2.0/go.mod h1:RCffaCvUyW2CJmG2gAWVqwePwW7JMgxjsHm7+J5kjWY=
github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY=
github.com/libp2p/go-libp2p-peerstore v0.1.1/go.mod h1:ojEWnwG7JpJLkJ9REWYXQslyu9ZLrPWPEcCdiZzEbSM=
github.com/libp2p/go-libp2p-peerstore v0.1.2-0.20190621130618-cfa9bb890c1a h1:xW2Q7yiWAQnBpxe6m5Y094bYxxBCaoNruxc1sDlVxs0=
github.com/libp2p/go-libp2p-peerstore v0.1.2-0.20190621130618-cfa9bb890c1a/go.mod h1:DAchSrPUuksotuxrqPcvk5jvifXlxC3oH/65iHFmBns=
github.com/libp2p/go-libp2p-record v0.1.0 h1:wHwBGbFzymoIl69BpgwIu0O6ta3TXGcMPvHUAcodzRc=
Expand All @@ -161,7 +151,6 @@ github.com/libp2p/go-libp2p-testing v0.0.4 h1:Qev57UR47GcLPXWjrunv5aLIQGO4n9mhI/
github.com/libp2p/go-libp2p-testing v0.0.4/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-transport-upgrader v0.1.1 h1:PZMS9lhjK9VytzMCW3tWHAXtKXmlURSc3ZdvwEcKCzw=
github.com/libp2p/go-libp2p-transport-upgrader v0.1.1/go.mod h1:IEtA6or8JUbsV07qPW4r01GnTenLW4oi3lOPbUMGJJA=
github.com/libp2p/go-libp2p-yamux v0.2.0 h1:TSPZ5cMMz/wdoYsye/wU1TE4G3LDGMoeEN0xgnCKU/I=
github.com/libp2p/go-libp2p-yamux v0.2.0/go.mod h1:Db2gU+XfLpm6E4rG5uGCFX6uXA8MEXOxFcRoXUODaK8=
github.com/libp2p/go-libp2p-yamux v0.2.1 h1:Q3XYNiKCC2vIxrvUJL+Jg1kiyeEaIDNKLjgEjo3VQdI=
github.com/libp2p/go-libp2p-yamux v0.2.1/go.mod h1:1FBXiHDk1VyRM1C0aez2bCfHQ4vMZKkAQzZbkSQt5fI=
Expand All @@ -184,9 +173,7 @@ github.com/libp2p/go-stream-muxer-multistream v0.2.0 h1:714bRJ4Zy9mdhyTLJ+ZKiROm
github.com/libp2p/go-stream-muxer-multistream v0.2.0/go.mod h1:j9eyPol/LLRqT+GPLSxvimPhNph4sfYfMoDPd7HkzIc=
github.com/libp2p/go-tcp-transport v0.1.0 h1:IGhowvEqyMFknOar4FWCKSWE0zL36UFKQtiRQD60/8o=
github.com/libp2p/go-tcp-transport v0.1.0/go.mod h1:oJ8I5VXryj493DEJ7OsBieu8fcg2nHGctwtInJVpipc=
github.com/libp2p/go-ws-transport v0.1.0 h1:F+0OvvdmPTDsVc4AjPHjV7L7Pk1B7D5QwtDcKE2oag4=
github.com/libp2p/go-ws-transport v0.1.0/go.mod h1:rjw1MG1LU9YDC6gzmwObkPd/Sqwhw7yT74kj3raBFuo=
github.com/libp2p/go-yamux v1.2.2 h1:s6J6o7+ajoQMjHe7BEnq+EynOj5D2EoG8CuQgL3F2vg=
github.com/libp2p/go-yamux v1.2.2/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow=
github.com/libp2p/go-yamux v1.2.3 h1:xX8A36vpXb59frIzWFdEgptLMsOANMFq2K7fPRlunYI=
github.com/libp2p/go-yamux v1.2.3/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow=
Expand Down
Loading

0 comments on commit 8fe679a

Please sign in to comment.