Skip to content

Commit

Permalink
Outgoing Request Hooks, swapping persistence layers (#61)
Browse files Browse the repository at this point in the history
* feat(graphsync): define hook interfaces, test infrastructure

rename hooks, add outgoing request hook, define chain types for node builder chooser tests

* feat(graphsync): define persistence option

add persistence options to asyncloader & responsemanager + handling

* feat(graphsync): complete persistence / node builder chooser roundtrip

* fix(lint): fix lint errors
  • Loading branch information
hannahhoward authored Apr 8, 2020
1 parent 40a6408 commit e1a98fc
Show file tree
Hide file tree
Showing 15 changed files with 2,300 additions and 384 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
run:
skip-files:
- testutil/chaintypes/testchain_gen.go
47 changes: 30 additions & 17 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,27 +135,37 @@ type ResponseData interface {
Extension(name ExtensionName) ([]byte, bool)
}

// RequestReceivedHookActions are actions that a request hook can take to change
// IncomingRequestHookActions are actions that a request hook can take to change
// behavior for the response
type RequestReceivedHookActions interface {
type IncomingRequestHookActions interface {
SendExtensionData(ExtensionData)
UseLoader(ipld.Loader)
UsePersistenceOption(name string)
UseNodeBuilderChooser(traversal.NodeBuilderChooser)
TerminateWithError(error)
ValidateRequest()
}

// OnRequestReceivedHook is a hook that runs each time a request is received.
// OutgoingRequestHookActions are actions that an outgoing request hook can take
// to change the execution of this request
type OutgoingRequestHookActions interface {
UsePersistenceOption(name string)
UseNodeBuilderChooser(traversal.NodeBuilderChooser)
}

// OnIncomingRequestHook is a hook that runs each time a new request is received.
// It receives the peer that sent the request and all data about the request.
// It should return:
// extensionData - any extension data to add to the outgoing response
// err - error - if not nil, halt request and return RequestRejected with the responseData
type OnRequestReceivedHook func(p peer.ID, request RequestData, hookActions RequestReceivedHookActions)
// It receives an interface for customizing the response to this request
type OnIncomingRequestHook func(p peer.ID, request RequestData, hookActions IncomingRequestHookActions)

// OnResponseReceivedHook is a hook that runs each time a response is received.
// OnIncomingResponseHook is a hook that runs each time a new response is received.
// It receives the peer that sent the response and all data about the response.
// If it returns an error processing is halted and the original request is cancelled.
type OnResponseReceivedHook func(p peer.ID, responseData ResponseData) error
type OnIncomingResponseHook func(p peer.ID, responseData ResponseData) error

// OnOutgoingRequestHook is a hook that runs immediately prior to sending a request
// It receives the peer we're sending a request to and all the data aobut the request
// It receives an interface for customizing how we handle executing this request
type OnOutgoingRequestHook func(p peer.ID, request RequestData, hookActions OutgoingRequestHookActions)

// UnregisterHookFunc is a function call to unregister a hook that was previously registered
type UnregisterHookFunc func()
Expand All @@ -165,12 +175,15 @@ type GraphExchange interface {
// Request initiates a new GraphSync request to the given peer using the given selector spec.
Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...ExtensionData) (<-chan ResponseProgress, <-chan error)

// RegisterRequestReceivedHook adds a hook that runs when a request is received
// If overrideDefaultValidation is set to true, then if the hook does not error,
// it is considered to have "validated" the request -- and that validation supersedes
// the normal validation of requests Graphsync does (i.e. all selectors can be accepted)
RegisterRequestReceivedHook(hook OnRequestReceivedHook) UnregisterHookFunc
// RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default
RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error

// RegisterIncomingRequestHook adds a hook that runs when a request is received
RegisterIncomingRequestHook(hook OnIncomingRequestHook) UnregisterHookFunc

// RegisterIncomingResponseHook adds a hook that runs when a response is received
RegisterIncomingResponseHook(OnIncomingResponseHook) UnregisterHookFunc

// RegisterResponseReceivedHook adds a hook that runs when a response is received
RegisterResponseReceivedHook(OnResponseReceivedHook) UnregisterHookFunc
// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request
RegisterOutgoingRequestHook(hook OnOutgoingRequestHook) UnregisterHookFunc
}
28 changes: 21 additions & 7 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
}
peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue)
unregisterDefaultValidator := responseManager.RegisterHook(selectorvalidator.SelectorValidator(maxRecursionDepth))
unregisterDefaultValidator := responseManager.RegisterRequestHook(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
loader: loader,
Expand Down Expand Up @@ -103,17 +103,31 @@ func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, sel
return gs.requestManager.SendRequest(ctx, p, root, selector, extensions...)
}

