Skip to content

Commit

Permalink
Add helper for long StreamId keys (#8578)
Browse files Browse the repository at this point in the history
* Add helper for long StreamId keys

* Add GetStream extensions for long keys

* Add unit tests
  • Loading branch information
AdrianoAE authored Aug 21, 2023
1 parent 6a4cc90 commit b63d948
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/Orleans.Streaming/Providers/IStreamProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,25 @@ public static class StreamProviderExtensions
/// <param name="id">The identifier.</param>
/// <returns>The stream.</returns>
public static IAsyncStream<T> GetStream<T>(this IStreamProvider streamProvider, string ns, string id) => streamProvider.GetStream<T>(StreamId.Create(ns, id));

/// <summary>
/// Gets the stream with the specified identity and namespace.
/// </summary>
/// <typeparam name="T">The stream element type.</typeparam>
/// <param name="streamProvider">The stream provider.</param>
/// <param name="id">The identifier.</param>
/// <returns>The stream.</returns>
public static IAsyncStream<T> GetStream<T>(this IStreamProvider streamProvider, long id) => streamProvider.GetStream<T>(StreamId.Create(null, id));

/// <summary>
/// Gets the stream with the specified identity and namespace.
/// </summary>
/// <typeparam name="T">The stream element type.</typeparam>
/// <param name="streamProvider">The stream provider.</param>
/// <param name="ns">The namespace.</param>
/// <param name="id">The identifier.</param>
/// <returns>The stream.</returns>
public static IAsyncStream<T> GetStream<T>(this IStreamProvider streamProvider, string ns, long id) => streamProvider.GetStream<T>(StreamId.Create(ns, id));
}
}

7 changes: 7 additions & 0 deletions src/Orleans.Streaming/StreamId.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ public static StreamId Create(string ns, Guid key)
return new StreamId(buf, (ushort)nsLen);
}
}

/// <summary>
/// Initializes a new instance of the <see cref="StreamId"/> struct.
/// </summary>
/// <param name="ns">The namespace.</param>
/// <param name="key">The key.</param>
public static StreamId Create(string ns, long key) => Create(ns, key.ToString());

/// <summary>
/// Initializes a new instance of the <see cref="StreamId"/> struct.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace UnitTests.GrainInterfaces
{
public interface IImplicitSubscriptionKeyTypeGrain
{
Task<int> GetValue();
}

public interface IImplicitSubscriptionLongKeyGrain : IImplicitSubscriptionKeyTypeGrain, IGrainWithIntegerKey
{ }
}
7 changes: 7 additions & 0 deletions test/Grains/TestGrains/ImplicitStreamTestConstants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace UnitTests.Grains
{
public sealed class ImplicitStreamTestConstants
{
public const string StreamProviderName = "ImplicitStreamProvider";
}
}
37 changes: 37 additions & 0 deletions test/Grains/TestGrains/ImplicitSubscriptionWithKeyTypeGrain.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Microsoft.Extensions.Logging;
using Orleans.Streams;
using UnitTests.GrainInterfaces;

namespace UnitTests.Grains
{
[ImplicitStreamSubscription(nameof(IImplicitSubscriptionLongKeyGrain))]
public class ImplicitSubscriptionWithLongKeyGrain : Grain, IImplicitSubscriptionLongKeyGrain
{
private readonly ILogger logger;
private int value;

public ImplicitSubscriptionWithLongKeyGrain(ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger($"{nameof(ImplicitSubscriptionWithLongKeyGrain)} {IdentityString}");
}

public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
logger.LogInformation("OnActivateAsync");

value = 0;
IStreamProvider streamProvider = this.GetStreamProvider(ImplicitStreamTestConstants.StreamProviderName);
IAsyncStream<int> stream = streamProvider.GetStream<int>(nameof(IImplicitSubscriptionLongKeyGrain), this.GetPrimaryKeyLong());

await stream.SubscribeAsync(
(data, token) =>
{
logger.LogInformation("Received event {Event}", data);
value = data;
return Task.CompletedTask;
});
}

public Task<int> GetValue() => Task.FromResult(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using Microsoft.Extensions.Configuration;
using Orleans.Streams;
using Orleans.TestingHost;
using Orleans.TestingHost.Utils;
using TestExtensions;
using UnitTests.GrainInterfaces;
using UnitTests.Grains;
using Xunit;

namespace UnitTests.StreamingTests
{
public sealed class ImplicitSubscriptionKeyTypeGrainTests : OrleansTestingBase, IClassFixture<ImplicitSubscriptionKeyTypeGrainTests.Fixture>
{
private readonly Fixture fixture;
private readonly IStreamProvider _streamProvider;

public class Fixture : BaseTestClusterFixture
{
public const string StreamProviderName = GeneratedStreamTestConstants.StreamProviderName;

protected override void ConfigureTestCluster(TestClusterBuilder builder)
{
builder.AddSiloBuilderConfigurator<MySiloBuilderConfigurator>();
builder.AddClientBuilderConfigurator<MyClientBuilderConfigurator>();
}

private class MySiloBuilderConfigurator : ISiloConfigurator
{
public void Configure(ISiloBuilder hostBuilder)
{
hostBuilder.AddMemoryGrainStorageAsDefault();

hostBuilder.AddMemoryStreams(ImplicitStreamTestConstants.StreamProviderName)
.AddMemoryGrainStorage("PubSubStore");
}
}

private class MyClientBuilderConfigurator : IClientBuilderConfigurator
{
public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
{
clientBuilder.AddMemoryStreams(ImplicitStreamTestConstants.StreamProviderName);
}
}
}

public ImplicitSubscriptionKeyTypeGrainTests(Fixture fixture)
{
this.fixture = fixture;
_streamProvider = fixture.Client.GetStreamProvider(ImplicitStreamTestConstants.StreamProviderName);
}

[Fact, TestCategory("Functional"), TestCategory("Streaming")]
public async Task LongKey()
{
long grainId = 13;
int value = 87;
IAsyncStream<int> stream = _streamProvider.GetStream<int>(nameof(IImplicitSubscriptionLongKeyGrain), grainId);

await stream.OnNextAsync(value);

var consumer = fixture.GrainFactory.GetGrain<IImplicitSubscriptionLongKeyGrain>(grainId);
await TestingUtils.WaitUntilAsync(lastTry => CheckValue(consumer, value, lastTry), TimeSpan.FromSeconds(30));
}

private async Task<bool> CheckValue(IImplicitSubscriptionKeyTypeGrain consumer, int expectedValue, bool assertIsTrue)
{
int value = await consumer.GetValue();

if (assertIsTrue)
{
Assert.Equal(expectedValue, value);
}

if (expectedValue != value)
{
return false;
}

return true;
}
}
}

0 comments on commit b63d948

Please sign in to comment.