Skip to content

Commit

Permalink
Add IClusterConnectionStatusObserver support (#9145)
Browse files Browse the repository at this point in the history
* Add in-process test cluster to simplify testing of services which are hosted entirely in-process

* Add IClusterConnectionStatusObserver support
---------

Co-authored-by: Reuben Bond <[email protected]>
  • Loading branch information
galvesribeiro and ReubenBond authored Oct 1, 2024
1 parent fdc1c5e commit 546b739
Show file tree
Hide file tree
Showing 15 changed files with 1,604 additions and 96 deletions.
41 changes: 41 additions & 0 deletions src/Orleans.Core/Core/ClientBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,35 @@ public static IClientBuilder AddGatewayCountChangedHandler(this IClientBuilder b
return builder;
}

/// <summary>
/// Registers a <see cref="GatewayCountChangedHandler"/> event handler.
/// </summary>
public static IClientBuilder AddGatewayCountChangedHandler(this IClientBuilder builder, Func<IServiceProvider, GatewayCountChangedHandler> handlerFactory)
{
builder.ConfigureServices(services => services.AddSingleton(handlerFactory));
return builder;
}

/// <summary>
/// Registers a cluster connection status observer.
/// </summary>
public static IClientBuilder AddClusterConnectionStatusObserver<TObserver>(this IClientBuilder builder, TObserver observer)
where TObserver : IClusterConnectionStatusObserver
{
builder.Services.AddSingleton<IClusterConnectionStatusObserver>(observer);
return builder;
}

/// <summary>
/// Registers a cluster connection status observer.
/// </summary>
public static IClientBuilder AddClusterConnectionStatusObserver<TObserver>(this IClientBuilder builder)
where TObserver : class, IClusterConnectionStatusObserver
{
builder.Services.AddSingleton<IClusterConnectionStatusObserver, TObserver>();
return builder;
}

/// <summary>
/// Registers a <see cref="ConnectionToClusterLostHandler"/> event handler.
/// </summary>
Expand All @@ -116,6 +145,18 @@ public static IClientBuilder AddClusterConnectionLostHandler(this IClientBuilder
return builder;
}

/// <summary>
/// Registers a <see cref="ConnectionToClusterLostHandler"/> event handler.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="handlerFactory">The handler factory.</param>
/// <returns>The builder.</returns>
public static IClientBuilder AddClusterConnectionLostHandler(this IClientBuilder builder, Func<IServiceProvider, ConnectionToClusterLostHandler> handlerFactory)
{
builder.ConfigureServices(services => services.AddSingleton(handlerFactory));
return builder;
}

/// <summary>
/// Add <see cref="Activity.Current"/> propagation through grain calls.
/// Note: according to <see cref="ActivitySource.StartActivity(string, ActivityKind)"/> activity will be created only when any listener for activity exists <see cref="ActivitySource.HasListeners()"/> and <see cref="ActivityListener.Sample"/> returns <see cref="ActivitySamplingResult.PropagationData"/>.
Expand Down
1 change: 1 addition & 0 deletions src/Orleans.Core/Core/DefaultClientServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public static void AddDefaultServices(IClientBuilder builder)
services.TryAddSingleton<OutsideRuntimeClient>();
services.TryAddSingleton<InterfaceToImplementationMappingCache>();
services.TryAddSingleton<ClientGrainContext>();
services.AddSingleton<IClusterConnectionStatusObserver, ClusterConnectionStatusObserverAdaptor>();
services.AddFromExisting<IGrainContextAccessor, ClientGrainContext>();
services.TryAddFromExisting<IRuntimeClient, OutsideRuntimeClient>();
services.TryAddFromExisting<IClusterConnectionStatusListener, OutsideRuntimeClient>();
Expand Down
24 changes: 24 additions & 0 deletions src/Orleans.Core/Core/IClusterConnectionStatusObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace Orleans;

/// <summary>
/// Interface that receives notifications about the status of the cluster connection.
/// </summary>
public interface IClusterConnectionStatusObserver
{
/// <summary>
/// Notifies this observer that the number of connected gateways has changed.
/// </summary>
/// <param name="currentNumberOfGateways">
/// The current number of gateways.
/// </param>
/// <param name="previousNumberOfGateways">
/// The previous number of gateways.
/// </param>
/// <param name="connectionRecovered">Indicates whether a loss of connectivity has been resolved.</param>
void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways, bool connectionRecovered);

/// <summary>
/// Notifies this observer that the connection to the cluster has been lost.
/// </summary>
void NotifyClusterConnectionLost();
}
46 changes: 46 additions & 0 deletions src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using Microsoft.Extensions.Logging;

namespace Orleans.Runtime;

