Skip to content

Commit

Permalink
subscription before consumer (#597)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored May 4, 2022
1 parent 7788937 commit 6b8c68e
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
32 changes: 28 additions & 4 deletions src/NATS.Client/JetStream/JetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ Subscription CreateSubscription(string subject, string queueName,
inboxDeliver = Conn.NewInbox();
}

// 5. If consumer does not exist, create
// 5. If consumer does not exist, create and settle on the config. Name will have to wait
// If the consumer exists, I know what the settled info is
if (serverCC == null) {
ConsumerConfiguration.ConsumerConfigurationBuilder ccBuilder = ConsumerConfiguration.Builder(userCC);

Expand All @@ -284,9 +285,8 @@ Subscription CreateSubscription(string subject, string queueName,
}

// createOrUpdateConsumer can fail for security reasons, maybe other reasons?
ConsumerInfo ci = AddOrUpdateConsumerInternal(stream, ccBuilder.Build());
consumerName = ci.Name;
serverCC = ci.ConsumerConfiguration;
serverCC = ccBuilder.Build();
consumerName = null;
}

// 6. create the subscription
Expand Down Expand Up @@ -345,6 +345,30 @@ AsyncSubscription CreateAsyncSubDelegate(Connection lConn, string lSubject, stri
}

asm.SetSub(sub);

// 7. The consumer might need to be created, do it here
if (consumerName == null)
{
try
{
ConsumerInfo ci = AddOrUpdateConsumerInternal(stream, serverCC);
if (sub is JetStreamAbstractSyncSubscription syncSub)
{
syncSub.Consumer = ci.Name;
}
else if (sub is JetStreamPushAsyncSubscription asyncSub)
{
asyncSub.Consumer = ci.Name;
}
}
catch
{
// create consumer can fail, unsubscribe and then throw the exception to the user
sub.Unsubscribe();
throw;
}
}

return sub;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class JetStreamAbstractSyncSubscription : SyncSubscription
internal IAutoStatusManager _asm;
public JetStream Context { get; }
public string Stream { get; }
public string Consumer { get; }
public string Consumer { get; internal set; }
public string DeliverSubject { get; }

internal JetStreamAbstractSyncSubscription(Connection conn, string subject, string queue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class JetStreamPushAsyncSubscription : AsyncSubscription, IJetStreamPushA
private IAutoStatusManager _asm;
public JetStream Context { get; }
public string Stream { get; }
public string Consumer { get; }
public string Consumer { get; internal set; }
public string DeliverSubject { get; }

internal JetStreamPushAsyncSubscription(Connection conn, string subject, string queue,
Expand Down

0 comments on commit 6b8c68e

Please sign in to comment.