Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outgoing Request Hooks, swapping persistence layers #61

Merged
merged 4 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
run:
skip-files:
- testutil/chaintypes/testchain_gen.go
47 changes: 30 additions & 17 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,27 +135,37 @@ type ResponseData interface {
Extension(name ExtensionName) ([]byte, bool)
}

// RequestReceivedHookActions are actions that a request hook can take to change
// IncomingRequestHookActions are actions that a request hook can take to change
// behavior for the response
type RequestReceivedHookActions interface {
type IncomingRequestHookActions interface {
SendExtensionData(ExtensionData)
UseLoader(ipld.Loader)
UsePersistenceOption(name string)
UseNodeBuilderChooser(traversal.NodeBuilderChooser)
TerminateWithError(error)
ValidateRequest()
}

// OnRequestReceivedHook is a hook that runs each time a request is received.
// OutgoingRequestHookActions are actions that an outgoing request hook can take
// to change the execution of this request
type OutgoingRequestHookActions interface {
UsePersistenceOption(name string)
UseNodeBuilderChooser(traversal.NodeBuilderChooser)
}

// OnIncomingRequestHook is a hook that runs each time a new request is received.
// It receives the peer that sent the request and all data about the request.
// It should return:
// extensionData - any extension data to add to the outgoing response
// err - error - if not nil, halt request and return RequestRejected with the responseData
type OnRequestReceivedHook func(p peer.ID, request RequestData, hookActions RequestReceivedHookActions)
// It receives an interface for customizing the response to this request
type OnIncomingRequestHook func(p peer.ID, request RequestData, hookActions IncomingRequestHookActions)

// OnResponseReceivedHook is a hook that runs each time a response is received.
// OnIncomingResponseHook is a hook that runs each time a new response is received.
// It receives the peer that sent the response and all data about the response.
// If it returns an error processing is halted and the original request is cancelled.
type OnResponseReceivedHook func(p peer.ID, responseData ResponseData) error
type OnIncomingResponseHook func(p peer.ID, responseData ResponseData) error

// OnOutgoingRequestHook is a hook that runs immediately prior to sending a request
// It receives the peer we're sending a request to and all the data aobut the request
// It receives an interface for customizing how we handle executing this request
type OnOutgoingRequestHook func(p peer.ID, request RequestData, hookActions OutgoingRequestHookActions)

// UnregisterHookFunc is a function call to unregister a hook that was previously registered
type UnregisterHookFunc func()
Expand All @@ -165,12 +175,15 @@ type GraphExchange interface {
// Request initiates a new GraphSync request to the given peer using the given selector spec.
Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...ExtensionData) (<-chan ResponseProgress, <-chan error)

// RegisterRequestReceivedHook adds a hook that runs when a request is received
// If overrideDefaultValidation is set to true, then if the hook does not error,
// it is considered to have "validated" the request -- and that validation supersedes
// the normal validation of requests Graphsync does (i.e. all selectors can be accepted)
RegisterRequestReceivedHook(hook OnRequestReceivedHook) UnregisterHookFunc
// RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default
RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error

// RegisterIncomingRequestHook adds a hook that runs when a request is received
RegisterIncomingRequestHook(hook OnIncomingRequestHook) UnregisterHookFunc

// RegisterIncomingResponseHook adds a hook that runs when a response is received
RegisterIncomingResponseHook(OnIncomingResponseHook) UnregisterHookFunc

// RegisterResponseReceivedHook adds a hook that runs when a response is received
RegisterResponseReceivedHook(OnResponseReceivedHook) UnregisterHookFunc
// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request
RegisterOutgoingRequestHook(hook OnOutgoingRequestHook) UnregisterHookFunc
}
28 changes: 21 additions & 7 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
}
peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue)
unregisterDefaultValidator := responseManager.RegisterHook(selectorvalidator.SelectorValidator(maxRecursionDepth))
unregisterDefaultValidator := responseManager.RegisterRequestHook(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
loader: loader,
Expand Down Expand Up @@ -103,17 +103,31 @@ func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, sel
return gs.requestManager.SendRequest(ctx, p, root, selector, extensions...)
}

// RegisterRequestReceivedHook adds a hook that runs when a request is received
// RegisterIncomingRequestHook adds a hook that runs when a request is received
// If overrideDefaultValidation is set to true, then if the hook does not error,
// it is considered to have "validated" the request -- and that validation supersedes
// the normal validation of requests Graphsync does (i.e. all selectors can be accepted)
func (gs *GraphSync) RegisterRequestReceivedHook(hook graphsync.OnRequestReceivedHook) graphsync.UnregisterHookFunc {
return gs.responseManager.RegisterHook(hook)
func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingRequestHook) graphsync.UnregisterHookFunc {
return gs.responseManager.RegisterRequestHook(hook)
}

