Skip to content

Commit

Permalink
Incoming Block Hooks (#68)
Browse files Browse the repository at this point in the history
* feat(graphsync): add incoming block hook types

add types (not implemented) for incoming block hook

* feat(asyncloader): identify local loads

add a boolean value to AsyncLoadResult to determine if a request load was local

* fixup! feat(graphsync): add incoming block hook types

* feat(hooks): add block hooks plumbing

* feat(requestmanager): process block hooks

Process block hooks on each block, and connect last response to blocks

* feat(peerresponsemanager): add transactions

add the ability to do multple response operations with a gaurantee they will end up in a single
response

* refactor(lint): fix lint errors
  • Loading branch information
hannahhoward authored May 27, 2020
1 parent 9d5f2c2 commit 430b4dc
Show file tree
Hide file tree
Showing 20 changed files with 1,112 additions and 434 deletions.
21 changes: 20 additions & 1 deletion graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,13 @@ type IncomingResponseHookActions interface {
UpdateRequestWithExtensions(...ExtensionData)
}

// IncomingBlockHookActions are actions that incoming block hook can take
// to change the execution of a request
type IncomingBlockHookActions interface {
TerminateWithError(error)
UpdateRequestWithExtensions(...ExtensionData)
}

// RequestUpdatedHookActions are actions that can be taken in a request updated hook to
// change execution of the response
type RequestUpdatedHookActions interface {
Expand All @@ -197,9 +204,18 @@ type OnIncomingRequestHook func(p peer.ID, request RequestData, hookActions Inco

// OnIncomingResponseHook is a hook that runs each time a new response is received.
// It receives the peer that sent the response and all data about the response.
// If it returns an error processing is halted and the original request is cancelled.
// It receives an interface for customizing how we handle the ongoing execution of the request
type OnIncomingResponseHook func(p peer.ID, responseData ResponseData, hookActions IncomingResponseHookActions)

// OnIncomingBlockHook is a hook that runs each time a new block is validated as
// part of the response, regardless of whether it came locally or over the network
// It receives that sent the response, the most recent response, a link for the block received,
// and the size of the block received
// The difference between BlockSize & BlockSizeOnWire can be used to determine
// where the block came from (Local vs remote)
// It receives an interface for customizing how we handle the ongoing execution of the request
type OnIncomingBlockHook func(p peer.ID, responseData ResponseData, blockData BlockData, hookActions IncomingBlockHookActions)

// OnOutgoingRequestHook is a hook that runs immediately prior to sending a request
// It receives the peer we're sending a request to and all the data aobut the request
// It receives an interface for customizing how we handle executing this request
Expand Down Expand Up @@ -237,6 +253,9 @@ type GraphExchange interface {
// RegisterIncomingResponseHook adds a hook that runs when a response is received
RegisterIncomingResponseHook(OnIncomingResponseHook) UnregisterHookFunc

// RegisterIncomingBlockHook adds a hook that runs when a block is received and validated (put in block store)
RegisterIncomingBlockHook(OnIncomingBlockHook) UnregisterHookFunc

// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request
RegisterOutgoingRequestHook(hook OnOutgoingRequestHook) UnregisterHookFunc

Expand Down
10 changes: 9 additions & 1 deletion impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type GraphSync struct {
completedResponseListeners *responderhooks.CompletedResponseListeners
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
incomingBlockHooks *requestorhooks.IncomingBlockHooks
persistenceOptions *persistenceoptions.PersistenceOptions
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -75,7 +76,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
asyncLoader := asyncloader.New(ctx, loader, storer)
incomingResponseHooks := requestorhooks.NewResponseHooks()
outgoingRequestHooks := requestorhooks.NewRequestHooks()
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks)
incomingBlockHooks := requestorhooks.NewBlockHooks()
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks)
peerTaskQueue := peertaskqueue.New()
createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
return peerresponsemanager.NewResponseSender(ctx, p, peerManager)
Expand All @@ -102,6 +104,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
completedResponseListeners: completedResponseListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
incomingBlockHooks: incomingBlockHooks,
peerTaskQueue: peerTaskQueue,
peerResponseManager: peerResponseManager,
responseManager: responseManager,
Expand Down Expand Up @@ -169,6 +172,11 @@ func (gs *GraphSync) RegisterCompletedResponseListener(listener graphsync.OnResp
return gs.completedResponseListeners.Register(listener)
}

// RegisterIncomingBlockHook adds a hook that runs when a block is received and validated (put in block store)
func (gs *GraphSync) RegisterIncomingBlockHook(hook graphsync.OnIncomingBlockHook) graphsync.UnregisterHookFunc {
return gs.incomingBlockHooks.Register(hook)
}

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.UnpauseResponse(p, requestID)
Expand Down
61 changes: 61 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,67 @@ func TestPauseResumeViaUpdate(t *testing.T) {
require.Equal(t, td.extensionUpdateData, receivedUpdateData, "did not receive correct extension update data")
}

func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

var receivedReponseData []byte
var receivedUpdateData []byte
// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)

stopPoint := 50
blocksReceived := 0
requestor.RegisterIncomingBlockHook(func(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
blocksReceived++
if response.Status() == graphsync.RequestPaused && blocksReceived == stopPoint {
var has bool
receivedReponseData, has = response.Extension(td.extensionName)
if has {
hookActions.UpdateRequestWithExtensions(td.extensionUpdate)
}
}
})

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
blocksSent := 0
responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
_, has := requestData.Extension(td.extensionName)
if has {
blocksSent++
if blocksSent == stopPoint {
hookActions.SendExtensionData(td.extensionResponse)
hookActions.PauseResponse()
}
} else {
hookActions.TerminateWithError(errors.New("should have sent extension"))
}
})
responder.RegisterRequestUpdatedHook(func(p peer.ID, request graphsync.RequestData, update graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
var has bool
receivedUpdateData, has = update.Extension(td.extensionName)
if has {
hookActions.UnpauseResponse()
}
})
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyWholeChain(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

require.Equal(t, td.extensionResponseData, receivedReponseData, "did not receive correct extension response data")
require.Equal(t, td.extensionUpdateData, receivedUpdateData, "did not receive correct extension update data")
}

func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
// create network
ctx := context.Background()
Expand Down
14 changes: 11 additions & 3 deletions requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func setupAttemptQueue(loader ipld.Loader, storer ipld.Storer) (*responsecache.R

unverifiedBlockStore := unverifiedblockstore.New(storer)
responseCache := responsecache.New(unverifiedBlockStore)
loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) ([]byte, error) {
loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult {
// load from response cache
data, err := responseCache.AttemptLoad(requestID, link)
if data == nil && err == nil {
Expand All @@ -326,11 +326,19 @@ func setupAttemptQueue(loader ipld.Loader, storer ipld.Storer) (*responsecache.R
if stream != nil && loadErr == nil {
localData, loadErr := ioutil.ReadAll(stream)
if loadErr == nil && localData != nil {
return localData, nil
return types.AsyncLoadResult{
Data: localData,
Err: nil,
Local: true,
}
}
}
}
return data, err
return types.AsyncLoadResult{
Data: data,
Err: err,
Local: false,
}
})

return responseCache, loadAttemptQueue
Expand Down
18 changes: 5 additions & 13 deletions requestmanager/asyncloader/loadattemptqueue/loadattemptqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ func NewLoadRequest(requestID graphsync.RequestID,
}

// LoadAttempter attempts to load a link to an array of bytes
// it has three results:
// bytes present, error nil = success
// bytes nil, error present = error
// bytes nil, error nil = did not load, but try again later
type LoadAttempter func(graphsync.RequestID, ipld.Link) ([]byte, error)
// and returns an async load result
type LoadAttempter func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult

// LoadAttemptQueue attempts to load using the load attempter, and then can
// place requests on a retry queue
Expand All @@ -48,14 +45,9 @@ func New(loadAttempter LoadAttempter) *LoadAttemptQueue {
// AttemptLoad attempts to loads the given load request, and if retry is true
// it saves the loadrequest for retrying later
func (laq *LoadAttemptQueue) AttemptLoad(lr LoadRequest, retry bool) {
response, err := laq.loadAttempter(lr.requestID, lr.link)
if err != nil {
lr.resultChan <- types.AsyncLoadResult{Data: nil, Err: err}
close(lr.resultChan)
return
}
if response != nil {
lr.resultChan <- types.AsyncLoadResult{Data: response, Err: nil}
response := laq.loadAttempter(lr.requestID, lr.link)
if response.Err != nil || response.Data != nil {
lr.resultChan <- response
close(lr.resultChan)
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ func TestAsyncLoadInitialLoadSucceeds(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
callCount := 0
loadAttempter := func(graphsync.RequestID, ipld.Link) ([]byte, error) {
loadAttempter := func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult {
callCount++
return testutil.RandomBytes(100), nil
return types.AsyncLoadResult{
Data: testutil.RandomBytes(100),
}
}
loadAttemptQueue := New(loadAttempter)

Expand All @@ -45,9 +47,11 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
callCount := 0
loadAttempter := func(graphsync.RequestID, ipld.Link) ([]byte, error) {
loadAttempter := func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult {
callCount++
return nil, fmt.Errorf("something went wrong")
return types.AsyncLoadResult{
Err: fmt.Errorf("something went wrong"),
}
}
loadAttemptQueue := New(loadAttempter)

Expand All @@ -69,13 +73,15 @@ func TestAsyncLoadInitialLoadIndeterminateRetryFalse(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
callCount := 0
loadAttempter := func(graphsync.RequestID, ipld.Link) ([]byte, error) {
loadAttempter := func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult {
var result []byte
if callCount > 0 {
result = testutil.RandomBytes(100)
}
callCount++
return result, nil
return types.AsyncLoadResult{
Data: result,
}
}

loadAttemptQueue := New(loadAttempter)
Expand All @@ -99,14 +105,16 @@ func TestAsyncLoadInitialLoadIndeterminateRetryTrueThenRetriedSuccess(t *testing
defer cancel()
callCount := 0
called := make(chan struct{}, 2)
loadAttempter := func(graphsync.RequestID, ipld.Link) ([]byte, error) {
loadAttempter := func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult {
var result []byte
called <- struct{}{}
if callCount > 0 {
result = testutil.RandomBytes(100)
}
callCount++
return result, nil
return types.AsyncLoadResult{
Data: result,
}
}
loadAttemptQueue := New(loadAttempter)

Expand All @@ -132,14 +140,16 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) {
defer cancel()
callCount := 0
called := make(chan struct{}, 2)
loadAttempter := func(graphsync.RequestID, ipld.Link) ([]byte, error) {
loadAttempter := func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult {
var result []byte
called <- struct{}{}
if callCount > 0 {
result = testutil.RandomBytes(100)
}
callCount++
return result, nil
return types.AsyncLoadResult{
Data: result,
}
}
loadAttemptQueue := New(loadAttempter)

Expand Down
44 changes: 44 additions & 0 deletions requestmanager/hooks/blockhooks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package hooks

import (
"github.com/hannahhoward/go-pubsub"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/ipfs/go-graphsync"
)

// IncomingBlockHooks is a set of incoming block hooks that can be processed
type IncomingBlockHooks struct {
pubSub *pubsub.PubSub
}

type internalBlockHookEvent struct {
p peer.ID
response graphsync.ResponseData
block graphsync.BlockData
rha *updateHookActions
}

func blockHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalBlockHookEvent)
hook := subscriberFn.(graphsync.OnIncomingBlockHook)
hook(ie.p, ie.response, ie.block, ie.rha)
return ie.rha.err
}

// NewBlockHooks returns a new list of incoming request hooks
func NewBlockHooks() *IncomingBlockHooks {
return &IncomingBlockHooks{pubSub: pubsub.New(blockHookDispatcher)}
}

// Register registers an extension to process incoming responses
func (ibh *IncomingBlockHooks) Register(hook graphsync.OnIncomingBlockHook) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(ibh.pubSub.Subscribe(hook))
}

// ProcessBlockHooks runs response hooks against an incoming response
func (ibh *IncomingBlockHooks) ProcessBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) UpdateResult {
rha := &updateHookActions{}
_ = ibh.pubSub.Publish(internalBlockHookEvent{p, response, block, rha})
return rha.result()
}
Loading

0 comments on commit 430b4dc

Please sign in to comment.