diff --git a/README.md b/README.md index 0aa4ca9b..dee2a59e 100644 --- a/README.md +++ b/README.md @@ -257,19 +257,25 @@ spec: # Optional # name : SLIMFAAS_ALLOW_UNSECURE_SSL # value : "false" # default equivalent to false + # Optional + # name: HEALTH_WORKER_DELAY_MILLISECONDS + # value: "1000" # default equivalent to 1 seconds + # Optional + # name: HEALTH_WORKER_DELAY_TO_EXIT_SECONDS + # value: "60" # default equivalent to 10 seconds # name : SLIMDATA_CONFIGURATION # represent SlimData internal configuration, more documentation here: https://dotnet.github.io/dotNext/features/cluster/raft.html # value : | #default values # { # "partitioning":"false", - # "lowerElectionTimeout":"400", - # "upperElectionTimeout":"800", - # "requestTimeout":"00:01:20.0000000", - # "rpcTimeout":"00:00:40.0000000", + # "lowerElectionTimeout":"150", + # "upperElectionTimeout":"300", + # "requestTimeout":"00:00:00.3000000", + # "rpcTimeout":"00:00:00.1500000", # "coldStart":"false", # "requestJournal:memoryLimit":"5", # "requestJournal:expiration":"00:01:00", - # "heartbeatThreshold":"0.4", + # "heartbeatThreshold":"0.5", # } volumeMounts: - name: slimfaas-volume diff --git a/src/SlimData/Commands/AddHashSetCommand.cs b/src/SlimData/Commands/AddHashSetCommand.cs index 93a8364f..da41f6f5 100644 --- a/src/SlimData/Commands/AddHashSetCommand.cs +++ b/src/SlimData/Commands/AddHashSetCommand.cs @@ -38,8 +38,8 @@ await writer.EncodeAsync(command.Key.AsMemory(), context, foreach (var (key, value) in Value) { - await writer.EncodeAsync(key.AsMemory(), context, LengthFormat.LittleEndian, token).ConfigureAwait(false);; - await writer.EncodeAsync(value.AsMemory(), context, LengthFormat.LittleEndian, token).ConfigureAwait(false);; + await writer.EncodeAsync(key.AsMemory(), context, LengthFormat.LittleEndian, token).ConfigureAwait(false); + await writer.EncodeAsync(value.AsMemory(), context, LengthFormat.LittleEndian, token).ConfigureAwait(false); } } diff --git a/src/SlimData/Commands/AddKeyValueCommand.cs b/src/SlimData/Commands/AddKeyValueCommand.cs index 8807fa85..6bb877df 100644 --- a/src/SlimData/Commands/AddKeyValueCommand.cs +++ b/src/SlimData/Commands/AddKeyValueCommand.cs @@ -31,7 +31,7 @@ public static async ValueTask ReadFromAsync(TReader var key = await reader .DecodeAsync(new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token) .ConfigureAwait(false); - using var value = await reader.ReadAsync(LengthFormat.Compressed).ConfigureAwait(false); + using var value = await reader.ReadAsync(LengthFormat.Compressed, token: token).ConfigureAwait(false); return new AddKeyValueCommand { Key = key.ToString(), diff --git a/src/SlimData/Commands/ListLeftPushCommand.cs b/src/SlimData/Commands/ListLeftPushCommand.cs index a2895e8f..8fa9a60f 100644 --- a/src/SlimData/Commands/ListLeftPushCommand.cs +++ b/src/SlimData/Commands/ListLeftPushCommand.cs @@ -29,7 +29,7 @@ public static async ValueTask ReadFromAsync(TReade where TReader : notnull, IAsyncBinaryReader { var key = await reader.DecodeAsync(new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token).ConfigureAwait(false); - using var value = await reader.ReadAsync(LengthFormat.Compressed).ConfigureAwait(false); + using var value = await reader.ReadAsync(LengthFormat.Compressed, token: token).ConfigureAwait(false); return new ListLeftPushCommand { Key = key.ToString(), diff --git a/src/SlimData/Commands/LogSnapshotCommand.cs b/src/SlimData/Commands/LogSnapshotCommand.cs index 5e691f35..f111003d 100644 --- a/src/SlimData/Commands/LogSnapshotCommand.cs +++ b/src/SlimData/Commands/LogSnapshotCommand.cs @@ -100,7 +100,7 @@ public static async ValueTask ReadFromAsync(TReader { var key = await reader.DecodeAsync(new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token) .ConfigureAwait(false); - using var value = await reader.ReadAsync(LengthFormat.Compressed).ConfigureAwait(false); + using var value = await reader.ReadAsync(LengthFormat.Compressed, token: token).ConfigureAwait(false); keysValues.Add(key.ToString(), value.Memory); } @@ -115,7 +115,7 @@ public static async ValueTask ReadFromAsync(TReader var queue = new List>(countQueue); while (countQueue-- > 0) { - using var value = await reader.ReadAsync(LengthFormat.Compressed).ConfigureAwait(false); + using var value = await reader.ReadAsync(LengthFormat.Compressed, token: token).ConfigureAwait(false); queue.Add(value.Memory); } diff --git a/src/SlimData/Endpoints.cs b/src/SlimData/Endpoints.cs index df75ff0b..6e42a42f 100644 --- a/src/SlimData/Endpoints.cs +++ b/src/SlimData/Endpoints.cs @@ -29,8 +29,7 @@ public static async Task DoAsync(HttpContext context, RespondDelegate respondDel var cluster = context.RequestServices.GetRequiredService(); var provider = context.RequestServices.GetRequiredService(); - var source = - CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted, + var source = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted, cluster.LeadershipToken); try { @@ -78,8 +77,7 @@ public static async Task AddHashSetCommand(SlimPersistentState provider, string var logEntry = provider.Interpreter.CreateLogEntry( new AddHashSetCommand { Key = key, Value = dictionary }, cluster.Term); - await provider.AppendAsync(logEntry, source.Token); - await provider.CommitAsync(source.Token); + await cluster.ReplicateAsync(logEntry, source.Token); } public static Task ListRightPop(HttpContext context) @@ -93,8 +91,7 @@ public static Task ListRightPop(HttpContext context) if (string.IsNullOrEmpty(key) || !int.TryParse(value, out var count)) { context.Response.StatusCode = StatusCodes.Status400BadRequest; - await context.Response.WriteAsync("GetKeyValue key is empty or value is not a number", - context.RequestAborted); + await context.Response.WriteAsync("GetKeyValue key is empty or value is not a number", context.RequestAborted); return; } @@ -109,6 +106,11 @@ public static async Task ListRightPopCommand(SlimPersistentState pro { var values = new ListString(); values.Items = new List(); + while (cluster.TryGetLeaseToken(out var leaseToken) && leaseToken.IsCancellationRequested) + { + Console.WriteLine("Master node is waiting for lease token"); + await Task.Delay(10); + } var queues = ((ISupplier)provider).Invoke().Queues; if (queues.TryGetValue(key, out var queue)) { @@ -122,8 +124,7 @@ public static async Task ListRightPopCommand(SlimPersistentState pro provider.Interpreter.CreateLogEntry( new ListRightPopCommand { Key = key, Count = count }, cluster.Term); - await provider.AppendAsync(logEntry, source.Token); - await provider.CommitAsync(source.Token); + await cluster.ReplicateAsync(logEntry, source.Token); } return values; @@ -155,8 +156,7 @@ public static async Task ListLeftPushCommand(SlimPersistentState provider, strin var logEntry = provider.Interpreter.CreateLogEntry(new ListLeftPushCommand { Key = key, Value = value }, cluster.Term); - await provider.AppendAsync(logEntry, source.Token); - await provider.CommitAsync(source.Token); + await cluster.ReplicateAsync(logEntry, source.Token); } private static (string key, string value) GetKeyValue(IFormCollection form) @@ -199,7 +199,6 @@ public static async Task AddKeyValueCommand(SlimPersistentState provider, string var logEntry = provider.Interpreter.CreateLogEntry(new AddKeyValueCommand { Key = key, Value = value }, cluster.Term); - await provider.AppendAsync(logEntry, source.Token); - await provider.CommitAsync(source.Token); + await cluster.ReplicateAsync(logEntry, source.Token); } } \ No newline at end of file diff --git a/src/SlimData/RaftClientHandlerFactory.cs b/src/SlimData/RaftClientHandlerFactory.cs index 6a183c61..070a8fa6 100644 --- a/src/SlimData/RaftClientHandlerFactory.cs +++ b/src/SlimData/RaftClientHandlerFactory.cs @@ -7,21 +7,18 @@ internal sealed class RaftClientHandlerFactory : IHttpMessageHandlerFactory { public HttpMessageHandler CreateHandler(string name) { - Console.WriteLine($"RaftClientHandlerFactory.CreateHandler({name})"); + var slimDataSocketsHttpHandlerTimeoutDefault = + Environment.GetEnvironmentVariable(EnvironmentVariables.SlimDataSocketsHttpHandlerTimeout) ?? + EnvironmentVariables.SlimDataSocketsHttpHandlerTimeoutDefault; + if (!int.TryParse(slimDataSocketsHttpHandlerTimeoutDefault, out int electionTimeout)) { - var slimDataSocketsHttpHandlerTimeoutDefault = - Environment.GetEnvironmentVariable(EnvironmentVariables.SlimDataSocketsHttpHandlerTimeout) ?? - EnvironmentVariables.SlimDataSocketsHttpHandlerTimeoutDefault; - if (!int.TryParse(slimDataSocketsHttpHandlerTimeoutDefault, out int upperElectionTimeout)) - { - throw new Exception("SLIMDATA_SOCKETS_HTTP_HANDLER_TIMEOUT is not an integer"); - } - - var handler = new SocketsHttpHandler { ConnectTimeout = TimeSpan.FromMilliseconds(upperElectionTimeout) }; - handler.SslOptions.RemoteCertificateValidationCallback = AllowCertificate; - handler.UseProxy = false; - return handler; - } + throw new Exception("SLIMDATA_SOCKETS_HTTP_HANDLER_TIMEOUT is not an integer"); + } + Console.WriteLine($"RaftClientHandlerFactory.CreateHandler({name}) with electionTimeout {electionTimeout}"); + var handler = new SocketsHttpHandler { ConnectTimeout = TimeSpan.FromMilliseconds(electionTimeout) }; + handler.SslOptions.RemoteCertificateValidationCallback = AllowCertificate; + handler.UseProxy = false; + return handler; } internal static bool AllowCertificate(object sender, X509Certificate? certificate, X509Chain? chain, diff --git a/src/SlimFaas/Database/SlimDataService.cs b/src/SlimFaas/Database/SlimDataService.cs index d2d73b06..b7ae2c91 100644 --- a/src/SlimFaas/Database/SlimDataService.cs +++ b/src/SlimFaas/Database/SlimDataService.cs @@ -8,34 +8,33 @@ namespace SlimFaas.Database; #pragma warning disable CA2252 -public class SlimDataService(HttpClient httpClient, IServiceProvider serviceProvider, IRaftCluster cluster, ILogger logger) +public class SlimDataService(IHttpClientFactory httpClientFactory, IServiceProvider serviceProvider, IRaftCluster cluster, ILogger logger) : IDatabaseService { + public const string HttpClientName = "SlimDataHttpClient"; + private const int MaxAttemptCount = 3; + private readonly TimeSpan _retryInterval = TimeSpan.FromSeconds(1); + private readonly TimeSpan _timeMaxToWaitForLeader = TimeSpan.FromMilliseconds(3000); + private ISupplier SimplePersistentState => serviceProvider.GetRequiredService>(); public async Task GetAsync(string key) { - return await Retry.Do(() => DoGetAsync(key), TimeSpan.FromSeconds(1), logger, 5); + return await Retry.Do(() => DoGetAsync(key), _retryInterval, logger, MaxAttemptCount); } private async Task DoGetAsync(string key) { await GetAndWaitForLeader(); - if (cluster.LeadershipToken.IsCancellationRequested) - { - if (!cluster.TryGetLeaseToken(out var leaseToken) || leaseToken.IsCancellationRequested) - { - await cluster.ApplyReadBarrierAsync(); - } - } + await MasterWaitForleaseToken(); SlimDataPayload data = SimplePersistentState.Invoke(); return data.KeyValues.TryGetValue(key, out ReadOnlyMemory value) ? value.ToArray() : null; } public async Task SetAsync(string key, byte[] value) { - await Retry.Do(() =>DoSetAsync(key, value), TimeSpan.FromSeconds(1), logger, 5); + await Retry.Do(() =>DoSetAsync(key, value), _retryInterval, logger, MaxAttemptCount); } private async Task DoSetAsync(string key, byte[] value) @@ -49,9 +48,10 @@ private async Task DoSetAsync(string key, byte[] value) } else { - HttpRequestMessage request = new(HttpMethod.Post, new Uri($"{endpoint}SlimData/AddKeyValue?key={key}")); + using HttpRequestMessage request = new(HttpMethod.Post, new Uri($"{endpoint}SlimData/AddKeyValue?key={key}")); request.Content = new ByteArrayContent(value); - HttpResponseMessage response = await httpClient.SendAsync(request); + using var httpClient = httpClientFactory.CreateClient(HttpClientName); + using HttpResponseMessage response = await httpClient.SendAsync(request); if ((int)response.StatusCode >= 500) { throw new DataException("Error in calling SlimData HTTP Service"); @@ -61,7 +61,7 @@ private async Task DoSetAsync(string key, byte[] value) public async Task HashSetAsync(string key, IDictionary values) { - await Retry.Do(() =>DoHashSetAsync(key, values), TimeSpan.FromSeconds(1), logger, 5); + await Retry.Do(() =>DoHashSetAsync(key, values), _retryInterval, logger, MaxAttemptCount); } private async Task DoHashSetAsync(string key, IDictionary values) @@ -80,8 +80,8 @@ private async Task DoHashSetAsync(string key, IDictionary values { multipart.Add(new StringContent(value.Value), value.Key); } - - HttpResponseMessage response = + using var httpClient = httpClientFactory.CreateClient(HttpClientName); + using HttpResponseMessage response = await httpClient.PostAsync(new Uri($"{endpoint}SlimData/AddHashset"), multipart); if ((int)response.StatusCode >= 500) { @@ -90,22 +90,15 @@ private async Task DoHashSetAsync(string key, IDictionary values } } - public async Task> HashGetAllAsync(string key) { - return await Retry.Do(() =>DoHashGetAllAsync(key), TimeSpan.FromSeconds(1), logger, 5); + return await Retry.Do(() =>DoHashGetAllAsync(key), _retryInterval, logger, MaxAttemptCount); } private async Task> DoHashGetAllAsync(string key) { await GetAndWaitForLeader(); - if (cluster.LeadershipToken.IsCancellationRequested) - { - if (!cluster.TryGetLeaseToken(out var leaseToken) || leaseToken.IsCancellationRequested) - { - await cluster.ApplyReadBarrierAsync(); - } - } + await MasterWaitForleaseToken(); SlimDataPayload data = SimplePersistentState.Invoke(); return data.Hashsets.TryGetValue(key, out Dictionary? value) @@ -115,7 +108,7 @@ private async Task> DoHashGetAllAsync(string key) public async Task ListLeftPushAsync(string key, byte[] field) { - await Retry.Do(() =>DoListLeftPushAsync(key, field), TimeSpan.FromSeconds(1), logger, 5); + await Retry.Do(() =>DoListLeftPushAsync(key, field), _retryInterval, logger, MaxAttemptCount); } private async Task DoListLeftPushAsync(string key, byte[] field) @@ -128,8 +121,9 @@ private async Task DoListLeftPushAsync(string key, byte[] field) } else { - HttpRequestMessage request = new(HttpMethod.Post, new Uri($"{endpoint}SlimData/ListLeftPush?key={key}")); + using HttpRequestMessage request = new(HttpMethod.Post, new Uri($"{endpoint}SlimData/ListLeftPush?key={key}")); request.Content = new ByteArrayContent(field); + using var httpClient = httpClientFactory.CreateClient(HttpClientName); HttpResponseMessage response = await httpClient.SendAsync(request); if ((int)response.StatusCode >= 500) { @@ -140,7 +134,7 @@ private async Task DoListLeftPushAsync(string key, byte[] field) public async Task> ListRightPopAsync(string key, int count = 1) { - return await Retry.Do(() =>DoListRightPopAsync(key, count), TimeSpan.FromSeconds(1), logger, 5); + return await Retry.Do(() =>DoListRightPopAsync(key, count), _retryInterval, logger, MaxAttemptCount); } private async Task> DoListRightPopAsync(string key, int count = 1) @@ -154,12 +148,13 @@ private async Task> DoListRightPopAsync(string key, int count = 1) } else { - HttpRequestMessage request = new(HttpMethod.Post, new Uri($"{endpoint}SlimData/ListRightPop")); + using HttpRequestMessage request = new(HttpMethod.Post, new Uri($"{endpoint}SlimData/ListRightPop")); MultipartFormDataContent multipart = new(); multipart.Add(new StringContent(count.ToString()), key); request.Content = multipart; - HttpResponseMessage response = await httpClient.SendAsync(request); + using var httpClient = httpClientFactory.CreateClient(HttpClientName); + using HttpResponseMessage response = await httpClient.SendAsync(request); if ((int)response.StatusCode >= 500) { throw new DataException("Error in calling SlimData HTTP Service"); @@ -173,31 +168,35 @@ private async Task> DoListRightPopAsync(string key, int count = 1) public async Task ListLengthAsync(string key) { - return await Retry.Do(() =>DoListLengthAsync(key), TimeSpan.FromSeconds(1), logger, 5); + return await Retry.Do(() =>DoListLengthAsync(key), _retryInterval, logger, MaxAttemptCount); } private async Task DoListLengthAsync(string key) { await GetAndWaitForLeader(); - if (cluster.LeadershipToken.IsCancellationRequested) - { - if (!cluster.TryGetLeaseToken(out var leaseToken) || leaseToken.IsCancellationRequested) - { - await cluster.ApplyReadBarrierAsync(); - } - } + await MasterWaitForleaseToken(); + SlimDataPayload data = SimplePersistentState.Invoke(); long result = data.Queues.TryGetValue(key, out List>? value) ? value.Count : 0L; return result; } + private async Task MasterWaitForleaseToken() + { + while (cluster.TryGetLeaseToken(out var leaseToken) && leaseToken.IsCancellationRequested) + { + Console.WriteLine("Master node is waiting for lease token"); + await Task.Delay(10); + } + } + private async Task GetAndWaitForLeader() { - int numberWaitMaximum = 10; - while (cluster.Leader == null && numberWaitMaximum > 0) + TimeSpan timeWaited = TimeSpan.Zero; + while (cluster.Leader == null && timeWaited < _timeMaxToWaitForLeader) { await Task.Delay(500); - numberWaitMaximum--; + timeWaited += TimeSpan.FromMilliseconds(500); } if (cluster.Leader == null) diff --git a/src/SlimFaas/EnvironmentVariables.cs b/src/SlimFaas/EnvironmentVariables.cs index 7fc373f9..b38bc3f9 100644 --- a/src/SlimFaas/EnvironmentVariables.cs +++ b/src/SlimFaas/EnvironmentVariables.cs @@ -40,6 +40,12 @@ public static class EnvironmentVariables public const string ScaleReplicasWorkerDelayMilliseconds = "SCALE_REPLICAS_WORKER_DELAY_MILLISECONDS"; public const int ScaleReplicasWorkerDelayMillisecondsDefault = 1000; + public const string HealthWorkerDelayMilliseconds = "HEALTH_WORKER_DELAY_MILLISECONDS"; + public const int HealthWorkerDelayMillisecondsDefault = 1000; + + public const string HealthWorkerDelayToExitSeconds = "HEALTH_WORKER_DELAY_TO_EXIT_SECONDS"; + public const int HealthWorkerDelayToExitSecondsDefault = 60; + public const string PodScaledUpByDefaultWhenInfrastructureHasNeverCalled = "POD_SCALED_UP_BY_DEFAULT_WHEN_INFRASTRUCTURE_HAS_NEVER_CALLED"; diff --git a/src/SlimFaas/HealthWorker.cs b/src/SlimFaas/HealthWorker.cs new file mode 100644 index 00000000..3a76b116 --- /dev/null +++ b/src/SlimFaas/HealthWorker.cs @@ -0,0 +1,48 @@ +using DotNext.Net.Cluster.Consensus.Raft; +using SlimFaas.Database; + +namespace SlimFaas; + +public class HealthWorker(IHostApplicationLifetime hostApplicationLifetime, IRaftCluster raftCluster, ISlimDataStatus slimDataStatus, + ILogger logger, + int delay = EnvironmentVariables.HealthWorkerDelayMillisecondsDefault, + int delayToExitSeconds = EnvironmentVariables.HealthWorkerDelayToExitSecondsDefault) + : BackgroundService +{ + private readonly int _delay = + EnvironmentVariables.ReadInteger(logger, EnvironmentVariables.HealthWorkerDelayMilliseconds, delay); + private readonly int _delayToExitSeconds = + EnvironmentVariables.ReadInteger(logger, EnvironmentVariables.HealthWorkerDelayToExitSeconds, delayToExitSeconds); + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + await slimDataStatus.WaitForReadyAsync(); + TimeSpan timeSpan = TimeSpan.FromSeconds(0); + while (stoppingToken.IsCancellationRequested == false) + { + try + { + await Task.Delay(_delay, stoppingToken); + if (raftCluster.Leader == null) + { + timeSpan = timeSpan.Add(TimeSpan.FromMilliseconds(_delay)); + logger.LogWarning("Raft cluster has no leader"); + } + else + { + timeSpan = TimeSpan.FromSeconds(0); + } + + if (timeSpan.TotalSeconds > _delayToExitSeconds) + { + logger.LogError("Raft cluster has no leader for more than {TotalSeconds} seconds, exist the application ", timeSpan.TotalSeconds); + hostApplicationLifetime.StopApplication(); + } + } + catch (Exception e) + { + logger.LogError(e, "Global Error in HealthWorker"); + } + } + } +} diff --git a/src/SlimFaas/HistorySynchronizationWorker.cs b/src/SlimFaas/HistorySynchronizationWorker.cs index 98e6e317..9ea26561 100644 --- a/src/SlimFaas/HistorySynchronizationWorker.cs +++ b/src/SlimFaas/HistorySynchronizationWorker.cs @@ -32,15 +32,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) if (ticksInDatabase > nowTicks) { logger.LogWarning( - "HistorySynchronizationWorker: ticksInDatabase is superior to now ticks {TimeSpan}", - TimeSpan.FromTicks(ticksInDatabase - nowTicks)); + "HistorySynchronizationWorker: ticksInDatabase is superior to now ticks {TimeSpan} for {Function}", + TimeSpan.FromTicks(ticksInDatabase - nowTicks), function.Deployment); ticksInDatabase = nowTicks; } if (ticksMemory > nowTicks) { logger.LogWarning( - "HistorySynchronizationWorker: ticksMemory is superior to now ticks {TimeSpan}", - TimeSpan.FromTicks(ticksMemory - nowTicks)); + "HistorySynchronizationWorker: ticksMemory is superior to now ticks {TimeSpan} for {Function}", + TimeSpan.FromTicks(ticksMemory - nowTicks), function.Deployment); ticksMemory = nowTicks; } diff --git a/src/SlimFaas/Program.cs b/src/SlimFaas/Program.cs index c7790916..ed1a434d 100644 --- a/src/SlimFaas/Program.cs +++ b/src/SlimFaas/Program.cs @@ -74,6 +74,7 @@ serviceCollectionSlimFaas.AddHostedService(); serviceCollectionSlimFaas.AddHostedService(); serviceCollectionSlimFaas.AddHostedService(); +serviceCollectionSlimFaas.AddHostedService(); serviceCollectionSlimFaas.AddHttpClient(); serviceCollectionSlimFaas.AddSingleton(); serviceCollectionSlimFaas.AddSingleton(); @@ -154,7 +155,7 @@ serviceCollectionSlimFaas.AddHostedService(); serviceCollectionSlimFaas.AddSingleton(); serviceCollectionSlimFaas.AddSingleton(); -serviceCollectionSlimFaas.AddHttpClient() +serviceCollectionSlimFaas.AddHttpClient(SlimDataService.HttpClientName) .SetHandlerLifetime(TimeSpan.FromMinutes(5)) .ConfigureHttpClient(client => { @@ -215,15 +216,15 @@ Dictionary slimDataDefaultConfiguration = new() { { "partitioning", "false" }, - { "lowerElectionTimeout", "400" }, - { "upperElectionTimeout", "800" }, - { "requestTimeout", "00:01:20.0000000" }, - { "rpcTimeout", "00:00:40.0000000" }, + { "lowerElectionTimeout", "150" }, + { "upperElectionTimeout", "300" }, + { "requestTimeout", "00:00:00.3000000" }, + { "rpcTimeout", "00:00:00.1500000" }, { "publicEndPoint", publicEndPoint }, { coldstart, coldStart }, { "requestJournal:memoryLimit", "5" }, { "requestJournal:expiration", "00:01:00" }, - { "heartbeatThreshold", "0.2" } + { "heartbeatThreshold", "0.5" } }; foreach (KeyValuePair keyValuePair in slimDataDefaultConfiguration) { diff --git a/src/SlimFaas/SlimWorker.cs b/src/SlimFaas/SlimWorker.cs index c67f3572..e81c6b46 100644 --- a/src/SlimFaas/SlimWorker.cs +++ b/src/SlimFaas/SlimWorker.cs @@ -44,10 +44,10 @@ private async Task DoOneCycle(CancellationToken stoppingToken, int? numberLimitProcessingTasks = ComputeNumberLimitProcessingTasks(slimFaas, function); setTickLastCallCounterDictionary[functionDeployment]++; int functionReplicas = function.Replicas; - long queueLenght = await UpdateTickLastCallIfRequestStillInProgress(functionReplicas, + long queueLength = await UpdateTickLastCallIfRequestStillInProgress(functionReplicas, setTickLastCallCounterDictionary, functionDeployment, numberProcessingTasks); - if (functionReplicas == 0 || queueLenght <= 0) + if (functionReplicas == 0 || queueLength <= 0) { continue; } diff --git a/tests/SlimData.Tests/RaftClusterTests.cs b/tests/SlimData.Tests/RaftClusterTests.cs index 953fc3d5..9f49e29a 100644 --- a/tests/SlimData.Tests/RaftClusterTests.cs +++ b/tests/SlimData.Tests/RaftClusterTests.cs @@ -147,7 +147,7 @@ private static IHost CreateHost(int port, IDictionary } services.AddSingleton(); - services.AddHttpClient() + services.AddHttpClient(SlimDataService.HttpClientName) .SetHandlerLifetime(TimeSpan.FromMinutes(5)) .ConfigurePrimaryHttpMessageHandler(() => new HttpClientHandler { AllowAutoRedirect = true }); }) @@ -211,7 +211,7 @@ public static async Task MessageExchange() { SlimPersistentState.LogLocation, GetTemporaryDirectory() } }; - LeaderTracker listener = new LeaderTracker(); + LeaderTracker listener = new(); using IHost host1 = CreateHost(3262, config1, listener); await host1.StartAsync(); Assert.True(GetLocalClusterView(host1).Readiness.IsCompletedSuccessfully); @@ -238,17 +238,19 @@ public static async Task MessageExchange() await databaseServiceSlave.SetAsync("key1", MemoryPackSerializer.Serialize("value1") ); Assert.Equal("value1", MemoryPackSerializer.Deserialize(await databaseServiceMaster.GetAsync("key1"))); + await GetLocalClusterView(host1).ForceReplicationAsync(); Assert.Equal("value1", MemoryPackSerializer.Deserialize(await databaseServiceSlave.GetAsync("key1"))); await databaseServiceSlave.HashSetAsync("hashsetKey1", new Dictionary { { "field1", "value1" }, { "field2", "value2" } }); + await GetLocalClusterView(host1).ForceReplicationAsync(); IDictionary hashGet = await databaseServiceSlave.HashGetAllAsync("hashsetKey1"); Assert.Equal("value1", hashGet["field1"]); Assert.Equal("value2", hashGet["field2"]); await databaseServiceSlave.ListLeftPushAsync("listKey1", MemoryPackSerializer.Serialize("value1")); - + await GetLocalClusterView(host1).ForceReplicationAsync(); long listLength = await databaseServiceSlave.ListLengthAsync("listKey1"); Assert.Equal(1, listLength); diff --git a/tests/SlimFaas.Tests/HistorySynchronizationWorkerShould.cs b/tests/SlimFaas.Tests/HistorySynchronizationWorkerShould.cs index f2fcda81..04e7698f 100644 --- a/tests/SlimFaas.Tests/HistorySynchronizationWorkerShould.cs +++ b/tests/SlimFaas.Tests/HistorySynchronizationWorkerShould.cs @@ -40,7 +40,7 @@ public async Task SyncLastTicksBetweenDatabaseAndMemory() historyHttpMemoryService, historyHttpRedisService, logger.Object, slimDataStatus.Object, 100); Task task = service.StartAsync(CancellationToken.None); - await Task.Delay(200); + await Task.Delay(500); long ticksFirstCallAsync = historyHttpMemoryService.GetTicksLastCall("fibonacci1"); Assert.Equal(firstTicks, ticksFirstCallAsync);