Skip to content

Commit

Permalink
generalize CacheInvalidator to ServerBrodcast to support methodName /…
Browse files Browse the repository at this point in the history
… Argument
  • Loading branch information
olmobrutall committed Dec 15, 2021
1 parent 15116a8 commit 1903c0d
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ namespace Signum.Engine.Cache;
//Never Tested, works only in theory
//https://github.com/briandunnington/SynchronizedCache/blob/master/SynchronizedCache.cs
//https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/MigrationGuide.md
public class AzureServiceBusCacheInvalidator : ICacheMultiServerInvalidator, IAsyncDisposable
public class AzureServiceBusBroadcast : IServerBroadcast, IAsyncDisposable
{
public event Action<string>? ReceiveInvalidation;

public event Action<string, string>? Receive;

ServiceBusAdministrationClient adminClient;
ServiceBusClient client;

Expand All @@ -25,26 +25,26 @@ public class AzureServiceBusCacheInvalidator : ICacheMultiServerInvalidator, IAs

public DateTime StartTime;

public AzureServiceBusCacheInvalidator(string namespaceConnectionString, string topicName = "cache-invalidation")
public AzureServiceBusBroadcast(string namespaceConnectionString, string topicName = "cache-invalidation")
{
this.TopicName = topicName;
this.SubscriptionName = Environment.MachineName + "-" + Schema.Current.ApplicationName;

adminClient = new ServiceBusAdministrationClient(namespaceConnectionString);
client = new ServiceBusClient(namespaceConnectionString);
sender = client.CreateSender(this.TopicName);
}



public void SendInvalidation(string cleanName)
}


public void Send(string methodName, string argument)
{
sender.SendMessageAsync(new ServiceBusMessage(BinaryData.FromObjectAsJson(new AzureInvalidationMessage
{
CreationDate = DateTime.UtcNow,
OriginMachineName = Environment.MachineName,
OriginApplicationName = Schema.Current.ApplicationName,
CleanName = cleanName,
MethodName = methodName,
Argument = argument,
}, EntityJsonContext.FullJsonSerializerOptions))).Wait();
}

Expand Down Expand Up @@ -79,7 +79,7 @@ async Task StartMessageListener()

private Task Processor_ProcessErrorAsync(ProcessErrorEventArgs arg)
{
arg.Exception.LogException(ex => ex.ControllerName = nameof(AzureServiceBusCacheInvalidator));
arg.Exception.LogException(ex => ex.ControllerName = nameof(AzureServiceBusBroadcast));
return Task.CompletedTask;
}

Expand All @@ -96,19 +96,25 @@ private async Task Processor_ProcessMessageAsync(ProcessMessageEventArgs arg)
message.OriginApplicationName == Schema.Current.ApplicationName)
return;

ReceiveInvalidation?.Invoke(message.CleanName);
Receive?.Invoke(message.MethodName, message.Argument);

await arg.CompleteMessageAsync(arg.Message);
}catch (Exception ex)
{
ex.LogException(ex => ex.ControllerName = nameof(AzureServiceBusCacheInvalidator));
ex.LogException(ex => ex.ControllerName = nameof(AzureServiceBusBroadcast));
}
}

public async ValueTask DisposeAsync()
{
await this.client.DisposeAsync();
}
}

public override string ToString()
{
return $"{nameof(AzureServiceBusBroadcast)}(TopicName = {TopicName}, SubscriptionName = {SubscriptionName})";
}

}

#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
Expand All @@ -117,6 +123,8 @@ public class AzureInvalidationMessage
public DateTime CreationDate;
public string OriginMachineName;
public string OriginApplicationName;
public string CleanName;
public string MethodName;

public string Argument { get; internal set; }
}
#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
68 changes: 68 additions & 0 deletions Signum.Engine.Extensions/Cache/Broadcast/PostgresBroadcast.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using Npgsql;
using System.Diagnostics;

namespace Signum.Engine.Cache;

public class PostgresBroadcast : IServerBroadcast
{
public event Action<string, string>? Receive;

public void Send(string methodName, string argument)
{
Executor.ExecuteNonQuery($"NOTIFY table_changed, '{methodName}/{Process.GetCurrentProcess().Id}/{argument}'");
}

public void Start()
{
Task.Run(() =>
{
try
{
var conn = (NpgsqlConnection)Connector.Current.CreateConnection();
conn.Open();
conn.Notification += (o, e) =>
{
try
{
var methodName = e.Payload.Before('/');
var after = e.Payload.After("/");
var pid = int.Parse(after.Before("/"));
var arguments = after.After("/");
if (Process.GetCurrentProcess().Id != pid)
Receive?.Invoke(methodName, arguments);
}
catch (Exception ex)
{
ex.LogException(a => a.ControllerName = nameof(PostgresBroadcast));
}
};
using (var cmd = new NpgsqlCommand("LISTEN table_changed", conn))
{
cmd.ExecuteNonQuery();
}
while (true)
{
conn.Wait(); // Thread will block here
}
}
catch (Exception e)
{
e.LogException(a =>
{
a.ControllerName = nameof(PostgresBroadcast);
a.ActionName = "Fatal";
});
}
});
}


public override string ToString()
{
return $"{nameof(PostgresBroadcast)}()";
}
}
108 changes: 108 additions & 0 deletions Signum.Engine.Extensions/Cache/Broadcast/SimpleHttpBroadcast.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using Npgsql;
using Signum.Engine.Json;
using Signum.Entities.Cache;
using Signum.Services;
using System.Diagnostics;
using System.Net.Http;
using System.Net.Http.Json;

