Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve 175 race condition, no change to hook timing #178

Merged
merged 9 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ require (
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-ipld-cbor v0.0.4 // indirect
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log v1.0.4
github.com/ipfs/go-log/v2 v2.1.1
github.com/ipfs/go-merkledag v0.3.1
github.com/ipfs/go-peertaskqueue v0.2.0
Expand Down
2 changes: 1 addition & 1 deletion impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package graphsync
import (
"context"

logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue"
ipld "github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down
2 changes: 1 addition & 1 deletion messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
"time"

logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"

gsmsg "github.com/ipfs/go-graphsync/message"
Expand Down
2 changes: 1 addition & 1 deletion network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"io"
"time"

logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/asyncloader/responsecache/responsecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"

blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"

Expand Down
17 changes: 9 additions & 8 deletions requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"

"github.com/hannahhoward/go-pubsub"
"golang.org/x/xerrors"
"sync/atomic"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -66,12 +67,12 @@ type AsyncLoader interface {
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
ctx context.Context
cancel func()
messages chan requestManagerMessage
peerHandler PeerHandler
rc *responseCollector
asyncLoader AsyncLoader
ctx context.Context
cancel func()
messages chan requestManagerMessage
peerHandler PeerHandler
rc *responseCollector
asyncLoader AsyncLoader
disconnectNotif *pubsub.PubSub
// dont touch out side of run loop
nextRequestID graphsync.RequestID
Expand Down
130 changes: 4 additions & 126 deletions responsemanager/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@ import (
"strings"
"time"

"github.com/ipfs/go-cid"
ipld "github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/dedupkey"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
Expand All @@ -26,12 +22,10 @@ var errCancelledByCommand = errors.New("response cancelled by responder")

// TODO: Move this into a seperate module and fully seperate from the ResponseManager
type queryExecutor struct {
requestHooks RequestHooks
blockHooks BlockHooks
updateHooks UpdateHooks
cancelledListeners CancelledListeners
responseAssembler ResponseAssembler
loader ipld.Loader
queryQueue QueryQueue
messages chan responseManagerMessage
ctx context.Context
Expand All @@ -57,9 +51,8 @@ func (qe *queryExecutor) processQueriesWorker() {
}
}
for _, task := range tasks {
key := task.Topic.(responseKey)
select {
case qe.messages <- &responseDataRequest{key, taskDataChan}:
case qe.messages <- &startTaskRequest{task, taskDataChan}:
case <-qe.ctx.Done():
return
}
Expand All @@ -72,132 +65,17 @@ func (qe *queryExecutor) processQueriesWorker() {
log.Info("Empty task on peer request stack")
continue
}
status, err := qe.executeTask(key, taskData)
status, err := qe.executeQuery(pid, taskData.request, taskData.loader, taskData.traverser, taskData.signals, taskData.subscriber)
isCancelled := err != nil && isContextErr(err)
if isCancelled {
qe.cancelledListeners.NotifyCancelledListeners(key.p, taskData.request)
qe.cancelledListeners.NotifyCancelledListeners(pid, taskData.request)
}
select {
case qe.messages <- &finishTaskRequest{key, status, err}:
case qe.messages <- &finishTaskRequest{task, status, err}:
case <-qe.ctx.Done():
}
}
qe.queryQueue.TasksDone(pid, tasks...)

}

}

func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) (graphsync.ResponseStatusCode, error) {
var err error
loader := taskData.loader
traverser := taskData.traverser
if loader == nil || traverser == nil {
var isPaused bool
loader, traverser, isPaused, err = qe.prepareQuery(taskData.ctx, key.p, taskData.request, taskData.signals, taskData.subscriber)
if err != nil {
return graphsync.RequestFailedUnknown, err
}
select {
case <-qe.ctx.Done():
return graphsync.RequestFailedUnknown, errors.New("context cancelled")
case qe.messages <- &setResponseDataRequest{key, loader, traverser}:
}
if isPaused {
return graphsync.RequestPaused, hooks.ErrPaused{}
}
}
return qe.executeQuery(key.p, taskData.request, loader, traverser, taskData.signals, taskData.subscriber)
}

func (qe *queryExecutor) prepareQuery(ctx context.Context,
p peer.ID,
request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) {
result := qe.requestHooks.ProcessRequestHooks(p, request)
var transactionError error
var isPaused bool
failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub}
err := qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
for _, extension := range result.Extensions {
rb.SendExtensionData(extension)
}
if result.Err != nil || !result.IsValidated {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
transactionError = errors.New("request not valid")
} else if result.IsPaused {
rb.PauseRequest()
isPaused = true
}
return nil
})
if err != nil {
return nil, nil, false, err
}
if transactionError != nil {
return nil, nil, false, transactionError
}
if err := qe.processDedupByKey(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
rootLink := cidlink.Link{Cid: request.Root()}
traverser := ipldutil.TraversalBuilder{
Root: rootLink,
Selector: request.Selector(),
Chooser: result.CustomChooser,
}.Start(ctx)
loader := result.CustomLoader
if loader == nil {
loader = qe.loader
}
return loader, traverser, isPaused, nil
}