internal sealed class ClusterConnectionStatusObserverAdaptor(
IEnumerable<GatewayCountChangedHandler> gatewayCountChangedHandlers,
IEnumerable<ConnectionToClusterLostHandler> connectionLostHandlers,
ILogger<ClusterClient> logger) : IClusterConnectionStatusObserver
{
private readonly ImmutableArray<GatewayCountChangedHandler> _gatewayCountChangedHandlers = gatewayCountChangedHandlers.ToImmutableArray();
private readonly ImmutableArray<ConnectionToClusterLostHandler> _connectionLostHandler = connectionLostHandlers.ToImmutableArray();

public void NotifyClusterConnectionLost()
{
foreach (var handler in _connectionLostHandler)
{
try
{
handler(null, EventArgs.Empty);
}
catch (Exception ex)
{
logger.LogError((int)ErrorCode.ClientError, ex, "Error sending cluster connection lost notification.");
}
}
}

public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways, bool connectionRecovered)
{
var args = new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways);
foreach (var handler in _gatewayCountChangedHandlers)
{
try
{
handler(null, args);
}
catch (Exception ex)
{
logger.LogError((int)ErrorCode.ClientError, ex, "Error sending gateway count changed notification.");
}
}
}
}
60 changes: 26 additions & 34 deletions src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
Expand All @@ -13,11 +14,11 @@
using Orleans.Runtime;
using Orleans.Serialization;
using Orleans.Serialization.Invocation;
using Orleans.Serialization.Serializers;
using static Orleans.Internal.StandardExtensions;

namespace Orleans
{

internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConnectionStatusListener
{
internal static bool TestOnlyThrowExceptionDuringInit { get; set; }
Expand All @@ -32,6 +33,7 @@ internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConne

private readonly MessagingTrace messagingTrace;
private readonly InterfaceToImplementationMappingCache _interfaceToImplementationMapping;
private IClusterConnectionStatusObserver[] _statusObservers;

public IInternalGrainFactory InternalGrainFactory { get; private set; }

Expand Down Expand Up @@ -95,17 +97,7 @@ internal void ConsumeServices()
{
try
{
var connectionLostHandlers = this.ServiceProvider.GetServices<ConnectionToClusterLostHandler>();
foreach (var handler in connectionLostHandlers)
{
this.ClusterConnectionLost += handler;
}

var gatewayCountChangedHandlers = this.ServiceProvider.GetServices<GatewayCountChangedHandler>();
foreach (var handler in gatewayCountChangedHandlers)
{
this.GatewayCountChanged += handler;
}
_statusObservers = this.ServiceProvider.GetServices<IClusterConnectionStatusObserver>().ToArray();

this.InternalGrainFactory = this.ServiceProvider.GetRequiredService<IInternalGrainFactory>();
this.messageFactory = this.ServiceProvider.GetService<MessageFactory>();
Expand Down Expand Up @@ -272,7 +264,7 @@ public void SendRequest(GrainReference target, IInvokable request, IResponseComp
{
// don't set expiration for system target messages.
var ttl = request.GetDefaultResponseTimeout() ?? this.clientMessagingOptions.ResponseTimeout;
message.TimeToLive = ttl;
message.TimeToLive = ttl;
}

if (!oneWay)
Expand Down Expand Up @@ -401,9 +393,6 @@ public void Dispose()

Utils.SafeExecute(() => MessageCenter?.Dispose());

this.ClusterConnectionLost = null;
this.GatewayCountChanged = null;

GC.SuppressFinalize(this);
disposed = true;
}
Expand All @@ -422,35 +411,38 @@ public void BreakOutstandingMessagesToDeadSilo(SiloAddress deadSilo)
public int GetRunningRequestsCount(GrainInterfaceType grainInterfaceType)
=> this.callbacks.Count(c => c.Value.Message.InterfaceType == grainInterfaceType);

/// <inheritdoc />
public event ConnectionToClusterLostHandler ClusterConnectionLost;

/// <inheritdoc />
public event GatewayCountChangedHandler GatewayCountChanged;

/// <inheritdoc />
public void NotifyClusterConnectionLost()
{
try
foreach (var observer in _statusObservers)
{
this.ClusterConnectionLost?.Invoke(this, EventArgs.Empty);
}
catch (Exception ex)
{
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error when sending cluster disconnection notification");
try
{
observer.NotifyClusterConnectionLost();
}
catch (Exception ex)
{
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error sending cluster disconnection notification.");
}
}
}

/// <inheritdoc />
public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways)
{
try
{
this.GatewayCountChanged?.Invoke(this, new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways));
}
catch (Exception ex)
foreach (var observer in _statusObservers)
{
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error when sending gateway count changed notification");
try
{
observer.NotifyGatewayCountChanged(
currentNumberOfGateways,
previousNumberOfGateways,
currentNumberOfGateways > 0 && previousNumberOfGateways <= 0);
}
catch (Exception ex)
{
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error sending gateway count changed notification.");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,6 @@ public override async ValueTask DisposeAsync()

_connectionClosedTokenSource.Dispose();
}

public override string ToString() => $"InMem({LocalEndPoint}<->{RemoteEndPoint})";
}
Loading

0 comments on commit 546b739

Please sign in to comment.