Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for instanced versions of MayInterleave predicates #8548

Merged
merged 10 commits into from
Jul 24, 2023
79 changes: 65 additions & 14 deletions src/Orleans.Runtime/Activation/IGrainContextActivator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -121,7 +122,7 @@ public interface IGrainContextActivatorProvider
/// <returns><see langword="true"/> if an appropriate activator was found, otherwise <see langword="false"/>.</returns>
bool TryGet(GrainType grainType, [NotNullWhen(true)] out IGrainContextActivator activator);
}

/// <summary>
/// Creates a grain context for the given grain address.
/// </summary>
Expand Down Expand Up @@ -289,7 +290,7 @@ public void Configure(GrainType grainType, GrainProperties properties, GrainType
shared.SetComponent<GrainCanInterleave>(component);
}

component.MayInterleavePredicates.Add(_ => true);
component.MayInterleavePredicates.Add(ReentrantPredicate.Instance);
}
}
}
Expand All @@ -305,11 +306,11 @@ public MayInterleaveConfiguratorProvider(GrainClassMap grainClassMap)

public bool TryGetConfigurator(GrainType grainType, GrainProperties properties, out IConfigureGrainContext configurator)
{
if (properties.Properties.TryGetValue(WellKnownGrainTypeProperties.MayInterleavePredicate, out var value)
if (properties.Properties.TryGetValue(WellKnownGrainTypeProperties.MayInterleavePredicate, out _)
&& _grainClassMap.TryGetGrainClass(grainType, out var grainClass))
{
var predicate = GetMayInterleavePredicate(grainClass);
configurator = new MayInterleaveConfigurator(message => predicate(message.BodyObject as IInvokable));
configurator = new MayInterleaveConfigurator(predicate);
return true;
}

Expand All @@ -321,20 +322,21 @@ public bool TryGetConfigurator(GrainType grainType, GrainProperties properties,
/// Returns interleave predicate depending on whether class is marked with <see cref="MayInterleaveAttribute"/> or not.
/// </summary>
/// <param name="grainType">Grain class.</param>
private static Func<IInvokable, bool> GetMayInterleavePredicate(Type grainType)
private static IMayInterleavePredicate GetMayInterleavePredicate(Type grainType)
{
var attribute = grainType.GetCustomAttribute<MayInterleaveAttribute>();
if (attribute is null)
{
return null;
}

// here
var callbackMethodName = attribute.CallbackMethodName;
var method = grainType.GetMethod(callbackMethodName, BindingFlags.Public | BindingFlags.Static | BindingFlags.FlattenHierarchy);
var method = grainType.GetMethod(callbackMethodName, BindingFlags.Public | BindingFlags.Static | BindingFlags.Instance | BindingFlags.FlattenHierarchy);
if (method == null)
{
throw new InvalidOperationException(
$"Class {grainType.FullName} doesn't declare public static method " +
$"Class {grainType.FullName} doesn't declare public method " +
$"with name {callbackMethodName} specified in MayInterleave attribute");
}

Expand All @@ -345,18 +347,67 @@ private static Func<IInvokable, bool> GetMayInterleavePredicate(Type grainType)
throw new InvalidOperationException(
$"Wrong signature of callback method {callbackMethodName} " +
$"specified in MayInterleave attribute for grain class {grainType.FullName}. \n" +
$"Expected: public static bool {callbackMethodName}(IInvokable req)");
$"Expected: public bool {callbackMethodName}(IInvokable req)");
}

if (method.IsStatic)
{
return new MayInterleaveStaticPredicate(method.CreateDelegate<Func<IInvokable, bool>>());
}

return method.CreateDelegate<Func<IInvokable, bool>>();
var functionType = Expression.GetFuncType(grainType, typeof(IInvokable), typeof(bool));
var functionDelegate = method.CreateDelegate(functionType);
var predicateType = typeof(MayInterleaveInstancedPredicate<>).MakeGenericType(grainType);

return Activator.CreateInstance(predicateType, functionDelegate) as IMayInterleavePredicate;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(side note) at some point we will probably need to make this more AOT friendly, which will involve reworking all of this.

}
}

internal interface IMayInterleavePredicate
{
bool Invoke(object instance, IInvokable bodyObject);
}

internal class ReentrantPredicate : IMayInterleavePredicate
{
private ReentrantPredicate()
{
}

public static ReentrantPredicate Instance { get; } = new();

public bool Invoke(object _, IInvokable bodyObject) => true;
}

internal class MayInterleaveStaticPredicate : IMayInterleavePredicate
{
private readonly Func<IInvokable, bool> _mayInterleavePredicate;

public MayInterleaveStaticPredicate(Func<IInvokable, bool> mayInterleavePredicate)
{
_mayInterleavePredicate = mayInterleavePredicate;
}

public bool Invoke(object _, IInvokable bodyObject) => _mayInterleavePredicate(bodyObject);
}