func (qe *queryExecutor) processDedupByKey(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error {
dedupData, has := request.Extension(graphsync.ExtensionDeDupByKey)
if !has {
return nil
}
key, err := dedupkey.DecodeDedupKey(dedupData)
if err != nil {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
return nil
})
return err
}
qe.responseAssembler.DedupKey(p, request.ID(), key)
return nil
}

func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error {
doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
if !has {
return nil
}
cidSet, err := cidset.DecodeCidSet(doNotSendCidsData)
if err != nil {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
return nil
})
return err
}
links := make([]ipld.Link, 0, cidSet.Len())
err = cidSet.ForEach(func(c cid.Cid) error {
links = append(links, cidlink.Link{Cid: c})
return nil
})
if err != nil {
return err
}
qe.responseAssembler.IgnoreBlocks(p, request.ID(), links)
return nil
}

func (qe *queryExecutor) executeQuery(
Expand Down
115 changes: 115 additions & 0 deletions responsemanager/querypreparer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package responsemanager

import (
"context"
"errors"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/dedupkey"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/responsemanager/responseassembler"
ipld "github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/peer"
)

type queryPreparer struct {
requestHooks RequestHooks
responseAssembler ResponseAssembler
loader ipld.Loader
}

func (qe *queryPreparer) prepareQuery(ctx context.Context,
p peer.ID,
request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) {
log.Infof("Processing request hooks for request: %d", request.ID())
result := qe.requestHooks.ProcessRequestHooks(p, request)
var transactionError error
var isPaused bool
failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub}
err := qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
for _, extension := range result.Extensions {
rb.SendExtensionData(extension)
}
if result.Err != nil || !result.IsValidated {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
transactionError = errors.New("request not valid")
} else if result.IsPaused {
rb.PauseRequest()
isPaused = true
}
return nil
})
if err != nil {
return nil, nil, false, err
}
if transactionError != nil {
return nil, nil, false, transactionError
}
if err := qe.processDedupByKey(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
rootLink := cidlink.Link{Cid: request.Root()}
traverser := ipldutil.TraversalBuilder{
Root: rootLink,
Selector: request.Selector(),
Chooser: result.CustomChooser,
}.Start(ctx)
loader := result.CustomLoader
if loader == nil {
loader = qe.loader
}
return loader, traverser, isPaused, nil
}

func (qe *queryPreparer) processDedupByKey(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error {
dedupData, has := request.Extension(graphsync.ExtensionDeDupByKey)
if !has {
return nil
}
key, err := dedupkey.DecodeDedupKey(dedupData)
if err != nil {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
return nil
})
return err
}
qe.responseAssembler.DedupKey(p, request.ID(), key)
return nil
}

func (qe *queryPreparer) processDoNoSendCids(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error {
doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
if !has {
return nil
}
cidSet, err := cidset.DecodeCidSet(doNotSendCidsData)
if err != nil {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
return nil
})
return err
}
links := make([]ipld.Link, 0, cidSet.Len())
err = cidSet.ForEach(func(c cid.Cid) error {
links = append(links, cidlink.Link{Cid: c})
return nil
})
if err != nil {
return err
}
qe.responseAssembler.IgnoreBlocks(p, request.ID(), links)
return nil
}
2 changes: 1 addition & 1 deletion responsemanager/responseassembler/responseBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package responseassembler

import (
blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"

Expand Down
Loading