Skip to content

Commit

Permalink
Merge commits from main to v0.10.x release branch
Browse files Browse the repository at this point in the history
feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests (#284)

* feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests

* fixup! feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests

fix(responsemanager): fix flaky tests

fix(responsemanager): make fix more global

feat: add basic OT tracing for incoming requests

Closes: #271

docs(tests): document tracing test helper utilities

fix(test): increase 1s timeouts to 2s for slow CI (#289)

* fix(test): increase 1s timeouts to 2s for slow CI

* fixup! fix(test): increase 1s timeouts to 2s for slow CI

testutil/chaintypes: simplify maintenance of codegen (#294)

"go generate" now updates the generated code for us.

The separate directory for a main package was unnecessary;
a build-tag-ignored file is enough.

Using gofmt on the resulting source is now unnecessary too,
as upstream has been using go/format on its output for some time.

Finally, re-generate the output source code,
as the last time that was done we were on an older ipld-prime.

ipldutil: use chooser APIs from dagpb and basicnode (#292)

Saves us a bit of extra code, since they were added in summer.
Also avoid making defaultVisitor a variable,
which makes it clearer that it's never a nil func.

While here, replace node/basic with node/basicnode,
as the former has been deprecated in favor of the latter.

Co-authored-by: Hannah Howard <[email protected]>

fix: use sync.Cond to handle no-task blocking wait (#299)

Ref: #284

Peer Stats function (#298)

* feat(graphsync): add impl method for peer stats

add method that gets current request states by request ID for a given peer

* fix(requestmanager): fix tested method

Add a bit of logging (#301)

* chore(responsemanager): add a bit of logging

* fix(responsemanager): remove code change

chore: short-circuit unnecessary message processing

Expose task queue diagnostics (#302)

* feat(impl): expose task queue diagnostics

* refactor(peerstate): put peerstate in its own module

* refactor(peerstate): make diagnostics return array
  • Loading branch information
rvagg authored and hannahhoward committed Dec 9, 2021
1 parent 2925810 commit e24e96a
Show file tree
Hide file tree
Showing 43 changed files with 1,417 additions and 408 deletions.
2 changes: 1 addition & 1 deletion benchmarks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/node/basicnode"
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
Expand Down
2 changes: 1 addition & 1 deletion cidset/cidset.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime/fluent"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/node/basicnode"

"github.com/ipfs/go-graphsync/ipldutil"
)
Expand Down
2 changes: 1 addition & 1 deletion dedupkey/dedupkey.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package dedupkey

import (
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/node/basicnode"

"github.com/ipfs/go-graphsync/ipldutil"
)
Expand Down
2 changes: 1 addition & 1 deletion donotsendfirstblocks/donotsendfirstblocks.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package donotsendfirstblocks

import (
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/node/basicnode"

"github.com/ipfs/go-graphsync/ipldutil"
)
Expand Down
9 changes: 6 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log/v2 v2.1.1
github.com/ipfs/go-merkledag v0.3.2
github.com/ipfs/go-peertaskqueue v0.6.0
github.com/ipfs/go-peertaskqueue v0.7.1
github.com/ipfs/go-unixfs v0.2.4
github.com/ipld/go-codec-dagpb v1.3.0
github.com/ipld/go-ipld-prime v0.12.3
Expand All @@ -35,9 +35,12 @@ require (
github.com/libp2p/go-msgio v0.0.6
github.com/multiformats/go-multiaddr v0.3.1
github.com/multiformats/go-multihash v0.0.15
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.7.0
github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
go.opentelemetry.io/otel v1.2.0
go.opentelemetry.io/otel/sdk v1.2.0
go.opentelemetry.io/otel/trace v1.2.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
google.golang.org/protobuf v1.27.1
)
23 changes: 16 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gopacket v1.1.17 h1:rMrlX2ZY2UbvT+sdz3+6J+pp2z+msCq9MxTU6ymxbBY=
github.com/google/gopacket v1.1.17/go.mod h1:UdDNZ1OO62aGYVnPhxT1U6aI7ukYtA/kB8vaU0diBUM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
Expand Down Expand Up @@ -228,8 +229,8 @@ github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fG
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-peertaskqueue v0.1.0/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U=
github.com/ipfs/go-peertaskqueue v0.1.1/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U=
github.com/ipfs/go-peertaskqueue v0.6.0 h1:BT1/PuNViVomiz1PnnP5+WmKsTNHrxIDvkZrkj4JhOg=
github.com/ipfs/go-peertaskqueue v0.6.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
github.com/ipfs/go-peertaskqueue v0.7.1 h1:7PLjon3RZwRQMgOTvYccZ+mjzkmds/7YzSWKFlBAypE=
github.com/ipfs/go-peertaskqueue v0.7.1/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
github.com/ipfs/go-unixfs v0.2.4 h1:6NwppOXefWIyysZ4LR/qUBPvXd5//8J3jiMdvpbw6Lo=
github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw=
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
Expand Down Expand Up @@ -616,8 +617,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/urfave/cli/v2 v2.0.0 h1:+HU9SCbu8GnEUFtIBfuUNXN39ofWViIEJIp6SURMpCg=
Expand Down Expand Up @@ -650,6 +652,12 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opentelemetry.io/otel v1.2.0 h1:YOQDvxO1FayUcT9MIhJhgMyNO1WqoduiyvQHzGN0kUQ=
go.opentelemetry.io/otel v1.2.0/go.mod h1:aT17Fk0Z1Nor9e0uisf98LrntPGMnk4frBO9+dkf69I=
go.opentelemetry.io/otel/sdk v1.2.0 h1:wKN260u4DesJYhyjxDa7LRFkuhH7ncEVKU37LWcyNIo=
go.opentelemetry.io/otel/sdk v1.2.0/go.mod h1:jNN8QtpvbsKhgaC6V5lHiejMoKD+V8uadoSafgHPx1U=
go.opentelemetry.io/otel/trace v1.2.0 h1:Ys3iqbqZhcf28hHzrm5WAquMkDHNZTUkw7KHbuNjej0=
go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down Expand Up @@ -708,8 +716,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -732,8 +740,9 @@ golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 h1:46ULzRKLh1CwgRq2dC5SlBzEqqNCi8rreOZnNrbqcIY=
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
Expand Down
28 changes: 28 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,34 @@ type Stats struct {
OutgoingResponses ResponseStats
}

// RequestState describes the current general state of a request
type RequestState uint64

// RequestStates describe a set of request IDs and their current state
type RequestStates map[RequestID]RequestState

const (
// Queued means a request has been received and is queued for processing
Queued RequestState = iota
// Running means a request is actively sending or receiving data
Running
// Paused means a request is paused
Paused
)

func (rs RequestState) String() string {
switch rs {
case Queued:
return "queued"
case Running:
return "running"
case Paused:
return "paused"
default:
return "unrecognized request state"
}
}

// GraphExchange is a protocol that can exchange IPLD graphs based on a selector
type GraphExchange interface {
// Request initiates a new GraphSync request to the given peer using the given selector spec.
Expand Down
40 changes: 38 additions & 2 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"github.com/ipfs/go-peertaskqueue"
ipld "github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/allocator"
Expand All @@ -16,6 +19,7 @@ import (
"github.com/ipfs/go-graphsync/messagequeue"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/peermanager"
"github.com/ipfs/go-graphsync/peerstate"
"github.com/ipfs/go-graphsync/requestmanager"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
"github.com/ipfs/go-graphsync/requestmanager/executor"
Expand Down Expand Up @@ -304,6 +308,15 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,

// Request initiates a new GraphSync request to the given peer using the given selector spec.
func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
var extNames []string
for _, ext := range extensions {
extNames = append(extNames, string(ext.Name))
}
ctx, _ = otel.Tracer("graphsync").Start(ctx, "request", trace.WithAttributes(
attribute.String("peerID", p.Pretty()),
attribute.String("root", root.String()),
attribute.StringSlice("extensions", extNames),
))
return gs.requestManager.NewRequest(ctx, p, root, selector, extensions...)
}

Expand Down Expand Up @@ -446,6 +459,20 @@ func (gs *GraphSync) Stats() graphsync.Stats {
}
}

// PeerState describes the state of graphsync for a given peer
type PeerState struct {
OutgoingState peerstate.PeerState
IncomingState peerstate.PeerState
}

// PeerState produces insight on the current state of a given peer
func (gs *GraphSync) PeerState(p peer.ID) PeerState {
return PeerState{
OutgoingState: gs.requestManager.PeerState(p),
IncomingState: gs.responseManager.PeerState(p),
}
}

type graphSyncReceiver GraphSync

func (gsr *graphSyncReceiver) graphSync() *GraphSync {
Expand All @@ -458,8 +485,17 @@ func (gsr *graphSyncReceiver) ReceiveMessage(
ctx context.Context,
sender peer.ID,
incoming gsmsg.GraphSyncMessage) {
gsr.graphSync().responseManager.ProcessRequests(ctx, sender, incoming.Requests())
gsr.graphSync().requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks())

requests := incoming.Requests()
responses := incoming.Responses()
blocks := incoming.Blocks()

if len(requests) > 0 {
gsr.graphSync().responseManager.ProcessRequests(ctx, sender, requests)
}
if len(responses) > 0 || len(blocks) > 0 {
gsr.graphSync().requestManager.ProcessResponses(sender, responses, blocks)
}
}

// ReceiveError is part of the network's Receiver interface and handles incoming
Expand Down
Loading

0 comments on commit e24e96a

Please sign in to comment.