Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rewrite of provides to better select peers to send RPCs to #456

Merged
merged 8 commits into from
Jan 2, 2015
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/blocks/blockstore"
Expand Down
9 changes: 7 additions & 2 deletions crypto/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"io"

"crypto/elliptic"
"crypto/hmac"
Expand Down Expand Up @@ -74,11 +75,15 @@ type PubKey interface {
// Given a public key, generates the shared key.
type GenSharedKey func([]byte) ([]byte, error)

// Generates a keypair of the given type and bitsize
func GenerateKeyPair(typ, bits int) (PrivKey, PubKey, error) {
return GenerateKeyPairWithReader(typ, bits, rand.Reader)
}

// Generates a keypair of the given type and bitsize
func GenerateKeyPairWithReader(typ, bits int, src io.Reader) (PrivKey, PubKey, error) {
switch typ {
case RSA:
priv, err := rsa.GenerateKey(rand.Reader, bits)
priv, err := rsa.GenerateKey(src, bits)
if err != nil {
return nil, nil, err
}
Expand Down
9 changes: 6 additions & 3 deletions crypto/key_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package crypto
package crypto_test

import (
. "github.com/jbenet/go-ipfs/crypto"

"bytes"
tu "github.com/jbenet/go-ipfs/util/testutil"
"testing"
)

func TestRsaKeys(t *testing.T) {
sk, pk, err := GenerateKeyPair(RSA, 512)
sk, pk, err := tu.RandKeyPair(512)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -90,7 +93,7 @@ func testKeyEquals(t *testing.T, k Key) {
t.Fatal("Key not equal to key with same bytes.")
}

sk, pk, err := GenerateKeyPair(RSA, 512)
sk, pk, err := tu.RandKeyPair(512)
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 1 addition & 2 deletions namesys/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package namesys
import (
"testing"

ci "github.com/jbenet/go-ipfs/crypto"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
u "github.com/jbenet/go-ipfs/util"
testutil "github.com/jbenet/go-ipfs/util/testutil"
Expand All @@ -15,7 +14,7 @@ func TestRoutingResolve(t *testing.T) {
resolver := NewRoutingResolver(d)
publisher := NewRoutingPublisher(d)

privk, pubk, err := ci.GenerateKeyPair(ci.RSA, 512)
privk, pubk, err := testutil.RandKeyPair(512)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion net/mock/mock_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func New(ctx context.Context) Mocknet {
}

func (mn *mocknet) GenPeer() (inet.Network, error) {
sk, _, err := testutil.RandKeyPair(512)
sk, _, err := testutil.SeededKeyPair(512, int64(len(mn.nets)))
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions peer/peer_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package peer
package peer_test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

peer_test intentional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap. prevents circular dependencies in testing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note the import: . "github.com/jbenet/go-ipfs/peer"


import (
"encoding/base64"
Expand All @@ -7,7 +7,9 @@ import (
"testing"

ic "github.com/jbenet/go-ipfs/crypto"
. "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
tu "github.com/jbenet/go-ipfs/util/testutil"

b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
)
Expand Down Expand Up @@ -39,7 +41,7 @@ type keyset struct {

func (ks *keyset) generate() error {
var err error
ks.sk, ks.pk, err = ic.GenerateKeyPair(ic.RSA, 1024)
ks.sk, ks.pk, err = tu.RandKeyPair(512)
if err != nil {
return err
}
Expand Down
63 changes: 15 additions & 48 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,9 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
return nil
}

// putValueToNetwork stores the given key/value pair at the peer 'p'
// meaning: it sends a PUT_VALUE message to p
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.ID,
key string, rec *pb.Record) error {
// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
key u.Key, rec *pb.Record) error {

pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
pmes.Record = rec
Expand Down Expand Up @@ -238,12 +237,12 @@ func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.PeerInfo, *kb.RoutingTable) {
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.PeerInfo {
p := dht.routingTable.Find(id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low priority but the latter would make more sense than the former

func (rt *RoutingTable) Find(id peer.ID) peer.ID
func (rt *RoutingTable) Exists(id peer.ID) bool

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, thats leftover from when it returned a peer.Peer, fairly pointless after the refactor

if p != "" {
return dht.peerstore.PeerInfo(p), dht.routingTable
return dht.peerstore.PeerInfo(p)
}
return peer.PeerInfo{}, nil
return peer.PeerInfo{}
}

// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
Expand All @@ -257,26 +256,6 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Ke
return dht.sendRequest(ctx, p, pmes)
}

func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.ID {
peers := pb.PBPeersToPeerInfos(pbps)

var provArr []peer.ID
for _, pi := range peers {
p := pi.ID

// Dont add outselves to the list
if p == dht.self {
continue
}

log.Debugf("%s adding provider: %s for %s", dht.self, p, key)
// TODO(jbenet) ensure providers is idempotent
dht.providers.AddProvider(key, p)
provArr = append(provArr, p)
}
return provArr
}

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
key := u.Key(pmes.GetKey())
Expand All @@ -285,7 +264,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
}

// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID {
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
closer := dht.nearestPeersToQuery(pmes, count)

// no node? nil
Expand All @@ -302,11 +281,16 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID {
}

var filtered []peer.ID
for _, p := range closer {
for _, clp := range closer {
// Dont send a peer back themselves
if p == clp {
continue
}

// must all be closer than self
key := u.Key(pmes.GetKey())
if !kb.Closer(dht.self, p, key) {
filtered = append(filtered, p)
if !kb.Closer(dht.self, clp, key) {
filtered = append(filtered, clp)
}
}

Expand All @@ -323,23 +307,6 @@ func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error
return dht.network.DialPeer(ctx, p)
}

//TODO: this should be smarter about which keys it selects.
func (dht *IpfsDHT) loadProvidableKeys() error {
kl, err := dht.datastore.KeyList()
if err != nil {
return err
}
for _, dsk := range kl {
k := u.KeyFromDsKey(dsk)
if len(k) == 0 {
log.Errorf("loadProvidableKeys error: %v", dsk)
}

dht.providers.AddProvider(k, dht.self)
}
return nil
}

// PingRoutine periodically pings nearest neighbors.
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
defer dht.Children().Done()
Expand Down
26 changes: 10 additions & 16 deletions routing/dht/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"

// ci "github.com/jbenet/go-ipfs/crypto"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
Expand All @@ -33,9 +32,9 @@ func init() {
}
}

func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr) *IpfsDHT {
func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr, seed int64) *IpfsDHT {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's painful to put the seeds everywhere in the interface, the testutil package is there to have all the sane defaults for tests. Lets use a shared seed (the test shared time-seeded rand, or the current time itself), and move the bits # into testutil as a default.


sk, pk, err := testutil.RandKeyPair(512)
sk, pk, err := testutil.SeededKeyPair(512, seed)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -71,7 +70,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer

for i := 0; i < n; i++ {
addrs[i] = testutil.RandLocalTCPAddress()
dhts[i] = setupDHT(ctx, t, addrs[i])
dhts[i] = setupDHT(ctx, t, addrs[i], int64(i))
peers[i] = dhts[i].self
}

Expand Down Expand Up @@ -120,8 +119,8 @@ func TestPing(t *testing.T) {
addrA := testutil.RandLocalTCPAddress()
addrB := testutil.RandLocalTCPAddress()

dhtA := setupDHT(ctx, t, addrA)
dhtB := setupDHT(ctx, t, addrB)
dhtA := setupDHT(ctx, t, addrA, 1)
dhtB := setupDHT(ctx, t, addrB, 2)

peerA := dhtA.self
peerB := dhtB.self
Expand Down Expand Up @@ -153,8 +152,8 @@ func TestValueGetSet(t *testing.T) {
addrA := testutil.RandLocalTCPAddress()
addrB := testutil.RandLocalTCPAddress()

dhtA := setupDHT(ctx, t, addrA)
dhtB := setupDHT(ctx, t, addrB)
dhtA := setupDHT(ctx, t, addrA, 1)
dhtB := setupDHT(ctx, t, addrB, 2)

defer dhtA.Close()
defer dhtB.Close()
Expand Down Expand Up @@ -487,12 +486,7 @@ func TestLayeredGet(t *testing.T) {
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])

err := dhts[3].putLocal(u.Key("/v/hello"), []byte("world"))
if err != nil {
t.Fatal(err)
}

err = dhts[3].Provide(ctx, u.Key("/v/hello"))
err := dhts[3].Provide(ctx, u.Key("/v/hello"))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -642,8 +636,8 @@ func TestConnectCollision(t *testing.T) {
addrA := testutil.RandLocalTCPAddress()
addrB := testutil.RandLocalTCPAddress()

dhtA := setupDHT(ctx, t, addrA)
dhtB := setupDHT(ctx, t, addrB)
dhtA := setupDHT(ctx, t, addrA, int64((rtime*2)+1))
dhtB := setupDHT(ctx, t, addrB, int64((rtime*2)+2))

peerA := dhtA.self
peerB := dhtB.self
Expand Down
7 changes: 3 additions & 4 deletions routing/dht/ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ func TestGetFailures(t *testing.T) {
t.Fatal("Did not get expected error!")
}

msgs := make(chan *pb.Message, 100)
t.Log("Timeout test passed.")

// u.POut("NotFound Test\n")
// Reply with failures to every message
nets[1].SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
defer s.Close()
Expand All @@ -68,8 +67,6 @@ func TestGetFailures(t *testing.T) {
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}

msgs <- resp
})

// This one should fail with NotFound
Expand All @@ -83,6 +80,8 @@ func TestGetFailures(t *testing.T) {
t.Fatal("expected error, got none.")
}

t.Log("ErrNotFound check passed!")

// Now we test this DHT's handleGetValue failure
{
typ := pb.Message_GET_VALUE
Expand Down
12 changes: 6 additions & 6 deletions routing/dht/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
}

func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
log.Debugf("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey())
log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey())

// setup response
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
Expand Down Expand Up @@ -93,7 +93,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
}

// Find closest peer on given cluster to desired key and reply with that info
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
closerinfos := peer.PeerInfos(dht.peerstore, closer)
if closer != nil {
for _, pi := range closerinfos {
Expand Down Expand Up @@ -127,7 +127,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
}

err = dht.datastore.Put(dskey, data)
log.Debugf("%s handlePutValue %v\n", dht.self, dskey)
log.Debugf("%s handlePutValue %v", dht.self, dskey)
return pmes, err
}

Expand All @@ -144,11 +144,11 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess
if peer.ID(pmes.GetKey()) == dht.self {
closest = []peer.ID{dht.self}
} else {
closest = dht.betterPeersToQuery(pmes, CloserPeerCount)
closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount)
}

if closest == nil {
log.Debugf("handleFindPeer: could not find anything.")
log.Warningf("handleFindPeer: could not find anything.")
return resp, nil
}

Expand Down Expand Up @@ -189,7 +189,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
}

// Also send closer peers.
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
if closer != nil {
infos := peer.PeerInfos(dht.peerstore, providers)
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.network, infos)
Expand Down
Loading