From 8509117a12cf9781b7b0ba40f4fc4145d7338438 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 10 Jul 2019 21:07:09 -0700 Subject: [PATCH] Allow sync subs to allow m.Respond with auto-unsub Signed-off-by: Derek Collison --- nats.go | 26 +++++++++++++++----------- test/sub_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/nats.go b/nats.go index 4b3cfb3c4..ee2ca0cf1 100644 --- a/nats.go +++ b/nats.go @@ -2904,7 +2904,6 @@ func (nc *Conn) removeSub(s *Subscription) { s.mch = nil // Mark as invalid - s.conn = nil s.closed = true if s.pCond != nil { s.pCond.Broadcast() @@ -2941,7 +2940,7 @@ func (s *Subscription) IsValid() bool { } s.mu.Lock() defer s.mu.Unlock() - return s.conn != nil + return s.conn != nil && !s.closed } // Drain will remove interest but continue callbacks until all messages @@ -2966,8 +2965,12 @@ func (s *Subscription) Unsubscribe() error { } s.mu.Lock() conn := s.conn + closed := s.closed s.mu.Unlock() - if conn == nil { + if conn == nil || conn.IsClosed() { + return ErrConnectionClosed + } + if closed { return ErrBadSubscription } if conn.IsDraining() { @@ -3023,8 +3026,9 @@ func (s *Subscription) AutoUnsubscribe(max int) error { } s.mu.Lock() conn := s.conn + closed := s.closed s.mu.Unlock() - if conn == nil { + if conn == nil || closed { return ErrBadSubscription } return conn.unsubscribe(s, max, false) @@ -3199,7 +3203,7 @@ func (s *Subscription) Pending() (int, int, error) { } s.mu.Lock() defer s.mu.Unlock() - if s.conn == nil { + if s.conn == nil || s.closed { return -1, -1, ErrBadSubscription } if s.typ == ChanSubscription { @@ -3215,7 +3219,7 @@ func (s *Subscription) MaxPending() (int, int, error) { } s.mu.Lock() defer s.mu.Unlock() - if s.conn == nil { + if s.conn == nil || s.closed { return -1, -1, ErrBadSubscription } if s.typ == ChanSubscription { @@ -3231,7 +3235,7 @@ func (s *Subscription) ClearMaxPending() error { } s.mu.Lock() defer s.mu.Unlock() - if s.conn == nil { + if s.conn == nil || s.closed { return ErrBadSubscription } if s.typ == ChanSubscription { @@ -3256,7 +3260,7 @@ func (s *Subscription) PendingLimits() (int, int, error) { } s.mu.Lock() defer s.mu.Unlock() - if s.conn == nil { + if s.conn == nil || s.closed { return -1, -1, ErrBadSubscription } if s.typ == ChanSubscription { @@ -3273,7 +3277,7 @@ func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error { } s.mu.Lock() defer s.mu.Unlock() - if s.conn == nil { + if s.conn == nil || s.closed { return ErrBadSubscription } if s.typ == ChanSubscription { @@ -3293,7 +3297,7 @@ func (s *Subscription) Delivered() (int64, error) { } s.mu.Lock() defer s.mu.Unlock() - if s.conn == nil { + if s.conn == nil || s.closed { return -1, ErrBadSubscription } return int64(s.delivered), nil @@ -3309,7 +3313,7 @@ func (s *Subscription) Dropped() (int, error) { } s.mu.Lock() defer s.mu.Unlock() - if s.conn == nil { + if s.conn == nil || s.closed { return -1, ErrBadSubscription } return s.dropped, nil diff --git a/test/sub_test.go b/test/sub_test.go index bbd3882e8..abc7ef723 100644 --- a/test/sub_test.go +++ b/test/sub_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/nats-io/nats.go" + "github.com/nats-io/nuid" ) // More advanced tests on subscriptions @@ -1495,5 +1496,37 @@ func TestSubscriptionTypes(t *testing.T) { if _, _, err := sub.PendingLimits(); err == nil { t.Fatalf("We should NOT be able to call PendingLimits() on ChanSubscriber") } +} + +func TestAutoUnsubOnSyncSubCanStillRespond(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + nc := NewDefaultConnection(t) + defer nc.Close() + + subj := nuid.Next() + sub, err := nc.SubscribeSync(subj) + if err != nil { + t.Fatalf("Error susbscribing: %v", err) + } + // When the single message is delivered, the + // auto unsub will reap the subscription removing + // the connection, make sure Respond still works. + if err := sub.AutoUnsubscribe(1); err != nil { + t.Fatalf("Error autounsub: %v", err) + } + + inbox := nats.NewInbox() + if err = nc.PublishRequest(subj, inbox, nil); err != nil { + t.Fatalf("Error making request: %v", err) + } + m, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error getting next message") + } + if err := m.Respond(nil); err != nil { + t.Fatalf("Error responding: %v", err) + } }