Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use external pubsub #65

Merged
merged 1 commit into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.2 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.5
Expand Down Expand Up @@ -33,7 +34,7 @@ require (
github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a // indirect
github.com/smartystreets/assertions v1.0.1 // indirect
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 // indirect
github.com/stretchr/testify v1.4.0
github.com/stretchr/testify v1.5.1
github.com/whyrusleeping/cbor-gen v0.0.0-20200402171437-3d27c146c105 // indirect
go.uber.org/multierr v1.4.0 // indirect
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyF
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e h1:3YKHER4nmd7b5qy5t0GWDTwSn4OyRgfAXSmo6VnryBY=
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e/go.mod h1:I8h3MITA53gN9OnWGCgaMa0JWVRdXthWw4M3CPM54OY=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand Down Expand Up @@ -539,6 +541,8 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
Expand Down
51 changes: 20 additions & 31 deletions requestmanager/hooks/requesthooks.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,40 @@
package hooks

import (
"sync"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-graphsync"
"github.com/ipld/go-ipld-prime/traversal"
peer "github.com/libp2p/go-libp2p-core/peer"
)

type requestHook struct {
key uint64
hook graphsync.OnOutgoingRequestHook
}

// OutgoingRequestHooks is a set of incoming request hooks that can be processed
type OutgoingRequestHooks struct {
nextKey uint64
hooksLk sync.RWMutex
hooks []requestHook
pubSub *pubsub.PubSub
}

type internalRequestHookEvent struct {
p peer.ID
request graphsync.RequestData
hookActions *requestHookActions
}

func requestHooksDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalRequestHookEvent)
hook := subscriberFn.(graphsync.OnOutgoingRequestHook)
hook(ie.p, ie.request, ie.hookActions)
return nil
}

// NewRequestHooks returns a new list of incoming request hooks
func NewRequestHooks() *OutgoingRequestHooks {
return &OutgoingRequestHooks{}
return &OutgoingRequestHooks{
pubSub: pubsub.New(requestHooksDispatcher),
}
}

// Register registers an extension to process outgoing requests
func (orh *OutgoingRequestHooks) Register(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc {
orh.hooksLk.Lock()
rh := requestHook{orh.nextKey, hook}
orh.nextKey++
orh.hooks = append(orh.hooks, rh)
orh.hooksLk.Unlock()
return func() {
orh.hooksLk.Lock()
defer orh.hooksLk.Unlock()
for i, matchHook := range orh.hooks {
if rh.key == matchHook.key {
orh.hooks = append(orh.hooks[:i], orh.hooks[i+1:]...)
return
}
}
}
return graphsync.UnregisterHookFunc(orh.pubSub.Subscribe(hook))
}

// RequestResult is the outcome of running requesthooks
Expand All @@ -52,12 +45,8 @@ type RequestResult struct {

// ProcessRequestHooks runs request hooks against an outgoing request
func (orh *OutgoingRequestHooks) ProcessRequestHooks(p peer.ID, request graphsync.RequestData) RequestResult {
orh.hooksLk.RLock()
defer orh.hooksLk.RUnlock()
rha := &requestHookActions{}
for _, requestHook := range orh.hooks {
requestHook.hook(p, request, rha)
}
_ = orh.pubSub.Publish(internalRequestHookEvent{p, request, rha})
return rha.result()
}

Expand Down
56 changes: 18 additions & 38 deletions requestmanager/hooks/responsehooks.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,38 @@
package hooks

import (
"sync"

"github.com/hannahhoward/go-pubsub"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/ipfs/go-graphsync"
)

type responseHook struct {
key uint64
hook graphsync.OnIncomingResponseHook
}

// IncomingResponseHooks is a set of incoming response hooks that can be processed
type IncomingResponseHooks struct {
nextKey uint64
hooksLk sync.RWMutex
hooks []responseHook
pubSub *pubsub.PubSub
}

type internalResponseHookEvent struct {
p peer.ID
response graphsync.ResponseData
rha *responseHookActions
}

func responseHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalResponseHookEvent)
hook := subscriberFn.(graphsync.OnIncomingResponseHook)
hook(ie.p, ie.response, ie.rha)
return ie.rha.err
}

// NewResponseHooks returns a new list of incoming request hooks
func NewResponseHooks() *IncomingResponseHooks {
return &IncomingResponseHooks{}
return &IncomingResponseHooks{pubSub: pubsub.New(responseHookDispatcher)}
}

// Register registers an extension to process incoming responses
func (irh *IncomingResponseHooks) Register(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
irh.hooksLk.Lock()
rh := responseHook{irh.nextKey, hook}
irh.nextKey++
irh.hooks = append(irh.hooks, rh)
irh.hooksLk.Unlock()
return func() {
irh.hooksLk.Lock()
defer irh.hooksLk.Unlock()
for i, matchHook := range irh.hooks {
if rh.key == matchHook.key {
irh.hooks = append(irh.hooks[:i], irh.hooks[i+1:]...)
return
}
}
}
return graphsync.UnregisterHookFunc(irh.pubSub.Subscribe(hook))
}

// ResponseResult is the outcome of running response hooks
Expand All @@ -52,15 +43,8 @@ type ResponseResult struct {

// ProcessResponseHooks runs response hooks against an incoming response
func (irh *IncomingResponseHooks) ProcessResponseHooks(p peer.ID, response graphsync.ResponseData) ResponseResult {
irh.hooksLk.Lock()
defer irh.hooksLk.Unlock()
rha := &responseHookActions{}
for _, responseHooks := range irh.hooks {
responseHooks.hook(p, response, rha)
if rha.hasError() {
break
}
}
_ = irh.pubSub.Publish(internalResponseHookEvent{p, response, rha})
return rha.result()
}

Expand All @@ -76,10 +60,6 @@ func (rha *responseHookActions) result() ResponseResult {
}
}

