Skip to content
This repository has been archived by the owner on Sep 9, 2022. It is now read-only.

implement circuit v2 #136

Merged
merged 5 commits into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/ipfs/go-log v1.0.4
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210211060025-0db24c10d3bd
github.com/libp2p/go-libp2p-blankhost v0.2.0
github.com/libp2p/go-libp2p-core v0.7.0
github.com/libp2p/go-libp2p-swarm v0.3.0
github.com/libp2p/go-libp2p-transport-upgrader v0.3.0
github.com/libp2p/go-libp2p-core v0.8.3
github.com/libp2p/go-libp2p-peerstore v0.2.6
github.com/libp2p/go-libp2p-swarm v0.4.3
github.com/libp2p/go-libp2p-transport-upgrader v0.4.2
github.com/libp2p/go-msgio v0.0.6
github.com/libp2p/go-tcp-transport v0.2.0
github.com/multiformats/go-multiaddr v0.3.1
github.com/multiformats/go-varint v0.0.6
)
249 changes: 224 additions & 25 deletions go.sum

Large diffs are not rendered by default.

66 changes: 66 additions & 0 deletions v2/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package client

import (
"context"
"sync"

"github.com/libp2p/go-libp2p-circuit/v2/proto"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"

logging "github.com/ipfs/go-log"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
)

var log = logging.Logger("p2p-circuit")

// Client implements the client-side of the p2p-circuit/v2 protocol:
// - it implements dialing through v2 relays
// - it listens for incoming connections through v2 relays.
//
// For backwards compatibility with v1 relays and older nodes, the client will
// also accept relay connections through v1 relays and fallback dial peers using p2p-circuit/v1.
// This allows us to use the v2 code as drop in replacement for v1 in a host without breaking
// existing code and interoperability with older nodes.
type Client struct {
ctx context.Context
host host.Host
upgrader *tptu.Upgrader

incoming chan accept

mx sync.Mutex
activeDials map[peer.ID]*completion
hopCount map[peer.ID]int
}

type accept struct {
conn *Conn
writeResponse func() error
}

type completion struct {
ch chan struct{}
relay peer.ID
err error
}

// New constructs a new p2p-circuit/v2 client, attached to the given host and using the given
// upgrader to perform connection upgrades.
func New(ctx context.Context, h host.Host, upgrader *tptu.Upgrader) (*Client, error) {
return &Client{
ctx: ctx,
host: h,
upgrader: upgrader,
incoming: make(chan accept),
activeDials: make(map[peer.ID]*completion),
hopCount: make(map[peer.ID]int),
}, nil
}

// Start registers the circuit (client) protocol stream handlers
func (c *Client) Start() {
c.host.SetStreamHandler(proto.ProtoIDv1, c.handleStreamV1)
c.host.SetStreamHandler(proto.ProtoIDv2Stop, c.handleStreamV2)
}
145 changes: 145 additions & 0 deletions v2/client/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package client

import (
"fmt"
"net"
"time"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

// HopTagWeight is the connection manager weight for connections carrying relay hop streams
var HopTagWeight = 5

type statLimitDuration struct{}
type statLimitData struct{}

var (
StatLimitDuration = statLimitDuration{}
StatLimitData = statLimitData{}
)

type Conn struct {
stream network.Stream
remote peer.AddrInfo
stat network.Stat

client *Client
}

type NetAddr struct {
Relay string
Remote string
}

var _ net.Addr = (*NetAddr)(nil)

func (n *NetAddr) Network() string {
return "libp2p-circuit-relay"
}

func (n *NetAddr) String() string {
return fmt.Sprintf("relay[%s-%s]", n.Remote, n.Relay)
}

// Conn interface
var _ manet.Conn = (*Conn)(nil)

func (c *Conn) Close() error {
c.untagHop()
return c.stream.Reset()
}

func (c *Conn) Read(buf []byte) (int, error) {
return c.stream.Read(buf)
}

func (c *Conn) Write(buf []byte) (int, error) {
return c.stream.Write(buf)
}

func (c *Conn) SetDeadline(t time.Time) error {
return c.stream.SetDeadline(t)
}

func (c *Conn) SetReadDeadline(t time.Time) error {
return c.stream.SetReadDeadline(t)
}

func (c *Conn) SetWriteDeadline(t time.Time) error {
return c.stream.SetWriteDeadline(t)
}

// TODO: is it okay to cast c.Conn().RemotePeer() into a multiaddr? might be "user input"
func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
// TODO: We should be able to do this directly without converting to/from a string.
relayAddr, err := ma.NewComponent(
ma.ProtocolWithCode(ma.P_P2P).Name,
c.stream.Conn().RemotePeer().Pretty(),
)
if err != nil {
panic(err)
}
return ma.Join(c.stream.Conn().RemoteMultiaddr(), relayAddr, circuitAddr)
}

func (c *Conn) LocalMultiaddr() ma.Multiaddr {
return c.stream.Conn().LocalMultiaddr()
}

func (c *Conn) LocalAddr() net.Addr {
na, err := manet.ToNetAddr(c.stream.Conn().LocalMultiaddr())
if err != nil {
log.Error("failed to convert local multiaddr to net addr:", err)
return nil
}
return na
}

func (c *Conn) RemoteAddr() net.Addr {
return &NetAddr{
Relay: c.stream.Conn().RemotePeer().Pretty(),
Remote: c.remote.ID.Pretty(),
}
}

// ConnStat interface
var _ network.ConnStat = (*Conn)(nil)

func (c *Conn) Stat() network.Stat {
return c.stat
}

// tagHop tags the underlying relay connection so that it can be (somewhat) protected from the
// connection manager as it is an important connection that proxies other connections.
// This is handled here so that the user code doesnt need to bother with this and avoid
// clown shoes situations where a high value peer connection is behind a relayed connection and it is
// implicitly because the connection manager closed the underlying relay connection.
func (c *Conn) tagHop() {
c.client.mx.Lock()
defer c.client.mx.Unlock()

p := c.stream.Conn().RemotePeer()
c.client.hopCount[p]++
if c.client.hopCount[p] == 1 {
c.client.host.ConnManager().TagPeer(p, "relay-hop-stream", HopTagWeight)
}
}

// untagHop removes the relay-hop-stream tag if necessary; it is invoked when a relayed connection
// is closed.
func (c *Conn) untagHop() {
c.client.mx.Lock()
defer c.client.mx.Unlock()

p := c.stream.Conn().RemotePeer()
c.client.hopCount[p]--
if c.client.hopCount[p] == 0 {
c.client.host.ConnManager().UntagPeer(p, "relay-hop-stream")
delete(c.client.hopCount, p)
}
}
Loading