Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
count the number of streams on a connection for the stats
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Dec 12, 2021
1 parent 279f539 commit 6026e74
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 25 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ require (
github.com/ipfs/go-log/v2 v2.4.0
github.com/libp2p/go-addr-util v0.1.0
github.com/libp2p/go-conn-security-multistream v0.3.0
github.com/libp2p/go-libp2p-core v0.11.0
github.com/libp2p/go-libp2p-peerstore v0.4.0
github.com/libp2p/go-libp2p-core v0.13.0
github.com/libp2p/go-libp2p-peerstore v0.6.0
github.com/libp2p/go-libp2p-quic-transport v0.13.0
github.com/libp2p/go-libp2p-testing v0.5.0
github.com/libp2p/go-libp2p-transport-upgrader v0.5.0
github.com/libp2p/go-libp2p-transport-upgrader v0.6.0
github.com/libp2p/go-libp2p-yamux v0.5.0
github.com/libp2p/go-maddr-filter v0.1.0
github.com/libp2p/go-stream-muxer-multistream v0.3.0
Expand Down
17 changes: 8 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,10 @@ github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupV
github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q=
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
github.com/ipfs/go-log v1.0.4 h1:6nLQdX4W8P9yZZFH7mO+X/PzjN8Laozm/lMJ6esdgzY=
github.com/ipfs/go-log v1.0.4/go.mod h1:oDCg2FkjogeFOhqqb+N39l2RpTNPL6F/StPkB3kPgcs=
github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8=
github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo=
github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw=
github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM=
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
github.com/ipfs/go-log/v2 v2.3.0/go.mod h1:QqGoj30OTpnKaG/LKTGTxoP2mmQtjVMEnK72gynbe/g=
github.com/ipfs/go-log/v2 v2.4.0 h1:iR/2o9PGWanVJrBgIH5Ff8mPGOwpqLaPIAFqSnsdlzk=
github.com/ipfs/go-log/v2 v2.4.0/go.mod h1:nPZnh7Cj7lwS3LpRU5Mwr2ol1c2gXIEXuF6aywqrtmo=
Expand Down Expand Up @@ -279,14 +277,14 @@ github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL
github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZasHhFfQKibzTls0=
github.com/libp2p/go-libp2p-core v0.5.1/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.6/go.mod h1:dgHr0l0hIKfWpGpqAMbpo19pen9wJfdCGv51mTmdpmM=
github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-core v0.11.0 h1:75jAgdA+IChNa+/mZXogfmrGkgwxkVvxmIC7pV+F6sI=
github.com/libp2p/go-libp2p-core v0.11.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-core v0.12.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-core v0.13.0 h1:IFG/s8dN6JN2OTrXX9eq2wNU/Zlz2KLdwZUp5FplgXI=
github.com/libp2p/go-libp2p-core v0.13.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-mplex v0.4.1 h1:/pyhkP1nLwjG3OM+VuaNJkQT/Pqq73WzB3aDN3Fx1sc=
github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g=
github.com/libp2p/go-libp2p-peerstore v0.4.0 h1:DOhRJLnM9Dc9lIXi3rPDZBf789LXy1BrzwIs7Tj0cKA=
github.com/libp2p/go-libp2p-peerstore v0.4.0/go.mod h1:rDJUFyzEWPpXpEwywkcTYYzDHlwza8riYMaUzaN6hX0=
github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnHd/VBjL4mY5A=
github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-quic-transport v0.13.0 h1:MTVojS4AnGD/rng6rF/HXEqwMHL27rHUEf3DaqSdnUw=
Expand All @@ -297,8 +295,9 @@ github.com/libp2p/go-libp2p-testing v0.5.0 h1:bTjC29TTQ/ODq0ld3+0KLq3irdA5cAH3OM
github.com/libp2p/go-libp2p-testing v0.5.0/go.mod h1:QBk8fqIL1XNcno/l3/hhaIEn4aLRijpYOR+zVjjlh+A=
github.com/libp2p/go-libp2p-tls v0.3.0 h1:8BgvUJiOTcj0Gp6XvEicF0rL5aUtRg/UzEdeZDmDlC8=
github.com/libp2p/go-libp2p-tls v0.3.0/go.mod h1:fwF5X6PWGxm6IDRwF3V8AVCCj/hOd5oFlg+wo2FxJDY=
github.com/libp2p/go-libp2p-transport-upgrader v0.5.0 h1:7SDl3O2+AYOgfE40Mis83ClpfGNkNA6m4FwhbOHs+iI=
github.com/libp2p/go-libp2p-transport-upgrader v0.5.0/go.mod h1:Rc+XODlB3yce7dvFV4q/RmyJGsFcCZRkeZMu/Zdg0mo=
github.com/libp2p/go-libp2p-transport-upgrader v0.6.0 h1:GfMCU+2aGGEm1zW3UcOz6wYSn8tXQalFfVfcww99i5A=
github.com/libp2p/go-libp2p-transport-upgrader v0.6.0/go.mod h1:1e07y1ZSZdHo9HPbuU8IztM1Cj+DR5twgycb4pnRzRo=
github.com/libp2p/go-libp2p-yamux v0.5.0 h1:ZzmUhbQE+X7NuYUT2naxN31JyebZfRmpZVhKtRP13ys=
github.com/libp2p/go-libp2p-yamux v0.5.0/go.mod h1:AyR8k5EzyM2QN9Bbdg6X1SkVVuqLwTGf0L4DFq9g6po=
github.com/libp2p/go-maddr-filter v0.1.0 h1:4ACqZKw8AqiuJfwFGq1CYDFugfXTOos+qQ3DETkhtCE=
Expand Down
2 changes: 1 addition & 1 deletion swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
)

