Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Router closes publisher before message processing is done #446

Open
olivierjacob opened this issue May 30, 2024 · 0 comments
Open

Router closes publisher before message processing is done #446

olivierjacob opened this issue May 30, 2024 · 0 comments

Comments

@olivierjacob
Copy link

olivierjacob commented May 30, 2024

In router.go, line 615, we have this

for msg := range h.messagesCh {
	h.runningHandlersWgLock.Lock()
	h.runningHandlersWg.Add(1)
	h.runningHandlersWgLock.Unlock()

	go h.handleMessage(msg, middlewareHandler)
}

if h.publisher != nil {
	h.logger.Debug("Waiting for publisher to close", nil)
	if err := h.publisher.Close(); err != nil {
		h.logger.Error("Failed to close publisher", err, nil)
	}
	h.logger.Debug("Publisher closed", nil)
}

Whenever a message is received, a new goroutine is created to process it.
If the messagesCh is closed while the message is being processed, then we immediately proceed to closing the publisher, without waiting on the goroutine to return.

In my use case, the processing logic can take some time, and requires the publisher to be open to send the results back. In the event the router is shutdown, the publisher will be closed when the goroutine tries to publish the results.

It seems that the runningHandlersWg is waited on outside of this scope (the router uses it to wait on handlers). But it could also be waited on before closing the publisher in each handler.

for msg := range h.messagesCh {
	h.runningHandlersWgLock.Lock()
	h.runningHandlersWg.Add(1)
	h.runningHandlersWgLock.Unlock()

	go h.handleMessage(msg, middlewareHandler)
}

if h.publisher != nil {
	h.logger.Debug("Waiting on message processing go routines", nil)
	h.runningHandlersWg.Wait()
	
	h.logger.Debug("Waiting for publisher to close", nil)
	if err := h.publisher.Close(); err != nil {
		h.logger.Error("Failed to close publisher", err, nil)
	}
	h.logger.Debug("Publisher closed", nil)
}

In a pub/sub setup, it makes sense to close the subscriber to avoid processing more messages. But the publisher should remain open until all the goroutines processing messages terminate. Or is there something I misunderstand? Or wrongly assuming?

EDIT: it is not correct to use the h.runningHandlersWg waitGroup, because it is shared between all the handlers. Instead, it would require a per-handler wait group, but the idea is the same.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant