From 85539ce17cf8718e2bbe05758d70d4c1746aa725 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=94=99=E8=90=BD?= Date: Tue, 9 Aug 2022 17:22:45 +0800 Subject: [PATCH 1/4] Add config for subscriber concurrency contorl and remove resend of nacked message --- pubsub/gochannel/pubsub.go | 41 ++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index f47d0ca2b..c13f4cf9d 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -2,10 +2,10 @@ package gochannel import ( "context" - "sync" - + "fmt" "github.com/lithammer/shortuuid/v3" "github.com/pkg/errors" + "sync" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" @@ -26,6 +26,10 @@ type Config struct { // When true, Publish will block until subscriber Ack's the message. // If there are no subscribers, Publish will not block (also when Persistent is true). BlockPublishUntilSubscriberAck bool + + // The concurrent count of subscriber to handle the messages. + // Default is 1, which means the subscriber will be blocked until the last message ack or nack. + SubscriberConcurrentCount int } // GoChannel is the simplest Pub/Sub implementation. @@ -60,7 +64,9 @@ func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel { if logger == nil { logger = watermill.NopLogger{} } - + if config.SubscriberConcurrentCount == 0 { + config.SubscriberConcurrentCount = 1 + } return &GoChannel{ config: config, @@ -69,7 +75,6 @@ func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel { logger: logger.With(watermill.LogFields{ "pubsub_uuid": shortuuid.New(), }), - closing: make(chan struct{}), persistedMessages: map[string][]*message.Message{}, @@ -113,7 +118,6 @@ func (g *GoChannel) Publish(topic string, messages ...*message.Message) error { if err != nil { return err } - if g.config.BlockPublishUntilSubscriberAck { g.waitForAckFromSubscribers(msg, ackedBySubscribers) } @@ -145,6 +149,7 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan g.logger.Info("No subscribers to send message", logFields) return ackedBySubscribers, nil } + fmt.Printf("\nsend message: %v", message) go func(subscribers []*subscriber) { wg := &sync.WaitGroup{} @@ -186,11 +191,12 @@ func (g *GoChannel) Subscribe(ctx context.Context, topic string) (<-chan *messag subLock.(*sync.Mutex).Lock() s := &subscriber{ - ctx: ctx, - uuid: watermill.NewUUID(), - outputChannel: make(chan *message.Message, g.config.OutputChannelBuffer), - logger: g.logger, - closing: make(chan struct{}), + ctx: ctx, + uuid: watermill.NewUUID(), + outputChannel: make(chan *message.Message, g.config.OutputChannelBuffer), + sendingChannel: make(chan struct{}, g.config.SubscriberConcurrentCount), + logger: g.logger, + closing: make(chan struct{}), } go func(s *subscriber, g *GoChannel) { @@ -315,8 +321,9 @@ type subscriber struct { uuid string - sending sync.Mutex - outputChannel chan *message.Message + sending sync.Mutex + sendingChannel chan struct{} + outputChannel chan *message.Message logger watermill.LoggerAdapter closed bool @@ -342,13 +349,14 @@ func (s *subscriber) Close() { } func (s *subscriber) sendMessageToSubscriber(msg *message.Message, logFields watermill.LogFields) { - s.sending.Lock() - defer s.sending.Unlock() + s.sendingChannel <- struct{}{} + defer func() { + <-s.sendingChannel + }() ctx, cancelCtx := context.WithCancel(s.ctx) defer cancelCtx() -SendToSubscriber: for { // copy the message to prevent ack/nack propagation to other consumers // also allows to make retries on a fresh copy of the original message @@ -375,8 +383,7 @@ SendToSubscriber: s.logger.Trace("Message acked", logFields) return case <-msgToSend.Nacked(): - s.logger.Trace("Nack received, resending message", logFields) - continue SendToSubscriber + s.logger.Trace("Nack received", logFields) case <-s.closing: s.logger.Trace("Closing, message discarded", logFields) return From e2d35f6975fd86e1c2eafd44e553db42195deecf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=94=99=E8=90=BD?= Date: Wed, 24 Aug 2022 16:02:56 +0800 Subject: [PATCH 2/4] update commit and remove useless log --- pubsub/gochannel/pubsub.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index c13f4cf9d..b6e45d7da 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -2,7 +2,6 @@ package gochannel import ( "context" - "fmt" "github.com/lithammer/shortuuid/v3" "github.com/pkg/errors" "sync" @@ -28,7 +27,7 @@ type Config struct { BlockPublishUntilSubscriberAck bool // The concurrent count of subscriber to handle the messages. - // Default is 1, which means the subscriber will be blocked until the last message ack or nack. + // Default value is 1, which means the subscriber will be blocked until the last message ack or nack. SubscriberConcurrentCount int } @@ -83,7 +82,6 @@ func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel { // Publish in GoChannel is NOT blocking until all consumers consume. // Messages will be send in background. -// // Messages may be persisted or not, depending of persistent attribute. func (g *GoChannel) Publish(topic string, messages ...*message.Message) error { if g.isClosed() { @@ -149,7 +147,6 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan g.logger.Info("No subscribers to send message", logFields) return ackedBySubscribers, nil } - fmt.Printf("\nsend message: %v", message) go func(subscribers []*subscriber) { wg := &sync.WaitGroup{} @@ -384,6 +381,7 @@ func (s *subscriber) sendMessageToSubscriber(msg *message.Message, logFields wat return case <-msgToSend.Nacked(): s.logger.Trace("Nack received", logFields) + return case <-s.closing: s.logger.Trace("Closing, message discarded", logFields) return From 2496a38610ed663f15a19fc0ca53a7c18a9a1232 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=94=99=E8=90=BD?= Date: Wed, 24 Aug 2022 16:16:23 +0800 Subject: [PATCH 3/4] revert non related codes --- pubsub/gochannel/pubsub.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index b6e45d7da..79cb68f7d 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -353,7 +353,7 @@ func (s *subscriber) sendMessageToSubscriber(msg *message.Message, logFields wat ctx, cancelCtx := context.WithCancel(s.ctx) defer cancelCtx() - +SendToSubscriber: for { // copy the message to prevent ack/nack propagation to other consumers // also allows to make retries on a fresh copy of the original message @@ -381,7 +381,7 @@ func (s *subscriber) sendMessageToSubscriber(msg *message.Message, logFields wat return case <-msgToSend.Nacked(): s.logger.Trace("Nack received", logFields) - return + continue SendToSubscriber case <-s.closing: s.logger.Trace("Closing, message discarded", logFields) return From e10a66007454536adf08a6825a151cfcb6c30bb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=94=99=E8=90=BD?= Date: Wed, 24 Aug 2022 16:28:09 +0800 Subject: [PATCH 4/4] revert non related codes --- pubsub/gochannel/pubsub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index 79cb68f7d..b18380215 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -380,7 +380,7 @@ SendToSubscriber: s.logger.Trace("Message acked", logFields) return case <-msgToSend.Nacked(): - s.logger.Trace("Nack received", logFields) + s.logger.Trace("Nack received, resending message", logFields) continue SendToSubscriber case <-s.closing: s.logger.Trace("Closing, message discarded", logFields)