Skip to content

Commit

Permalink
Merge pull request #514 from nats-io/close-notifications
Browse files Browse the repository at this point in the history
[ADDED] Allow client to choose whether to execute callbacks on Close()
  • Loading branch information
kozlovic authored Oct 4, 2019
2 parents b659a00 + d2b2945 commit e16cf3a
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 6 deletions.
27 changes: 21 additions & 6 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,11 @@ type Options struct {
// UseOldRequestStyle forces the old method of Requests that utilize
// a new Inbox and a new Subscription for each request.
UseOldRequestStyle bool

// NoCallbacksAfterClientClose allows preventing the invocation of
// callbacks after Close() is called. Client won't receive notifications
// when Close is invoked by user code. Default is to invoke the callbacks.
NoCallbacksAfterClientClose bool
}

const (
Expand Down Expand Up @@ -883,6 +888,16 @@ func UseOldRequestStyle() Option {
}
}

// NoCallbacksAfterClientClose is an Option to disable callbacks when user code
// calls Close(). If close is initiated by any other condition, callbacks
// if any will be invoked.
func NoCallbacksAfterClientClose() Option {
return func(o *Options) error {
o.NoCallbacksAfterClientClose = true
return nil
}
}

// Handler processing

// SetDisconnectHandler will set the disconnect event handler.
Expand Down Expand Up @@ -1929,7 +1944,7 @@ func (nc *Conn) doReconnect(err error) {
nc.err = ErrNoServers
}
nc.mu.Unlock()
nc.Close()
nc.close(CLOSED, true, nil)
}

// processOpErr handles errors from reading or parsing the protocol.
Expand Down Expand Up @@ -1964,7 +1979,7 @@ func (nc *Conn) processOpErr(err error) {
nc.status = DISCONNECTED
nc.err = err
nc.mu.Unlock()
nc.Close()
nc.close(CLOSED, true, nil)
}

// dispatch is responsible for calling any async callbacks
Expand Down Expand Up @@ -2496,7 +2511,7 @@ func (nc *Conn) processErr(ie string) {
nc.mu.Unlock()
}
if close {
nc.Close()
nc.close(CLOSED, true, nil)
}
}

Expand Down Expand Up @@ -3691,7 +3706,7 @@ func (nc *Conn) close(status Status, doCBs bool, err error) {
// all blocking calls, such as Flush() and NextMsg()
func (nc *Conn) Close() {
if nc != nil {
nc.close(CLOSED, true, nil)
nc.close(CLOSED, !nc.Opts.NoCallbacksAfterClientClose, nil)
}
}

Expand Down Expand Up @@ -3770,12 +3785,12 @@ func (nc *Conn) drainConnection() {
err := nc.Flush()
if err != nil {
pushErr(err)
nc.Close()
nc.close(CLOSED, true, nil)
return
}

// Move to closed state.
nc.Close()
nc.close(CLOSED, true, nil)
}

// Drain will put a connection into a drain state. All subscriptions will
Expand Down
47 changes: 47 additions & 0 deletions test/reconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,3 +722,50 @@ func TestReconnectTLSHostNoIP(t *testing.T) {
t.Fatalf("ReconnectedCB should have been triggered: %v", nc.LastError())
}
}

func TestConnCloseNoCallback(t *testing.T) {
ts := startReconnectServer(t)
defer ts.Shutdown()

// create a connection that manually sets the options
var conns []*nats.Conn
cch := make(chan string, 2)
opts := reconnectOpts
opts.ClosedCB = func(_ *nats.Conn) {
cch <- "manual"
}
opts.NoCallbacksAfterClientClose = true
nc, err := opts.Connect()
if err != nil {
t.Fatalf("Should have connected ok: %v", err)
}
conns = append(conns, nc)

// and another connection that uses the option
nc2, err := nats.Connect(reconnectOpts.Url, nats.NoCallbacksAfterClientClose(),
nats.ClosedHandler(func(_ *nats.Conn) {
cch <- "opts"
}))
if err != nil {
t.Fatalf("Should have connected ok: %v", err)
}
conns = append(conns, nc2)

// defer close() for safety, flush() and close()
for _, c := range conns {
defer c.Close()
c.Flush()

// Close the connection, we don't expect to get a notification
c.Close()
}

// if the timeout happens we didn't get data from the channel
// if we get a value from the channel that connection type failed.
select {
case <-time.After(500 * time.Millisecond):
// test passed - we timed so no callback was called
case what := <-cch:
t.Fatalf("%s issued a callback and it shouldn't have", what)
}
}

0 comments on commit e16cf3a

Please sign in to comment.