Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FanOut proposal #166

Merged
merged 10 commits into from
Dec 20, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions message/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ type HandlerFunc func(msg *Message) ([]*Message, error)
// NoPublishHandlerFunc is HandlerFunc alternative, which doesn't produce any messages.
type NoPublishHandlerFunc func(msg *Message) error

// PassthroughHandler is a handler that passes the message unchanged from the subscriber to the publisher.
var PassthroughHandler HandlerFunc = func(msg *Message) ([]*Message, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use the alias Messages instead of []*Message, as we've done elsewhere.
If you think it'll improve readability

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HandlerFunc doesn't use it though, so maybe we should change them all at once?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I thought I saw some HandlerFuncs using Messages instead, maybe I was wrong

return []*Message{msg}, nil
}

// HandlerMiddleware allows us to write something like decorators to HandlerFunc.
// It can execute something before handler (for example: modify consumed message)
// or after (modify produced messages, ack/nack on consumed message, handle errors, logging, etc.).
Expand Down
99 changes: 99 additions & 0 deletions pubsub/gochannel/fanout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package gochannel

import (
"context"
"errors"
"fmt"
"sync"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)

// FanOut is a component that receives messages from the subscriber and passes them
// to all publishers. In effect, messages are "multiplied".
//
// A typical use case for using FanOut is having one external subscription and multiple workers
// inside the process.
//
// You need to call AddSubscription method for all topics that you want to listen to.
// This needs to be done *before* the router is started.
//
// FanOut exposes the standard Subscriber interface.
type FanOut struct {
roblaszczak marked this conversation as resolved.
Show resolved Hide resolved
internalPubSub *GoChannel

router *message.Router
subscriber message.Subscriber

logger watermill.LoggerAdapter

subscribedTopics map[string]struct{}
subscribedLock sync.Mutex
}

// NewFanOut creates new FanOut.
// The passed router should not be running yet.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle, we could check if the router is already running (router.Running())?
Should we return error if this is the case?
Then cover it with the appropriate unit test ofc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, that's a good idea. 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 6939c53

func NewFanOut(
router *message.Router,
subscriber message.Subscriber,
logger watermill.LoggerAdapter,
) (*FanOut, error) {
if router == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically, is there any case when we need to pass router?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to internal router in a96ad45

return nil, errors.New("missing router")
}
if subscriber == nil {
return nil, errors.New("missing subscriber")
}
if logger == nil {
logger = watermill.NopLogger{}
}

return &FanOut{
internalPubSub: NewGoChannel(Config{}, logger),

router: router,
subscriber: subscriber,

logger: logger,

subscribedTopics: map[string]struct{}{},
}, nil
}

// AddSubscription add an internal subscription for the given topic.
// You need to call this method with all topics that you want to listen to, before the router is started.
// AddSubscription is idempotent.
func (f *FanOut) AddSubscription(topic string) {
f.subscribedLock.Lock()
defer f.subscribedLock.Unlock()

_, ok := f.subscribedTopics[topic]
if ok {
// Subscription already exists
return
}

f.logger.Trace("Adding fan-out subscription for topic", watermill.LogFields{
"topic": topic,
})

f.router.AddHandler(
fmt.Sprintf("fanout-%s", topic),
topic,
f.subscriber,
topic,
f.internalPubSub,
message.PassthroughHandler,
)

f.subscribedTopics[topic] = struct{}{}
}

func (f *FanOut) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
return f.internalPubSub.Subscribe(ctx, topic)
}

func (f *FanOut) Close() error {
return f.internalPubSub.Close()
}
71 changes: 71 additions & 0 deletions pubsub/gochannel/fanout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package gochannel_test

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func TestFanOut(t *testing.T) {
logger := watermill.NopLogger{}

upstreamPubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
upstreamTopic := "upstream-topic"

router, err := message.NewRouter(message.RouterConfig{}, logger)
require.NoError(t, err)

fanout, err := gochannel.NewFanOut(router, upstreamPubSub, logger)
require.NoError(t, err)

fanout.AddSubscription(upstreamTopic)

var counter uint64

workersCount := 10
messagesCount := 100

for i := 0; i < workersCount; i++ {
router.AddNoPublisherHandler(
fmt.Sprintf("worker-%v", i),
upstreamTopic,
fanout,
func(msg *message.Message) error {
atomic.AddUint64(&counter, 1)
return nil
},
)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()

go func() {
err := router.Run(ctx)
require.NoError(t, err)
}()

<-router.Running()

go func() {
for i := 0; i < messagesCount; i++ {
msg := message.NewMessage(watermill.NewUUID(), nil)
err := upstreamPubSub.Publish(upstreamTopic, msg)
if err != nil {
panic(err)
}
}
}()

<-ctx.Done()

require.Equal(t, uint64(workersCount*messagesCount), counter)
}