Skip to content

Commit

Permalink
Merge pull request #456 from jbenet/provides-rewrite
Browse files Browse the repository at this point in the history
rewrite of provides to better select peers to send RPCs to
  • Loading branch information
jbenet committed Jan 2, 2015
2 parents da1387f + da04d26 commit b86101b
Show file tree
Hide file tree
Showing 16 changed files with 231 additions and 188 deletions.
1 change: 1 addition & 0 deletions core/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/blocks/blockstore"
Expand Down
9 changes: 7 additions & 2 deletions crypto/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"io"

"crypto/elliptic"
"crypto/hmac"
Expand Down Expand Up @@ -74,11 +75,15 @@ type PubKey interface {
// Given a public key, generates the shared key.
type GenSharedKey func([]byte) ([]byte, error)

// Generates a keypair of the given type and bitsize
func GenerateKeyPair(typ, bits int) (PrivKey, PubKey, error) {
return GenerateKeyPairWithReader(typ, bits, rand.Reader)
}

// Generates a keypair of the given type and bitsize
func GenerateKeyPairWithReader(typ, bits int, src io.Reader) (PrivKey, PubKey, error) {
switch typ {
case RSA:
priv, err := rsa.GenerateKey(rand.Reader, bits)
priv, err := rsa.GenerateKey(src, bits)
if err != nil {
return nil, nil, err
}
Expand Down
9 changes: 6 additions & 3 deletions crypto/key_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package crypto
package crypto_test

import (
. "github.com/jbenet/go-ipfs/crypto"

"bytes"
tu "github.com/jbenet/go-ipfs/util/testutil"
"testing"
)

func TestRsaKeys(t *testing.T) {
sk, pk, err := GenerateKeyPair(RSA, 512)
sk, pk, err := tu.RandKeyPair(512)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -90,7 +93,7 @@ func testKeyEquals(t *testing.T, k Key) {
t.Fatal("Key not equal to key with same bytes.")
}

sk, pk, err := GenerateKeyPair(RSA, 512)
sk, pk, err := tu.RandKeyPair(512)
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 1 addition & 2 deletions namesys/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package namesys
import (
"testing"

ci "github.com/jbenet/go-ipfs/crypto"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
u "github.com/jbenet/go-ipfs/util"
testutil "github.com/jbenet/go-ipfs/util/testutil"
Expand All @@ -15,7 +14,7 @@ func TestRoutingResolve(t *testing.T) {
resolver := NewRoutingResolver(d)
publisher := NewRoutingPublisher(d)

privk, pubk, err := ci.GenerateKeyPair(ci.RSA, 512)
privk, pubk, err := testutil.RandKeyPair(512)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion net/mock/mock_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func New(ctx context.Context) Mocknet {
}

func (mn *mocknet) GenPeer() (inet.Network, error) {
sk, _, err := testutil.RandKeyPair(512)
sk, _, err := testutil.SeededKeyPair(int64(len(mn.nets)))
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions peer/peer_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package peer
package peer_test

import (
"encoding/base64"
Expand All @@ -7,7 +7,9 @@ import (
"testing"

ic "github.com/jbenet/go-ipfs/crypto"
. "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
tu "github.com/jbenet/go-ipfs/util/testutil"

b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
)
Expand Down Expand Up @@ -39,7 +41,7 @@ type keyset struct {

func (ks *keyset) generate() error {
var err error
ks.sk, ks.pk, err = ic.GenerateKeyPair(ic.RSA, 1024)
ks.sk, ks.pk, err = tu.RandKeyPair(512)
if err != nil {
return err
}
Expand Down
63 changes: 15 additions & 48 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,9 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
return nil
}

// putValueToNetwork stores the given key/value pair at the peer 'p'
// meaning: it sends a PUT_VALUE message to p
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.ID,
key string, rec *pb.Record) error {
// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
key u.Key, rec *pb.Record) error {

pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
pmes.Record = rec
Expand Down Expand Up @@ -238,12 +237,12 @@ func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.PeerInfo, *kb.RoutingTable) {
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.PeerInfo {
p := dht.routingTable.Find(id)
if p != "" {
return dht.peerstore.PeerInfo(p), dht.routingTable
return dht.peerstore.PeerInfo(p)
}
return peer.PeerInfo{}, nil
return peer.PeerInfo{}
}

// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
Expand All @@ -257,26 +256,6 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Ke
return dht.sendRequest(ctx, p, pmes)
}

func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.ID {
peers := pb.PBPeersToPeerInfos(pbps)

var provArr []peer.ID
for _, pi := range peers {
p := pi.ID

// Dont add outselves to the list
if p == dht.self {
continue
}

log.Debugf("%s adding provider: %s for %s", dht.self, p, key)
// TODO(jbenet) ensure providers is idempotent
dht.providers.AddProvider(key, p)
provArr = append(provArr, p)
}
return provArr
}

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
key := u.Key(pmes.GetKey())
Expand All @@ -285,7 +264,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
}

// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID {
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
closer := dht.nearestPeersToQuery(pmes, count)

// no node? nil
Expand All @@ -302,11 +281,16 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID {
}

var filtered []peer.ID
for _, p := range closer {
for _, clp := range closer {
// Dont send a peer back themselves
if p == clp {
continue
}

// must all be closer than self
key := u.Key(pmes.GetKey())
if !kb.Closer(dht.self, p, key) {
filtered = append(filtered, p)
if !kb.Closer(dht.self, clp, key) {
filtered = append(filtered, clp)
}
}

Expand All @@ -323,23 +307,6 @@ func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error
return dht.network.DialPeer(ctx, p)
}

//TODO: this should be smarter about which keys it selects.
func (dht *IpfsDHT) loadProvidableKeys() error {
kl, err := dht.datastore.KeyList()
if err != nil {
return err
}
for _, dsk := range kl {
k := u.KeyFromDsKey(dsk)
if len(k) == 0 {
log.Errorf("loadProvidableKeys error: %v", dsk)
}

dht.providers.AddProvider(k, dht.self)
}
return nil
}

// PingRoutine periodically pings nearest neighbors.
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
defer dht.Children().Done()
Expand Down
10 changes: 2 additions & 8 deletions routing/dht/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"

// ci "github.com/jbenet/go-ipfs/crypto"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
Expand All @@ -35,7 +34,7 @@ func init() {

func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr) *IpfsDHT {

sk, pk, err := testutil.RandKeyPair(512)
sk, pk, err := testutil.SeededKeyPair(time.Now().UnixNano())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -487,12 +486,7 @@ func TestLayeredGet(t *testing.T) {
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])

err := dhts[3].putLocal(u.Key("/v/hello"), []byte("world"))
if err != nil {
t.Fatal(err)
}

err = dhts[3].Provide(ctx, u.Key("/v/hello"))
err := dhts[3].Provide(ctx, u.Key("/v/hello"))
if err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 3 additions & 4 deletions routing/dht/ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ func TestGetFailures(t *testing.T) {
t.Fatal("Did not get expected error!")
}

msgs := make(chan *pb.Message, 100)
t.Log("Timeout test passed.")

// u.POut("NotFound Test\n")
// Reply with failures to every message
nets[1].SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
defer s.Close()
Expand All @@ -68,8 +67,6 @@ func TestGetFailures(t *testing.T) {
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}

msgs <- resp
})

// This one should fail with NotFound
Expand All @@ -83,6 +80,8 @@ func TestGetFailures(t *testing.T) {
t.Fatal("expected error, got none.")
}

t.Log("ErrNotFound check passed!")

// Now we test this DHT's handleGetValue failure
{
typ := pb.Message_GET_VALUE
Expand Down
12 changes: 6 additions & 6 deletions routing/dht/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
}

func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
log.Debugf("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey())
log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey())

// setup response
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
Expand Down Expand Up @@ -93,7 +93,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
}

// Find closest peer on given cluster to desired key and reply with that info
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
closerinfos := peer.PeerInfos(dht.peerstore, closer)
if closer != nil {
for _, pi := range closerinfos {
Expand Down Expand Up @@ -127,7 +127,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
}

err = dht.datastore.Put(dskey, data)
log.Debugf("%s handlePutValue %v\n", dht.self, dskey)
log.Debugf("%s handlePutValue %v", dht.self, dskey)
return pmes, err
}

Expand All @@ -144,11 +144,11 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess
if peer.ID(pmes.GetKey()) == dht.self {
closest = []peer.ID{dht.self}
} else {
closest = dht.betterPeersToQuery(pmes, CloserPeerCount)
closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount)
}

if closest == nil {
log.Debugf("handleFindPeer: could not find anything.")
log.Warningf("handleFindPeer: could not find anything.")
return resp, nil
}

Expand Down Expand Up @@ -189,7 +189,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
}

// Also send closer peers.
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
if closer != nil {
infos := peer.PeerInfos(dht.peerstore, providers)
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.network, infos)
Expand Down
Loading

0 comments on commit b86101b

Please sign in to comment.