Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add helper for long StreamId keys #8578

Merged
merged 3 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/Orleans.Streaming/Providers/IStreamProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,25 @@ public static class StreamProviderExtensions
/// <param name="id">The identifier.</param>
/// <returns>The stream.</returns>
public static IAsyncStream<T> GetStream<T>(this IStreamProvider streamProvider, string ns, string id) => streamProvider.GetStream<T>(StreamId.Create(ns, id));

/// <summary>
/// Gets the stream with the specified identity and namespace.
/// </summary>
/// <typeparam name="T">The stream element type.</typeparam>
/// <param name="streamProvider">The stream provider.</param>
/// <param name="id">The identifier.</param>
/// <returns>The stream.</returns>
public static IAsyncStream<T> GetStream<T>(this IStreamProvider streamProvider, long id) => streamProvider.GetStream<T>(StreamId.Create(null, id));

/// <summary>
/// Gets the stream with the specified identity and namespace.
/// </summary>
/// <typeparam name="T">The stream element type.</typeparam>
/// <param name="streamProvider">The stream provider.</param>
/// <param name="ns">The namespace.</param>
/// <param name="id">The identifier.</param>
/// <returns>The stream.</returns>
public static IAsyncStream<T> GetStream<T>(this IStreamProvider streamProvider, string ns, long id) => streamProvider.GetStream<T>(StreamId.Create(ns, id));
}
}

7 changes: 7 additions & 0 deletions src/Orleans.Streaming/StreamId.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ public static StreamId Create(string ns, Guid key)
return new StreamId(buf, (ushort)nsLen);
}
}

/// <summary>
/// Initializes a new instance of the <see cref="StreamId"/> struct.
/// </summary>
/// <param name="ns">The namespace.</param>
/// <param name="key">The key.</param>
public static StreamId Create(string ns, long key) => Create(ns, key.ToString());

/// <summary>
/// Initializes a new instance of the <see cref="StreamId"/> struct.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace UnitTests.GrainInterfaces
{
public interface IImplicitSubscriptionKeyTypeGrain
{
Task<int> GetValue();
}

public interface IImplicitSubscriptionLongKeyGrain : IImplicitSubscriptionKeyTypeGrain, IGrainWithIntegerKey
{ }
}
7 changes: 7 additions & 0 deletions test/Grains/TestGrains/ImplicitStreamTestConstants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace UnitTests.Grains
{
public sealed class ImplicitStreamTestConstants
{
public const string StreamProviderName = "ImplicitStreamProvider";
}
}
37 changes: 37 additions & 0 deletions test/Grains/TestGrains/ImplicitSubscriptionWithKeyTypeGrain.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Microsoft.Extensions.Logging;
using Orleans.Streams;
using UnitTests.GrainInterfaces;

namespace UnitTests.Grains
{
[ImplicitStreamSubscription(nameof(IImplicitSubscriptionLongKeyGrain))]
public class ImplicitSubscriptionWithLongKeyGrain : Grain, IImplicitSubscriptionLongKeyGrain
{
private readonly ILogger logger;
private int value;

public ImplicitSubscriptionWithLongKeyGrain(ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger($"{nameof(ImplicitSubscriptionWithLongKeyGrain)} {IdentityString}");
}

public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
logger.LogInformation("OnActivateAsync");

value = 0;
IStreamProvider streamProvider = this.GetStreamProvider(ImplicitStreamTestConstants.StreamProviderName);
IAsyncStream<int> stream = streamProvider.GetStream<int>(nameof(IImplicitSubscriptionLongKeyGrain), this.GetPrimaryKeyLong());

await stream.SubscribeAsync(
(data, token) =>
{
logger.LogInformation("Received event {Event}", data);
value = data;
return Task.CompletedTask;
});
}

public Task<int> GetValue() => Task.FromResult(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using Microsoft.Extensions.Configuration;
using Orleans.Streams;
using Orleans.TestingHost;
using Orleans.TestingHost.Utils;
using TestExtensions;
using UnitTests.GrainInterfaces;
using UnitTests.Grains;
using Xunit;

namespace UnitTests.StreamingTests
{
public sealed class ImplicitSubscriptionKeyTypeGrainTests : OrleansTestingBase, IClassFixture<ImplicitSubscriptionKeyTypeGrainTests.Fixture>
{
private readonly Fixture fixture;
private readonly IStreamProvider _streamProvider;

public class Fixture : BaseTestClusterFixture
{
public const string StreamProviderName = GeneratedStreamTestConstants.StreamProviderName;

protected override void ConfigureTestCluster(TestClusterBuilder builder)
{
builder.AddSiloBuilderConfigurator<MySiloBuilderConfigurator>();
builder.AddClientBuilderConfigurator<MyClientBuilderConfigurator>();
}

private class MySiloBuilderConfigurator : ISiloConfigurator
{
public void Configure(ISiloBuilder hostBuilder)
{
hostBuilder.AddMemoryGrainStorageAsDefault();

hostBuilder.AddMemoryStreams(ImplicitStreamTestConstants.StreamProviderName)
.AddMemoryGrainStorage("PubSubStore");
}
}

private class MyClientBuilderConfigurator : IClientBuilderConfigurator
{
public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
{
clientBuilder.AddMemoryStreams(ImplicitStreamTestConstants.StreamProviderName);
}
}
}

public ImplicitSubscriptionKeyTypeGrainTests(Fixture fixture)
{
this.fixture = fixture;
_streamProvider = fixture.Client.GetStreamProvider(ImplicitStreamTestConstants.StreamProviderName);
}

[Fact, TestCategory("Functional"), TestCategory("Streaming")]
public async Task LongKey()
{
long grainId = 13;
int value = 87;
IAsyncStream<int> stream = _streamProvider.GetStream<int>(nameof(IImplicitSubscriptionLongKeyGrain), grainId);

await stream.OnNextAsync(value);

var consumer = fixture.GrainFactory.GetGrain<IImplicitSubscriptionLongKeyGrain>(grainId);
await TestingUtils.WaitUntilAsync(lastTry => CheckValue(consumer, value, lastTry), TimeSpan.FromSeconds(30));
}

private async Task<bool> CheckValue(IImplicitSubscriptionKeyTypeGrain consumer, int expectedValue, bool assertIsTrue)
{
int value = await consumer.GetValue();

if (assertIsTrue)
{
Assert.Equal(expectedValue, value);
}

if (expectedValue != value)
{
return false;
}

return true;
}
}
}
Loading