Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #275 and add gochannel concurreny control #295

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions pubsub/gochannel/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package gochannel

import (
"context"
"sync"

"github.com/lithammer/shortuuid/v3"
"github.com/pkg/errors"
"sync"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
Expand All @@ -26,6 +25,10 @@ type Config struct {
// When true, Publish will block until subscriber Ack's the message.
// If there are no subscribers, Publish will not block (also when Persistent is true).
BlockPublishUntilSubscriberAck bool

// The concurrent count of subscriber to handle the messages.
// Default value is 1, which means the subscriber will be blocked until the last message ack or nack.
SubscriberConcurrentCount int
}

// GoChannel is the simplest Pub/Sub implementation.
Expand Down Expand Up @@ -60,7 +63,9 @@ func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel {
if logger == nil {
logger = watermill.NopLogger{}
}

if config.SubscriberConcurrentCount == 0 {
config.SubscriberConcurrentCount = 1
}
return &GoChannel{
config: config,

Expand All @@ -69,7 +74,6 @@ func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel {
logger: logger.With(watermill.LogFields{
"pubsub_uuid": shortuuid.New(),
}),

closing: make(chan struct{}),

persistedMessages: map[string][]*message.Message{},
Expand All @@ -78,7 +82,6 @@ func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel {

// Publish in GoChannel is NOT blocking until all consumers consume.
// Messages will be send in background.
//
// Messages may be persisted or not, depending of persistent attribute.
func (g *GoChannel) Publish(topic string, messages ...*message.Message) error {
if g.isClosed() {
Expand Down Expand Up @@ -113,7 +116,6 @@ func (g *GoChannel) Publish(topic string, messages ...*message.Message) error {
if err != nil {
return err
}

if g.config.BlockPublishUntilSubscriberAck {
g.waitForAckFromSubscribers(msg, ackedBySubscribers)
}
Expand Down Expand Up @@ -186,11 +188,12 @@ func (g *GoChannel) Subscribe(ctx context.Context, topic string) (<-chan *messag
subLock.(*sync.Mutex).Lock()

s := &subscriber{
ctx: ctx,
uuid: watermill.NewUUID(),
outputChannel: make(chan *message.Message, g.config.OutputChannelBuffer),
logger: g.logger,
closing: make(chan struct{}),
ctx: ctx,
uuid: watermill.NewUUID(),
outputChannel: make(chan *message.Message, g.config.OutputChannelBuffer),
sendingChannel: make(chan struct{}, g.config.SubscriberConcurrentCount),
logger: g.logger,
closing: make(chan struct{}),
}

go func(s *subscriber, g *GoChannel) {
Expand Down Expand Up @@ -315,8 +318,9 @@ type subscriber struct {

uuid string

sending sync.Mutex
outputChannel chan *message.Message
sending sync.Mutex
sendingChannel chan struct{}
outputChannel chan *message.Message

logger watermill.LoggerAdapter
closed bool
Expand All @@ -342,12 +346,13 @@ func (s *subscriber) Close() {
}

func (s *subscriber) sendMessageToSubscriber(msg *message.Message, logFields watermill.LogFields) {
s.sending.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might still need the lock to avoid race conditions with Close.

defer s.sending.Unlock()
s.sendingChannel <- struct{}{}
defer func() {
<-s.sendingChannel
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sendMessageToSubscriber is also called to deliver persisted messages to newly joining subscribers. I'm not sure if this will behave as expected in this case. Can we have some tests to cover it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll check again and add some tests for it.

}()

ctx, cancelCtx := context.WithCancel(s.ctx)
defer cancelCtx()

SendToSubscriber:
for {
// copy the message to prevent ack/nack propagation to other consumers
Expand Down