Skip to content

Commit

Permalink
Add Message.Respond implementing #281 (#283)
Browse files Browse the repository at this point in the history
* Add Msg.Respond

- Adds Respond to Msg to simplify responses to Request-Reply
- No longer clears Subscription.conn, until disposing
- Add tests for Msg.Respond with both closed connections and lost servers
- Throw NATSConnectionClosedException when appropriate
  • Loading branch information
watfordgnf authored and ColinSullivan1 committed Aug 27, 2019
1 parent d2be064 commit d2cfbe6
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 19 deletions.
24 changes: 15 additions & 9 deletions NATS.Client/AsyncSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,16 @@ internal void enableAsyncProcessing()

internal void disableAsyncProcessing()
{
if (msgFeeder != null)
lock (mu)
{
mch.close();
msgFeeder = null;
if (msgFeeder != null)
{
mch.close();
msgFeeder = null;
}
MessageHandler = null;
started = false;
}
MessageHandler = null;
started = false;
}

/// <summary>
Expand All @@ -155,11 +158,14 @@ public void Start()
if (started)
return;

if (conn == null)
throw new NATSBadSubscriptionException();
lock (mu)
{
if (conn == null)
throw new NATSBadSubscriptionException();

conn.sendSubscriptionMessage(this);
enableAsyncProcessing();
conn.sendSubscriptionMessage(this);
enableAsyncProcessing();
}
}

/// <summary>
Expand Down
6 changes: 3 additions & 3 deletions NATS.Client/Conn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3439,8 +3439,9 @@ internal Task unsubscribe(Subscription sub, int max, bool drain, int timeout)
if (isClosed())
throw new NATSConnectionClosedException();

Subscription s = subs[sub.sid];
if (s == null)
Subscription s;
if (!subs.TryGetValue(sub.sid, out s)
|| s == null)
{
// already unsubscribed
return null;
Expand Down Expand Up @@ -3485,7 +3486,6 @@ internal virtual void removeSub(Subscription s)
s.mch = null;
}

s.conn = null;
s.closed = true;
}

Expand Down
25 changes: 25 additions & 0 deletions NATS.Client/Msg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,31 @@ public ISubscription ArrivalSubcription
get { return sub; }
}

/// <summary>
/// Send a response to the message on the arrival subscription.
/// </summary>
/// <param name="data">The response payload to send.</param>
/// <exception cref="NATSException">
/// <para><see cref="Reply"/> is null or empty.</para>
/// <para>-or-</para>
/// <para><see cref="ArrivalSubcription"/> is null.</para>
/// </exception>
public void Respond(byte[] data)
{
if (String.IsNullOrEmpty(Reply))
{
throw new NATSException("No Reply subject");
}

Connection conn = ArrivalSubcription?.Connection;
if (conn == null)
{
throw new NATSException("Message is not bound to a subscription");
}

conn.Publish(this.Reply, data);
}

/// <summary>
/// Generates a string representation of the messages.
/// </summary>
Expand Down
33 changes: 27 additions & 6 deletions NATS.Client/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,19 @@ public bool IsValid
{
lock (mu)
{
return (conn != null);
return (conn != null) && !closed;
}
}
}

internal void unsubscribe(bool throwEx)
{
Connection c;
bool isClosed;
lock (mu)
{
c = this.conn;
isClosed = this.closed;
}

if (c == null)
Expand All @@ -243,6 +245,18 @@ internal void unsubscribe(bool throwEx)
return;
}

if (c.IsClosed())
{
if (throwEx)
throw new NATSConnectionClosedException();
}

if (isClosed)
{
if (throwEx)
throw new NATSBadSubscriptionException();
}

if (c.IsDraining())
{
if (throwEx)
Expand Down Expand Up @@ -284,6 +298,12 @@ public virtual void AutoUnsubscribe(int max)
if (conn == null)
throw new NATSBadSubscriptionException();

if (conn.IsClosed())
throw new NATSConnectionClosedException();

if (closed)
throw new NATSBadSubscriptionException();

c = conn;
}

Expand All @@ -301,7 +321,7 @@ public int QueuedMessageCount
{
lock (mu)
{
if (conn == null)
if (conn == null || closed)
throw new NATSBadSubscriptionException();

return mch.Count;
Expand Down Expand Up @@ -332,6 +352,9 @@ protected virtual void Dispose(bool disposing)
// auto unsubscribing, so ignore.
}

conn = null;
closed = true;

disposedValue = true;
}
}
Expand Down Expand Up @@ -370,9 +393,8 @@ public override string ToString()

