From b8e119a7901d650829f3bedc1ac9368dbb31611a Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Fri, 30 Aug 2019 17:46:38 -0400 Subject: [PATCH 1/2] FIX #513. Added connection option `NoCallbacksAfterClientClose` which disables the invocation of callbacks if the client code directly calls Close(). If case of a library initiated close, callbacks will be invoked as they currently are. --- nats.go | 27 ++++++++++++++---- test/reconnect_test.go | 63 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 6 deletions(-) diff --git a/nats.go b/nats.go index b3a87449a..aca5c56c8 100644 --- a/nats.go +++ b/nats.go @@ -347,6 +347,11 @@ type Options struct { // UseOldRequestStyle forces the old method of Requests that utilize // a new Inbox and a new Subscription for each request. UseOldRequestStyle bool + + // NoCallbacksAfterClientClose allows preventing the invocation of + // callbacks after Close() is called. Client won't receive notifications + // when Close is invoked by user code. Default is to invoke the callbacks. + NoCallbacksAfterClientClose bool } const ( @@ -883,6 +888,16 @@ func UseOldRequestStyle() Option { } } +// NoCallbacksAfterClientClose is an Option to disable callbacks when user code +// calls Close(). If close is initiated by any other condition, callbacks +// if any will be invoked. +func NoCallbacksAfterClientClose() Option { + return func(o *Options) error { + o.NoCallbacksAfterClientClose = true + return nil + } +} + // Handler processing // SetDisconnectHandler will set the disconnect event handler. @@ -1929,7 +1944,7 @@ func (nc *Conn) doReconnect(err error) { nc.err = ErrNoServers } nc.mu.Unlock() - nc.Close() + nc.close(CLOSED, true, nil) } // processOpErr handles errors from reading or parsing the protocol. @@ -1964,7 +1979,7 @@ func (nc *Conn) processOpErr(err error) { nc.status = DISCONNECTED nc.err = err nc.mu.Unlock() - nc.Close() + nc.close(CLOSED, true, nil) } // dispatch is responsible for calling any async callbacks @@ -2496,7 +2511,7 @@ func (nc *Conn) processErr(ie string) { nc.mu.Unlock() } if close { - nc.Close() + nc.close(CLOSED, true, nil) } } @@ -3691,7 +3706,7 @@ func (nc *Conn) close(status Status, doCBs bool, err error) { // all blocking calls, such as Flush() and NextMsg() func (nc *Conn) Close() { if nc != nil { - nc.close(CLOSED, true, nil) + nc.close(CLOSED, !nc.Opts.NoCallbacksAfterClientClose, nil) } } @@ -3770,12 +3785,12 @@ func (nc *Conn) drainConnection() { err := nc.Flush() if err != nil { pushErr(err) - nc.Close() + nc.close(CLOSED, true, nil) return } // Move to closed state. - nc.Close() + nc.close(CLOSED, true, nil) } // Drain will put a connection into a drain state. All subscriptions will diff --git a/test/reconnect_test.go b/test/reconnect_test.go index 7fd9f84a5..0f6d5b587 100644 --- a/test/reconnect_test.go +++ b/test/reconnect_test.go @@ -722,3 +722,66 @@ func TestReconnectTLSHostNoIP(t *testing.T) { t.Fatalf("ReconnectedCB should have been triggered: %v", nc.LastError()) } } + +func TestConnCloseNoCallback(t *testing.T) { + ts := startReconnectServer(t) + defer ts.Shutdown() + + cch := make(chan bool) + opts := reconnectOpts + opts.ClosedCB = func(_ *nats.Conn) { + cch <- true + } + opts.NoCallbacksAfterClientClose = true + + nc, err := opts.Connect() + if err != nil { + t.Fatalf("Should have connected ok: %v", err) + } + defer nc.Close() + + nc.Flush() + // Close the connection, we don't expect to get a notification + nc.Close() + // Shutdown the server + ts.Shutdown() + + // Even on Windows (where a createConn takes more than a second) + // we should be able to break the reconnect loop with the following + // timeout. + if err := WaitTime(cch, 3*time.Second); err != nil { + // yay no callback + } else { + t.Fatal("Got a closed callback, but shouldn't have") + } +} + +func TestConnCloseNoCallbackFromOptionsFunc(t *testing.T) { + ts := startReconnectServer(t) + defer ts.Shutdown() + + cch := make(chan bool) + nc, err := nats.Connect(reconnectOpts.Url, nats.NoCallbacksAfterClientClose(), + nats.ClosedHandler(func(_ *nats.Conn) { + cch <- true + })) + if err != nil { + t.Fatalf("Should have connected ok: %v", err) + } + defer nc.Close() + + nc.Flush() + // Close the connection, we don't expect to get a notification + nc.Close() + // Shutdown the server + ts.Shutdown() + + // Even on Windows (where a createConn takes more than a second) + // we should be able to break the reconnect loop with the following + // timeout. + if err := WaitTime(cch, 3*time.Second); err != nil { + // yay no callback + } else { + t.Fatal("Got a closed callback, but shouldn't have") + } +} From d2b2945029e4bcc762922651aa06772a97be7895 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Tue, 3 Sep 2019 16:42:36 -0500 Subject: [PATCH 2/2] review comments --- test/reconnect_test.go | 64 ++++++++++++++++-------------------------- 1 file changed, 24 insertions(+), 40 deletions(-) diff --git a/test/reconnect_test.go b/test/reconnect_test.go index 0f6d5b587..c42862cc7 100644 --- a/test/reconnect_test.go +++ b/test/reconnect_test.go @@ -727,61 +727,45 @@ func TestConnCloseNoCallback(t *testing.T) { ts := startReconnectServer(t) defer ts.Shutdown() - cch := make(chan bool) + // create a connection that manually sets the options + var conns []*nats.Conn + cch := make(chan string, 2) opts := reconnectOpts opts.ClosedCB = func(_ *nats.Conn) { - cch <- true + cch <- "manual" } opts.NoCallbacksAfterClientClose = true - nc, err := opts.Connect() if err != nil { t.Fatalf("Should have connected ok: %v", err) } - defer nc.Close() - - nc.Flush() - // Close the connection, we don't expect to get a notification - nc.Close() - // Shutdown the server - ts.Shutdown() - - // Even on Windows (where a createConn takes more than a second) - // we should be able to break the reconnect loop with the following - // timeout. - if err := WaitTime(cch, 3*time.Second); err != nil { - // yay no callback - } else { - t.Fatal("Got a closed callback, but shouldn't have") - } -} + conns = append(conns, nc) -func TestConnCloseNoCallbackFromOptionsFunc(t *testing.T) { - ts := startReconnectServer(t) - defer ts.Shutdown() - - cch := make(chan bool) - nc, err := nats.Connect(reconnectOpts.Url, nats.NoCallbacksAfterClientClose(), + // and another connection that uses the option + nc2, err := nats.Connect(reconnectOpts.Url, nats.NoCallbacksAfterClientClose(), nats.ClosedHandler(func(_ *nats.Conn) { - cch <- true + cch <- "opts" })) if err != nil { t.Fatalf("Should have connected ok: %v", err) } - defer nc.Close() + conns = append(conns, nc2) - nc.Flush() - // Close the connection, we don't expect to get a notification - nc.Close() - // Shutdown the server - ts.Shutdown() + // defer close() for safety, flush() and close() + for _, c := range conns { + defer c.Close() + c.Flush() - // Even on Windows (where a createConn takes more than a second) - // we should be able to break the reconnect loop with the following - // timeout. - if err := WaitTime(cch, 3*time.Second); err != nil { - // yay no callback - } else { - t.Fatal("Got a closed callback, but shouldn't have") + // Close the connection, we don't expect to get a notification + c.Close() + } + + // if the timeout happens we didn't get data from the channel + // if we get a value from the channel that connection type failed. + select { + case <-time.After(500 * time.Millisecond): + // test passed - we timed so no callback was called + case what := <-cch: + t.Fatalf("%s issued a callback and it shouldn't have", what) } }