Skip to content

Commit

Permalink
Update Requests (#63)
Browse files Browse the repository at this point in the history
* feat(graphsync): add update to message

add update to message protobuf. also fix deserialization error with cancel requests

* feat(requestmanager): support updates for response hooks

Support sending request updates in incoming response hooks. Also factor hooks out of request manager

* refactor(hooks): make hooks approach consistent

for ease of use and learning in library, make hooks structure consistent across requestor and
responder

* feat(responsemanager): update request processing

Add the ability to processing update requests as needed

* test(responsemanager): add more tests for update behavior

* test(integration):add update integration test

Add test to verify the whole update flow

* fix(lint): fix lint error
  • Loading branch information
hannahhoward authored Apr 21, 2020
1 parent 79c195c commit 9ada784
Show file tree
Hide file tree
Showing 25 changed files with 1,720 additions and 759 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/ipfs/go-graphsync
go 1.12

require (
github.com/filecoin-project/go-data-transfer v0.0.0-20200408061858-82c58b423ca6
github.com/filecoin-project/go-fil-markets v0.0.0-20200408062434-d92f329a6428
github.com/gogo/protobuf v1.3.1
github.com/ipfs/go-block-format v0.0.2
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ github.com/filecoin-project/go-amt-ipld/v2 v2.0.1-0.20200131012142-05d80eeccc5e
github.com/filecoin-project/go-amt-ipld/v2 v2.0.1-0.20200131012142-05d80eeccc5e/go.mod h1:boRtQhzmxNocrMxOXo1NYn4oUc1NGvR8tEa79wApNXg=
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v0.0.0-20200408061858-82c58b423ca6 h1:CIQ7RlW7I3E+JBxfKiK0ZWO9HPSBqlI5aeA/sdwyVTc=
github.com/filecoin-project/go-data-transfer v0.0.0-20200408061858-82c58b423ca6/go.mod h1:7b5/sG9Jj33aWqft8XsH8yIdxZBACqS5tx9hv4uj2Ck=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5/go.mod h1:JbkIgFF/Z9BDlvrJO1FuKkaWsH673/UdFaiVS6uIHlA=
github.com/filecoin-project/go-fil-markets v0.0.0-20200408062434-d92f329a6428 h1:y8P10ZwfmsKMVHrqcU1L8Bgj2q42O6LzaySI+XaogXE=
Expand Down
40 changes: 33 additions & 7 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ const (
// PartialResponse may include blocks and metadata about the in progress response
// in extra.
PartialResponse = ResponseStatusCode(14)
// RequestPaused indicates a request is paused and will not send any more data
// until unpaused
RequestPaused = ResponseStatusCode(15)

// Success Response Codes (request terminated)

Expand Down Expand Up @@ -157,19 +160,34 @@ type IncomingRequestHookActions interface {
ValidateRequest()
}

// OutgoingBlockHookActions are actions that an outgoing block hook can take to
// change the execution of a request
type OutgoingBlockHookActions interface {
SendExtensionData(ExtensionData)
TerminateWithError(error)
PauseResponse()
}

// OutgoingRequestHookActions are actions that an outgoing request hook can take
// to change the execution of this request
// to change the execution of a request
type OutgoingRequestHookActions interface {
UsePersistenceOption(name string)
UseNodeBuilderChooser(traversal.NodeBuilderChooser)
}

// OutgoingBlockHookActions are actions that an outgoing block hook can take to
// change the execution of this request
type OutgoingBlockHookActions interface {
SendExtensionData(ExtensionData)
// IncomingResponseHookActions are actions that incoming response hook can take
// to change the execution of a request
type IncomingResponseHookActions interface {
TerminateWithError(error)
PauseResponse()
UpdateRequestWithExtensions(...ExtensionData)
}

// RequestUpdatedHookActions are actions that can be taken in a request updated hook to
// change execution of the response
type RequestUpdatedHookActions interface {
TerminateWithError(error)
SendExtensionData(ExtensionData)
UnpauseResponse()
}

// OnIncomingRequestHook is a hook that runs each time a new request is received.
Expand All @@ -180,7 +198,7 @@ type OnIncomingRequestHook func(p peer.ID, request RequestData, hookActions Inco
// OnIncomingResponseHook is a hook that runs each time a new response is received.
// It receives the peer that sent the response and all data about the response.
// If it returns an error processing is halted and the original request is cancelled.
type OnIncomingResponseHook func(p peer.ID, responseData ResponseData) error
type OnIncomingResponseHook func(p peer.ID, responseData ResponseData, hookActions IncomingResponseHookActions)

// OnOutgoingRequestHook is a hook that runs immediately prior to sending a request
// It receives the peer we're sending a request to and all the data aobut the request
Expand All @@ -194,6 +212,11 @@ type OnOutgoingRequestHook func(p peer.ID, request RequestData, hookActions Outg
// It receives an interface for taking further action on the response
type OnOutgoingBlockHook func(p peer.ID, request RequestData, block BlockData, hookActions OutgoingBlockHookActions)

// OnRequestUpdatedHook is a hook that runs when an update to a request is received
// It receives the peer we're sending to, the original request, the request update
// It receives an interface to taking further action on the response
type OnRequestUpdatedHook func(p peer.ID, request RequestData, updateRequest RequestData, hookActions RequestUpdatedHookActions)

// UnregisterHookFunc is a function call to unregister a hook that was previously registered
type UnregisterHookFunc func()

Expand All @@ -217,6 +240,9 @@ type GraphExchange interface {
// RegisterOutgoingBlockHook adds a hook that runs every time a block is sent from a responder
RegisterOutgoingBlockHook(hook OnOutgoingBlockHook) UnregisterHookFunc

// RegisterRequestUpdatedHook adds a hook that runs every time an update to a request is received
RegisterRequestUpdatedHook(hook OnRequestUpdatedHook) UnregisterHookFunc

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
UnpauseResponse(peer.ID, RequestID) error
}
34 changes: 24 additions & 10 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"github.com/ipfs/go-graphsync/peermanager"
"github.com/ipfs/go-graphsync/requestmanager"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
requestorhooks "github.com/ipfs/go-graphsync/requestmanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager"
"github.com/ipfs/go-graphsync/responsemanager/blockhooks"
responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
"github.com/ipfs/go-graphsync/responsemanager/requesthooks"
"github.com/ipfs/go-graphsync/selectorvalidator"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-peertaskqueue"
Expand All @@ -38,8 +38,11 @@ type GraphSync struct {
peerResponseManager *peerresponsemanager.PeerResponseManager
peerTaskQueue *peertaskqueue.PeerTaskQueue
peerManager *peermanager.PeerMessageManager
incomingRequestHooks *requesthooks.IncomingRequestHooks
outgoingBlockHooks *blockhooks.OutgoingBlockHooks
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
persistenceOptions *persistenceoptions.PersistenceOptions
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -69,16 +72,19 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
}
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
asyncLoader := asyncloader.New(ctx, loader, storer)
requestManager := requestmanager.New(ctx, asyncLoader)
incomingResponseHooks := requestorhooks.NewResponseHooks()
outgoingRequestHooks := requestorhooks.NewRequestHooks()
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks)
peerTaskQueue := peertaskqueue.New()
createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
return peerresponsemanager.NewResponseSender(ctx, p, peerManager)
}
peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
persistenceOptions := persistenceoptions.New()
incomingRequestHooks := requesthooks.New(persistenceOptions)
outgoingBlockHooks := blockhooks.New()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks)
incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
outgoingBlockHooks := responderhooks.NewBlockHooks()
requestUpdatedHooks := responderhooks.NewUpdateHooks()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks)
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
Expand All @@ -90,6 +96,9 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
persistenceOptions: persistenceOptions,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
peerTaskQueue: peerTaskQueue,
peerResponseManager: peerResponseManager,
responseManager: responseManager,
Expand Down Expand Up @@ -125,12 +134,12 @@ func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingReques

// RegisterIncomingResponseHook adds a hook that runs when a response is received
func (gs *GraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
return gs.requestManager.RegisterResponseHook(hook)
return gs.incomingResponseHooks.Register(hook)
}

// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request
func (gs *GraphSync) RegisterOutgoingRequestHook(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc {
return gs.requestManager.RegisterRequestHook(hook)
return gs.outgoingRequestHooks.Register(hook)
}

// RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default
Expand All @@ -147,6 +156,11 @@ func (gs *GraphSync) RegisterOutgoingBlockHook(hook graphsync.OnOutgoingBlockHoo
return gs.outgoingBlockHooks.Register(hook)
}

// RegisterRequestUpdatedHook registers a hook that runs when an update to a request is received
func (gs *GraphSync) RegisterRequestUpdatedHook(hook graphsync.OnRequestUpdatedHook) graphsync.UnregisterHookFunc {
return gs.requestUpdatedHooks.Register(hook)
}

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.UnpauseResponse(p, requestID)
Expand Down
69 changes: 67 additions & 2 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,11 @@ func TestGraphsyncRoundTrip(t *testing.T) {
var receivedRequestData []byte

requestor.RegisterIncomingResponseHook(
func(p peer.ID, responseData graphsync.ResponseData) error {
func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
data, has := responseData.Extension(td.extensionName)
if has {
receivedResponseData = data
}
return nil
})

responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
Expand Down Expand Up @@ -272,6 +271,65 @@ func TestPauseResume(t *testing.T) {

}

func TestPauseResumeViaUpdate(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

var receivedReponseData []byte
var receivedUpdateData []byte
// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

requestor.RegisterIncomingResponseHook(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
if response.Status() == graphsync.RequestPaused {
var has bool
receivedReponseData, has = response.Extension(td.extensionName)
if has {
hookActions.UpdateRequestWithExtensions(td.extensionUpdate)
}
}
})

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
stopPoint := 50
blocksSent := 0
responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
_, has := requestData.Extension(td.extensionName)
if has {
blocksSent++
if blocksSent == stopPoint {
hookActions.SendExtensionData(td.extensionResponse)
hookActions.PauseResponse()
}
} else {
hookActions.TerminateWithError(errors.New("should have sent extension"))
}
})
responder.RegisterRequestUpdatedHook(func(p peer.ID, request graphsync.RequestData, update graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
var has bool
receivedUpdateData, has = update.Extension(td.extensionName)
if has {
hookActions.UnpauseResponse()
}
})
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyWholeChain(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

require.Equal(t, td.extensionResponseData, receivedReponseData, "did not receive correct extension response data")
require.Equal(t, td.extensionUpdateData, receivedUpdateData, "did not receive correct extension update data")
}

func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
// create network
ctx := context.Background()
Expand Down Expand Up @@ -539,6 +597,8 @@ type gsTestData struct {
extension graphsync.ExtensionData
extensionResponseData []byte
extensionResponse graphsync.ExtensionData
extensionUpdateData []byte
extensionUpdate graphsync.ExtensionData
}

func newGsTestData(ctx context.Context, t *testing.T) *gsTestData {
Expand Down Expand Up @@ -571,6 +631,11 @@ func newGsTestData(ctx context.Context, t *testing.T) *gsTestData {
Name: td.extensionName,
Data: td.extensionResponseData,
}
td.extensionUpdateData = testutil.RandomBytes(100)
td.extensionUpdate = graphsync.ExtensionData{
Name: td.extensionName,
Data: td.extensionUpdateData,
}

return td
}
Expand Down
38 changes: 29 additions & 9 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type GraphSyncRequest struct {
id graphsync.RequestID
extensions map[string][]byte
isCancel bool
isUpdate bool
}

// GraphSyncResponse is an struct to capture data on a response sent back
Expand Down Expand Up @@ -110,12 +111,17 @@ func NewRequest(id graphsync.RequestID,
priority graphsync.Priority,
extensions ...graphsync.ExtensionData) GraphSyncRequest {

return newRequest(id, root, selector, priority, false, toExtensionsMap(extensions))
return newRequest(id, root, selector, priority, false, false, toExtensionsMap(extensions))
}

// CancelRequest request generates a request to cancel an in progress request
func CancelRequest(id graphsync.RequestID) GraphSyncRequest {
return newRequest(id, cid.Cid{}, nil, 0, true, nil)
return newRequest(id, cid.Cid{}, nil, 0, true, false, nil)
}

// UpdateRequest generates a new request to update an in progress request with the given extensions
func UpdateRequest(id graphsync.RequestID, extensions ...graphsync.ExtensionData) GraphSyncRequest {
return newRequest(id, cid.Cid{}, nil, 0, false, true, toExtensionsMap(extensions))
}

func toExtensionsMap(extensions []graphsync.ExtensionData) (extensionsMap map[string][]byte) {
Expand All @@ -133,13 +139,15 @@ func newRequest(id graphsync.RequestID,
selector ipld.Node,
priority graphsync.Priority,
isCancel bool,
isUpdate bool,
extensions map[string][]byte) GraphSyncRequest {
return GraphSyncRequest{
id: id,
root: root,
selector: selector,
priority: priority,
isCancel: isCancel,
isUpdate: isUpdate,
extensions: extensions,
}
}
Expand All @@ -162,15 +170,23 @@ func newResponse(requestID graphsync.RequestID,
func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
gsm := newMsg()
for _, req := range pbm.Requests {
root, err := cid.Cast(req.Root)
if err != nil {
return nil, err
var root cid.Cid
var err error
if !req.Cancel && !req.Update {
root, err = cid.Cast(req.Root)
if err != nil {
return nil, err
}
}
selector, err := ipldutil.DecodeNode(req.Selector)
if err != nil {
return nil, err

var selector ipld.Node
if !req.Cancel && !req.Update {
selector, err = ipldutil.DecodeNode(req.Selector)
if err != nil {
return nil, err
}
}
gsm.AddRequest(newRequest(graphsync.RequestID(req.Id), root, selector, graphsync.Priority(req.Priority), req.Cancel, req.GetExtensions()))
gsm.AddRequest(newRequest(graphsync.RequestID(req.Id), root, selector, graphsync.Priority(req.Priority), req.Cancel, req.Update, req.GetExtensions()))
}

for _, res := range pbm.Responses {
Expand Down Expand Up @@ -273,6 +289,7 @@ func (gsm *graphSyncMessage) ToProto() (*pb.Message, error) {
Selector: selector,
Priority: int32(request.priority),
Cancel: request.isCancel,
Update: request.isUpdate,
Extensions: request.extensions,
})
}
Expand Down Expand Up @@ -349,6 +366,9 @@ func (gsr GraphSyncRequest) Extension(name graphsync.ExtensionName) ([]byte, boo
// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }

// IsUpdate returns true if this particular request is being updated
func (gsr GraphSyncRequest) IsUpdate() bool { return gsr.isUpdate }

// RequestID returns the request ID for this response
func (gsr GraphSyncResponse) RequestID() graphsync.RequestID { return gsr.requestID }

Expand Down
Loading

0 comments on commit 9ada784

Please sign in to comment.