private void checkState()
{
if (conn == null)
if (conn == null || closed)
throw new NATSBadSubscriptionException();

}

/// <summary>
Expand Down Expand Up @@ -573,7 +595,7 @@ internal Task InternalDrain(int timeout)

lock (mu)
{
if (conn == null)
if (conn == null || closed)
throw new NATSBadSubscriptionException();

c = conn;
Expand All @@ -595,7 +617,6 @@ public Task DrainAsync(int timeout)
return InternalDrain(timeout);
}


public void Drain()
{
Drain(Defaults.DefaultDrainTimeout);
Expand Down
132 changes: 131 additions & 1 deletion NATSUnitTests/UnitTestSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,6 @@ public void TestAsyncSubscriptionPending()
}
}


[Fact]
public void TestAsyncPendingSubscriptionBatchSizeExactlyOne()
{
Expand Down Expand Up @@ -1040,6 +1039,137 @@ public void TestInvalidSubjects()
}
}
}

[Fact]
public void TestRespond()
{
using (new NATSServer())
{
using (IConnection c = utils.DefaultTestConnection)
using (ISyncSubscription s = c.SubscribeSync("foo"))
{
string replyTo = c.NewInbox();
using (ISyncSubscription r = c.SubscribeSync(replyTo))
{
c.Publish("foo", replyTo, Encoding.UTF8.GetBytes("message"));

Msg m = s.NextMessage(1000);
Assert.NotNull(m);
Assert.Equal(replyTo, m.Reply);

byte[] reply = Encoding.UTF8.GetBytes("reply");
m.Respond(reply);

m = r.NextMessage(1000);
Assert.NotNull(m);
Assert.Equal(replyTo, m.Subject);
Assert.Equal(reply, m.Data);

s.Unsubscribe();
r.Unsubscribe();
}
}
}
}

[Fact]
public void TestRespondWithAutoUnsubscribe()
{
using (new NATSServer())
{
using (IConnection c = utils.DefaultTestConnection)
using (ISyncSubscription s = c.SubscribeSync("foo"))
{
s.AutoUnsubscribe(1);

string replyTo = c.NewInbox();
using (ISyncSubscription r = c.SubscribeSync(replyTo))
{
c.Publish("foo", replyTo, Encoding.UTF8.GetBytes("message"));

Msg m = s.NextMessage(1000);
Assert.NotNull(m);
Assert.Equal(replyTo, m.Reply);

byte[] reply = Encoding.UTF8.GetBytes("reply");
m.Respond(reply);

m = r.NextMessage(1000);
Assert.NotNull(m);
Assert.Equal(replyTo, m.Subject);
Assert.Equal(reply, m.Data);

r.Unsubscribe();
}
}
}
}

[Fact]
public void TestRespondFailsWithClosedConnection()
{
using (new NATSServer())
{
using (IConnection c = utils.DefaultTestConnection)
{
ISyncSubscription s = c.SubscribeSync("foo");

string replyTo = c.NewInbox();
c.Publish("foo", replyTo, Encoding.UTF8.GetBytes("message"));

Msg m = s.NextMessage(1000);
Assert.NotNull(m);
Assert.Equal(replyTo, m.Reply);

c.Close();

byte[] reply = Encoding.UTF8.GetBytes("reply");
Assert.ThrowsAny<NATSConnectionClosedException>(() => m.Respond(reply));

s.Dispose();
}
}
}

[Fact]
public void TestRespondFailsWithServerClosed()
{
IConnection c = null;
ISyncSubscription s = null;
try
{
Msg m;
using (NATSServer ns = new NATSServer())
{
Options options = utils.DefaultTestOptions;
options.AllowReconnect = false;

c = new ConnectionFactory().CreateConnection(options);
s = c.SubscribeSync("foo");

string replyTo = c.NewInbox();

c.Publish("foo", replyTo, Encoding.UTF8.GetBytes("message"));

m = s.NextMessage(1000);
Assert.NotNull(m);
Assert.Equal(replyTo, m.Reply);

ns.Shutdown();
}

// Give the server time to close
Thread.Sleep(2000);

byte[] reply = Encoding.UTF8.GetBytes("reply");
Assert.ThrowsAny<NATSConnectionClosedException>(() => m.Respond(reply));
}
finally
{
c?.Dispose();
s?.Dispose();
}
}
}
}

0 comments on commit d2cfbe6

Please sign in to comment.