// create the Stat object, initializing with the underlying connection Stat if available
var stat network.Stat
var stat network.ConnStats
if cs, ok := tc.(network.ConnStat); ok {
stat = cs.Stat()
}
Expand Down
19 changes: 11 additions & 8 deletions swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Conn struct {
m map[*Stream]struct{}
}

stat network.Stat
stat network.ConnStats
}

func (c *Conn) ID() string {
Expand Down Expand Up @@ -90,6 +90,7 @@ func (c *Conn) doClose() {

func (c *Conn) removeStream(s *Stream) {
c.streams.Lock()
c.stat.NumStreams--
delete(c.streams.m, s)
c.streams.Unlock()
}
Expand Down Expand Up @@ -171,7 +172,9 @@ func (c *Conn) RemotePublicKey() ic.PubKey {
}

// Stat returns metadata pertaining to this connection
func (c *Conn) Stat() network.Stat {
func (c *Conn) Stat() network.ConnStats {
c.streams.Lock()
defer c.streams.Unlock()
return c.stat
}

Expand Down Expand Up @@ -201,16 +204,16 @@ func (c *Conn) addStream(ts mux.MuxedStream, dir network.Direction) (*Stream, er
}

// Wrap and register the stream.
stat := network.Stat{
Direction: dir,
Opened: time.Now(),
}
s := &Stream{
stream: ts,
conn: c,
stat: stat,
id: atomic.AddUint64(&c.swarm.nextStreamID, 1),
stat: network.Stats{
Direction: dir,
Opened: time.Now(),
},
id: atomic.AddUint64(&c.swarm.nextStreamID, 1),
}
c.stat.NumStreams++
c.streams.m[s] = struct{}{}

// Released once the stream disconnect notifications have finished
Expand Down
4 changes: 2 additions & 2 deletions swarm_net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func TestNetworkOpenStream(t *testing.T) {
t.Fatal(err)
}

numStreams := 0
var numStreams int
for _, conn := range nets[0].ConnsToPeer(nets[1].LocalPeer()) {
numStreams += len(conn.GetStreams())
numStreams += conn.Stat().NumStreams
}

if numStreams != 1 {
Expand Down
4 changes: 2 additions & 2 deletions swarm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Stream struct {

protocol atomic.Value

stat network.Stat
stat network.Stats
}

func (s *Stream) ID() string {
Expand Down Expand Up @@ -151,6 +151,6 @@ func (s *Stream) SetWriteDeadline(t time.Time) error {
}

// Stat returns metadata information for this stream.
func (s *Stream) Stat() network.Stat {
func (s *Stream) Stat() network.Stats {
return s.stat
}
43 changes: 43 additions & 0 deletions swarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,3 +424,46 @@ func TestPreventDialListenAddr(t *testing.T) {
t.Fatal("expected dial to fail: %w", err)
}
}

func TestStreamCount(t *testing.T) {
s1 := GenSwarm(t)
s2 := GenSwarm(t)
connectSwarms(t, context.Background(), []*swarm.Swarm{s2, s1})
time.Sleep(100 * time.Millisecond)

countStreams := func() (n int) {
var num int
for _, c := range s1.ConnsToPeer(s2.LocalPeer()) {
n += c.Stat().NumStreams
num += len(c.GetStreams())
}
require.Equal(t, n, num, "inconsistent stream count")
return
}

streams := make(chan network.Stream, 20)
streamAccepted := make(chan struct{}, 1)
s1.SetStreamHandler(func(str network.Stream) {
// fmt.Println("accepted stream")
streams <- str
streamAccepted <- struct{}{}
})

for i := 0; i < 10; i++ {
str, err := s2.NewStream(context.Background(), s1.LocalPeer())
require.NoError(t, err)
str.Write([]byte("foobar"))
<-streamAccepted
}
require.Eventually(t, func() bool { return len(streams) == 10 }, 5*time.Second, 10*time.Millisecond)
require.Equal(t, countStreams(), 10)
(<-streams).Reset()
(<-streams).Close()
require.Equal(t, countStreams(), 8)

str, err := s1.NewStream(context.Background(), s2.LocalPeer())
require.NoError(t, err)
require.Equal(t, countStreams(), 9)
str.Close()
require.Equal(t, countStreams(), 8)
}

0 comments on commit 6026e74

Please sign in to comment.