[SUB-90011] Subject does not match consumer configuration filter. #715
Answered
by
scottf
Hulkstance
asked this question in
Q&A
-
I know this has already been asked but I'm misunderstanding something. I'm publishing messages to
I saw this example but it uses the pull-based consumer. Snippetprivate const string Stream = "mkt-data";
await _natsPublisher.PublishAsync(stream, "PERSISTENCE.binance", request); private const string Stream = "mkt-data";
private const string Subject = "PERSISTENCE.*";
private const string Durable = "bind-durable";
private const string DeliverSubject = "bind-deliver";
_ = _natsSubscriber.Subscribe(stream, Subject, Durable, DeliverSubject, msg =>
{
var json = Encoding.UTF8.GetString(msg.Data);
_logger.LogInformation("Message arrived: {Message}", json);
}); public sealed class NatsPublisher : INatsPublisher
{
private readonly IConnection _connection;
private readonly Lazy<IJetStream> _jetStreamFactory;
public NatsPublisher(IConnection connection)
{
_connection = connection;
_jetStreamFactory = new Lazy<IJetStream>(() => connection.CreateJetStreamContext());
}
private IJetStream JetStream => _jetStreamFactory.Value;
public async ValueTask<PublishAck> PublishAsync<T>(
string stream,
string subject,
T message)
where T : class
{
JetStreamUtils.CreateStreamOrUpdateSubjects(_connection, stream, subject);
var payload = JsonSerializer.SerializeToUtf8Bytes(message);
var msg = new Msg(subject, null, null, payload);
return await JetStream.PublishAsync(msg);
}
public async ValueTask<PublishAck> PublishAsync<T>(
string stream,
string subject,
T message,
IEnumerable<(string, string)> headers)
where T : class
{
JetStreamUtils.CreateStreamOrUpdateSubjects(_connection, stream, subject);
var payload = JsonSerializer.SerializeToUtf8Bytes(message);
var msg = new Msg(subject, null, null, payload);
foreach (var (header, val) in headers)
{
msg.Header[header] = val;
}
return await JetStream.PublishAsync(msg);
}
public async ValueTask<PublishAck> PublishDeduplicationIdAsync<T>(
string stream,
string subject,
T message,
string id)
where T : class
{
JetStreamUtils.CreateStreamOrUpdateSubjects(_connection, stream, subject);
var payload = JsonSerializer.SerializeToUtf8Bytes(message);
var msg = new Msg(subject, null, null, payload)
{
Header = { ["Nats-Msg-Id"] = id }
};
return await JetStream.PublishAsync(msg);
}
}
public sealed class NatsSubscriber : INatsSubscriber
{
private readonly IConnection _connection;
private readonly Lazy<IJetStream> _jetStreamFactory;
private readonly Lazy<IJetStreamManagement> _jetStreamManagementFactory;
public NatsSubscriber(IConnection connection)
{
_connection = connection;
_jetStreamFactory = new Lazy<IJetStream>(() => connection.CreateJetStreamContext());
_jetStreamManagementFactory = new Lazy<IJetStreamManagement>(() => connection.CreateJetStreamManagementContext());
}
private IJetStream JetStream => _jetStreamFactory.Value;
private IJetStreamManagement JetStreamManagement => _jetStreamManagementFactory.Value;
public IJetStreamSubscription Subscribe(
string stream,
string subject,
string durable,
string deliverSubject,
Action<Msg> onMessageArrived)
{
JetStreamUtils.CreateStreamWhenDoesNotExist(_connection, stream, subject);
var consumerConfiguration = ConsumerConfiguration.Builder()
.WithDurable(durable)
.WithDeliverSubject(deliverSubject)
.Build();
JetStreamManagement.AddOrUpdateConsumer(stream, consumerConfiguration);
var so = PushSubscribeOptions.BindTo(stream, durable);
var subscription = JetStream.PushSubscribeAsync(
subject,
(_, e) => onMessageArrived(e.Message),
true,
so);
return subscription;
}
}
public static class JetStreamUtils
{
// ----------------------------------------------------------------------------------------------------
// STREAM INFO / CREATE / UPDATE
// ----------------------------------------------------------------------------------------------------
public static StreamInfo? GetStreamInfoOrNullWhenNotExist(IJetStreamManagement jsm, string streamName)
{
try
{
return jsm.GetStreamInfo(streamName);
}
catch (NATSJetStreamException e)
{
if (e.ErrorCode == 404)
{
return null;
}
throw;
}
}
public static bool StreamExists(IConnection c, string streamName)
{
return GetStreamInfoOrNullWhenNotExist(c.CreateJetStreamManagementContext(), streamName) != null;
}
public static bool StreamExists(IJetStreamManagement jsm, string streamName)
{
return GetStreamInfoOrNullWhenNotExist(jsm, streamName) != null;
}
public static void ExitIfStreamExists(IJetStreamManagement jsm, string streamName)
{
if (StreamExists(jsm, streamName))
{
Environment.Exit(-1);
}
}
public static void ExitIfStreamNotExists(IConnection c, string streamName)
{
if (!StreamExists(c, streamName))
{
Environment.Exit(-1);
}
}
public static StreamInfo CreateStream(IJetStreamManagement jsm, string streamName, StorageType storageType,
params string[] subjects)
{
var sc = StreamConfiguration.Builder()
.WithName(streamName)
.WithStorageType(storageType)
.WithSubjects(subjects)
.Build();
var si = jsm.AddStream(sc);
return si;
}
public static StreamInfo CreateStream(IJetStreamManagement jsm, string stream, params string[] subjects)
{
return CreateStream(jsm, stream, StorageType.Memory, subjects);
}
public static StreamInfo CreateStream(IConnection c, string stream, params string[] subjects)
{
return CreateStream(c.CreateJetStreamManagementContext(), stream, StorageType.Memory, subjects);
}
public static StreamInfo CreateStreamExitWhenExists(IConnection c, string streamName, params string[] subjects)
{
return CreateStreamExitWhenExists(c.CreateJetStreamManagementContext(), streamName, subjects);
}
public static StreamInfo CreateStreamExitWhenExists(IJetStreamManagement jsm, string streamName,
params string[] subjects)
{
ExitIfStreamExists(jsm, streamName);
return CreateStream(jsm, streamName, StorageType.Memory, subjects);
}
public static void CreateStreamWhenDoesNotExist(IJetStreamManagement jsm, string stream, params string[] subjects)
{
try
{
jsm.GetStreamInfo(stream);
return;
}
catch (NATSJetStreamException)
{
}
var sc = StreamConfiguration.Builder()
.WithName(stream)
.WithStorageType(StorageType.Memory)
.WithSubjects(subjects)
.Build();
jsm.AddStream(sc);
}
public static void CreateStreamWhenDoesNotExist(IConnection c, string stream, params string[] subjects)
{
CreateStreamWhenDoesNotExist(c.CreateJetStreamManagementContext(), stream, subjects);
}
public static StreamInfo CreateStreamOrUpdateSubjects(IJetStreamManagement jsm, string streamName,
StorageType storageType, params string[] subjects)
{
var si = GetStreamInfoOrNullWhenNotExist(jsm, streamName);
if (si == null)
{
return CreateStream(jsm, streamName, storageType, subjects);
}
var sc = si.Config;
var needToUpdate = false;
foreach (var sub in subjects)
{
if (!sc.Subjects.Contains(sub))
{
needToUpdate = true;
sc.Subjects.Add(sub);
}
}
if (needToUpdate)
{
si = jsm.UpdateStream(sc);
}
return si;
}
public static StreamInfo CreateStreamOrUpdateSubjects(IJetStreamManagement jsm, string streamName,
params string[] subjects)
{
return CreateStreamOrUpdateSubjects(jsm, streamName, StorageType.Memory, subjects);
}
public static StreamInfo CreateStreamOrUpdateSubjects(IConnection c, string stream, params string[] subjects)
{
return CreateStreamOrUpdateSubjects(c.CreateJetStreamManagementContext(), stream, StorageType.Memory,
subjects);
}
// ----------------------------------------------------------------------------------------------------
// PUBLISH
// ----------------------------------------------------------------------------------------------------
public static void Publish(IConnection c, string subject, int count)
{
Publish(c.CreateJetStreamContext(), subject, "data", count);
}
public static void Publish(IJetStream js, string subject, int count)
{
Publish(js, subject, "data", count);
}
public static void Publish(IJetStream js, string subject, string prefix, int count)
{
for (var x = 1; x <= count; x++)
{
var data = prefix + x;
js.Publish(subject, Encoding.UTF8.GetBytes(data));
}
}
public static void PublishInBackground(IJetStream js, string subject, string prefix, int count)
{
new Thread(() =>
{
try
{
for (var x = 1; x <= count; x++)
{
js.Publish(subject, Encoding.ASCII.GetBytes(prefix + "-" + x));
}
}
catch (Exception)
{
Environment.Exit(-1);
}
}).Start();
Thread.Sleep(100); // give the publish thread a little time to get going
}
// ----------------------------------------------------------------------------------------------------
// READ MESSAGES
// ----------------------------------------------------------------------------------------------------
public static IList<Msg> ReadMessagesAck(ISyncSubscription sub, int timeout = 1000)
{
IList<Msg> messages = new List<Msg>();
var keepGoing = true;
while (keepGoing)
{
try
{
var msg = sub.NextMessage(timeout);
messages.Add(msg);
msg.Ack();
}
catch (NATSTimeoutException)
{
keepGoing = false;
}
}
return messages;
}
} |
Beta Was this translation helpful? Give feedback.
Answered by
scottf
Nov 21, 2022
Replies: 1 comment 1 reply
-
|
Beta Was this translation helpful? Give feedback.
1 reply
Answer selected by
scottf
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
>
. This does not match the subscribe subject ofPERSISTENCE.*
. AddWithFilterSubject(subject)
to your consumer builder.