Skip to content

Commit

Permalink
Low-level APIs and last clean-up before first release (#65)
Browse files Browse the repository at this point in the history
* Implemented Pub/Sub/INatsSub low-level APIs

* INatsConnection rename and namespace changes

Also removed flush, observable and sharding.

* NatsMsg as struct

* Made low-level api internal
  • Loading branch information
mtmk committed Jun 28, 2023
1 parent 7d0ecad commit 3d43304
Show file tree
Hide file tree
Showing 26 changed files with 344 additions and 564 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ public class WeatherForecastService : IHostedService, IAsyncDisposable
};

private readonly ILogger<WeatherForecastService> _logger;
private readonly INatsCommand _natsCommand;
private readonly INatsConnection _natsConnection;
private NatsReplyHandle? _replyHandle;

public WeatherForecastService(ILogger<WeatherForecastService> logger, INatsCommand natsCommand)
public WeatherForecastService(ILogger<WeatherForecastService> logger, INatsConnection natsConnection)
{
_logger = logger;
_natsCommand = natsCommand;
_natsConnection = natsConnection;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
_replyHandle = await _natsCommand.ReplyAsync<object, WeatherForecast[]>("weather", req =>
_replyHandle = await _natsConnection.ReplyAsync<object, WeatherForecast[]>("weather", req =>
{
return Enumerable.Range(1, 5).Select(index => new WeatherForecast
{
Expand Down
12 changes: 6 additions & 6 deletions sandbox/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,17 @@ public record Person(int Age, string Name);

public class Runner : ConsoleAppBase
{
private readonly INatsCommand _command;
private readonly INatsConnection _connection;

public Runner(INatsCommand command)
public Runner(INatsConnection connection)
{
_command = command;
_connection = connection;
}

[RootCommand]
public async Task Run()
{
var subscription = await _command.SubscribeAsync("foo");
var subscription = await _connection.SubscribeAsync("foo");

_ = Task.Run(async () =>
{
Expand All @@ -84,8 +84,8 @@ public async Task Run()
}
});

await _command.PingAsync();
await _command.PublishAsync("foo");
await _connection.PingAsync();
await _connection.PublishAsync("foo");
}
}

Expand Down
4 changes: 2 additions & 2 deletions sandbox/MinimumWebApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

var app = builder.Build();

app.MapGet("/subscribe", async (INatsCommand command) =>
app.MapGet("/subscribe", async (INatsConnection command) =>
{
var subscription = await command.SubscribeAsync("foo");
Expand All @@ -22,6 +22,6 @@
});
});

app.MapGet("/publish", async (INatsCommand command) => await command.PublishAsync("foo", 99));
app.MapGet("/publish", async (INatsConnection command) => await command.PublishAsync("foo", 99));

app.Run();
30 changes: 0 additions & 30 deletions src/NATS.Client.Core/Commands/FlushCommand.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
using System.Diagnostics;
using System.Text;
using Microsoft.Extensions.Primitives;
using NATS.Client.Core.Internal;

namespace NATS.Client.Core.Internal;
namespace NATS.Client.Core;

internal class HeaderParser
public class HeaderParser
{
private const byte ByteCR = (byte)'\r';
private const byte ByteLF = (byte)'\n';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

namespace NATS.Client.Core;

public interface INatsCommand
public interface INatsConnection
{
ValueTask FlushAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Send PING command and await PONG. Return value is similar as Round Trip Time (RTT).
/// </summary>
Expand Down Expand Up @@ -69,6 +67,4 @@ public interface INatsCommand
/// <typeparam name="T">Specifies the type of data that may be received from the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous send operation.</returns>
ValueTask<NatsSub<T>> SubscribeAsync<T>(string subject, in NatsSubOpts? opts = default, CancellationToken cancellationToken = default);

IObservable<T> AsObservable<T>(string subject);
}
49 changes: 49 additions & 0 deletions src/NATS.Client.Core/INatsSub.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System.Buffers;
using System.Collections.Concurrent;

namespace NATS.Client.Core;

internal interface INatsSub : IAsyncDisposable
{
string Subject { get; }

string? QueueGroup { get; }

int Sid { get; }

ValueTask ReceiveAsync(string subject, string? replyTo, in ReadOnlySequence<byte>? headersBuffer, in ReadOnlySequence<byte> payloadBuffer);
}

internal interface INatsSubBuilder<out T>
where T : INatsSub
{
T Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager);
}

internal class NatsSubBuilder : INatsSubBuilder<NatsSub>
{
public static readonly NatsSubBuilder Default = new();

public NatsSub Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager)
{
var sid = manager.GetNextSid();
return new NatsSub(connection, manager, subject, queueGroup, sid);
}
}

internal class NatsSubModelBuilder<T> : INatsSubBuilder<NatsSub<T>>
{
private static readonly ConcurrentDictionary<INatsSerializer, NatsSubModelBuilder<T>> Cache = new();
private readonly INatsSerializer _serializer;

public NatsSubModelBuilder(INatsSerializer serializer) => _serializer = serializer;

public static NatsSubModelBuilder<T> For(INatsSerializer serializer) =>
Cache.GetOrAdd(serializer, static s => new NatsSubModelBuilder<T>(s));

public NatsSub<T> Build(string subject, string? queueGroup, NatsConnection connection, SubscriptionManager manager)
{
var sid = manager.GetNextSid();
return new NatsSub<T>(connection, manager, subject, queueGroup, sid, _serializer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using NATS.Client.Core.Commands;
using NATS.Client.Core.Internal;

namespace NATS.Client.Core;
namespace NATS.Client.Core.Internal;

internal sealed class NatsPipeliningWriteProtocolProcessor : IAsyncDisposable
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
using System.Text.Json;
using Microsoft.Extensions.Logging;
using NATS.Client.Core.Commands;
using NATS.Client.Core.Internal;

namespace NATS.Client.Core;
namespace NATS.Client.Core.Internal;

internal sealed class NatsReadProtocolProcessor : IAsyncDisposable
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace NATS.Client.Core;
namespace NATS.Client.Core.Internal;

internal sealed class NatsUri : IEquatable<NatsUri>
{
Expand Down
76 changes: 76 additions & 0 deletions src/NATS.Client.Core/NatsConnection.LowLevelApi.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using System.Buffers;
using NATS.Client.Core.Commands;

namespace NATS.Client.Core;

public partial class NatsConnection
{
internal ValueTask PubAsync(string subject, string? replyTo = default, ReadOnlySequence<byte> payload = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default)
{
headers?.SetReadOnly();

if (ConnectionState == NatsConnectionState.Open)
{
var command = AsyncPublishBytesCommand.Create(_pool, GetCommandTimer(cancellationToken), subject, replyTo, headers, payload);
if (TryEnqueueCommand(command))
{
return command.AsValueTask();
}
else
{
return EnqueueAndAwaitCommandAsync(command);
}
}
else
{
return WithConnectAsync(subject, replyTo, headers, payload, cancellationToken, static (self, s, r, h, p, token) =>
{
var command = AsyncPublishBytesCommand.Create(self._pool, self.GetCommandTimer(token), s, r, h, p);
return self.EnqueueAndAwaitCommandAsync(command);
});
}
}

internal ValueTask PubModelAsync<T>(string subject, T data, INatsSerializer serializer, string? replyTo = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default)
{
headers?.SetReadOnly();

if (ConnectionState == NatsConnectionState.Open)
{
var command = AsyncPublishCommand<T>.Create(_pool, GetCommandTimer(cancellationToken), subject, replyTo, headers, data, serializer);
if (TryEnqueueCommand(command))
{
return command.AsValueTask();
}
else
{
return EnqueueAndAwaitCommandAsync(command);
}
}
else
{
return WithConnectAsync(subject, replyTo, headers, data, serializer, cancellationToken, static (self, s, r, h, v, ser, token) =>
{
var command = AsyncPublishCommand<T>.Create(self._pool, self.GetCommandTimer(token), s, r, h, v, ser);
return self.EnqueueAndAwaitCommandAsync(command);
});
}
}

internal ValueTask<T> SubAsync<T>(string subject, string? queueGroup, INatsSubBuilder<T> builder, CancellationToken cancellationToken = default)
where T : INatsSub
{
var sub = builder.Build(subject, queueGroup, this, _subscriptionManager);
if (ConnectionState == NatsConnectionState.Open)
{
return _subscriptionManager.SubscribeAsync(subject, queueGroup, sub, cancellationToken);
}
else
{
return WithConnectAsync(subject, queueGroup, sub, cancellationToken, static (self, s, qg, sb, token) =>
{
return self._subscriptionManager.SubscribeAsync(s, qg, sb, token);
});
}
}
}
35 changes: 0 additions & 35 deletions src/NATS.Client.Core/NatsConnection.Other.cs

This file was deleted.

Loading

0 comments on commit 3d43304

Please sign in to comment.