namespace Signum.Engine.Cache;


public class SimpleHttpBroadcast : IServerBroadcast
{

HttpClient client = new HttpClient();
readonly string bordcastSecretHash;
readonly string[] broadcastUrls;

public SimpleHttpBroadcast(string broadcastSecret, string broadcastUrls)
{
this.bordcastSecretHash = Convert.ToBase64String(Security.EncodePassword(broadcastSecret));
this.broadcastUrls = broadcastUrls
.SplitNoEmpty(new char[] { ';', ',' } /*In theory ; and , are valid in a URL, but since we talk only domain names or IPs...*/)
.Select(a => a.Trim())
.Where(a => a.HasText())
.ToArray();
}

public event Action<string, string>? Receive;

public void Start()
{
}

//Called from Controller
public void InvalidateTable(InvalidateTableRequest request)
{
if (this.bordcastSecretHash != request.SecretHash)
throw new InvalidOperationException("invalidationSecret does not match");

if (request.OriginMachineName == Environment.MachineName ||
request.OriginApplicationName == Schema.Current.ApplicationName)
return;

Receive?.Invoke(request.MethodName, request.Argument);
}

public void Send(string methodName, string argument)
{
var request = new InvalidateTableRequest
{
MethodName = methodName,
Argument = argument,
SecretHash = this.bordcastSecretHash,
OriginMachineName = Environment.MachineName,
OriginApplicationName = Schema.Current.ApplicationName,
};

foreach (var url in broadcastUrls)
{
string? errorBody = null;
try
{
var fullUrl = url.TrimEnd('/') + "/api/cache/invalidateTable";

var json = JsonContent.Create(request, options: EntityJsonContext.FullJsonSerializerOptions /*SignumServer.JsonSerializerOptions*/);

var response = client.PostAsync(fullUrl, json).Result;

if (!response.IsSuccessStatusCode)
{
errorBody = response.Content.ReadAsStringAsync().Result;
}

}

catch (Exception e)
{
e.LogException(a =>
{
a.ControllerName = nameof(SimpleHttpBroadcast);
a.Data.Text = errorBody;
});
}
}
}

public override string ToString()
{
return $"{nameof(SimpleHttpBroadcast)}(Urls={broadcastUrls.ToString(", ")})";
}


}

#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
public class InvalidateTableRequest
{
public string OriginMachineName;
public string OriginApplicationName;
public string SecretHash;
public string Argument;
public string MethodName;
}
#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
36 changes: 24 additions & 12 deletions Signum.Engine.Extensions/Cache/CacheLogic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@

namespace Signum.Engine.Cache;

public interface ICacheMultiServerInvalidator
public interface IServerBroadcast
{
void Start();
void SendInvalidation(string cleanName);
event Action<string>? ReceiveInvalidation;
void Send(string methodName, string argument);
event Action<string, string>? Receive;
}

