Skip to content

Commit

Permalink
moved nesting dht into a separate package. removed ability to be a se…
Browse files Browse the repository at this point in the history
…rver of the inner dht and a client of the outer dht from the nesting dht.
  • Loading branch information
aschmahmann committed May 1, 2020
1 parent b81a3e9 commit 018fa25
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 192 deletions.
19 changes: 2 additions & 17 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"bufio"
"context"
"fmt"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/protocol"
"io"
"sync"
"time"
Expand Down Expand Up @@ -270,14 +268,6 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
return nil
}

func (dht *IpfsDHT) getHost() host.Host {
return dht.host
}

func (dht *IpfsDHT) getProtocols() []protocol.ID {
return append([]protocol.ID{},dht.protocols...)
}

func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) {
dht.smlk.Lock()
ms, ok := dht.strmap[p]
Expand Down Expand Up @@ -310,17 +300,12 @@ func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messa
return ms, nil
}

type protocolSender interface {
getHost() host.Host
getProtocols() []protocol.ID
}

type messageSender struct {
s network.Stream
r msgio.ReadCloser
lk ctxMutex
p peer.ID
dht protocolSender
dht *IpfsDHT

invalid bool
singleMes int
Expand Down Expand Up @@ -361,7 +346,7 @@ func (ms *messageSender) prep(ctx context.Context) error {
// We only want to speak to peers using our primary protocols. We do not want to query any peer that only speaks
// one of the secondary "server" protocols that we happen to support (e.g. older nodes that we can respond to for
// backwards compatibility reasons).
nstr, err := ms.dht.getHost().NewStream(ctx, ms.p, ms.dht.getProtocols()...)
nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...)
if err != nil {
return err
}
Expand Down
175 changes: 0 additions & 175 deletions nesting.go

This file was deleted.

123 changes: 123 additions & 0 deletions nesting/nesting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package nesting

import (
"context"
"fmt"
"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-cid"
ci "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/pkg/errors"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/libp2p/go-libp2p-kad-dht"
)

// DHT implements the routing interface to
type DHT struct {
Inner, Outer *dht.IpfsDHT
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
_ routing.ContentRouting = (*DHT)(nil)
_ routing.Routing = (*DHT)(nil)
_ routing.PeerRouting = (*DHT)(nil)
_ routing.PubKeyFetcher = (*DHT)(nil)
_ routing.ValueStore = (*DHT)(nil)
)

func New(ctx context.Context, h host.Host, innerOptions []dht.Option, outerOptions []dht.Option) (*DHT, error) {
inner, err := dht.New(ctx, h, innerOptions...)
if err != nil {
return nil, err
}

outer, err := dht.New(ctx, h, outerOptions...)
if err != nil {
return nil, err
}

d := &DHT{
Inner: inner,
Outer: outer,
}

return d, nil
}

func (dht *DHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
var innerResult []peer.ID
peerCh, err := dht.Inner.GetClosestPeersSeeded(ctx, key, nil)
if err == nil {
innerResult = getPeersFromCh(peerCh)
}

outerResultCh, err := dht.Outer.GetClosestPeersSeeded(ctx, key, innerResult)
if err != nil {
return nil, err
}

return getPeersFromCh(outerResultCh), nil
}

func getPeersFromCh(peerCh <-chan peer.ID) []peer.ID {
var peers []peer.ID
for p := range peerCh {
peers = append(peers, p)
}
return peers
}

func (dht *DHT) GetPublicKey(ctx context.Context, id peer.ID) (ci.PubKey, error) {
panic("implement me")
}

func (dht *DHT) Provide(ctx context.Context, cid cid.Cid, b bool) error {
panic("implement me")
}

func (dht *DHT) FindProvidersAsync(ctx context.Context, cid cid.Cid, i int) <-chan peer.AddrInfo {
panic("implement me")
}

func (dht *DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
panic("implement me")
}

func (dht *DHT) PutValue(ctx context.Context, s string, bytes []byte, option ...routing.Option) error {
panic("implement me")
}

func (dht *DHT) GetValue(ctx context.Context, s string, option ...routing.Option) ([]byte, error) {
panic("implement me")
}

func (dht *DHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) {
panic("implement me")
}

func (dht *DHT) Bootstrap(ctx context.Context) error {
errI := dht.Inner.Bootstrap(ctx)
errO := dht.Outer.Bootstrap(ctx)

errs := make([]error, 0, 2)
if errI != nil {
errs = append(errs, errors.Wrap(errI, fmt.Sprintf("failed to bootstrap inner dht")))
}
if errO != nil {
errs = append(errs, errors.Wrap(errI, fmt.Sprintf("failed to bootstrap outer dht")))
}

switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return multierror.Append(errs[0], errs[1:]...)
}
}

0 comments on commit 018fa25

Please sign in to comment.