// RegisterRequestReceivedHook adds a hook that runs when a request is received
// RegisterIncomingRequestHook adds a hook that runs when a request is received
// If overrideDefaultValidation is set to true, then if the hook does not error,
// it is considered to have "validated" the request -- and that validation supersedes
// the normal validation of requests Graphsync does (i.e. all selectors can be accepted)
func (gs *GraphSync) RegisterRequestReceivedHook(hook graphsync.OnRequestReceivedHook) graphsync.UnregisterHookFunc {
return gs.responseManager.RegisterHook(hook)
func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingRequestHook) graphsync.UnregisterHookFunc {
return gs.responseManager.RegisterRequestHook(hook)
}

// RegisterResponseReceivedHook adds a hook that runs when a response is received
func (gs *GraphSync) RegisterResponseReceivedHook(hook graphsync.OnResponseReceivedHook) graphsync.UnregisterHookFunc {
return gs.requestManager.RegisterHook(hook)
// RegisterIncomingResponseHook adds a hook that runs when a response is received
func (gs *GraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
return gs.requestManager.RegisterResponseHook(hook)
}

// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request
func (gs *GraphSync) RegisterOutgoingRequestHook(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc {
return gs.requestManager.RegisterRequestHook(hook)
}

// RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default
func (gs *GraphSync) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error {
err := gs.asyncLoader.RegisterPersistenceOption(name, loader, storer)
if err != nil {
return err
}
return gs.responseManager.RegisterPersistenceOption(name, loader)
}

type graphSyncReceiver GraphSync
Expand Down
73 changes: 68 additions & 5 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func TestSendResponseToIncomingRequest(t *testing.T) {
var receivedRequestData []byte
// initialize graphsync on second node to response to requests
gsnet := td.GraphSyncHost2()
gsnet.RegisterRequestReceivedHook(
func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) {
gsnet.RegisterIncomingRequestHook(
func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
var has bool
receivedRequestData, has = requestData.Extension(td.extensionName)
require.True(t, has, "did not have expected extension")
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestGraphsyncRoundTrip(t *testing.T) {
var receivedResponseData []byte
var receivedRequestData []byte

requestor.RegisterResponseReceivedHook(
requestor.RegisterIncomingResponseHook(
func(p peer.ID, responseData graphsync.ResponseData) error {
data, has := responseData.Extension(td.extensionName)
if has {
Expand All @@ -199,7 +199,7 @@ func TestGraphsyncRoundTrip(t *testing.T) {
return nil
})

responder.RegisterRequestReceivedHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) {
responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
var has bool
receivedRequestData, has = requestData.Extension(td.extensionName)
if !has {
Expand All @@ -220,6 +220,69 @@ func TestGraphsyncRoundTrip(t *testing.T) {
require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data")
}

func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()

// alternate storing location for responder
altStore1 := make(map[ipld.Link][]byte)
altLoader1, altStorer1 := testutil.NewTestStore(altStore1)

// alternate storing location for requestor
altStore2 := make(map[ipld.Link][]byte)
altLoader2, altStorer2 := testutil.NewTestStore(altStore2)

err := requestor.RegisterPersistenceOption("chainstore", altLoader1, altStorer1)
require.NoError(t, err)

err = responder.RegisterPersistenceOption("chainstore", altLoader2, altStorer2)
require.NoError(t, err)

blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, altLoader1, altStorer2, 100, blockChainLength)

extensionName := graphsync.ExtensionName("blockchain")
extension := graphsync.ExtensionData{
Name: extensionName,
Data: nil,
}

requestor.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) {
_, has := requestData.Extension(extensionName)
if has {
hookActions.UseNodeBuilderChooser(blockChain.Chooser)
hookActions.UsePersistenceOption("chainstore")
}
})
responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
_, has := requestData.Extension(extensionName)
if has {
hookActions.UseNodeBuilderChooser(blockChain.Chooser)
hookActions.UsePersistenceOption("chainstore")
}
})

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector())
testutil.VerifyEmptyResponse(ctx, t, progressChan)
testutil.VerifySingleTerminalError(ctx, t, errChan)

progressChan, errChan = requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension)

blockChain.VerifyWholeChainWithTypes(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, 0, "should store no blocks in normal store")
require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store")
}

// TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work
// under a specific of adverse conditions:
// -- large blocks being returned by a query
Expand Down Expand Up @@ -360,7 +423,7 @@ func TestUnixFSFetch(t *testing.T) {
requestor := New(ctx, td.gsnet1, loader1, storer1)
responder := New(ctx, td.gsnet2, loader2, storer2)
extensionName := graphsync.ExtensionName("Free for all")
responder.RegisterRequestReceivedHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) {
responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
hookActions.SendExtensionData(graphsync.ExtensionData{
Name: extensionName,
Expand Down
Loading

0 comments on commit e1a98fc

Please sign in to comment.