public static class CacheLogic
{
public static ICacheMultiServerInvalidator? CacheInvalidator { get; private set; }
public static IServerBroadcast? ServerBroadcast { get; private set; }

public static bool WithSqlDependency { get; internal set; }

Expand All @@ -50,7 +50,7 @@ public static void AssertStarted(SchemaBuilder sb)
/// Change Server Authentication mode and enable SA: http://msdn.microsoft.com/en-us/library/ms188670.aspx
/// Change Database ownership to sa: ALTER AUTHORIZATION ON DATABASE::yourDatabase TO sa
/// </summary>
public static void Start(SchemaBuilder sb, bool? withSqlDependency = null, ICacheMultiServerInvalidator? cacheInvalidator = null)
public static void Start(SchemaBuilder sb, bool? withSqlDependency = null, IServerBroadcast? serverBroadcast = null)
{
if (sb.NotDefined(MethodInfo.GetCurrentMethod()))
{
Expand All @@ -63,14 +63,14 @@ public static void Start(SchemaBuilder sb, bool? withSqlDependency = null, ICach

WithSqlDependency = withSqlDependency ?? Connector.Current.SupportsSqlDependency;

if (cacheInvalidator != null && WithSqlDependency)
if (serverBroadcast != null && WithSqlDependency)
throw new InvalidOperationException("cacheInvalidator is only necessary if SqlDependency is not enabled");

CacheInvalidator = cacheInvalidator;
if(CacheInvalidator != null)
ServerBroadcast = serverBroadcast;
if(ServerBroadcast != null)
{
CacheInvalidator!.ReceiveInvalidation += CacheInvalidator_ReceiveInvalidation;
sb.Schema.BeforeDatabaseAccess += () => CacheInvalidator!.Start();
ServerBroadcast!.Receive += ServerBroadcast_Receive;
sb.Schema.BeforeDatabaseAccess += () => ServerBroadcast!.Start();
}

sb.Schema.SchemaCompleted += () => Schema_SchemaCompleted(sb);
Expand Down Expand Up @@ -115,7 +115,17 @@ static void Schema_SchemaCompleted(SchemaBuilder sb)
}
}

static void CacheInvalidator_ReceiveInvalidation(string cleanName)
static void ServerBroadcast_Receive(string methodName, string argument)
{
BroadcastReceivers.TryGetC(methodName)?.Invoke(argument);
}

public static Dictionary<string, Action<string>> BroadcastReceivers = new Dictionary<string, Action<string>>
{
{ InvalidateTable, ServerBroadcast_InvalidateTable}
};

static void ServerBroadcast_InvalidateTable(string cleanName)
{
Type type = TypeEntity.TryGetType(cleanName)!;

Expand Down Expand Up @@ -707,6 +717,8 @@ This may be because SchemaCompleted is not yet called and you are accesing some
}
}

const string InvalidateTable = "InvalidateTable";

internal static void NotifyInvalidateAllConnectedTypes(Type type)
{
var connected = inverseDependencies.IndirectlyRelatedTo(type, includeInitialNode: true);
Expand All @@ -717,7 +729,7 @@ internal static void NotifyInvalidateAllConnectedTypes(Type type)
if (controller != null)
controller.NotifyInvalidated();

CacheInvalidator?.SendInvalidation(TypeLogic.GetCleanName(stype));
ServerBroadcast?.Send(InvalidateTable, TypeLogic.GetCleanName(stype));
}
}

Expand Down
Loading

3 comments on commit 1903c0d

@olmobrutall
Copy link
Collaborator Author

@olmobrutall olmobrutall commented on 1903c0d Dec 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CacheLogic.ServerBrodcast

In yesterday's post we saw how Alerts modules uses SignalR so that the server can communicate with the clients in real-time... but what if we have more than one server?

Traditionally the solution of Signum Framework to communicate Cache Invalidations was SqlDependency but it has some problems:

  • Doesn't work in SqlAzure
  • Only works for cache invalidation but not for other transactional data (like Alerts)
  • It's error prone.

Alternatively Signum had a ICacheInvalidator that allows server-to-server cache invalidation using a SendInvalidation method and a ReceiveInvalidation event.

The only implementation of this interface till now was PostgresCacheInvalidation, using the native NOTIFY functionality.

Three changes

This change does three things:

  • Generalize ICacheInvalidator to IServerBroadcast that has a generic Send method and Receive event, each with two arguments: methodName and argument. This way the infrastructure can be used for sending other types of notifications, not only cache invalidation. PostgresCacheInvalidation has been renamed to PostgresBroadcast.

  • Implement AzureServiceBusBroadcast : IServerBroadcast that uses Azure Service Bus pub/sub functionality (topics and subscriptions) to implement this interface. This is the recommended solution if you have a farm of servers, but it cost about 8€/month

  • Implement SimpleHttpBroadcast : IServerBroadcast this is a simpler implementation where each server know the URLs of all the other servers and just makes an HTTP request. It automatically ignores his own requests. This is the recommended solution when you just have 2 or 3 servers, like using Azure Slot Deployment.

How to use it

This is how you receive messages form peer servers: https://github.com/signumsoftware/framework/blob/master/Signum.React.Extensions/Alerts/AlertsServer.cs#L36
This is how you sent messages: https://github.com/signumsoftware/framework/blob/master/Signum.React.Extensions/Alerts/AlertsServer.cs#L44

Useful for Azure Slot Deployments

One use case where this is useful is in Azure Slot Deployments where you have two servers to ensure 0 downtime deployments.

In this case, I typically chose one server to run the scheduled tasks / background process, (let's say the green one), but this server is not always in the front side, and when is not, it needs to refresh the caches / notify the front-end server.

@MehdyKarimpour
Copy link
Contributor

@MehdyKarimpour MehdyKarimpour commented on 1903c0d Dec 18, 2021 via email

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rezanos
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent design, iterative/constant/great improvements, as always
Bravo! 🎉

Please sign in to comment.