Skip to content

Commit

Permalink
Merge pull request ipfs#384 from aarshkshah1992/feature/correct-boots…
Browse files Browse the repository at this point in the history
…trapping

Feature/correct bootstrapping
  • Loading branch information
Stebalien committed Oct 11, 2019
2 parents fed99af + 00fffba commit 315504e
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 151 deletions.
84 changes: 61 additions & 23 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,22 @@ import (
"github.com/libp2p/go-libp2p-kad-dht/metrics"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
providers "github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p-kad-dht/providers"

proto "github.com/gogo/protobuf/proto"
cid "github.com/ipfs/go-cid"
"github.com/gogo/protobuf/proto"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
"github.com/jbenet/goprocess"
"github.com/jbenet/goprocess/context"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
base32 "github.com/whyrusleeping/base32"
"github.com/whyrusleeping/base32"
)

var logger = logging.Logger("dht")

// NumBootstrapQueries defines the number of random dht queries to do to
// collect members of the routing table.
const NumBootstrapQueries = 5

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand Down Expand Up @@ -70,6 +66,10 @@ type IpfsDHT struct {
protocols []protocol.ID // DHT protocols

bucketSize int

bootstrapCfg opts.BootstrapConfig

triggerBootstrap chan struct{}
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand All @@ -90,6 +90,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
return nil, err
}
dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize)
dht.bootstrapCfg = cfg.BootstrapConfig

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
Expand Down Expand Up @@ -136,34 +137,71 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT

func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID, bucketSize int) *IpfsDHT {
rt := kb.NewRoutingTable(bucketSize, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore())

cmgr := h.ConnManager()

rt.PeerAdded = func(p peer.ID) {
cmgr.TagPeer(p, "kbucket", 5)
}

rt.PeerRemoved = func(p peer.ID) {
cmgr.UntagPeer(p, "kbucket")
}

dht := &IpfsDHT{
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
triggerBootstrap: make(chan struct{}),
}

dht.ctx = dht.newContextWithLocalTags(ctx)

return dht
}

// TODO Implement RT seeding as described in https://github.com/libp2p/go-libp2p-kad-dht/pull/384#discussion_r320994340 OR
// come up with an alternative solution.
// issue is being tracked at https://github.com/libp2p/go-libp2p-kad-dht/issues/387
/*func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) {
writeResp := func(errorChan chan error, err error) {
select {
case <-proc.Closing():
case errorChan <- err:
}
close(errorChan)
}
for {
select {
case req := <-dht.rtRecoveryChan:
if dht.routingTable.Size() == 0 {
logger.Infof("rt recovery proc: received request with reqID=%s, RT is empty. initiating recovery", req.id)
// TODO Call Seeder with default bootstrap peers here once #383 is merged
if dht.routingTable.Size() > 0 {
logger.Infof("rt recovery proc: successfully recovered RT for reqID=%s, RT size is now %d", req.id, dht.routingTable.Size())
go writeResp(req.errorChan, nil)
} else {
logger.Errorf("rt recovery proc: failed to recover RT for reqID=%s, RT is still empty", req.id)
go writeResp(req.errorChan, errors.New("RT empty after seed attempt"))
}
} else {
logger.Infof("rt recovery proc: RT is not empty, no need to act on request with reqID=%s", req.id)
go writeResp(req.errorChan, nil)
}
case <-proc.Closing():
return
}
}
}*/

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {

Expand Down
Loading

0 comments on commit 315504e

Please sign in to comment.