From a96ad451dc7bc6c5258ef8c684dbc6d7270af7a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Wed, 18 Dec 2019 21:45:45 +0100 Subject: [PATCH] Replace router with internal router --- pubsub/gochannel/fanout.go | 34 ++++++++++++++++++--------------- pubsub/gochannel/fanout_test.go | 22 ++++++++++++++------- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/pubsub/gochannel/fanout.go b/pubsub/gochannel/fanout.go index 537c9d24d..8c8878cb0 100644 --- a/pubsub/gochannel/fanout.go +++ b/pubsub/gochannel/fanout.go @@ -17,13 +17,13 @@ import ( // inside the process. // // You need to call AddSubscription method for all topics that you want to listen to. -// This needs to be done *before* the router is started. +// This needs to be done *before* starting the FanOut. // // FanOut exposes the standard Subscriber interface. type FanOut struct { internalPubSub *GoChannel + internalRouter *message.Router - router *message.Router subscriber message.Subscriber logger watermill.LoggerAdapter @@ -32,16 +32,11 @@ type FanOut struct { subscribedLock sync.Mutex } -// NewFanOut creates new FanOut. -// The passed router should not be running yet. +// NewFanOut creates a new FanOut. func NewFanOut( - router *message.Router, subscriber message.Subscriber, logger watermill.LoggerAdapter, ) (*FanOut, error) { - if router == nil { - return nil, errors.New("missing router") - } if subscriber == nil { return nil, errors.New("missing subscriber") } @@ -49,16 +44,15 @@ func NewFanOut( logger = watermill.NopLogger{} } - select { - case <-router.Running(): - return nil, errors.New("the router is already running") - default: + router, err := message.NewRouter(message.RouterConfig{}, logger) + if err != nil { + return nil, err } return &FanOut{ internalPubSub: NewGoChannel(Config{}, logger), + internalRouter: router, - router: router, subscriber: subscriber, logger: logger, @@ -68,7 +62,7 @@ func NewFanOut( } // AddSubscription add an internal subscription for the given topic. -// You need to call this method with all topics that you want to listen to, before the router is started. +// You need to call this method with all topics that you want to listen to, before the FanOut is started. // AddSubscription is idempotent. func (f *FanOut) AddSubscription(topic string) { f.subscribedLock.Lock() @@ -84,7 +78,7 @@ func (f *FanOut) AddSubscription(topic string) { "topic": topic, }) - f.router.AddHandler( + f.internalRouter.AddHandler( fmt.Sprintf("fanout-%s", topic), topic, f.subscriber, @@ -96,6 +90,16 @@ func (f *FanOut) AddSubscription(topic string) { f.subscribedTopics[topic] = struct{}{} } +// Run runs the FanOut. +func (f *FanOut) Run(ctx context.Context) error { + return f.internalRouter.Run(ctx) +} + +// Running is closed when FanOut is running. +func (f *FanOut) Running() chan struct{} { + return f.internalRouter.Running() +} + func (f *FanOut) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) { return f.internalPubSub.Subscribe(ctx, topic) } diff --git a/pubsub/gochannel/fanout_test.go b/pubsub/gochannel/fanout_test.go index 6f363cb75..5381fc12f 100644 --- a/pubsub/gochannel/fanout_test.go +++ b/pubsub/gochannel/fanout_test.go @@ -23,7 +23,7 @@ func TestFanOut(t *testing.T) { router, err := message.NewRouter(message.RouterConfig{}, logger) require.NoError(t, err) - fanout, err := gochannel.NewFanOut(router, upstreamPubSub, logger) + fanout, err := gochannel.NewFanOut(upstreamPubSub, logger) require.NoError(t, err) fanout.AddSubscription(upstreamTopic) @@ -53,7 +53,13 @@ func TestFanOut(t *testing.T) { require.NoError(t, err) }() + go func() { + err := fanout.Run(ctx) + require.NoError(t, err) + }() + <-router.Running() + <-fanout.Running() go func() { for i := 0; i < messagesCount; i++ { @@ -70,20 +76,22 @@ func TestFanOut(t *testing.T) { require.Equal(t, uint64(workersCount*messagesCount), counter) } -func TestFanOut_RouterRunning(t *testing.T) { +func TestFanOut_RouterClosed(t *testing.T) { logger := watermill.NopLogger{} pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger) - router, err := message.NewRouter(message.RouterConfig{}, logger) + fanout, err := gochannel.NewFanOut(pubSub, logger) require.NoError(t, err) + fanout.AddSubscription("some-topic") + go func() { - err := router.Run(context.Background()) + err := fanout.Run(context.Background()) require.NoError(t, err) }() - <-router.Running() + <-fanout.Running() - _, err = gochannel.NewFanOut(router, pubSub, logger) - require.Error(t, err) + err = fanout.Close() + require.NoError(t, err) }