From 9b323be0244ae7e4280c57995a6ef102b56cc326 Mon Sep 17 00:00:00 2001 From: krhubert Date: Wed, 17 Apr 2019 14:39:46 +0200 Subject: [PATCH] Replace custom pubsub --- api/api.go | 10 +++++--- api/listen_event.go | 7 +++-- api/listen_result.go | 7 +++-- api/listen_task.go | 7 +++-- api/submit_result.go | 3 +-- event/event.go | 7 ----- go.mod | 1 + go.sum | 2 ++ pubsub/pubsub.go | 49 ----------------------------------- pubsub/pubsub_test.go | 59 ------------------------------------------- 10 files changed, 20 insertions(+), 132 deletions(-) delete mode 100644 pubsub/pubsub.go delete mode 100644 pubsub/pubsub_test.go diff --git a/api/api.go b/api/api.go index 4b679966a..2608386b7 100644 --- a/api/api.go +++ b/api/api.go @@ -3,17 +3,19 @@ package api import ( "fmt" + "github.com/cskr/pubsub" "github.com/mesg-foundation/core/container" "github.com/mesg-foundation/core/database" "github.com/mesg-foundation/core/event" "github.com/mesg-foundation/core/execution" - "github.com/mesg-foundation/core/pubsub" "github.com/mesg-foundation/core/service" uuid "github.com/satori/go.uuid" ) // API exposes all functionalities of MESG core. type API struct { + ps *pubsub.PubSub + db database.ServiceDB execDB database.ExecutionDB container container.Container @@ -22,6 +24,7 @@ type API struct { // New creates a new API with given options. func New(c container.Container, db database.ServiceDB, execDB database.ExecutionDB) *API { return &API{ + ps: pubsub.New(0), container: c, db: db, execDB: execDB, @@ -94,7 +97,8 @@ func (a *API) EmitEvent(token, eventKey string, eventData map[string]interface{} if err != nil { return err } - event.Publish() + + go a.ps.Pub(event, s.EventSubscriptionChannel()) return nil } @@ -126,7 +130,7 @@ func (a *API) ExecuteTask(serviceID, taskKey string, inputData map[string]interf if err = a.execDB.Save(exec); err != nil { return "", err } - go pubsub.Publish(s.TaskSubscriptionChannel(), exec) + go a.ps.Pub(exec, s.TaskSubscriptionChannel()) return exec.ID, nil } diff --git a/api/listen_event.go b/api/listen_event.go index b2e644589..2992d8483 100644 --- a/api/listen_event.go +++ b/api/listen_event.go @@ -2,7 +2,6 @@ package api import ( "github.com/mesg-foundation/core/event" - "github.com/mesg-foundation/core/pubsub" "github.com/mesg-foundation/core/service" "github.com/mesg-foundation/core/x/xstrings" ) @@ -73,9 +72,9 @@ func (l *EventListener) listen(serviceID string) error { } func (l *EventListener) listenLoop(service *service.Service) { - channel := service.EventSubscriptionChannel() - subscription := pubsub.Subscribe(channel) - defer pubsub.Unsubscribe(channel, subscription) + topic := service.EventSubscriptionChannel() + subscription := l.api.ps.Sub(topic) + defer l.api.ps.Unsub(subscription, topic) close(l.listening) for { diff --git a/api/listen_result.go b/api/listen_result.go index dd221e8f4..829198abd 100644 --- a/api/listen_result.go +++ b/api/listen_result.go @@ -2,7 +2,6 @@ package api import ( "github.com/mesg-foundation/core/execution" - "github.com/mesg-foundation/core/pubsub" "github.com/mesg-foundation/core/service" "github.com/mesg-foundation/core/x/xstrings" ) @@ -89,9 +88,9 @@ func (l *ResultListener) listen(serviceID string) error { } func (l *ResultListener) listenLoop(service *service.Service) { - channel := service.ResultSubscriptionChannel() - subscription := pubsub.Subscribe(channel) - defer pubsub.Unsubscribe(channel, subscription) + topic := service.ResultSubscriptionChannel() + subscription := l.api.ps.Sub(topic) + defer l.api.ps.Unsub(subscription, topic) close(l.listening) for { diff --git a/api/listen_task.go b/api/listen_task.go index a98b8f661..3d38b0b7d 100644 --- a/api/listen_task.go +++ b/api/listen_task.go @@ -2,7 +2,6 @@ package api import ( "github.com/mesg-foundation/core/execution" - "github.com/mesg-foundation/core/pubsub" "github.com/mesg-foundation/core/service" ) @@ -53,9 +52,9 @@ func (l *TaskListener) listen(token string) error { } func (l *TaskListener) listenLoop(service *service.Service) { - channel := service.TaskSubscriptionChannel() - subscription := pubsub.Subscribe(channel) - defer pubsub.Unsubscribe(channel, subscription) + topic := service.TaskSubscriptionChannel() + subscription := l.api.ps.Sub(topic) + defer l.api.ps.Unsub(subscription, topic) close(l.listening) for { diff --git a/api/submit_result.go b/api/submit_result.go index f77375994..5b81f04a3 100644 --- a/api/submit_result.go +++ b/api/submit_result.go @@ -6,7 +6,6 @@ import ( "github.com/mesg-foundation/core/database" "github.com/mesg-foundation/core/execution" - "github.com/mesg-foundation/core/pubsub" ) // SubmitResult submits results for executionID. @@ -31,7 +30,7 @@ func (s *resultSubmitter) Submit(executionID string, outputKey string, outputDat exec, stateChanged, err := s.processExecution(executionID, outputKey, outputData) if stateChanged { // only publish to listeners when the execution's state changed. - go pubsub.Publish(exec.Service.ResultSubscriptionChannel(), exec) + go s.api.ps.Pub(exec, exec.Service.ResultSubscriptionChannel()) } // always return any error to the service. return err diff --git a/event/event.go b/event/event.go index aaf6432b7..4b3fb2a72 100644 --- a/event/event.go +++ b/event/event.go @@ -3,7 +3,6 @@ package event import ( "time" - "github.com/mesg-foundation/core/pubsub" "github.com/mesg-foundation/core/service" ) @@ -31,9 +30,3 @@ func Create(s *service.Service, eventKey string, eventData map[string]interface{ CreatedAt: time.Now(), }, nil } - -// Publish publishes an event for every listener. -func (event *Event) Publish() { - channel := event.Service.EventSubscriptionChannel() - go pubsub.Publish(channel, event) -} diff --git a/go.mod b/go.mod index 3e92d897c..db65a2d6a 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/briandowns/spinner v0.0.0-20180822135157-9f016caa1359 github.com/cnf/structhash v0.0.0-20180104161610-62a607eb0224 github.com/containerd/continuity v0.0.0-20180712174259-0377f7d76720 // indirect + github.com/cskr/pubsub v1.0.2 github.com/docker/cli v0.0.0-20190129171106-b258f458cc8d github.com/docker/distribution v0.0.0-20180720172123-0dae0957e5fe // indirect github.com/docker/docker v0.0.0-20180803200506-eeea12db7a65 diff --git a/go.sum b/go.sum index 104c793d1..aa43ab0f6 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/cnf/structhash v0.0.0-20180104161610-62a607eb0224 h1:rnCKRrdSBqc061l0 github.com/cnf/structhash v0.0.0-20180104161610-62a607eb0224/go.mod h1:pCxVEbcm3AMg7ejXyorUXi6HQCzOIBf7zEDVPtw0/U4= github.com/containerd/continuity v0.0.0-20180712174259-0377f7d76720 h1:T5LkgEMACPq7+VPRnkh51WhyV+Q0waOGGtp37Y82org= github.com/containerd/continuity v0.0.0-20180712174259-0377f7d76720/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= +github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= +github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go deleted file mode 100644 index 30dcb96f2..000000000 --- a/pubsub/pubsub.go +++ /dev/null @@ -1,49 +0,0 @@ -package pubsub - -import ( - "sync" -) - -var ( - listeners = make(map[string][]chan Message) - mu sync.Mutex -) - -// Message sends subscribe/publish messages. -type Message interface{} - -// Publish publishes a message to a channel. -func Publish(channel string, data Message) { - mu.Lock() - defer mu.Unlock() - for _, listener := range listeners[channel] { - listener <- data - } -} - -// Subscribe subscribes to the channel and returns listener for it. -func Subscribe(channel string) chan Message { - mu.Lock() - defer mu.Unlock() - listener := make(chan Message) - if listeners[channel] == nil { - listeners[channel] = make([]chan Message, 0) - } - listeners[channel] = append(listeners[channel], listener) - return listener -} - -// Unsubscribe unsubscribes a listener from listening channel. -func Unsubscribe(channel string, listener chan Message) { - mu.Lock() - defer mu.Unlock() - for i, l := range listeners[channel] { - if l == listener { - listeners[channel] = append(listeners[channel][:i], listeners[channel][i+1:]...) - if len(listeners[channel]) == 0 { - listeners[channel] = nil - } - return - } - } -} diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go deleted file mode 100644 index e4df9f297..000000000 --- a/pubsub/pubsub_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package pubsub - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -type messageStructTest struct { - test string -} - -func TestPublish(t *testing.T) { - key := "TestPublish" - data := messageStructTest{test: "TestPublish"} - - res := Subscribe(key) - go Publish(key, data) - x := <-res - require.Equal(t, x, data) -} - -func TestPublishMultipleListeners(t *testing.T) { - key := "TestPublishMultipleListeners" - data := messageStructTest{test: "TestPublishMultipleListeners"} - res1 := Subscribe(key) - res2 := Subscribe(key) - go Publish(key, data) - x := <-res1 - y := <-res2 - require.Equal(t, x, data) - require.Equal(t, y, data) -} - -func TestSubscribe(t *testing.T) { - key := "TestSubscribe" - res := Subscribe(key) - require.NotNil(t, res) - require.Len(t, listeners[key], 1) -} - -func TestSubscribeMultipleTimes(t *testing.T) { - key := "TestSubscribeMultipleTimes" - Subscribe(key) - Subscribe(key) - require.Len(t, listeners[key], 2) -} - -func TestUnsubscribe(t *testing.T) { - key := "TestUnsubscribe" - channel := Subscribe(key) - channel1 := Subscribe(key) - Unsubscribe(key, channel) - require.Len(t, listeners[key], 1) - require.NotNil(t, listeners[key]) - Unsubscribe(key, channel1) - require.Len(t, listeners[key], 0) - require.Nil(t, listeners[key]) -}