func (rha *responseHookActions) hasError() bool {
return rha.err != nil
}

func (rha *responseHookActions) TerminateWithError(err error) {
rha.err = err
}
Expand Down
56 changes: 19 additions & 37 deletions responsemanager/hooks/blockhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,42 @@ package hooks

import (
"errors"
"sync"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-graphsync"
peer "github.com/libp2p/go-libp2p-core/peer"
)

// ErrPaused indicates a request should stop processing, but only cause it's paused
var ErrPaused = errors.New("request has been paused")

type blockHook struct {
key uint64
hook graphsync.OnOutgoingBlockHook
}

// OutgoingBlockHooks is a set of outgoing block hooks that can be processed
type OutgoingBlockHooks struct {
hooksLk sync.RWMutex
nextKey uint64
hooks []blockHook
pubSub *pubsub.PubSub
}

type internalBlockHookEvent struct {
p peer.ID
request graphsync.RequestData
block graphsync.BlockData
bha *blockHookActions
}

func blockHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalBlockHookEvent)
hook := subscriberFn.(graphsync.OnOutgoingBlockHook)
hook(ie.p, ie.request, ie.block, ie.bha)
return ie.bha.err
}

// NewBlockHooks returns a new list of outgoing block hooks
func NewBlockHooks() *OutgoingBlockHooks {
return &OutgoingBlockHooks{}
return &OutgoingBlockHooks{pubSub: pubsub.New(blockHookDispatcher)}
}

// Register registers an hook to process outgoing blocks in a response
func (obh *OutgoingBlockHooks) Register(hook graphsync.OnOutgoingBlockHook) graphsync.UnregisterHookFunc {
obh.hooksLk.Lock()
bh := blockHook{obh.nextKey, hook}
obh.nextKey++
obh.hooks = append(obh.hooks, bh)
obh.hooksLk.Unlock()
return func() {
obh.hooksLk.Lock()
defer obh.hooksLk.Unlock()
for i, matchHook := range obh.hooks {
if bh.key == matchHook.key {
obh.hooks = append(obh.hooks[:i], obh.hooks[i+1:]...)
return
}
}
}
return graphsync.UnregisterHookFunc(obh.pubSub.Subscribe(hook))
}

// BlockResult is the result of processing block hooks
Expand All @@ -55,15 +48,8 @@ type BlockResult struct {

// ProcessBlockHooks runs block hooks against a request and block data
func (obh *OutgoingBlockHooks) ProcessBlockHooks(p peer.ID, request graphsync.RequestData, blockData graphsync.BlockData) BlockResult {
obh.hooksLk.RLock()
defer obh.hooksLk.RUnlock()
bha := &blockHookActions{}
for _, bh := range obh.hooks {
bh.hook(p, request, blockData, bha)
if bha.hasError() {
break
}
}
_ = obh.pubSub.Publish(internalBlockHookEvent{p, request, blockData, bha})
return bha.result()
}

Expand All @@ -72,10 +58,6 @@ type blockHookActions struct {
extensions []graphsync.ExtensionData
}

func (bha *blockHookActions) hasError() bool {
return bha.err != nil
}

func (bha *blockHookActions) result() BlockResult {
return BlockResult{bha.err, bha.extensions}
}
Expand Down
49 changes: 18 additions & 31 deletions responsemanager/hooks/completedlisteners.go
Original file line number Diff line number Diff line change
@@ -1,53 +1,40 @@
package hooks

import (
"sync"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-graphsync"
peer "github.com/libp2p/go-libp2p-core/peer"
)

type completedListener struct {
key uint64
listener graphsync.OnResponseCompletedListener
}

// CompletedResponseListeners is a set of listeners for completed responses
type CompletedResponseListeners struct {
listenersLk sync.RWMutex
nextKey uint64
listeners []completedListener
pubSub *pubsub.PubSub
}

type internalCompletedResponseEvent struct {
p peer.ID
request graphsync.RequestData
status graphsync.ResponseStatusCode
}

func completedResponseDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalCompletedResponseEvent)
listener := subscriberFn.(graphsync.OnResponseCompletedListener)
listener(ie.p, ie.request, ie.status)
return nil
}

// NewCompletedResponseListeners returns a new list of completed response listeners
func NewCompletedResponseListeners() *CompletedResponseListeners {
return &CompletedResponseListeners{}
return &CompletedResponseListeners{pubSub: pubsub.New(completedResponseDispatcher)}
}

// Register registers an listener for completed responses
func (crl *CompletedResponseListeners) Register(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
crl.listenersLk.Lock()
cl := completedListener{crl.nextKey, listener}
crl.nextKey++
crl.listeners = append(crl.listeners, cl)
crl.listenersLk.Unlock()
return func() {
crl.listenersLk.Lock()
defer crl.listenersLk.Unlock()
for i, matchListener := range crl.listeners {
if cl.key == matchListener.key {
crl.listeners = append(crl.listeners[:i], crl.listeners[i+1:]...)
return
}
}
}
return graphsync.UnregisterHookFunc(crl.pubSub.Subscribe(listener))
}

// NotifyCompletedListeners runs notifies all completed listeners that a response has completed
func (crl *CompletedResponseListeners) NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
crl.listenersLk.RLock()
defer crl.listenersLk.RUnlock()
for _, listener := range crl.listeners {
listener.listener(p, request, status)
}
_ = crl.pubSub.Publish(internalCompletedResponseEvent{p, request, status})
}
Loading