Skip to content

Commit

Permalink
Fix logging an error when router is closed due to context cancellation (
Browse files Browse the repository at this point in the history
  • Loading branch information
hlubek authored Jul 2, 2023
1 parent 7cb0e23 commit 7c01195
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 3 deletions.
11 changes: 8 additions & 3 deletions message/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (r *Router) Run(ctx context.Context) (err error) {

close(r.running)

go r.closeWhenAllHandlersStopped()
go r.closeWhenAllHandlersStopped(ctx)

<-r.closingInProgressCh
cancel()
Expand Down Expand Up @@ -449,7 +449,7 @@ func (r *Router) RunHandlers(ctx context.Context) error {

// closeWhenAllHandlersStopped closed router, when all handlers has stopped,
// because for example all subscriptions are closed.
func (r *Router) closeWhenAllHandlersStopped() {
func (r *Router) closeWhenAllHandlersStopped(ctx context.Context) {
r.handlersLock.RLock()
hasHandlers := len(r.handlers) == 0
r.handlersLock.RUnlock()
Expand All @@ -472,7 +472,12 @@ func (r *Router) closeWhenAllHandlersStopped() {
return
}

r.logger.Error("All handlers stopped, closing router", errors.New("all router handlers stopped"), nil)
// Only log an error if the context was not canceled, but handlers were stopped.
select {
case <-ctx.Done():
default:
r.logger.Error("All handlers stopped, closing router", errors.New("all router handlers stopped"), nil)
}

if err := r.Close(); err != nil {
r.logger.Error("Cannot close router", err, nil)
Expand Down
102 changes: 102 additions & 0 deletions message/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,3 +1319,105 @@ func TestRouter_wait_for_handlers_before_shutdown_timeout(t *testing.T) {

assert.EqualError(t, r.Close(), "router close timeout")
}

func TestRouter_context_cancel_does_not_log_error(t *testing.T) {
t.Parallel()

pub, sub := createPubSub()
defer func() {
assert.NoError(t, pub.Close())
assert.NoError(t, sub.Close())
}()

logger := watermill.NewCaptureLogger()

r, err := message.NewRouter(message.RouterConfig{}, logger)
require.NoError(t, err)

r.AddNoPublisherHandler(
"foo",
"subscribe_topic",
sub,
func(msg *message.Message) error {
return nil
},
)

ctx, cancel := context.WithCancel(context.Background())

go func() {
err := r.Run(ctx)
assert.NoError(t, err)
}()
<-r.Running()

// Cancel the context
cancel()

require.Eventually(t, func() bool {
return r.IsClosed()
}, 1*time.Second, 1*time.Millisecond, "Router should be closed after all handlers are stopped")

assert.Empty(t, logger.Captured()[watermill.ErrorLogLevel], "No error should be logged when context is canceled")
}

func TestRouter_stopping_all_handlers_logs_error(t *testing.T) {
t.Parallel()

pub, sub := createPubSub()
defer func() {
assert.NoError(t, pub.Close())
assert.NoError(t, sub.Close())
}()

logger := watermill.NewCaptureLogger()

r, err := message.NewRouter(message.RouterConfig{}, logger)
require.NoError(t, err)

r.AddNoPublisherHandler(
"foo",
"subscribe_topic",
sub,
func(msg *message.Message) error {
return nil
},
)

ctx := context.Background()

go func() {
err := r.Run(ctx)
assert.NoError(t, err)
}()
<-r.Running()

// Stop the subscriber - this should close the router with an error
err = sub.Close()
require.NoError(t, err)

require.Eventually(t, func() bool {
return r.IsClosed()
}, 1*time.Second, 1*time.Millisecond, "Router should be closed after all handlers are stopped")

expectedLogMessage := watermill.CapturedMessage{
Level: watermill.ErrorLogLevel,
Msg: "All handlers stopped, closing router",
Err: errors.New("all router handlers stopped"),
}

// Note: using logger.Has does not work here, since the error is not exposed (and thus not deep equal-able)
for _, capturedMessage := range logger.Captured()[watermill.ErrorLogLevel] {
if capturedMessage.Level == expectedLogMessage.Level &&
capturedMessage.Msg == expectedLogMessage.Msg &&
capturedMessage.Err.Error() == expectedLogMessage.Err.Error() {
return
}
}

assert.Fail(
t,
"expected log message not found, logs: %#v",
logger.Captured(),
)
}

0 comments on commit 7c01195

Please sign in to comment.