Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FanOut proposal #166

Merged
merged 10 commits into from
Dec 20, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions message/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ type HandlerFunc func(msg *Message) ([]*Message, error)
// NoPublishHandlerFunc is HandlerFunc alternative, which doesn't produce any messages.
type NoPublishHandlerFunc func(msg *Message) error

// ProxyHandler is a handler that passes the message unchanged from the subscriber to the publisher.
var ProxyHandler HandlerFunc = func(msg *Message) ([]*Message, error) {
roblaszczak marked this conversation as resolved.
Show resolved Hide resolved
return []*Message{msg}, nil
}

// HandlerMiddleware allows us to write something like decorators to HandlerFunc.
// It can execute something before handler (for example: modify consumed message)
// or after (modify produced messages, ack/nack on consumed message, handle errors, logging, etc.).
Expand Down
67 changes: 67 additions & 0 deletions pubsub/gochannel/fanout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package gochannel

import (
"context"
"errors"
"fmt"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)

type FanOut struct {
roblaszczak marked this conversation as resolved.
Show resolved Hide resolved
pubSub *GoChannel
roblaszczak marked this conversation as resolved.
Show resolved Hide resolved

router *message.Router
subscriber message.Subscriber

logger watermill.LoggerAdapter
}

func NewFanOut(
router *message.Router,
subscriber message.Subscriber,
logger watermill.LoggerAdapter,
) (FanOut, error) {
if router == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically, is there any case when we need to pass router?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to internal router in a96ad45

return FanOut{}, errors.New("missing router")
}
if subscriber == nil {
return FanOut{}, errors.New("missing subscriber")
}
if logger == nil {
logger = watermill.NopLogger{}
}

return FanOut{
pubSub: NewGoChannel(Config{}, logger),

router: router,
subscriber: subscriber,

logger: logger,
}, nil
}

func (f FanOut) AddSubscription(topic string) {
f.logger.Trace("Adding fan-out subscription for topic", watermill.LogFields{
"topic": topic,
})

f.router.AddHandler(
fmt.Sprintf("fanout-%s", topic),
topic,
f.subscriber,
topic,
f.pubSub,
message.ProxyHandler,
)
}

func (f FanOut) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
return f.pubSub.Subscribe(ctx, topic)
}

func (f FanOut) Close() error {
return f.pubSub.Close()
}
71 changes: 71 additions & 0 deletions pubsub/gochannel/fanout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package gochannel_test

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func TestFanOut(t *testing.T) {
logger := watermill.NopLogger{}

upstreamPubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
upstreamTopic := "upstream-topic"

router, err := message.NewRouter(message.RouterConfig{}, logger)
require.NoError(t, err)

fanout, err := gochannel.NewFanOut(router, upstreamPubSub, logger)
require.NoError(t, err)

fanout.AddSubscription(upstreamTopic)

var counter uint64

workersCount := 10
messagesCount := 100

for i := 0; i < workersCount; i++ {
router.AddNoPublisherHandler(
fmt.Sprintf("worker-%v", i),
upstreamTopic,
fanout,
func(msg *message.Message) error {
atomic.AddUint64(&counter, 1)
return nil
},
)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()

go func() {
err := router.Run(ctx)
require.NoError(t, err)
}()

<-router.Running()

go func() {
for i := 0; i < messagesCount; i++ {
msg := message.NewMessage(watermill.NewUUID(), nil)
err := upstreamPubSub.Publish(upstreamTopic, msg)
if err != nil {
panic(err)
}
}
}()

<-ctx.Done()

require.Equal(t, uint64(workersCount*messagesCount), counter)
}