Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Requests #63

Merged
merged 7 commits into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/ipfs/go-graphsync
go 1.12

require (
github.com/filecoin-project/go-data-transfer v0.0.0-20200408061858-82c58b423ca6
github.com/filecoin-project/go-fil-markets v0.0.0-20200408062434-d92f329a6428
github.com/gogo/protobuf v1.3.1
github.com/ipfs/go-block-format v0.0.2
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ github.com/filecoin-project/go-amt-ipld/v2 v2.0.1-0.20200131012142-05d80eeccc5e
github.com/filecoin-project/go-amt-ipld/v2 v2.0.1-0.20200131012142-05d80eeccc5e/go.mod h1:boRtQhzmxNocrMxOXo1NYn4oUc1NGvR8tEa79wApNXg=
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v0.0.0-20200408061858-82c58b423ca6 h1:CIQ7RlW7I3E+JBxfKiK0ZWO9HPSBqlI5aeA/sdwyVTc=
github.com/filecoin-project/go-data-transfer v0.0.0-20200408061858-82c58b423ca6/go.mod h1:7b5/sG9Jj33aWqft8XsH8yIdxZBACqS5tx9hv4uj2Ck=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5/go.mod h1:JbkIgFF/Z9BDlvrJO1FuKkaWsH673/UdFaiVS6uIHlA=
github.com/filecoin-project/go-fil-markets v0.0.0-20200408062434-d92f329a6428 h1:y8P10ZwfmsKMVHrqcU1L8Bgj2q42O6LzaySI+XaogXE=
Expand Down
40 changes: 33 additions & 7 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ const (
// PartialResponse may include blocks and metadata about the in progress response
// in extra.
PartialResponse = ResponseStatusCode(14)
// RequestPaused indicates a request is paused and will not send any more data
// until unpaused
RequestPaused = ResponseStatusCode(15)

// Success Response Codes (request terminated)

Expand Down Expand Up @@ -157,19 +160,34 @@ type IncomingRequestHookActions interface {
ValidateRequest()
}

// OutgoingBlockHookActions are actions that an outgoing block hook can take to
// change the execution of a request
type OutgoingBlockHookActions interface {
SendExtensionData(ExtensionData)
TerminateWithError(error)
PauseResponse()
}

// OutgoingRequestHookActions are actions that an outgoing request hook can take
// to change the execution of this request
// to change the execution of a request
type OutgoingRequestHookActions interface {
UsePersistenceOption(name string)
UseNodeBuilderChooser(traversal.NodeBuilderChooser)
}

// OutgoingBlockHookActions are actions that an outgoing block hook can take to
// change the execution of this request
type OutgoingBlockHookActions interface {
SendExtensionData(ExtensionData)
// IncomingResponseHookActions are actions that incoming response hook can take
// to change the execution of a request
type IncomingResponseHookActions interface {
TerminateWithError(error)
PauseResponse()
UpdateRequestWithExtensions(...ExtensionData)
}

// RequestUpdatedHookActions are actions that can be taken in a request updated hook to
// change execution of the response
type RequestUpdatedHookActions interface {
TerminateWithError(error)
SendExtensionData(ExtensionData)
UnpauseResponse()
}

// OnIncomingRequestHook is a hook that runs each time a new request is received.
Expand All @@ -180,7 +198,7 @@ type OnIncomingRequestHook func(p peer.ID, request RequestData, hookActions Inco
// OnIncomingResponseHook is a hook that runs each time a new response is received.
// It receives the peer that sent the response and all data about the response.
// If it returns an error processing is halted and the original request is cancelled.
type OnIncomingResponseHook func(p peer.ID, responseData ResponseData) error
type OnIncomingResponseHook func(p peer.ID, responseData ResponseData, hookActions IncomingResponseHookActions)

// OnOutgoingRequestHook is a hook that runs immediately prior to sending a request
// It receives the peer we're sending a request to and all the data aobut the request
Expand All @@ -194,6 +212,11 @@ type OnOutgoingRequestHook func(p peer.ID, request RequestData, hookActions Outg
// It receives an interface for taking further action on the response
type OnOutgoingBlockHook func(p peer.ID, request RequestData, block BlockData, hookActions OutgoingBlockHookActions)

// OnRequestUpdatedHook is a hook that runs when an update to a request is received
// It receives the peer we're sending to, the original request, the request update
// It receives an interface to taking further action on the response
type OnRequestUpdatedHook func(p peer.ID, request RequestData, updateRequest RequestData, hookActions RequestUpdatedHookActions)

// UnregisterHookFunc is a function call to unregister a hook that was previously registered
type UnregisterHookFunc func()

Expand All @@ -217,6 +240,9 @@ type GraphExchange interface {
// RegisterOutgoingBlockHook adds a hook that runs every time a block is sent from a responder
RegisterOutgoingBlockHook(hook OnOutgoingBlockHook) UnregisterHookFunc

// RegisterRequestUpdatedHook adds a hook that runs every time an update to a request is received
RegisterRequestUpdatedHook(hook OnRequestUpdatedHook) UnregisterHookFunc

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
UnpauseResponse(peer.ID, RequestID) error
}
34 changes: 24 additions & 10 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"github.com/ipfs/go-graphsync/peermanager"
"github.com/ipfs/go-graphsync/requestmanager"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
requestorhooks "github.com/ipfs/go-graphsync/requestmanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager"
"github.com/ipfs/go-graphsync/responsemanager/blockhooks"
responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
"github.com/ipfs/go-graphsync/responsemanager/requesthooks"
"github.com/ipfs/go-graphsync/selectorvalidator"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-peertaskqueue"
Expand All @@ -38,8 +38,11 @@ type GraphSync struct {
peerResponseManager *peerresponsemanager.PeerResponseManager
peerTaskQueue *peertaskqueue.PeerTaskQueue
peerManager *peermanager.PeerMessageManager
incomingRequestHooks *requesthooks.IncomingRequestHooks
outgoingBlockHooks *blockhooks.OutgoingBlockHooks
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
persistenceOptions *persistenceoptions.PersistenceOptions
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -69,16 +72,19 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
}
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
asyncLoader := asyncloader.New(ctx, loader, storer)
requestManager := requestmanager.New(ctx, asyncLoader)
incomingResponseHooks := requestorhooks.NewResponseHooks()
outgoingRequestHooks := requestorhooks.NewRequestHooks()
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks)
peerTaskQueue := peertaskqueue.New()
createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
return peerresponsemanager.NewResponseSender(ctx, p, peerManager)
}
peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
persistenceOptions := persistenceoptions.New()
incomingRequestHooks := requesthooks.New(persistenceOptions)
outgoingBlockHooks := blockhooks.New()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks)
incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
outgoingBlockHooks := responderhooks.NewBlockHooks()
requestUpdatedHooks := responderhooks.NewUpdateHooks()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks)
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
Expand All @@ -90,6 +96,9 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
persistenceOptions: persistenceOptions,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
peerTaskQueue: peerTaskQueue,
peerResponseManager: peerResponseManager,
responseManager: responseManager,
Expand Down Expand Up @@ -125,12 +134,12 @@ func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingReques

// RegisterIncomingResponseHook adds a hook that runs when a response is received
func (gs *GraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
return gs.requestManager.RegisterResponseHook(hook)
return gs.incomingResponseHooks.Register(hook)
}

// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request
func (gs *GraphSync) RegisterOutgoingRequestHook(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc {
return gs.requestManager.RegisterRequestHook(hook)
return gs.outgoingRequestHooks.Register(hook)
}

// RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default
Expand All @@ -147,6 +156,11 @@ func (gs *GraphSync) RegisterOutgoingBlockHook(hook graphsync.OnOutgoingBlockHoo
return gs.outgoingBlockHooks.Register(hook)
}

// RegisterRequestUpdatedHook registers a hook that runs when an update to a request is received
func (gs *GraphSync) RegisterRequestUpdatedHook(hook graphsync.OnRequestUpdatedHook) graphsync.UnregisterHookFunc {
return gs.requestUpdatedHooks.Register(hook)
}

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.UnpauseResponse(p, requestID)
Expand Down
69 changes: 67 additions & 2 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,11 @@ func TestGraphsyncRoundTrip(t *testing.T) {
var receivedRequestData []byte

requestor.RegisterIncomingResponseHook(
func(p peer.ID, responseData graphsync.ResponseData) error {
func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
data, has := responseData.Extension(td.extensionName)
if has {
receivedResponseData = data
}
return nil
})

responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
Expand Down Expand Up @@ -272,6 +271,65 @@ func TestPauseResume(t *testing.T) {

}

func TestPauseResumeViaUpdate(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

var receivedReponseData []byte
var receivedUpdateData []byte
// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

requestor.RegisterIncomingResponseHook(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
if response.Status() == graphsync.RequestPaused {
var has bool
receivedReponseData, has = response.Extension(td.extensionName)
if has {
hookActions.UpdateRequestWithExtensions(td.extensionUpdate)
}
}
})

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
stopPoint := 50
blocksSent := 0
responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
_, has := requestData.Extension(td.extensionName)
if has {
blocksSent++
if blocksSent == stopPoint {
hookActions.SendExtensionData(td.extensionResponse)
hookActions.PauseResponse()
}
} else {
hookActions.TerminateWithError(errors.New("should have sent extension"))
}
})
responder.RegisterRequestUpdatedHook(func(p peer.ID, request graphsync.RequestData, update graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
var has bool
receivedUpdateData, has = update.Extension(td.extensionName)
if has {
hookActions.UnpauseResponse()
}
})
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyWholeChain(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

require.Equal(t, td.extensionResponseData, receivedReponseData, "did not receive correct extension response data")
require.Equal(t, td.extensionUpdateData, receivedUpdateData, "did not receive correct extension update data")
}

func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
// create network
ctx := context.Background()
Expand Down Expand Up @@ -539,6 +597,8 @@ type gsTestData struct {
extension graphsync.ExtensionData
extensionResponseData []byte
extensionResponse graphsync.ExtensionData
extensionUpdateData []byte
extensionUpdate graphsync.ExtensionData
}

func newGsTestData(ctx context.Context, t *testing.T) *gsTestData {
Expand Down Expand Up @@ -571,6 +631,11 @@ func newGsTestData(ctx context.Context, t *testing.T) *gsTestData {
Name: td.extensionName,
Data: td.extensionResponseData,
}
td.extensionUpdateData = testutil.RandomBytes(100)
td.extensionUpdate = graphsync.ExtensionData{
Name: td.extensionName,
Data: td.extensionUpdateData,
}

return td
}
Expand Down
38 changes: 29 additions & 9 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type GraphSyncRequest struct {
id graphsync.RequestID
extensions map[string][]byte
isCancel bool
isUpdate bool
}

// GraphSyncResponse is an struct to capture data on a response sent back
Expand Down Expand Up @@ -110,12 +111,17 @@ func NewRequest(id graphsync.RequestID,
priority graphsync.Priority,
extensions ...graphsync.ExtensionData) GraphSyncRequest {

return newRequest(id, root, selector, priority, false, toExtensionsMap(extensions))
return newRequest(id, root, selector, priority, false, false, toExtensionsMap(extensions))
}

// CancelRequest request generates a request to cancel an in progress request
func CancelRequest(id graphsync.RequestID) GraphSyncRequest {
return newRequest(id, cid.Cid{}, nil, 0, true, nil)
return newRequest(id, cid.Cid{}, nil, 0, true, false, nil)
}

// UpdateRequest generates a new request to update an in progress request with the given extensions
func UpdateRequest(id graphsync.RequestID, extensions ...graphsync.ExtensionData) GraphSyncRequest {
return newRequest(id, cid.Cid{}, nil, 0, false, true, toExtensionsMap(extensions))
}

func toExtensionsMap(extensions []graphsync.ExtensionData) (extensionsMap map[string][]byte) {
Expand All @@ -133,13 +139,15 @@ func newRequest(id graphsync.RequestID,
selector ipld.Node,
priority graphsync.Priority,
isCancel bool,
isUpdate bool,
extensions map[string][]byte) GraphSyncRequest {
return GraphSyncRequest{
id: id,
root: root,
selector: selector,
priority: priority,
isCancel: isCancel,
isUpdate: isUpdate,
extensions: extensions,
}
}
Expand All @@ -162,15 +170,23 @@ func newResponse(requestID graphsync.RequestID,
func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
gsm := newMsg()
for _, req := range pbm.Requests {
root, err := cid.Cast(req.Root)
if err != nil {
return nil, err
var root cid.Cid
var err error
if !req.Cancel && !req.Update {
root, err = cid.Cast(req.Root)
if err != nil {
return nil, err
}
}
selector, err := ipldutil.DecodeNode(req.Selector)
if err != nil {
return nil, err

var selector ipld.Node
if !req.Cancel && !req.Update {
selector, err = ipldutil.DecodeNode(req.Selector)
if err != nil {
return nil, err
}
}
gsm.AddRequest(newRequest(graphsync.RequestID(req.Id), root, selector, graphsync.Priority(req.Priority), req.Cancel, req.GetExtensions()))
gsm.AddRequest(newRequest(graphsync.RequestID(req.Id), root, selector, graphsync.Priority(req.Priority), req.Cancel, req.Update, req.GetExtensions()))
}

for _, res := range pbm.Responses {
Expand Down Expand Up @@ -273,6 +289,7 @@ func (gsm *graphSyncMessage) ToProto() (*pb.Message, error) {
Selector: selector,
Priority: int32(request.priority),
Cancel: request.isCancel,
Update: request.isUpdate,
Extensions: request.extensions,
})
}
Expand Down Expand Up @@ -349,6 +366,9 @@ func (gsr GraphSyncRequest) Extension(name graphsync.ExtensionName) ([]byte, boo
// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }

// IsUpdate returns true if this particular request is being updated
func (gsr GraphSyncRequest) IsUpdate() bool { return gsr.isUpdate }

// RequestID returns the request ID for this response
func (gsr GraphSyncResponse) RequestID() graphsync.RequestID { return gsr.requestID }

Expand Down
Loading