diff --git a/config/config.go b/config/config.go index dd3d3996b7..c503509465 100644 --- a/config/config.go +++ b/config/config.go @@ -5,10 +5,13 @@ import ( "fmt" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" + relay "github.com/libp2p/go-libp2p/p2p/host/relay" + routed "github.com/libp2p/go-libp2p/p2p/host/routed" logging "github.com/ipfs/go-log" circuit "github.com/libp2p/go-libp2p-circuit" crypto "github.com/libp2p/go-libp2p-crypto" + discovery "github.com/libp2p/go-libp2p-discovery" host "github.com/libp2p/go-libp2p-host" ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr" pnet "github.com/libp2p/go-libp2p-interface-pnet" @@ -16,6 +19,7 @@ import ( inet "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" + routing "github.com/libp2p/go-libp2p-routing" swarm "github.com/libp2p/go-libp2p-swarm" tptu "github.com/libp2p/go-libp2p-transport-upgrader" filter "github.com/libp2p/go-maddr-filter" @@ -31,6 +35,8 @@ type AddrsFactory = bhost.AddrsFactory // NATManagerC is a NATManager constructor. type NATManagerC func(inet.Network) bhost.NATManager +type RoutingC func(host.Host) (routing.PeerRouting, error) + // Config describes a set of settings for a libp2p node // // This is *not* a stable interface. Use the options defined in the root @@ -58,6 +64,8 @@ type Config struct { Reporter metrics.Reporter DisablePing bool + + Routing RoutingC } // NewNode constructs a new libp2p Host from the Config. @@ -99,8 +107,8 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) { swrm.Filters = cfg.Filters } - // TODO: make host implementation configurable. - h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{ + var h host.Host + h, err = bhost.NewHost(ctx, swrm, &bhost.HostOpts{ ConnManager: cfg.ConnManager, AddrsFactory: cfg.AddrsFactory, NATManager: cfg.NATManager, @@ -158,7 +166,37 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) { return nil, err } - // TODO: Configure routing (it's a pain to setup). + if cfg.Routing != nil { + router, err := cfg.Routing(h) + if err != nil { + h.Close() + return nil, err + } + + crouter, ok := router.(routing.ContentRouting) + if ok { + if cfg.Relay { + discovery := discovery.NewRoutingDiscovery(crouter) + + hop := false + for _, opt := range cfg.RelayOpts { + if opt == circuit.OptHop { + hop = true + break + } + } + + if hop { + h = relay.NewRelayHost(swrm.Context(), h.(*bhost.BasicHost), discovery) + } else { + h = relay.NewAutoRelayHost(swrm.Context(), h.(*bhost.BasicHost), discovery) + } + } + } + + h = routed.Wrap(h, router) + } + // TODO: Bootstrapping. return h, nil diff --git a/options.go b/options.go index 4ab4f39c73..0015c80818 100644 --- a/options.go +++ b/options.go @@ -260,6 +260,17 @@ func Ping(enable bool) Option { } } +// Routing will configure libp2p to use routing. +func Routing(rt config.RoutingC) Option { + return func(cfg *Config) error { + if cfg.Routing != nil { + return fmt.Errorf("cannot specify multiple routing options") + } + cfg.Routing = rt + return nil + } +} + // NoListenAddrs will configure libp2p to not listen by default. // // This will both clear any configured listen addrs and prevent libp2p from diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index d1bd0d653a..13452e2f9c 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -59,10 +59,11 @@ type BasicHost struct { ids *identify.IDService pings *ping.PingService natmgr NATManager - addrs AddrsFactory maResolver *madns.Resolver cmgr ifconnmgr.ConnManager + AddrsFactory AddrsFactory + negtimeout time.Duration proc goprocess.Process @@ -106,11 +107,11 @@ type HostOpts struct { // NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network. func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost, error) { h := &BasicHost{ - network: net, - mux: msmux.NewMultistreamMuxer(), - negtimeout: DefaultNegotiationTimeout, - addrs: DefaultAddrsFactory, - maResolver: madns.DefaultResolver, + network: net, + mux: msmux.NewMultistreamMuxer(), + negtimeout: DefaultNegotiationTimeout, + AddrsFactory: DefaultAddrsFactory, + maResolver: madns.DefaultResolver, } h.proc = goprocessctx.WithContextAndTeardown(ctx, func() error { @@ -136,7 +137,7 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost, } if opts.AddrsFactory != nil { - h.addrs = opts.AddrsFactory + h.AddrsFactory = opts.AddrsFactory } if opts.NATManager != nil { @@ -256,6 +257,12 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { go handle(protoID, s) } +// PushIdentify pushes an identify update through the identify push protocol +// Warning: this interface is unstable and may disappear in the future. +func (h *BasicHost) PushIdentify() { + h.ids.Push() +} + // ID returns the (local) peer.ID associated with this Host func (h *BasicHost) ID() peer.ID { return h.Network().LocalPeer() @@ -474,7 +481,7 @@ func (h *BasicHost) ConnManager() ifconnmgr.ConnManager { // Addrs returns listening addresses that are safe to announce to the network. // The output is the same as AllAddrs, but processed by AddrsFactory. func (h *BasicHost) Addrs() []ma.Multiaddr { - return h.addrs(h.AllAddrs()) + return h.AddrsFactory(h.AllAddrs()) } // mergeAddrs merges input address lists, leave only unique addresses diff --git a/p2p/host/relay/autorelay.go b/p2p/host/relay/autorelay.go new file mode 100644 index 0000000000..e97ce88321 --- /dev/null +++ b/p2p/host/relay/autorelay.go @@ -0,0 +1,304 @@ +package relay + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" + + basic "github.com/libp2p/go-libp2p/p2p/host/basic" + + autonat "github.com/libp2p/go-libp2p-autonat" + _ "github.com/libp2p/go-libp2p-circuit" + discovery "github.com/libp2p/go-libp2p-discovery" + host "github.com/libp2p/go-libp2p-host" + inet "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + pstore "github.com/libp2p/go-libp2p-peerstore" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr-net" +) + +const ( + RelayRendezvous = "/libp2p/relay" +) + +var ( + DesiredRelays = 3 + + BootDelay = 60 * time.Second + + unspecificRelay ma.Multiaddr +) + +func init() { + var err error + unspecificRelay, err = ma.NewMultiaddr("/p2p-circuit") + if err != nil { + panic(err) + } +} + +// AutoRelayHost is a Host that uses relays for connectivity when a NAT is detected. +type AutoRelayHost struct { + *basic.BasicHost + discover discovery.Discoverer + autonat autonat.AutoNAT + addrsF basic.AddrsFactory + + disconnect chan struct{} + + mx sync.Mutex + relays map[peer.ID]pstore.PeerInfo + addrs []ma.Multiaddr +} + +func NewAutoRelayHost(ctx context.Context, bhost *basic.BasicHost, discover discovery.Discoverer) *AutoRelayHost { + h := &AutoRelayHost{ + BasicHost: bhost, + discover: discover, + addrsF: bhost.AddrsFactory, + relays: make(map[peer.ID]pstore.PeerInfo), + disconnect: make(chan struct{}, 1), + } + h.autonat = autonat.NewAutoNAT(ctx, bhost, h.baseAddrs) + bhost.AddrsFactory = h.hostAddrs + bhost.Network().Notify(h) + go h.background(ctx) + return h +} + +func (h *AutoRelayHost) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + h.mx.Lock() + defer h.mx.Unlock() + if h.addrs != nil && h.autonat.Status() == autonat.NATStatusPrivate { + return h.addrs + } else { + return filterUnspecificRelay(h.addrsF(addrs)) + } +} + +func (h *AutoRelayHost) baseAddrs() []ma.Multiaddr { + return filterUnspecificRelay(h.addrsF(h.AllAddrs())) +} + +func (h *AutoRelayHost) background(ctx context.Context) { + select { + case <-time.After(autonat.AutoNATBootDelay + BootDelay): + case <-ctx.Done(): + return + } + + for { + wait := autonat.AutoNATRefreshInterval + switch h.autonat.Status() { + case autonat.NATStatusUnknown: + wait = autonat.AutoNATRetryInterval + case autonat.NATStatusPublic: + case autonat.NATStatusPrivate: + h.findRelays(ctx) + } + + select { + case <-h.disconnect: + // invalidate addrs + h.mx.Lock() + h.addrs = nil + h.mx.Unlock() + case <-time.After(wait): + case <-ctx.Done(): + return + } + } +} + +func (h *AutoRelayHost) findRelays(ctx context.Context) { + h.mx.Lock() + if len(h.relays) >= DesiredRelays { + h.mx.Unlock() + return + } + need := DesiredRelays - len(h.relays) + h.mx.Unlock() + + limit := 20 + if need > limit/2 { + limit = 2 * need + } + + dctx, cancel := context.WithTimeout(ctx, 60*time.Second) + pis, err := discovery.FindPeers(dctx, h.discover, RelayRendezvous, limit) + cancel() + if err != nil { + log.Debugf("error discovering relays: %s", err.Error()) + return + } + + pis = h.selectRelays(pis) + + update := 0 + + for _, pi := range pis { + h.mx.Lock() + if _, ok := h.relays[pi.ID]; ok { + h.mx.Unlock() + continue + } + h.mx.Unlock() + + cctx, cancel := context.WithTimeout(ctx, 60*time.Second) + err = h.Connect(cctx, pi) + cancel() + if err != nil { + log.Debugf("error connecting to relay %s: %s", pi.ID, err.Error()) + continue + } + + log.Debugf("connected to relay %s", pi.ID) + h.mx.Lock() + h.relays[pi.ID] = pi + h.mx.Unlock() + + // tag the connection as very important + h.ConnManager().TagPeer(pi.ID, "relay", 42) + + update++ + need-- + if need == 0 { + break + } + } + + if update > 0 || h.addrs == nil { + h.updateAddrs() + } +} + +func (h *AutoRelayHost) selectRelays(pis []pstore.PeerInfo) []pstore.PeerInfo { + // TODO better relay selection strategy; this just selects random relays + // but we should probably use ping latency as the selection metric + shuffleRelays(pis) + return pis +} + +func (h *AutoRelayHost) updateAddrs() { + h.doUpdateAddrs() + h.PushIdentify() +} + +// This function updates our NATed advertised addrs (h.addrs) +// The public addrs are rewritten so that they only retain the public IP part; they +// become undialable but are useful as a hint to the dialer to determine whether or not +// to dial private addrs. +// The non-public addrs are included verbatim so that peers behind the same NAT/firewall +// can still dial us directly. +// On top of those, we add the relay-specific addrs for the relays to which we are +// connected. For each non-private relay addr, we encapsulate the p2p-circuit addr +// through which we can be dialed. +func (h *AutoRelayHost) doUpdateAddrs() { + h.mx.Lock() + defer h.mx.Unlock() + + addrs := h.baseAddrs() + raddrs := make([]ma.Multiaddr, 0, len(addrs)+len(h.relays)) + + // remove our public addresses from the list and replace them by just the public IP + for _, addr := range addrs { + if manet.IsPublicAddr(addr) { + ip, err := addr.ValueForProtocol(ma.P_IP4) + if err == nil { + pub, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s", ip)) + if err != nil { + panic(err) + } + + if !containsAddr(raddrs, pub) { + raddrs = append(raddrs, pub) + } + continue + } + + ip, err = addr.ValueForProtocol(ma.P_IP6) + if err == nil { + pub, err := ma.NewMultiaddr(fmt.Sprintf("/ip6/%s", ip)) + if err != nil { + panic(err) + } + if !containsAddr(raddrs, pub) { + raddrs = append(raddrs, pub) + } + continue + } + } else { + raddrs = append(raddrs, addr) + } + } + + // add relay specific addrs to the list + for _, pi := range h.relays { + circuit, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit", pi.ID.Pretty())) + if err != nil { + panic(err) + } + + for _, addr := range pi.Addrs { + if !manet.IsPrivateAddr(addr) { + pub := addr.Encapsulate(circuit) + raddrs = append(raddrs, pub) + } + } + } + + h.addrs = raddrs +} + +func filterUnspecificRelay(addrs []ma.Multiaddr) []ma.Multiaddr { + res := make([]ma.Multiaddr, 0, len(addrs)) + for _, addr := range addrs { + if addr.Equal(unspecificRelay) { + continue + } + res = append(res, addr) + } + return res +} + +func shuffleRelays(pis []pstore.PeerInfo) { + for i := range pis { + j := rand.Intn(i + 1) + pis[i], pis[j] = pis[j], pis[i] + } +} + +func containsAddr(lst []ma.Multiaddr, addr ma.Multiaddr) bool { + for _, xaddr := range lst { + if xaddr.Equal(addr) { + return true + } + } + return false +} + +// notify +func (h *AutoRelayHost) Listen(inet.Network, ma.Multiaddr) {} +func (h *AutoRelayHost) ListenClose(inet.Network, ma.Multiaddr) {} +func (h *AutoRelayHost) Connected(inet.Network, inet.Conn) {} + +func (h *AutoRelayHost) Disconnected(_ inet.Network, c inet.Conn) { + p := c.RemotePeer() + h.mx.Lock() + defer h.mx.Unlock() + if _, ok := h.relays[p]; ok { + delete(h.relays, p) + select { + case h.disconnect <- struct{}{}: + default: + } + } +} + +func (h *AutoRelayHost) OpenedStream(inet.Network, inet.Stream) {} +func (h *AutoRelayHost) ClosedStream(inet.Network, inet.Stream) {} + +var _ host.Host = (*AutoRelayHost)(nil) diff --git a/p2p/host/relay/autorelay_test.go b/p2p/host/relay/autorelay_test.go new file mode 100644 index 0000000000..ce50ea8c09 --- /dev/null +++ b/p2p/host/relay/autorelay_test.go @@ -0,0 +1,219 @@ +package relay_test + +import ( + "context" + "net" + "sync" + "testing" + "time" + + libp2p "github.com/libp2p/go-libp2p" + relay "github.com/libp2p/go-libp2p/p2p/host/relay" + + ggio "github.com/gogo/protobuf/io" + cid "github.com/ipfs/go-cid" + autonat "github.com/libp2p/go-libp2p-autonat" + autonatpb "github.com/libp2p/go-libp2p-autonat/pb" + circuit "github.com/libp2p/go-libp2p-circuit" + host "github.com/libp2p/go-libp2p-host" + inet "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + pstore "github.com/libp2p/go-libp2p-peerstore" + routing "github.com/libp2p/go-libp2p-routing" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr-net" +) + +// test specific parameters +func init() { + autonat.AutoNATIdentifyDelay = 100 * time.Millisecond + autonat.AutoNATBootDelay = 1 * time.Second + relay.BootDelay = 1 * time.Second + manet.Private4 = []*net.IPNet{} +} + +// mock routing +type mockRoutingTable struct { + mx sync.Mutex + providers map[string]map[peer.ID]pstore.PeerInfo +} + +type mockRouting struct { + h host.Host + tab *mockRoutingTable +} + +func newMockRoutingTable() *mockRoutingTable { + return &mockRoutingTable{providers: make(map[string]map[peer.ID]pstore.PeerInfo)} +} + +func newMockRouting(h host.Host, tab *mockRoutingTable) *mockRouting { + return &mockRouting{h: h, tab: tab} +} + +func (m *mockRouting) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) { + return pstore.PeerInfo{}, routing.ErrNotFound +} + +func (m *mockRouting) Provide(ctx context.Context, cid cid.Cid, bcast bool) error { + m.tab.mx.Lock() + defer m.tab.mx.Unlock() + + pmap, ok := m.tab.providers[cid.String()] + if !ok { + pmap = make(map[peer.ID]pstore.PeerInfo) + m.tab.providers[cid.String()] = pmap + } + + pmap[m.h.ID()] = pstore.PeerInfo{ID: m.h.ID(), Addrs: m.h.Addrs()} + + return nil +} + +func (m *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, limit int) <-chan pstore.PeerInfo { + ch := make(chan pstore.PeerInfo) + go func() { + defer close(ch) + m.tab.mx.Lock() + defer m.tab.mx.Unlock() + + pmap, ok := m.tab.providers[cid.String()] + if !ok { + return + } + + for _, pi := range pmap { + select { + case ch <- pi: + case <-ctx.Done(): + return + } + } + }() + + return ch +} + +// mock autonat +func makeAutoNATServicePrivate(ctx context.Context, t *testing.T) host.Host { + h, err := libp2p.New(ctx) + if err != nil { + t.Fatal(err) + } + h.SetStreamHandler(autonat.AutoNATProto, sayAutoNATPrivate) + return h +} + +func sayAutoNATPrivate(s inet.Stream) { + defer s.Close() + w := ggio.NewDelimitedWriter(s) + res := autonatpb.Message{ + Type: autonatpb.Message_DIAL_RESPONSE.Enum(), + DialResponse: newDialResponseError(autonatpb.Message_E_DIAL_ERROR, "no dialable addresses"), + } + w.WriteMsg(&res) +} + +func newDialResponseError(status autonatpb.Message_ResponseStatus, text string) *autonatpb.Message_DialResponse { + dr := new(autonatpb.Message_DialResponse) + dr.Status = status.Enum() + dr.StatusText = &text + return dr +} + +// connector +func connect(t *testing.T, a, b host.Host) { + pinfo := pstore.PeerInfo{ID: a.ID(), Addrs: a.Addrs()} + err := b.Connect(context.Background(), pinfo) + if err != nil { + t.Fatal(err) + } +} + +// and the actual test! +func TestAutoRelay(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mtab := newMockRoutingTable() + makeRouting := func(h host.Host) (routing.PeerRouting, error) { + mr := newMockRouting(h, mtab) + return mr, nil + } + + h1 := makeAutoNATServicePrivate(ctx, t) + _, err := libp2p.New(ctx, libp2p.EnableRelay(circuit.OptHop), libp2p.Routing(makeRouting)) + if err != nil { + t.Fatal(err) + } + h3, err := libp2p.New(ctx, libp2p.EnableRelay(), libp2p.Routing(makeRouting)) + if err != nil { + t.Fatal(err) + } + h4, err := libp2p.New(ctx, libp2p.EnableRelay()) + + // verify that we don't advertise relay addrs initially + for _, addr := range h3.Addrs() { + _, err := addr.ValueForProtocol(circuit.P_CIRCUIT) + if err == nil { + t.Fatal("relay addr advertised before auto detection") + } + } + + // connect to AutoNAT and let detection/discovery work its magic + connect(t, h1, h3) + time.Sleep(3 * time.Second) + + // verify that we now advertise relay addrs (but not unspecific relay addrs) + unspecificRelay, err := ma.NewMultiaddr("/p2p-circuit") + if err != nil { + t.Fatal(err) + } + + haveRelay := false + for _, addr := range h3.Addrs() { + if addr.Equal(unspecificRelay) { + t.Fatal("unspecific relay addr advertised") + } + + _, err := addr.ValueForProtocol(circuit.P_CIRCUIT) + if err == nil { + haveRelay = true + } + } + + if !haveRelay { + t.Fatal("No relay addrs advertised") + } + + // verify that we can connect through the relay + var raddrs []ma.Multiaddr + for _, addr := range h3.Addrs() { + _, err := addr.ValueForProtocol(circuit.P_CIRCUIT) + if err == nil { + raddrs = append(raddrs, addr) + } + } + + err = h4.Connect(ctx, pstore.PeerInfo{ID: h3.ID(), Addrs: raddrs}) + if err != nil { + t.Fatal(err) + } + + // verify that we have pushed relay addrs to connected peers + haveRelay = false + for _, addr := range h1.Peerstore().Addrs(h3.ID()) { + if addr.Equal(unspecificRelay) { + t.Fatal("unspecific relay addr advertised") + } + + _, err := addr.ValueForProtocol(circuit.P_CIRCUIT) + if err == nil { + haveRelay = true + } + } + + if !haveRelay { + t.Fatal("No relay addrs pushed") + } +} diff --git a/p2p/host/relay/doc.go b/p2p/host/relay/doc.go new file mode 100644 index 0000000000..c0a5878113 --- /dev/null +++ b/p2p/host/relay/doc.go @@ -0,0 +1,30 @@ +/* +The relay package contains host implementations that automatically +advertise relay addresses when the presence of NAT is detected. This +feature is dubbed `autorelay`. + +Warning: the internal interfaces are unstable. + +System Components: +- AutoNATService instances -- see https://github.com/libp2p/go-libp2p-autonat-svc +- One or more relays, instances of `RelayHost` +- The autorelayed hosts, instances of `AutoRelayHost`. + +How it works: +- `AutoNATService` instances are instantiated in the + bootstrappers (or other well known publicly reachable hosts) + +- `RelayHost`s are constructed with + `libp2p.New(libp2p.EnableRelay(circuit.OptHop), libp2p.Routing(makeDHT))`. + They provide Relay Hop services, and advertise through the DHT + in the `/libp2p/relay` namespace + +- `AutoRelayHost`s are constructed with `libp2p.New(libp2p.Routing(makeDHT))` + They passively discover autonat service instances and test dialability of + their listen address set through them. When the presence of NAT is detected, + they discover relays through the DHT, connect to some of them and begin + advertising relay addresses. The new set of addresses is propagated to + connected peers through the `identify/push` protocol. + +*/ +package relay diff --git a/p2p/host/relay/log.go b/p2p/host/relay/log.go new file mode 100644 index 0000000000..eca0fa45a8 --- /dev/null +++ b/p2p/host/relay/log.go @@ -0,0 +1,7 @@ +package relay + +import ( + logging "github.com/ipfs/go-log" +) + +var log = logging.Logger("autorelay") diff --git a/p2p/host/relay/relay.go b/p2p/host/relay/relay.go new file mode 100644 index 0000000000..69b6737078 --- /dev/null +++ b/p2p/host/relay/relay.go @@ -0,0 +1,36 @@ +package relay + +import ( + "context" + + basic "github.com/libp2p/go-libp2p/p2p/host/basic" + + discovery "github.com/libp2p/go-libp2p-discovery" + host "github.com/libp2p/go-libp2p-host" + ma "github.com/multiformats/go-multiaddr" +) + +// RelayHost is a Host that provides Relay services. +type RelayHost struct { + *basic.BasicHost + advertise discovery.Advertiser + addrsF basic.AddrsFactory +} + +// New constructs a new RelayHost +func NewRelayHost(ctx context.Context, bhost *basic.BasicHost, advertise discovery.Advertiser) *RelayHost { + h := &RelayHost{ + BasicHost: bhost, + addrsF: bhost.AddrsFactory, + advertise: advertise, + } + bhost.AddrsFactory = h.hostAddrs + discovery.Advertise(ctx, advertise, RelayRendezvous) + return h +} + +func (h *RelayHost) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + return filterUnspecificRelay(h.addrsF(addrs)) +} + +var _ host.Host = (*RelayHost)(nil) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 2d32d1f418..5991fe6950 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -3,6 +3,7 @@ package identify import ( "context" "sync" + "time" pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" @@ -23,6 +24,9 @@ var log = logging.Logger("net/identify") // ID is the protocol.ID of the Identify Service. const ID = "/ipfs/id/1.0.0" +// IDPush is the protocol.ID of the Identify push protocol +const IDPush = "/ipfs/id/push/1.0.0" + // LibP2PVersion holds the current protocol version for a client running this code // TODO(jbenet): fix the versioning mess. const LibP2PVersion = "ipfs/0.1.0" @@ -60,6 +64,7 @@ func NewIDService(h host.Host) *IDService { currid: make(map[inet.Conn]chan struct{}), } h.SetStreamHandler(ID, s.requestHandler) + h.SetStreamHandler(IDPush, s.pushHandler) h.Network().Notify((*netNotifiee)(s)) return s } @@ -138,6 +143,26 @@ func (ids *IDService) responseHandler(s inet.Stream) { go inet.FullClose(s) } +func (ids *IDService) pushHandler(s inet.Stream) { + ids.responseHandler(s) +} + +func (ids *IDService) Push() { + for _, p := range ids.Host.Network().Peers() { + go func(p peer.ID) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + s, err := ids.Host.NewStream(ctx, p, IDPush) + if err != nil { + log.Debugf("error opening push stream: %s", err.Error()) + return + } + + ids.requestHandler(s) + }(p) + } +} + func (ids *IDService) populateMessage(mes *pb.Identify, c inet.Conn) { // set protocols this node is currently handling diff --git a/package.json b/package.json index c83cc672a5..65d8edec28 100644 --- a/package.json +++ b/package.json @@ -226,6 +226,18 @@ "hash": "QmdxUuburamoF6zF9qjeQC4WYcWGbWuRmdLacMEsW8ioD8", "name": "gogo-protobuf", "version": "0.0.0" + }, + { + "author": "vyzo", + "hash": "QmT8eNT96scr6wcrARzyxrCcsBAPKn9GkEx2TeXZniHiMP", + "name": "go-libp2p-discovery", + "version": "1.0.2" + }, + { + "author": "vyzo", + "hash": "QmU2XQcwPxikg1jZqDBMFgLQVan7zjT2HtQWrWui3vbUVS", + "name": "go-libp2p-autonat", + "version": "1.0.2" } ], "gxVersion": "0.4.0",