// RegisterResponseReceivedHook adds a hook that runs when a response is received
func (gs *GraphSync) RegisterResponseReceivedHook(hook graphsync.OnResponseReceivedHook) graphsync.UnregisterHookFunc {
return gs.requestManager.RegisterHook(hook)
// RegisterIncomingResponseHook adds a hook that runs when a response is received
func (gs *GraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
return gs.requestManager.RegisterResponseHook(hook)
}

// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request
func (gs *GraphSync) RegisterOutgoingRequestHook(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc {
return gs.requestManager.RegisterRequestHook(hook)
}

// RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default
func (gs *GraphSync) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error {
err := gs.asyncLoader.RegisterPersistenceOption(name, loader, storer)
if err != nil {
return err
}
return gs.responseManager.RegisterPersistenceOption(name, loader)
}

type graphSyncReceiver GraphSync
Expand Down
73 changes: 68 additions & 5 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func TestSendResponseToIncomingRequest(t *testing.T) {
var receivedRequestData []byte
// initialize graphsync on second node to response to requests
gsnet := td.GraphSyncHost2()
gsnet.RegisterRequestReceivedHook(
func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) {
gsnet.RegisterIncomingRequestHook(
func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
var has bool
receivedRequestData, has = requestData.Extension(td.extensionName)
require.True(t, has, "did not have expected extension")
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestGraphsyncRoundTrip(t *testing.T) {
var receivedResponseData []byte
var receivedRequestData []byte

requestor.RegisterResponseReceivedHook(
requestor.RegisterIncomingResponseHook(
func(p peer.ID, responseData graphsync.ResponseData) error {
data, has := responseData.Extension(td.extensionName)
if has {
Expand All @@ -199,7 +199,7 @@ func TestGraphsyncRoundTrip(t *testing.T) {
return nil
})

responder.RegisterRequestReceivedHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) {
responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
var has bool
receivedRequestData, has = requestData.Extension(td.extensionName)
if !has {
Expand All @@ -220,6 +220,69 @@ func TestGraphsyncRoundTrip(t *testing.T) {
require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data")
}

func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()

// alternate storing location for responder
altStore1 := make(map[ipld.Link][]byte)
altLoader1, altStorer1 := testutil.NewTestStore(altStore1)

// alternate storing location for requestor
altStore2 := make(map[ipld.Link][]byte)
altLoader2, altStorer2 := testutil.NewTestStore(altStore2)

err := requestor.RegisterPersistenceOption("chainstore", altLoader1, altStorer1)
require.NoError(t, err)

err = responder.RegisterPersistenceOption("chainstore", altLoader2, altStorer2)
require.NoError(t, err)

blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, altLoader1, altStorer2, 100, blockChainLength)

extensionName := graphsync.ExtensionName("blockchain")
extension := graphsync.ExtensionData{
Name: extensionName,
Data: nil,
}

requestor.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) {
_, has := requestData.Extension(extensionName)
if has {
hookActions.UseNodeBuilderChooser(blockChain.Chooser)
hookActions.UsePersistenceOption("chainstore")
}
})
responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
_, has := requestData.Extension(extensionName)
if has {
hookActions.UseNodeBuilderChooser(blockChain.Chooser)
hookActions.UsePersistenceOption("chainstore")
}
})

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector())
testutil.VerifyEmptyResponse(ctx, t, progressChan)
testutil.VerifySingleTerminalError(ctx, t, errChan)

progressChan, errChan = requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension)

blockChain.VerifyWholeChainWithTypes(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, 0, "should store no blocks in normal store")
require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store")
}

// TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work
// under a specific of adverse conditions:
// -- large blocks being returned by a query
Expand Down Expand Up @@ -360,7 +423,7 @@ func TestUnixFSFetch(t *testing.T) {
requestor := New(ctx, td.gsnet1, loader1, storer1)
responder := New(ctx, td.gsnet2, loader2, storer2)
extensionName := graphsync.ExtensionName("Free for all")
responder.RegisterRequestReceivedHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.RequestReceivedHookActions) {
responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
hookActions.SendExtensionData(graphsync.ExtensionData{
Name: extensionName,
Expand Down
Loading