From f35cd565f8c43cea369a615f6080555a7894bd7c Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 23 Apr 2020 15:59:17 -0700 Subject: [PATCH] refactor(hooks): use external pubsub --- go.mod | 3 +- go.sum | 4 ++ requestmanager/hooks/requesthooks.go | 51 ++++++++---------- requestmanager/hooks/responsehooks.go | 56 +++++++------------- responsemanager/hooks/blockhooks.go | 56 +++++++------------- responsemanager/hooks/completedlisteners.go | 49 +++++++----------- responsemanager/hooks/requesthook.go | 54 +++++++------------ responsemanager/hooks/requestupdatehooks.go | 57 +++++++-------------- 8 files changed, 118 insertions(+), 212 deletions(-) diff --git a/go.mod b/go.mod index 408e4590..fb603f14 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/gogo/protobuf v1.3.1 github.com/golang/protobuf v1.3.2 // indirect github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect + github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/go-blockservice v0.1.3 github.com/ipfs/go-cid v0.0.5 @@ -33,7 +34,7 @@ require ( github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a // indirect github.com/smartystreets/assertions v1.0.1 // indirect github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 // indirect - github.com/stretchr/testify v1.4.0 + github.com/stretchr/testify v1.5.1 github.com/whyrusleeping/cbor-gen v0.0.0-20200402171437-3d27c146c105 // indirect go.uber.org/multierr v1.4.0 // indirect golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect diff --git a/go.sum b/go.sum index eb60701a..daf4e10c 100644 --- a/go.sum +++ b/go.sum @@ -91,6 +91,8 @@ github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyF github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc= github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= +github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e h1:3YKHER4nmd7b5qy5t0GWDTwSn4OyRgfAXSmo6VnryBY= +github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e/go.mod h1:I8h3MITA53gN9OnWGCgaMa0JWVRdXthWw4M3CPM54OY= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -539,6 +541,8 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= diff --git a/requestmanager/hooks/requesthooks.go b/requestmanager/hooks/requesthooks.go index a8329c64..f85f29e1 100644 --- a/requestmanager/hooks/requesthooks.go +++ b/requestmanager/hooks/requesthooks.go @@ -1,47 +1,40 @@ package hooks import ( - "sync" - + "github.com/hannahhoward/go-pubsub" "github.com/ipfs/go-graphsync" "github.com/ipld/go-ipld-prime/traversal" peer "github.com/libp2p/go-libp2p-core/peer" ) -type requestHook struct { - key uint64 - hook graphsync.OnOutgoingRequestHook -} - // OutgoingRequestHooks is a set of incoming request hooks that can be processed type OutgoingRequestHooks struct { - nextKey uint64 - hooksLk sync.RWMutex - hooks []requestHook + pubSub *pubsub.PubSub +} + +type internalRequestHookEvent struct { + p peer.ID + request graphsync.RequestData + hookActions *requestHookActions +} + +func requestHooksDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error { + ie := event.(internalRequestHookEvent) + hook := subscriberFn.(graphsync.OnOutgoingRequestHook) + hook(ie.p, ie.request, ie.hookActions) + return nil } // NewRequestHooks returns a new list of incoming request hooks func NewRequestHooks() *OutgoingRequestHooks { - return &OutgoingRequestHooks{} + return &OutgoingRequestHooks{ + pubSub: pubsub.New(requestHooksDispatcher), + } } // Register registers an extension to process outgoing requests func (orh *OutgoingRequestHooks) Register(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc { - orh.hooksLk.Lock() - rh := requestHook{orh.nextKey, hook} - orh.nextKey++ - orh.hooks = append(orh.hooks, rh) - orh.hooksLk.Unlock() - return func() { - orh.hooksLk.Lock() - defer orh.hooksLk.Unlock() - for i, matchHook := range orh.hooks { - if rh.key == matchHook.key { - orh.hooks = append(orh.hooks[:i], orh.hooks[i+1:]...) - return - } - } - } + return graphsync.UnregisterHookFunc(orh.pubSub.Subscribe(hook)) } // RequestResult is the outcome of running requesthooks @@ -52,12 +45,8 @@ type RequestResult struct { // ProcessRequestHooks runs request hooks against an outgoing request func (orh *OutgoingRequestHooks) ProcessRequestHooks(p peer.ID, request graphsync.RequestData) RequestResult { - orh.hooksLk.RLock() - defer orh.hooksLk.RUnlock() rha := &requestHookActions{} - for _, requestHook := range orh.hooks { - requestHook.hook(p, request, rha) - } + _ = orh.pubSub.Publish(internalRequestHookEvent{p, request, rha}) return rha.result() } diff --git a/requestmanager/hooks/responsehooks.go b/requestmanager/hooks/responsehooks.go index fad43068..9da9fd8a 100644 --- a/requestmanager/hooks/responsehooks.go +++ b/requestmanager/hooks/responsehooks.go @@ -1,47 +1,38 @@ package hooks import ( - "sync" - + "github.com/hannahhoward/go-pubsub" "github.com/libp2p/go-libp2p-core/peer" "github.com/ipfs/go-graphsync" ) -type responseHook struct { - key uint64 - hook graphsync.OnIncomingResponseHook -} - // IncomingResponseHooks is a set of incoming response hooks that can be processed type IncomingResponseHooks struct { - nextKey uint64 - hooksLk sync.RWMutex - hooks []responseHook + pubSub *pubsub.PubSub +} + +type internalResponseHookEvent struct { + p peer.ID + response graphsync.ResponseData + rha *responseHookActions +} + +func responseHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error { + ie := event.(internalResponseHookEvent) + hook := subscriberFn.(graphsync.OnIncomingResponseHook) + hook(ie.p, ie.response, ie.rha) + return ie.rha.err } // NewResponseHooks returns a new list of incoming request hooks func NewResponseHooks() *IncomingResponseHooks { - return &IncomingResponseHooks{} + return &IncomingResponseHooks{pubSub: pubsub.New(responseHookDispatcher)} } // Register registers an extension to process incoming responses func (irh *IncomingResponseHooks) Register(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc { - irh.hooksLk.Lock() - rh := responseHook{irh.nextKey, hook} - irh.nextKey++ - irh.hooks = append(irh.hooks, rh) - irh.hooksLk.Unlock() - return func() { - irh.hooksLk.Lock() - defer irh.hooksLk.Unlock() - for i, matchHook := range irh.hooks { - if rh.key == matchHook.key { - irh.hooks = append(irh.hooks[:i], irh.hooks[i+1:]...) - return - } - } - } + return graphsync.UnregisterHookFunc(irh.pubSub.Subscribe(hook)) } // ResponseResult is the outcome of running response hooks @@ -52,15 +43,8 @@ type ResponseResult struct { // ProcessResponseHooks runs response hooks against an incoming response func (irh *IncomingResponseHooks) ProcessResponseHooks(p peer.ID, response graphsync.ResponseData) ResponseResult { - irh.hooksLk.Lock() - defer irh.hooksLk.Unlock() rha := &responseHookActions{} - for _, responseHooks := range irh.hooks { - responseHooks.hook(p, response, rha) - if rha.hasError() { - break - } - } + _ = irh.pubSub.Publish(internalResponseHookEvent{p, response, rha}) return rha.result() } @@ -76,10 +60,6 @@ func (rha *responseHookActions) result() ResponseResult { } } -func (rha *responseHookActions) hasError() bool { - return rha.err != nil -} - func (rha *responseHookActions) TerminateWithError(err error) { rha.err = err } diff --git a/responsemanager/hooks/blockhooks.go b/responsemanager/hooks/blockhooks.go index 13b15060..45d55dc5 100644 --- a/responsemanager/hooks/blockhooks.go +++ b/responsemanager/hooks/blockhooks.go @@ -2,8 +2,8 @@ package hooks import ( "errors" - "sync" + "github.com/hannahhoward/go-pubsub" "github.com/ipfs/go-graphsync" peer "github.com/libp2p/go-libp2p-core/peer" ) @@ -11,40 +11,33 @@ import ( // ErrPaused indicates a request should stop processing, but only cause it's paused var ErrPaused = errors.New("request has been paused") -type blockHook struct { - key uint64 - hook graphsync.OnOutgoingBlockHook -} - // OutgoingBlockHooks is a set of outgoing block hooks that can be processed type OutgoingBlockHooks struct { - hooksLk sync.RWMutex - nextKey uint64 - hooks []blockHook + pubSub *pubsub.PubSub +} + +type internalBlockHookEvent struct { + p peer.ID + request graphsync.RequestData + block graphsync.BlockData + bha *blockHookActions +} + +func blockHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error { + ie := event.(internalBlockHookEvent) + hook := subscriberFn.(graphsync.OnOutgoingBlockHook) + hook(ie.p, ie.request, ie.block, ie.bha) + return ie.bha.err } // NewBlockHooks returns a new list of outgoing block hooks func NewBlockHooks() *OutgoingBlockHooks { - return &OutgoingBlockHooks{} + return &OutgoingBlockHooks{pubSub: pubsub.New(blockHookDispatcher)} } // Register registers an hook to process outgoing blocks in a response func (obh *OutgoingBlockHooks) Register(hook graphsync.OnOutgoingBlockHook) graphsync.UnregisterHookFunc { - obh.hooksLk.Lock() - bh := blockHook{obh.nextKey, hook} - obh.nextKey++ - obh.hooks = append(obh.hooks, bh) - obh.hooksLk.Unlock() - return func() { - obh.hooksLk.Lock() - defer obh.hooksLk.Unlock() - for i, matchHook := range obh.hooks { - if bh.key == matchHook.key { - obh.hooks = append(obh.hooks[:i], obh.hooks[i+1:]...) - return - } - } - } + return graphsync.UnregisterHookFunc(obh.pubSub.Subscribe(hook)) } // BlockResult is the result of processing block hooks @@ -55,15 +48,8 @@ type BlockResult struct { // ProcessBlockHooks runs block hooks against a request and block data func (obh *OutgoingBlockHooks) ProcessBlockHooks(p peer.ID, request graphsync.RequestData, blockData graphsync.BlockData) BlockResult { - obh.hooksLk.RLock() - defer obh.hooksLk.RUnlock() bha := &blockHookActions{} - for _, bh := range obh.hooks { - bh.hook(p, request, blockData, bha) - if bha.hasError() { - break - } - } + _ = obh.pubSub.Publish(internalBlockHookEvent{p, request, blockData, bha}) return bha.result() } @@ -72,10 +58,6 @@ type blockHookActions struct { extensions []graphsync.ExtensionData } -func (bha *blockHookActions) hasError() bool { - return bha.err != nil -} - func (bha *blockHookActions) result() BlockResult { return BlockResult{bha.err, bha.extensions} } diff --git a/responsemanager/hooks/completedlisteners.go b/responsemanager/hooks/completedlisteners.go index 754e6b5a..5326704b 100644 --- a/responsemanager/hooks/completedlisteners.go +++ b/responsemanager/hooks/completedlisteners.go @@ -1,53 +1,40 @@ package hooks import ( - "sync" - + "github.com/hannahhoward/go-pubsub" "github.com/ipfs/go-graphsync" peer "github.com/libp2p/go-libp2p-core/peer" ) -type completedListener struct { - key uint64 - listener graphsync.OnResponseCompletedListener -} - // CompletedResponseListeners is a set of listeners for completed responses type CompletedResponseListeners struct { - listenersLk sync.RWMutex - nextKey uint64 - listeners []completedListener + pubSub *pubsub.PubSub +} + +type internalCompletedResponseEvent struct { + p peer.ID + request graphsync.RequestData + status graphsync.ResponseStatusCode +} + +func completedResponseDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error { + ie := event.(internalCompletedResponseEvent) + listener := subscriberFn.(graphsync.OnResponseCompletedListener) + listener(ie.p, ie.request, ie.status) + return nil } // NewCompletedResponseListeners returns a new list of completed response listeners func NewCompletedResponseListeners() *CompletedResponseListeners { - return &CompletedResponseListeners{} + return &CompletedResponseListeners{pubSub: pubsub.New(completedResponseDispatcher)} } // Register registers an listener for completed responses func (crl *CompletedResponseListeners) Register(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc { - crl.listenersLk.Lock() - cl := completedListener{crl.nextKey, listener} - crl.nextKey++ - crl.listeners = append(crl.listeners, cl) - crl.listenersLk.Unlock() - return func() { - crl.listenersLk.Lock() - defer crl.listenersLk.Unlock() - for i, matchListener := range crl.listeners { - if cl.key == matchListener.key { - crl.listeners = append(crl.listeners[:i], crl.listeners[i+1:]...) - return - } - } - } + return graphsync.UnregisterHookFunc(crl.pubSub.Subscribe(listener)) } // NotifyCompletedListeners runs notifies all completed listeners that a response has completed func (crl *CompletedResponseListeners) NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) { - crl.listenersLk.RLock() - defer crl.listenersLk.RUnlock() - for _, listener := range crl.listeners { - listener.listener(p, request, status) - } + _ = crl.pubSub.Publish(internalCompletedResponseEvent{p, request, status}) } diff --git a/responsemanager/hooks/requesthook.go b/responsemanager/hooks/requesthook.go index 2de99cd5..ba52eeb3 100644 --- a/responsemanager/hooks/requesthook.go +++ b/responsemanager/hooks/requesthook.go @@ -2,19 +2,14 @@ package hooks import ( "errors" - "sync" + "github.com/hannahhoward/go-pubsub" "github.com/ipfs/go-graphsync" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/traversal" peer "github.com/libp2p/go-libp2p-core/peer" ) -type requestHook struct { - key uint64 - hook graphsync.OnIncomingRequestHook -} - // PersistenceOptions is an interface for getting loaders by name type PersistenceOptions interface { GetLoader(name string) (ipld.Loader, bool) @@ -23,35 +18,33 @@ type PersistenceOptions interface { // IncomingRequestHooks is a set of incoming request hooks that can be processed type IncomingRequestHooks struct { persistenceOptions PersistenceOptions - hooksLk sync.RWMutex - nextKey uint64 - hooks []requestHook + pubSub *pubsub.PubSub +} + +type internalRequestHookEvent struct { + p peer.ID + request graphsync.RequestData + rha *requestHookActions +} + +func requestHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error { + ie := event.(internalRequestHookEvent) + hook := subscriberFn.(graphsync.OnIncomingRequestHook) + hook(ie.p, ie.request, ie.rha) + return ie.rha.err } // NewRequestHooks returns a new list of incoming request hooks func NewRequestHooks(persistenceOptions PersistenceOptions) *IncomingRequestHooks { return &IncomingRequestHooks{ persistenceOptions: persistenceOptions, + pubSub: pubsub.New(requestHookDispatcher), } } // Register registers an extension to process new incoming requests func (irh *IncomingRequestHooks) Register(hook graphsync.OnIncomingRequestHook) graphsync.UnregisterHookFunc { - irh.hooksLk.Lock() - rh := requestHook{irh.nextKey, hook} - irh.nextKey++ - irh.hooks = append(irh.hooks, rh) - irh.hooksLk.Unlock() - return func() { - irh.hooksLk.Lock() - defer irh.hooksLk.Unlock() - for i, matchHook := range irh.hooks { - if rh.key == matchHook.key { - irh.hooks = append(irh.hooks[:i], irh.hooks[i+1:]...) - return - } - } - } + return graphsync.UnregisterHookFunc(irh.pubSub.Subscribe(hook)) } // RequestResult is the outcome of running requesthooks @@ -65,17 +58,10 @@ type RequestResult struct { // ProcessRequestHooks runs request hooks against an incoming request func (irh *IncomingRequestHooks) ProcessRequestHooks(p peer.ID, request graphsync.RequestData) RequestResult { - irh.hooksLk.RLock() - defer irh.hooksLk.RUnlock() ha := &requestHookActions{ persistenceOptions: irh.persistenceOptions, } - for _, requestHook := range irh.hooks { - requestHook.hook(p, request, ha) - if ha.hasError() { - break - } - } + _ = irh.pubSub.Publish(internalRequestHookEvent{p, request, ha}) return ha.result() } @@ -88,10 +74,6 @@ type requestHookActions struct { extensions []graphsync.ExtensionData } -func (ha *requestHookActions) hasError() bool { - return ha.err != nil -} - func (ha *requestHookActions) result() RequestResult { return RequestResult{ IsValidated: ha.isValidated, diff --git a/responsemanager/hooks/requestupdatehooks.go b/responsemanager/hooks/requestupdatehooks.go index 0999a09f..b53f727e 100644 --- a/responsemanager/hooks/requestupdatehooks.go +++ b/responsemanager/hooks/requestupdatehooks.go @@ -1,46 +1,38 @@ package hooks import ( - "sync" - + "github.com/hannahhoward/go-pubsub" "github.com/ipfs/go-graphsync" peer "github.com/libp2p/go-libp2p-core/peer" ) -type requestUpdatedHook struct { - key uint64 - hook graphsync.OnRequestUpdatedHook -} - // RequestUpdatedHooks manages and runs hooks for request updates type RequestUpdatedHooks struct { - nextKey uint64 - hooksLk sync.RWMutex - hooks []requestUpdatedHook + pubSub *pubsub.PubSub +} + +type internalRequestUpdateEvent struct { + p peer.ID + request graphsync.RequestData + update graphsync.RequestData + uha *updateHookActions +} + +func updateHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error { + ie := event.(internalRequestUpdateEvent) + hook := subscriberFn.(graphsync.OnRequestUpdatedHook) + hook(ie.p, ie.request, ie.update, ie.uha) + return ie.uha.err } // NewUpdateHooks returns a new list of request updated hooks func NewUpdateHooks() *RequestUpdatedHooks { - return &RequestUpdatedHooks{} + return &RequestUpdatedHooks{pubSub: pubsub.New(updateHookDispatcher)} } // Register registers an hook to process updates to requests func (ruh *RequestUpdatedHooks) Register(hook graphsync.OnRequestUpdatedHook) graphsync.UnregisterHookFunc { - ruh.hooksLk.Lock() - rh := requestUpdatedHook{ruh.nextKey, hook} - ruh.nextKey++ - ruh.hooks = append(ruh.hooks, rh) - ruh.hooksLk.Unlock() - return func() { - ruh.hooksLk.Lock() - defer ruh.hooksLk.Unlock() - for i, matchHook := range ruh.hooks { - if rh.key == matchHook.key { - ruh.hooks = append(ruh.hooks[:i], ruh.hooks[i+1:]...) - return - } - } - } + return graphsync.UnregisterHookFunc(ruh.pubSub.Subscribe(hook)) } // UpdateResult is the result of running update hooks @@ -52,15 +44,8 @@ type UpdateResult struct { // ProcessUpdateHooks runs request hooks against an incoming request func (ruh *RequestUpdatedHooks) ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) UpdateResult { - ruh.hooksLk.RLock() - defer ruh.hooksLk.RUnlock() ha := &updateHookActions{} - for _, updateHook := range ruh.hooks { - updateHook.hook(p, request, update, ha) - if ha.hasError() { - break - } - } + _ = ruh.pubSub.Publish(internalRequestUpdateEvent{p, request, update, ha}) return ha.result() } @@ -70,10 +55,6 @@ type updateHookActions struct { extensions []graphsync.ExtensionData } -func (uha *updateHookActions) hasError() bool { - return uha.err != nil -} - func (uha *updateHookActions) result() UpdateResult { return UpdateResult{uha.err, uha.unpause, uha.extensions} }