Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
Merge branch 'feat/stat'
Browse files Browse the repository at this point in the history
  • Loading branch information
Stebalien committed Aug 28, 2018
2 parents 6970287 + a13e3ec commit 4bf3943
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .gx/lastpubver
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.0.8: QmPWNZRUybw3nwJH3mpkrwB97YEQmXRkzvyh34rpJiih6Q
3.0.9: QmYSM6PKnCe9YVPNMisfpoBmczzHkA7h5Wrnc36DtdJhGo
26 changes: 13 additions & 13 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
"gxDependencies": [
{
"author": "whyrusleeping",
"hash": "QmX5J1q63BrrDTbpcHifrFbxH3cMZsvaNajy6u3zCpzBXs",
"hash": "QmQSbtGXCyNrj34LWL8EgXyNNYDZ8r3SwQcpW5pPxVhLnM",
"name": "go-libp2p-net",
"version": "3.0.7"
"version": "3.0.8"
},
{
"author": "whyrusleeping",
Expand All @@ -33,9 +33,9 @@
},
{
"author": "whyrusleeping",
"hash": "QmYr9RHifaqHTFZdAsUPLmiMAi2oNeEqA48AFKxXJAsLpJ",
"hash": "QmcDUyb52N62J8ZamGgUWUyWc1MtuCBce7WFA4D9xA6cwF",
"name": "go-libp2p-transport",
"version": "3.0.7"
"version": "3.0.8"
},
{
"author": "whyrusleeping",
Expand Down Expand Up @@ -63,9 +63,9 @@
},
{
"author": "whyrusleeping",
"hash": "QmcK89iqkFV8TqpRUgx1481YZbhjPFnBjqkpBQJfJqmSfm",
"hash": "Qmcw9fndogcYwyGs4a5TPDbnZPBLxvtrBZzpvyyVDzxDWT",
"name": "go-tcp-transport",
"version": "2.0.7"
"version": "2.0.8"
},
{
"author": "whyrusleeping",
Expand Down Expand Up @@ -134,28 +134,28 @@
},
{
"author": "stebalien",
"hash": "Qma6UXLMHjdVFExQZLYqdb5KAesbnoXuthQzovrwRZ64fG",
"hash": "QmSbkqfiFmJCdczVQ7mkFZf5FUUNpuP5Ne2LxY2htXGtrZ",
"name": "go-conn-security-multistream",
"version": "0.1.6"
"version": "0.1.7"
},
{
"author": "steb",
"hash": "QmfNvpHX396fhMeauERV6eFnSJg78rUjhjpFf1JvbjxaYM",
"hash": "QmefQrpDSYX6jQRtUyhcASFVBDkoAsDTPXemyxGMzA3phK",
"name": "go-libp2p-transport-upgrader",
"version": "0.1.7"
"version": "0.1.8"
},
{
"author": "whyrusleeping",
"hash": "QmWKKkNLFRcznF5vDqt2eeRsnQqQhwbjVf8zJ9KC2RXrzN",
"hash": "QmWri2HWdxHjWBUermhWy7QWJqN1cV8Gd1QbDiB5m86f1H",
"name": "go-libp2p-secio",
"version": "2.0.7"
"version": "2.0.8"
}
],
"gxVersion": "0.9.1",
"language": "go",
"license": "MIT",
"name": "go-libp2p-swarm",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "3.0.8"
"version": "3.0.9"
}

4 changes: 3 additions & 1 deletion swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *Swarm) Process() goprocess.Process {
return s.proc
}

func (s *Swarm) addConn(tc transport.Conn) (*Conn, error) {
func (s *Swarm) addConn(tc transport.Conn, dir inet.Direction) (*Conn, error) {
// The underlying transport (or the dialer) *should* filter it's own
// connections but we should double check anyways.
raddr := tc.RemoteMultiaddr()
Expand Down Expand Up @@ -194,9 +194,11 @@ func (s *Swarm) addConn(tc transport.Conn) (*Conn, error) {
}

// Wrap and register the connection.
stat := inet.Stat{Direction: dir}
c := &Conn{
conn: tc,
swarm: s,
stat: stat,
}
c.streams.m = make(map[*Stream]struct{})
s.conns.m[p] = append(s.conns.m[p], c)
Expand Down
15 changes: 12 additions & 3 deletions swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Conn struct {
sync.Mutex
m map[*Stream]struct{}
}

stat inet.Stat
}

// Close closes this connection.
Expand Down Expand Up @@ -98,7 +100,7 @@ func (c *Conn) start() {
}
c.swarm.refs.Add(1)
go func() {
s, err := c.addStream(ts)
s, err := c.addStream(ts, inet.DirInbound)

// Don't defer this. We don't want to block
// swarm shutdown on the connection handler.
Expand Down Expand Up @@ -158,16 +160,21 @@ func (c *Conn) RemotePublicKey() ic.PubKey {
return c.conn.RemotePublicKey()
}

// Stat returns metadata pertaining to this connection
func (c *Conn) Stat() inet.Stat {
return c.stat
}

// NewStream returns a new Stream from this connection
func (c *Conn) NewStream() (inet.Stream, error) {
ts, err := c.conn.OpenStream()
if err != nil {
return nil, err
}
return c.addStream(ts)
return c.addStream(ts, inet.DirOutbound)
}

func (c *Conn) addStream(ts smux.Stream) (*Stream, error) {
func (c *Conn) addStream(ts smux.Stream, dir inet.Direction) (*Stream, error) {
c.streams.Lock()
// Are we still online?
if c.streams.m == nil {
Expand All @@ -177,9 +184,11 @@ func (c *Conn) addStream(ts smux.Stream) (*Stream, error) {
}

// Wrap and register the stream.
stat := inet.Stat{Direction: dir}
s := &Stream{
stream: ts,
conn: c,
stat: stat,
}
c.streams.m[s] = struct{}{}

Expand Down
2 changes: 1 addition & 1 deletion swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
"localAddr": connC.LocalMultiaddr(),
"remoteAddr": connC.RemoteMultiaddr(),
}
swarmC, err := s.addConn(connC)
swarmC, err := s.addConn(connC, inet.DirOutbound)
if err != nil {
logdial["error"] = err.Error()
connC.Close() // close the connection. didn't work out :(
Expand Down
2 changes: 1 addition & 1 deletion swarm_listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
s.refs.Add(1)
go func() {
defer s.refs.Done()
_, err := s.addConn(c)
_, err := s.addConn(c, inet.DirInbound)
if err != nil {
// Probably just means that the swarm has been closed.
log.Warningf("add conn failed: ", err)
Expand Down
10 changes: 10 additions & 0 deletions swarm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ const (
streamReset
)

// Validate Stream conforms to the go-libp2p-net Stream interface
var _ inet.Stream = &Stream{}

// Stream is the stream type used by swarm. In general, you won't use this type
// directly.
type Stream struct {
Expand All @@ -36,6 +39,8 @@ type Stream struct {
notifyLk sync.Mutex

protocol atomic.Value

stat inet.Stat
}

func (s *Stream) String() string {
Expand Down Expand Up @@ -165,3 +170,8 @@ func (s *Stream) SetReadDeadline(t time.Time) error {
func (s *Stream) SetWriteDeadline(t time.Time) error {
return s.stream.SetWriteDeadline(t)
}

// Stat returns metadata information for this stream.
func (s *Stream) Stat() inet.Stat {
return s.stat
}

0 comments on commit 4bf3943

Please sign in to comment.