internal class MayInterleaveInstancedPredicate<T> : IMayInterleavePredicate where T : class
{
private readonly Func<T, IInvokable, bool> _mayInterleavePredicate;
blazknuplez marked this conversation as resolved.
Show resolved Hide resolved

public MayInterleaveInstancedPredicate(Delegate mayInterleavePredicate)
{
_mayInterleavePredicate = mayInterleavePredicate as Func<T, IInvokable, bool>;
}

public bool Invoke(object instance, IInvokable bodyObject) => _mayInterleavePredicate(instance as T, bodyObject);
}

internal class MayInterleaveConfigurator : IConfigureGrainContext
{
private readonly Func<Message, bool> _mayInterleavePredicate;
private readonly IMayInterleavePredicate _mayInterleavePredicate;

public MayInterleaveConfigurator(Func<Message, bool> mayInterleavePredicate)
public MayInterleaveConfigurator(IMayInterleavePredicate mayInterleavePredicate)
{
_mayInterleavePredicate = mayInterleavePredicate;
}
Expand All @@ -376,12 +427,12 @@ public void Configure(IGrainContext context)

internal class GrainCanInterleave
{
public List<Func<Message, bool>> MayInterleavePredicates { get; } = new List<Func<Message, bool>>();
public bool MayInterleave(Message message)
public List<IMayInterleavePredicate> MayInterleavePredicates { get; } = new List<IMayInterleavePredicate>();
public bool MayInterleave(object instance, Message message)
{
foreach (var predicate in this.MayInterleavePredicates)
{
if (predicate(message)) return true;
if (predicate.Invoke(instance, message.BodyObject as IInvokable)) return true;
}

return false;
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ bool MayInvokeRequest(Message incoming)
{
try
{
return canInterleave.MayInterleave(incoming);
return canInterleave.MayInterleave(GrainInstance, incoming);
}
catch (Exception exception)
{
Expand Down
19 changes: 17 additions & 2 deletions test/Grains/TestGrainInterfaces/IReentrancyGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface INonReentrantGrain : IGrainWithIntegerKey
Task SetSelf(INonReentrantGrain self);
}

public interface IMayInterleavePredicateGrain : IGrainWithIntegerKey
public interface IMayInterleaveStaticPredicateGrain : IGrainWithIntegerKey
{
Task<string> One(string arg); // this interleaves only when arg == "reentrant"

Expand All @@ -35,7 +35,22 @@ public interface IMayInterleavePredicateGrain : IGrainWithIntegerKey
Task SubscribeToStream();
Task PushToStream(string item);

Task SetSelf(IMayInterleavePredicateGrain self);
Task SetSelf(IMayInterleaveStaticPredicateGrain self);
}

public interface IMayInterleaveInstancedPredicateGrain : IGrainWithIntegerKey
{
Task<string> One(string arg); // this interleaves only when arg == "reentrant"

Task<string> Two();
Task<string> TwoReentrant();

Task Exceptional();

Task SubscribeToStream();
Task PushToStream(string item);

Task SetSelf(IMayInterleaveInstancedPredicateGrain self);
}

[Unordered]
Expand Down
96 changes: 90 additions & 6 deletions test/Grains/TestGrains/ReentrantGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ public Task SetSelf(INonReentrantGrain self)
}

[MayInterleave(nameof(MayInterleave))]
public class MayInterleavePredicateGrain : Grain, IMayInterleavePredicateGrain
public class MayInterleaveStaticPredicateGrain : Grain, IMayInterleaveStaticPredicateGrain
{
private readonly ILogger logger;

public MayInterleavePredicateGrain(ILoggerFactory loggerFactory)
public MayInterleaveStaticPredicateGrain(ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger($"{this.GetType().Name}-{this.IdentityString}");
}
Expand Down Expand Up @@ -111,9 +111,9 @@ public static bool MayInterleave(IInvokable req)

static object UnwrapImmutable(object item) => item is Immutable<object> ? ((Immutable<object>)item).Value : item;

private IMayInterleavePredicateGrain Self { get; set; }
private IMayInterleaveStaticPredicateGrain Self { get; set; }

// this interleaves only when arg == "reentrant"
// this interleaves only when arg == "reentrant"
// and test predicate will throw when arg = "err"
public Task<string> One(string arg)
{
Expand Down Expand Up @@ -151,10 +151,94 @@ public Task PushToStream(string item)
return GetStream().OnNextAsync(item);
}

IAsyncStream<string> GetStream() =>
IAsyncStream<string> GetStream() =>
this.GetStreamProvider("sms").GetStream<string>("test-stream-interleave", Guid.Empty);

public Task SetSelf(IMayInterleavePredicateGrain self)
public Task SetSelf(IMayInterleaveStaticPredicateGrain self)
{
Self = self;
return Task.CompletedTask;
}
}

[MayInterleave(nameof(MayInterleave))]
public class MayInterleaveInstancedPredicateGrain : Grain, IMayInterleaveInstancedPredicateGrain
{
private readonly ILogger logger;

public MayInterleaveInstancedPredicateGrain(ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger($"{this.GetType().Name}-{this.IdentityString}");
}

public static bool MayInterleave(IInvokable req)
{
// not interested
if (req.GetArgumentCount() == 0)
return false;

string arg = null;

// assume single argument message
if (req.GetArgumentCount() == 1)
arg = (string)UnwrapImmutable(req.GetArgument(0));

// assume stream message
if (req.GetArgumentCount() == 2)
arg = (string)UnwrapImmutable(req.GetArgument(1));

if (arg == "err")
throw new ApplicationException("boom");

return arg == "reentrant";
}

static object UnwrapImmutable(object item) => item is Immutable<object> ? ((Immutable<object>)item).Value : item;

private IMayInterleaveInstancedPredicateGrain Self { get; set; }

// this interleaves only when arg == "reentrant"
// and test predicate will throw when arg = "err"
public Task<string> One(string arg)
{
return Task.FromResult("one");
}

public async Task<string> Two()
{
return await Self.One("") + " two";
}

public async Task<string> TwoReentrant()
{
return await Self.One("reentrant") + " two";
}

public Task Exceptional()
{
return Self.One("err");
}

public async Task SubscribeToStream()
{
var stream = GetStream();

await stream.SubscribeAsync((item, _) =>
{
logger.LogInformation("Received stream item: {Item}", item);
return Task.CompletedTask;
});
}

public Task PushToStream(string item)
{
return GetStream().OnNextAsync(item);
}

IAsyncStream<string> GetStream() =>
this.GetStreamProvider("sms").GetStream<string>("test-stream-interleave", Guid.Empty);

public Task SetSelf(IMayInterleaveInstancedPredicateGrain self)
{
Self = self;
return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public void NonReentrantGrain(bool performDeadlockDetection)
this.logger.LogInformation("Reentrancy NonReentrantGrain Test finished OK.");
}

public void NonReentrantGrain_WithMayInterleavePredicate_WhenPredicateReturnsFalse(bool performDeadlockDetection)
public void NonReentrantGrain_WithMayInterleaveStaticPredicate_WhenPredicateReturnsFalse(bool performDeadlockDetection)
{
var grain = this.grainFactory.GetGrain<IMayInterleavePredicateGrain>(OrleansTestingBase.GetRandomGrainId());
var grain = this.grainFactory.GetGrain<IMayInterleaveStaticPredicateGrain>(OrleansTestingBase.GetRandomGrainId());
grain.SetSelf(grain).Wait();
bool timeout = false;
bool deadlock = false;
Expand All @@ -69,6 +69,31 @@ public void NonReentrantGrain_WithMayInterleavePredicate_WhenPredicateReturnsFal
this.logger.LogInformation("Reentrancy NonReentrantGrain_WithMayInterleavePredicate_WhenPredicateReturnsFalse Test finished OK.");
}

public void NonReentrantGrain_WithMayInterleaveInstancedPredicate_WhenPredicateReturnsFalse(bool performDeadlockDetection)
{
var grain = this.grainFactory.GetGrain<IMayInterleaveInstancedPredicateGrain>(OrleansTestingBase.GetRandomGrainId());
grain.SetSelf(grain).Wait();
bool timeout = false;
bool deadlock = false;
try
{
timeout = !grain.Two().Wait(2000);
}
catch (Exception exc)
{
Assert.True(false, string.Format("Unexpected exception {0}: {1}", exc.Message, exc.StackTrace));
}
if (performDeadlockDetection)
{
Assert.True(deadlock, "Non-reentrant grain should deadlock when MayInterleave predicate returns false");
}
else
{
Assert.True(timeout, "Non-reentrant grain should timeout when MayInterleave predicate returns false");
}
this.logger.LogInformation("Reentrancy NonReentrantGrain_WithMayInterleaveInstancedPredicate_WhenPredicateReturnsFalse Test finished OK.");
}

public void UnorderedNonReentrantGrain(bool performDeadlockDetection)
{
IUnorderedNonReentrantGrain unonreentrant = this.grainFactory.GetGrain<IUnorderedNonReentrantGrain>(OrleansTestingBase.GetRandomGrainId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ public void NonReentrantGrain()
}

[Fact, TestCategory("Functional"), TestCategory("Tasks"), TestCategory("Reentrancy")]
public void NonReentrantGrain_WithMayInterleavePredicate_WhenPredicateReturnsFalse()
public void NonReentrantGrain_WithMayInterleaveStaticPredicate_WhenPredicateReturnsFalse()
{
this.runner.NonReentrantGrain_WithMayInterleavePredicate_WhenPredicateReturnsFalse(false);
this.runner.NonReentrantGrain_WithMayInterleaveStaticPredicate_WhenPredicateReturnsFalse(false);
}

[Fact, TestCategory("Functional"), TestCategory("Tasks"), TestCategory("Reentrancy")]
public void NonReentrantGrain_WithMayInterleaveInstancedPredicate_WhenPredicateReturnsFalse()
{
this.runner.NonReentrantGrain_WithMayInterleaveInstancedPredicate_WhenPredicateReturnsFalse(false);
}

[Fact, TestCategory("Functional"), TestCategory("Tasks"), TestCategory("Reentrancy")]
Expand Down
Loading
Loading