From 018fa259c19d6b216a00c58553d101a688309217 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 1 May 2020 03:48:46 -0400 Subject: [PATCH] moved nesting dht into a separate package. removed ability to be a server of the inner dht and a client of the outer dht from the nesting dht. --- dht_net.go | 19 +---- nesting.go | 175 --------------------------------------------- nesting/nesting.go | 123 +++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+), 192 deletions(-) delete mode 100644 nesting.go create mode 100644 nesting/nesting.go diff --git a/dht_net.go b/dht_net.go index 8745c3069..9bb1aed6b 100644 --- a/dht_net.go +++ b/dht_net.go @@ -4,8 +4,6 @@ import ( "bufio" "context" "fmt" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/protocol" "io" "sync" "time" @@ -270,14 +268,6 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message return nil } -func (dht *IpfsDHT) getHost() host.Host { - return dht.host -} - -func (dht *IpfsDHT) getProtocols() []protocol.ID { - return append([]protocol.ID{},dht.protocols...) -} - func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) { dht.smlk.Lock() ms, ok := dht.strmap[p] @@ -310,17 +300,12 @@ func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messa return ms, nil } -type protocolSender interface { - getHost() host.Host - getProtocols() []protocol.ID -} - type messageSender struct { s network.Stream r msgio.ReadCloser lk ctxMutex p peer.ID - dht protocolSender + dht *IpfsDHT invalid bool singleMes int @@ -361,7 +346,7 @@ func (ms *messageSender) prep(ctx context.Context) error { // We only want to speak to peers using our primary protocols. We do not want to query any peer that only speaks // one of the secondary "server" protocols that we happen to support (e.g. older nodes that we can respond to for // backwards compatibility reasons). - nstr, err := ms.dht.getHost().NewStream(ctx, ms.p, ms.dht.getProtocols()...) + nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...) if err != nil { return err } diff --git a/nesting.go b/nesting.go deleted file mode 100644 index d1d0aa192..000000000 --- a/nesting.go +++ /dev/null @@ -1,175 +0,0 @@ -package dht - -import ( - "context" - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - pstore "github.com/libp2p/go-libp2p-core/peerstore" - "github.com/libp2p/go-libp2p-core/protocol" - pb "github.com/libp2p/go-libp2p-kad-dht/pb" - "github.com/libp2p/go-msgio" - "io" - "time" - - "github.com/libp2p/go-libp2p-core/host" -) - -type NestedDHT struct { - Inner, Outer *IpfsDHT -} - -const transferProto protocol.ID = "/adin/ipfs/kad/1.0.0" - -func NewNested(ctx context.Context, h host.Host, innerOptions []Option, outerOptions []Option) (*NestedDHT, error) { - inner, err := New(ctx, h, innerOptions...) - if err != nil { - return nil, err - } - - outer, err := New(ctx, h, outerOptions...) - if err != nil { - return nil, err - } - - d := &NestedDHT{ - Inner: inner, - Outer: outer, - } - - h.SetStreamHandler(transferProto, func(s network.Stream) { - defer s.Reset() //nolint - if d.handleNewMessage(s) { - // Gracefully close the stream for writes. - s.Close() - } - }) - - return d, nil -} - -// Returns true on orderly completion of writes (so we can Close the stream). -func (dht *NestedDHT) handleNewMessage(s network.Stream) bool { - ctx := dht.Inner.ctx - r := msgio.NewVarintReaderSize(s, network.MessageSizeMax) - - mPeer := s.Conn().RemotePeer() - - timer := time.AfterFunc(dhtStreamIdleTimeout, func() { _ = s.Reset() }) - defer timer.Stop() - - for { - if dht.Inner.getMode() != modeServer { - logger.Errorf("ignoring incoming dht message while not in server mode") - return false - } - - var req pb.Message - msgbytes, err := r.ReadMsg() - msgLen := len(msgbytes) - if err != nil { - r.ReleaseMsg(msgbytes) - if err == io.EOF { - return true - } - // This string test is necessary because there isn't a single stream reset error - // instance in use. - if err.Error() != "stream reset" { - logger.Debugf("error reading message: %#v", err) - } - if msgLen > 0 { - } - return false - } - err = req.Unmarshal(msgbytes) - r.ReleaseMsg(msgbytes) - if err != nil { - return false - } - - timer.Reset(dhtStreamIdleTimeout) - - var handler dhtHandler - if req.GetType() == pb.Message_FIND_NODE { - handler = dht.Outer.handlerForMsgType(req.GetType()) - } - - if handler == nil { - return false - } - - resp, err := handler(ctx, mPeer, &req) - if err != nil { - return false - } - - if resp == nil { - continue - } - - // send out response msg - err = writeMsg(s, resp) - if err != nil { - return false - } - } -} - -type nestedProtocolSender struct { - host host.Host - proto protocol.ID -} - -func (ps *nestedProtocolSender) getHost() host.Host { - return ps.host -} - -func (ps *nestedProtocolSender) getProtocols() []protocol.ID { - return []protocol.ID{ps.proto} -} - -func (dht *NestedDHT) transferGCP(ctx context.Context, p peer.ID, key string) ([]*peer.AddrInfo, error){ - pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(key), 0) - nps := &nestedProtocolSender{host: dht.Inner.host, proto: transferProto} - ms := &messageSender{p: p, dht: nps, lk: newCtxMutex()} - resp, err := ms.SendRequest(ctx,pmes) - if err != nil { - return nil, err - } - peers := pb.PBPeersToPeerInfos(resp.GetCloserPeers()) - return peers, nil -} - -func (dht *NestedDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { - var innerResult []peer.ID - peerCh, err := dht.Inner.GetClosestPeersSeeded(ctx, key, nil) - if err == nil { - innerResult = getPeersFromCh(peerCh) - } - - seedPeerSet := peer.NewSet() - for _, p := range innerResult { - innerTransferPeers, err := dht.transferGCP(ctx, p, key) - if err == nil { - for _, outerPeer := range innerTransferPeers { - if seedPeerSet.TryAdd(outerPeer.ID) { - dht.Inner.host.Peerstore().AddAddrs(outerPeer.ID, outerPeer.Addrs, pstore.TempAddrTTL) - } - } - } - } - - outerResultCh, err := dht.Outer.GetClosestPeersSeeded(ctx, key, seedPeerSet.Peers()) - if err != nil { - return nil, err - } - - return getPeersFromCh(outerResultCh), nil -} - -func getPeersFromCh(peerCh <-chan peer.ID) []peer.ID { - var peers []peer.ID - for p := range peerCh { - peers = append(peers, p) - } - return peers -} diff --git a/nesting/nesting.go b/nesting/nesting.go new file mode 100644 index 000000000..52c8ad40e --- /dev/null +++ b/nesting/nesting.go @@ -0,0 +1,123 @@ +package nesting + +import ( + "context" + "fmt" + "github.com/hashicorp/go-multierror" + "github.com/ipfs/go-cid" + ci "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/routing" + "github.com/pkg/errors" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/libp2p/go-libp2p-kad-dht" +) + +// DHT implements the routing interface to +type DHT struct { + Inner, Outer *dht.IpfsDHT +} + +// Assert that IPFS assumptions about interfaces aren't broken. These aren't a +// guarantee, but we can use them to aid refactoring. +var ( + _ routing.ContentRouting = (*DHT)(nil) + _ routing.Routing = (*DHT)(nil) + _ routing.PeerRouting = (*DHT)(nil) + _ routing.PubKeyFetcher = (*DHT)(nil) + _ routing.ValueStore = (*DHT)(nil) +) + +func New(ctx context.Context, h host.Host, innerOptions []dht.Option, outerOptions []dht.Option) (*DHT, error) { + inner, err := dht.New(ctx, h, innerOptions...) + if err != nil { + return nil, err + } + + outer, err := dht.New(ctx, h, outerOptions...) + if err != nil { + return nil, err + } + + d := &DHT{ + Inner: inner, + Outer: outer, + } + + return d, nil +} + +func (dht *DHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { + var innerResult []peer.ID + peerCh, err := dht.Inner.GetClosestPeersSeeded(ctx, key, nil) + if err == nil { + innerResult = getPeersFromCh(peerCh) + } + + outerResultCh, err := dht.Outer.GetClosestPeersSeeded(ctx, key, innerResult) + if err != nil { + return nil, err + } + + return getPeersFromCh(outerResultCh), nil +} + +func getPeersFromCh(peerCh <-chan peer.ID) []peer.ID { + var peers []peer.ID + for p := range peerCh { + peers = append(peers, p) + } + return peers +} + +func (dht *DHT) GetPublicKey(ctx context.Context, id peer.ID) (ci.PubKey, error) { + panic("implement me") +} + +func (dht *DHT) Provide(ctx context.Context, cid cid.Cid, b bool) error { + panic("implement me") +} + +func (dht *DHT) FindProvidersAsync(ctx context.Context, cid cid.Cid, i int) <-chan peer.AddrInfo { + panic("implement me") +} + +func (dht *DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { + panic("implement me") +} + +func (dht *DHT) PutValue(ctx context.Context, s string, bytes []byte, option ...routing.Option) error { + panic("implement me") +} + +func (dht *DHT) GetValue(ctx context.Context, s string, option ...routing.Option) ([]byte, error) { + panic("implement me") +} + +func (dht *DHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) { + panic("implement me") +} + +func (dht *DHT) Bootstrap(ctx context.Context) error { + errI := dht.Inner.Bootstrap(ctx) + errO := dht.Outer.Bootstrap(ctx) + + errs := make([]error, 0, 2) + if errI != nil { + errs = append(errs, errors.Wrap(errI, fmt.Sprintf("failed to bootstrap inner dht"))) + } + if errO != nil { + errs = append(errs, errors.Wrap(errI, fmt.Sprintf("failed to bootstrap outer dht"))) + } + + switch len(errs) { + case 0: + return nil + case 1: + return errs[0] + default: + return multierror.Append(errs[0], errs[1:]...) + } +} \ No newline at end of file