Skip to content

Commit

Permalink
Update graphsync & fix in-progress request memory leak by consuming r…
Browse files Browse the repository at this point in the history
…esponses (#109)

* feat(deps): update to graphsync v0.4.1

* feat(graphsync): consume response channel

consume response channel so graphsync does not buffer responses in memory

* fix(deps): update graphsync to fix bug
  • Loading branch information
hannahhoward authored Oct 29, 2020
1 parent 75d8843 commit 501c1db
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 24 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.1
github.com/ipfs/go-graphsync v0.3.1
github.com/ipfs/go-graphsync v0.4.2
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand All @@ -26,7 +26,7 @@ require (
github.com/ipfs/go-log/v2 v2.0.3
github.com/ipfs/go-merkledag v0.3.2
github.com/ipfs/go-unixfs v0.2.4
github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/jpillora/backoff v1.0.0
github.com/libp2p/go-libp2p v0.6.0
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ github.com/ipfs/go-ds-badger v0.2.1 h1:RsC9DDlwFhFdfT+s2PeC8joxbSp2YMufK8w/RBOxK
github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.3.1 h1:dJLYrck4oyJDfMVhGEKiWHxaY8oYMWko4m2Fi+4bofo=
github.com/ipfs/go-graphsync v0.3.1/go.mod h1:bw4LiLM5Oq/uLdzEtih9LK8GrwSijv+XqYiWCTxHMqs=
github.com/ipfs/go-graphsync v0.4.2 h1:Y/jt5r619yj0LI7OLtGKh4jYm8goYUcuJ09y7TZ3zMo=
github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=
Expand Down Expand Up @@ -290,10 +290,10 @@ github.com/ipfs/go-unixfs v0.2.4 h1:6NwppOXefWIyysZ4LR/qUBPvXd5//8J3jiMdvpbw6Lo=
github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw=
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f h1:XpOuNQ5GbXxUcSukbQcW9jkE7REpaFGJU2/T00fo9kA=
github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM=
github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6 h1:6Mq+tZGSEMEoJJ1NbJRhddeelkXZcU8yfH/ZRYUo/Es=
github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1:3pHYooM9Ea65jewRwrb2u5uHZCNkNTe9ABsVB+SrkH0=
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018 h1:RbRHv8epkmvBYA5cGfz68GUSbOgx5j/7ObLIl4Rsif0=
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM=
github.com/ipld/go-ipld-prime-proto v0.1.0 h1:j7gjqrfwbT4+gXpHwEx5iMssma3mnctC7YaCimsFP70=
github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE=
github.com/jackpal/gateway v1.0.5 h1:qzXWUJfuMdlLMtt0a3Dgt+xkWQiA5itDEITVJtuSwMc=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/go-nat-pmp v1.0.1 h1:i0LektDkO1QlrTm/cSuP+PyBCDnYvjPLGl4LdWEMiaA=
Expand Down
3 changes: 2 additions & 1 deletion testutil/fakegraphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type ReceivedGraphSyncRequest struct {
Root ipld.Link
Selector ipld.Node
Extensions []graphsync.ExtensionData
ResponseChan chan graphsync.ResponseProgress
ResponseErrChan chan error
}

Expand Down Expand Up @@ -247,8 +248,8 @@ func (fgs *FakeGraphSync) AssertDoesNotHavePersistenceOption(t *testing.T, name
// Request initiates a new GraphSync request to the given peer using the given selector spec.
func (fgs *FakeGraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
errors := make(chan error)
fgs.requests <- ReceivedGraphSyncRequest{ctx, p, root, selector, extensions, errors}
responses := make(chan graphsync.ResponseProgress)
fgs.requests <- ReceivedGraphSyncRequest{ctx, p, root, selector, extensions, responses, errors}
if !fgs.leaveRequestsOpen {
close(responses)
close(errors)
Expand Down
25 changes: 10 additions & 15 deletions transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,29 +126,24 @@ func (t *Transport) OpenChannel(ctx context.Context,
Data: bz}
exts = append(exts, doNotSendExt)
}
_, errChan := t.gs.Request(internalCtx, dataSender, root, stor, exts...)
responseChan, errChan := t.gs.Request(internalCtx, dataSender, root, stor, exts...)

go t.executeGsRequest(ctx, channelID, errChan)
go t.executeGsRequest(ctx, channelID, responseChan, errChan)
return nil
}

func (t *Transport) consumeResponses(ctx context.Context, errChan <-chan error) error {
func (t *Transport) consumeResponses(responseChan <-chan graphsync.ResponseProgress, errChan <-chan error) error {
var lastError error
for {
select {
case <-ctx.Done():
return errContextCancelled
case err, ok := <-errChan:
if !ok {
return lastError
}
lastError = err
}
for range responseChan {
}
for err := range errChan {
lastError = err
}
return lastError
}

func (t *Transport) executeGsRequest(ctx context.Context, channelID datatransfer.ChannelID, errChan <-chan error) {
lastError := t.consumeResponses(ctx, errChan)
func (t *Transport) executeGsRequest(ctx context.Context, channelID datatransfer.ChannelID, responseChan <-chan graphsync.ResponseProgress, errChan <-chan error) {
lastError := t.consumeResponses(responseChan, errChan)

if _, ok := lastError.(graphsync.RequestContextCancelledErr); ok {
log.Warnf("graphsync request context cancelled, channel Id: %v", channelID)
Expand Down
3 changes: 3 additions & 0 deletions transport/graphsync/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ func TestManager(t *testing.T) {
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
requestReceived := gsData.fgs.AssertRequestReceived(gsData.ctx, t)
close(requestReceived.ResponseChan)
close(requestReceived.ResponseErrChan)

require.Eventually(t, func() bool {
Expand All @@ -734,6 +735,7 @@ func TestManager(t *testing.T) {
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
requestReceived := gsData.fgs.AssertRequestReceived(gsData.ctx, t)
close(requestReceived.ResponseChan)
requestReceived.ResponseErrChan <- graphsync.RequestFailedUnknownErr{}
close(requestReceived.ResponseErrChan)

Expand Down Expand Up @@ -789,6 +791,7 @@ func TestManager(t *testing.T) {
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
requestReceived := gsData.fgs.AssertRequestReceived(gsData.ctx, t)
close(requestReceived.ResponseChan)
requestReceived.ResponseErrChan <- graphsync.RequestContextCancelledErr{}
close(requestReceived.ResponseErrChan)

Expand Down

0 comments on commit 501c1db

Please sign in to comment.