Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace original context in pub/sub GoChannel (SendToSubscriber) #464

Open
bi0dread opened this issue Aug 5, 2024 · 6 comments
Open

Replace original context in pub/sub GoChannel (SendToSubscriber) #464

bi0dread opened this issue Aug 5, 2024 · 6 comments

Comments

@bi0dread
Copy link

bi0dread commented Aug 5, 2024

Hi
you have replaced original msg context with msgToSend.SetContext(ctx) and we lost the original context!
so to speak :

when we call msg.Copy() we don't set original context on new msg

func (s *subscriber) sendMessageToSubscriber(msg *message.Message, logFields watermill.LogFields) {
	s.sending.Lock()
	defer s.sending.Unlock()

	ctx, cancelCtx := context.WithCancel(s.ctx)
	defer cancelCtx()

SendToSubscriber:
	for {
		// copy the message to prevent ack/nack propagation to other consumers
		// also allows to make retries on a fresh copy of the original message
		msgToSend := msg.Copy()
		msgToSend.SetContext(ctx)

		s.logger.Trace("Sending msg to subscriber", logFields)

		if s.closed {
			s.logger.Info("Pub/Sub closed, discarding msg", logFields)
			return
		}

		select {
		case s.outputChannel <- msgToSend:
			s.logger.Trace("Sent message to subscriber", logFields)
		case <-s.closing:
			s.logger.Trace("Closing, message discarded", logFields)
			return
		}

		select {
		case <-msgToSend.Acked():
			s.logger.Trace("Message acked", logFields)
			return
		case <-msgToSend.Nacked():
			s.logger.Trace("Nack received, resending message", logFields)
			continue SendToSubscriber
		case <-s.closing:
			s.logger.Trace("Closing, message discarded", logFields)
			return
		}
	}
}
@bi0dread bi0dread changed the title replace original context in pub/sub GoChannel (SendToSubscriber) Replace original context in pub/sub GoChannel (SendToSubscriber) Aug 5, 2024
@m110
Copy link
Member

m110 commented Aug 12, 2024

Hey @bi0dread. This is to maintain the same behavior as other Pubs/Subs. Other backends transfer the message over the network, so they can't keep the context. I guess we could make it an option in the config to keep the context, though, as a special case.

@yashb042
Copy link

@m110 This issue is what I am also facing. At least in the local pub-sub case, the context must be propagated (even though it doesn't match the behaviour of other pub-subs.

Let me know if you want me to add this feature and push.

@m110
Copy link
Member

m110 commented Aug 29, 2024

Hey @yashb042. I think it makes sense, just please keep it as a config option with proper disclaimer that it doesn't match the generic behavior. :)

@yashb042
Copy link

yashb042 commented Sep 3, 2024

Okay, picking this up

@yashb042
Copy link

yashb042 commented Sep 3, 2024

https://github.com/ThreeDotsLabs/watermill/pull/487/files

Adding test cases soon
Please let me know if the variable addition is okay.
Have not added extra variables in Message struct, because the Message struct is used by every publisher-subscriber

@yashb042
Copy link

yashb042 commented Sep 4, 2024

Added test cases as well, please review.

Couldn't test the func - TestMessageCtx in test_pubsub.go
It's mentioned that "ExactlyOnceDelivery test is not supported yet"

Please let me know if anything to be done there

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants