Skip to content

Commit

Permalink
feat(slimfaas): add health worker and fix write/read database conflict (
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaume-chervet committed Jul 27, 2024
1 parent 63e12b8 commit 4c8d2e0
Show file tree
Hide file tree
Showing 15 changed files with 151 additions and 93 deletions.
16 changes: 11 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,19 +257,25 @@ spec:
# Optional
# name : SLIMFAAS_ALLOW_UNSECURE_SSL
# value : "false" # default equivalent to false
# Optional
# name: HEALTH_WORKER_DELAY_MILLISECONDS
# value: "1000" # default equivalent to 1 seconds
# Optional
# name: HEALTH_WORKER_DELAY_TO_EXIT_SECONDS
# value: "60" # default equivalent to 10 seconds

# name : SLIMDATA_CONFIGURATION # represent SlimData internal configuration, more documentation here: https://dotnet.github.io/dotNext/features/cluster/raft.html
# value : | #default values
# {
# "partitioning":"false",
# "lowerElectionTimeout":"400",
# "upperElectionTimeout":"800",
# "requestTimeout":"00:01:20.0000000",
# "rpcTimeout":"00:00:40.0000000",
# "lowerElectionTimeout":"150",
# "upperElectionTimeout":"300",
# "requestTimeout":"00:00:00.3000000",
# "rpcTimeout":"00:00:00.1500000",
# "coldStart":"false",
# "requestJournal:memoryLimit":"5",
# "requestJournal:expiration":"00:01:00",
# "heartbeatThreshold":"0.4",
# "heartbeatThreshold":"0.5",
# }
volumeMounts:
- name: slimfaas-volume
Expand Down
4 changes: 2 additions & 2 deletions src/SlimData/Commands/AddHashSetCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ await writer.EncodeAsync(command.Key.AsMemory(), context,

foreach (var (key, value) in Value)
{
await writer.EncodeAsync(key.AsMemory(), context, LengthFormat.LittleEndian, token).ConfigureAwait(false);;
await writer.EncodeAsync(value.AsMemory(), context, LengthFormat.LittleEndian, token).ConfigureAwait(false);;
await writer.EncodeAsync(key.AsMemory(), context, LengthFormat.LittleEndian, token).ConfigureAwait(false);
await writer.EncodeAsync(value.AsMemory(), context, LengthFormat.LittleEndian, token).ConfigureAwait(false);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/SlimData/Commands/AddKeyValueCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static async ValueTask<AddKeyValueCommand> ReadFromAsync<TReader>(TReader
var key = await reader
.DecodeAsync(new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token)
.ConfigureAwait(false);
using var value = await reader.ReadAsync(LengthFormat.Compressed).ConfigureAwait(false);
using var value = await reader.ReadAsync(LengthFormat.Compressed, token: token).ConfigureAwait(false);
return new AddKeyValueCommand
{
Key = key.ToString(),
Expand Down
2 changes: 1 addition & 1 deletion src/SlimData/Commands/ListLeftPushCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public static async ValueTask<ListLeftPushCommand> ReadFromAsync<TReader>(TReade
where TReader : notnull, IAsyncBinaryReader
{
var key = await reader.DecodeAsync(new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token).ConfigureAwait(false);
using var value = await reader.ReadAsync(LengthFormat.Compressed).ConfigureAwait(false);
using var value = await reader.ReadAsync(LengthFormat.Compressed, token: token).ConfigureAwait(false);
return new ListLeftPushCommand
{
Key = key.ToString(),
Expand Down
4 changes: 2 additions & 2 deletions src/SlimData/Commands/LogSnapshotCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public static async ValueTask<LogSnapshotCommand> ReadFromAsync<TReader>(TReader
{
var key = await reader.DecodeAsync(new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token)
.ConfigureAwait(false);
using var value = await reader.ReadAsync(LengthFormat.Compressed).ConfigureAwait(false);
using var value = await reader.ReadAsync(LengthFormat.Compressed, token: token).ConfigureAwait(false);
keysValues.Add(key.ToString(), value.Memory);
}

Expand All @@ -115,7 +115,7 @@ public static async ValueTask<LogSnapshotCommand> ReadFromAsync<TReader>(TReader
var queue = new List<ReadOnlyMemory<byte>>(countQueue);
while (countQueue-- > 0)
{
using var value = await reader.ReadAsync(LengthFormat.Compressed).ConfigureAwait(false);
using var value = await reader.ReadAsync(LengthFormat.Compressed, token: token).ConfigureAwait(false);
queue.Add(value.Memory);
}

Expand Down
23 changes: 11 additions & 12 deletions src/SlimData/Endpoints.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ public static async Task DoAsync(HttpContext context, RespondDelegate respondDel

var cluster = context.RequestServices.GetRequiredService<IRaftCluster>();
var provider = context.RequestServices.GetRequiredService<SlimPersistentState>();
var source =
CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted,
var source = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted,
cluster.LeadershipToken);
try
{
Expand Down Expand Up @@ -78,8 +77,7 @@ public static async Task AddHashSetCommand(SlimPersistentState provider, string
var logEntry =
provider.Interpreter.CreateLogEntry(
new AddHashSetCommand { Key = key, Value = dictionary }, cluster.Term);
await provider.AppendAsync(logEntry, source.Token);
await provider.CommitAsync(source.Token);
await cluster.ReplicateAsync(logEntry, source.Token);
}

public static Task ListRightPop(HttpContext context)
Expand All @@ -93,8 +91,7 @@ public static Task ListRightPop(HttpContext context)
if (string.IsNullOrEmpty(key) || !int.TryParse(value, out var count))
{
context.Response.StatusCode = StatusCodes.Status400BadRequest;
await context.Response.WriteAsync("GetKeyValue key is empty or value is not a number",
context.RequestAborted);
await context.Response.WriteAsync("GetKeyValue key is empty or value is not a number", context.RequestAborted);
return;
}
Expand All @@ -109,6 +106,11 @@ public static async Task<ListString> ListRightPopCommand(SlimPersistentState pro
{
var values = new ListString();
values.Items = new List<byte[]>();
while (cluster.TryGetLeaseToken(out var leaseToken) && leaseToken.IsCancellationRequested)
{
Console.WriteLine("Master node is waiting for lease token");
await Task.Delay(10);
}
var queues = ((ISupplier<SlimDataPayload>)provider).Invoke().Queues;
if (queues.TryGetValue(key, out var queue))
{
Expand All @@ -122,8 +124,7 @@ public static async Task<ListString> ListRightPopCommand(SlimPersistentState pro
provider.Interpreter.CreateLogEntry(
new ListRightPopCommand { Key = key, Count = count },
cluster.Term);
await provider.AppendAsync(logEntry, source.Token);
await provider.CommitAsync(source.Token);
await cluster.ReplicateAsync(logEntry, source.Token);
}

return values;
Expand Down Expand Up @@ -155,8 +156,7 @@ public static async Task ListLeftPushCommand(SlimPersistentState provider, strin
var logEntry =
provider.Interpreter.CreateLogEntry(new ListLeftPushCommand { Key = key, Value = value },
cluster.Term);
await provider.AppendAsync(logEntry, source.Token);
await provider.CommitAsync(source.Token);
await cluster.ReplicateAsync(logEntry, source.Token);
}

private static (string key, string value) GetKeyValue(IFormCollection form)
Expand Down Expand Up @@ -199,7 +199,6 @@ public static async Task AddKeyValueCommand(SlimPersistentState provider, string
var logEntry =
provider.Interpreter.CreateLogEntry(new AddKeyValueCommand { Key = key, Value = value },
cluster.Term);
await provider.AppendAsync(logEntry, source.Token);
await provider.CommitAsync(source.Token);
await cluster.ReplicateAsync(logEntry, source.Token);
}
}
25 changes: 11 additions & 14 deletions src/SlimData/RaftClientHandlerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,18 @@ internal sealed class RaftClientHandlerFactory : IHttpMessageHandlerFactory
{
public HttpMessageHandler CreateHandler(string name)
{
Console.WriteLine($"RaftClientHandlerFactory.CreateHandler({name})");
var slimDataSocketsHttpHandlerTimeoutDefault =
Environment.GetEnvironmentVariable(EnvironmentVariables.SlimDataSocketsHttpHandlerTimeout) ??
EnvironmentVariables.SlimDataSocketsHttpHandlerTimeoutDefault;
if (!int.TryParse(slimDataSocketsHttpHandlerTimeoutDefault, out int electionTimeout))
{
var slimDataSocketsHttpHandlerTimeoutDefault =
Environment.GetEnvironmentVariable(EnvironmentVariables.SlimDataSocketsHttpHandlerTimeout) ??
EnvironmentVariables.SlimDataSocketsHttpHandlerTimeoutDefault;
if (!int.TryParse(slimDataSocketsHttpHandlerTimeoutDefault, out int upperElectionTimeout))
{
throw new Exception("SLIMDATA_SOCKETS_HTTP_HANDLER_TIMEOUT is not an integer");
}

var handler = new SocketsHttpHandler { ConnectTimeout = TimeSpan.FromMilliseconds(upperElectionTimeout) };
handler.SslOptions.RemoteCertificateValidationCallback = AllowCertificate;
handler.UseProxy = false;
return handler;
}
throw new Exception("SLIMDATA_SOCKETS_HTTP_HANDLER_TIMEOUT is not an integer");
}
Console.WriteLine($"RaftClientHandlerFactory.CreateHandler({name}) with electionTimeout {electionTimeout}");
var handler = new SocketsHttpHandler { ConnectTimeout = TimeSpan.FromMilliseconds(electionTimeout) };
handler.SslOptions.RemoteCertificateValidationCallback = AllowCertificate;
handler.UseProxy = false;
return handler;
}

internal static bool AllowCertificate(object sender, X509Certificate? certificate, X509Chain? chain,
Expand Down
79 changes: 39 additions & 40 deletions src/SlimFaas/Database/SlimDataService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,33 @@

namespace SlimFaas.Database;
#pragma warning disable CA2252
public class SlimDataService(HttpClient httpClient, IServiceProvider serviceProvider, IRaftCluster cluster, ILogger<SlimDataService> logger)
public class SlimDataService(IHttpClientFactory httpClientFactory, IServiceProvider serviceProvider, IRaftCluster cluster, ILogger<SlimDataService> logger)
: IDatabaseService
{
public const string HttpClientName = "SlimDataHttpClient";
private const int MaxAttemptCount = 3;
private readonly TimeSpan _retryInterval = TimeSpan.FromSeconds(1);
private readonly TimeSpan _timeMaxToWaitForLeader = TimeSpan.FromMilliseconds(3000);

private ISupplier<SlimDataPayload> SimplePersistentState =>
serviceProvider.GetRequiredService<ISupplier<SlimDataPayload>>();

public async Task<byte[]?> GetAsync(string key)
{
return await Retry.Do(() => DoGetAsync(key), TimeSpan.FromSeconds(1), logger, 5);
return await Retry.Do(() => DoGetAsync(key), _retryInterval, logger, MaxAttemptCount);
}

private async Task<byte[]?> DoGetAsync(string key)
{
await GetAndWaitForLeader();
if (cluster.LeadershipToken.IsCancellationRequested)
{
if (!cluster.TryGetLeaseToken(out var leaseToken) || leaseToken.IsCancellationRequested)
{
await cluster.ApplyReadBarrierAsync();
}
}
await MasterWaitForleaseToken();
SlimDataPayload data = SimplePersistentState.Invoke();
return data.KeyValues.TryGetValue(key, out ReadOnlyMemory<byte> value) ? value.ToArray() : null;
}

public async Task SetAsync(string key, byte[] value)
{
await Retry.Do(() =>DoSetAsync(key, value), TimeSpan.FromSeconds(1), logger, 5);
await Retry.Do(() =>DoSetAsync(key, value), _retryInterval, logger, MaxAttemptCount);
}

private async Task DoSetAsync(string key, byte[] value)
Expand All @@ -49,9 +48,10 @@ private async Task DoSetAsync(string key, byte[] value)
}
else
{
HttpRequestMessage request = new(HttpMethod.Post, new Uri($"{endpoint}SlimData/AddKeyValue?key={key}"));
using HttpRequestMessage request = new(HttpMethod.Post, new Uri($"{endpoint}SlimData/AddKeyValue?key={key}"));
request.Content = new ByteArrayContent(value);
HttpResponseMessage response = await httpClient.SendAsync(request);
using var httpClient = httpClientFactory.CreateClient(HttpClientName);
using HttpResponseMessage response = await httpClient.SendAsync(request);
if ((int)response.StatusCode >= 500)
{
throw new DataException("Error in calling SlimData HTTP Service");
Expand All @@ -61,7 +61,7 @@ private async Task DoSetAsync(string key, byte[] value)

public async Task HashSetAsync(string key, IDictionary<string, string> values)
{
await Retry.Do(() =>DoHashSetAsync(key, values), TimeSpan.FromSeconds(1), logger, 5);
await Retry.Do(() =>DoHashSetAsync(key, values), _retryInterval, logger, MaxAttemptCount);
}

private async Task DoHashSetAsync(string key, IDictionary<string, string> values)
Expand All @@ -80,8 +80,8 @@ private async Task DoHashSetAsync(string key, IDictionary<string, string> values
{
multipart.Add(new StringContent(value.Value), value.Key);
}

HttpResponseMessage response =
using var httpClient = httpClientFactory.CreateClient(HttpClientName);
using HttpResponseMessage response =
await httpClient.PostAsync(new Uri($"{endpoint}SlimData/AddHashset"), multipart);
if ((int)response.StatusCode >= 500)
{
Expand All @@ -90,22 +90,15 @@ private async Task DoHashSetAsync(string key, IDictionary<string, string> values
}
}


public async Task<IDictionary<string, string>> HashGetAllAsync(string key)
{
return await Retry.Do(() =>DoHashGetAllAsync(key), TimeSpan.FromSeconds(1), logger, 5);
return await Retry.Do(() =>DoHashGetAllAsync(key), _retryInterval, logger, MaxAttemptCount);
}

private async Task<IDictionary<string, string>> DoHashGetAllAsync(string key)
{
await GetAndWaitForLeader();
if (cluster.LeadershipToken.IsCancellationRequested)
{
if (!cluster.TryGetLeaseToken(out var leaseToken) || leaseToken.IsCancellationRequested)
{
await cluster.ApplyReadBarrierAsync();
}
}
await MasterWaitForleaseToken();

SlimDataPayload data = SimplePersistentState.Invoke();
return data.Hashsets.TryGetValue(key, out Dictionary<string, string>? value)
Expand All @@ -115,7 +108,7 @@ private async Task<IDictionary<string, string>> DoHashGetAllAsync(string key)

public async Task ListLeftPushAsync(string key, byte[] field)
{
await Retry.Do(() =>DoListLeftPushAsync(key, field), TimeSpan.FromSeconds(1), logger, 5);
await Retry.Do(() =>DoListLeftPushAsync(key, field), _retryInterval, logger, MaxAttemptCount);
}

private async Task DoListLeftPushAsync(string key, byte[] field)
Expand All @@ -128,8 +121,9 @@ private async Task DoListLeftPushAsync(string key, byte[] field)
}
else
{
HttpRequestMessage request = new(HttpMethod.Post, new Uri($"{endpoint}SlimData/ListLeftPush?key={key}"));
using HttpRequestMessage request = new(HttpMethod.Post, new Uri($"{endpoint}SlimData/ListLeftPush?key={key}"));
request.Content = new ByteArrayContent(field);
using var httpClient = httpClientFactory.CreateClient(HttpClientName);
HttpResponseMessage response = await httpClient.SendAsync(request);
if ((int)response.StatusCode >= 500)
{
Expand All @@ -140,7 +134,7 @@ private async Task DoListLeftPushAsync(string key, byte[] field)

public async Task<IList<byte[]>> ListRightPopAsync(string key, int count = 1)
{
return await Retry.Do(() =>DoListRightPopAsync(key, count), TimeSpan.FromSeconds(1), logger, 5);
return await Retry.Do(() =>DoListRightPopAsync(key, count), _retryInterval, logger, MaxAttemptCount);
}

private async Task<IList<byte[]>> DoListRightPopAsync(string key, int count = 1)
Expand All @@ -154,12 +148,13 @@ private async Task<IList<byte[]>> DoListRightPopAsync(string key, int count = 1)
}
else
{
HttpRequestMessage request = new(HttpMethod.Post, new Uri($"{endpoint}SlimData/ListRightPop"));
using HttpRequestMessage request = new(HttpMethod.Post, new Uri($"{endpoint}SlimData/ListRightPop"));
MultipartFormDataContent multipart = new();
multipart.Add(new StringContent(count.ToString()), key);

request.Content = multipart;
HttpResponseMessage response = await httpClient.SendAsync(request);
using var httpClient = httpClientFactory.CreateClient(HttpClientName);
using HttpResponseMessage response = await httpClient.SendAsync(request);
if ((int)response.StatusCode >= 500)
{
throw new DataException("Error in calling SlimData HTTP Service");
Expand All @@ -173,31 +168,35 @@ private async Task<IList<byte[]>> DoListRightPopAsync(string key, int count = 1)

public async Task<long> ListLengthAsync(string key)
{
return await Retry.Do(() =>DoListLengthAsync(key), TimeSpan.FromSeconds(1), logger, 5);
return await Retry.Do(() =>DoListLengthAsync(key), _retryInterval, logger, MaxAttemptCount);
}

private async Task<long> DoListLengthAsync(string key)
{
await GetAndWaitForLeader();
if (cluster.LeadershipToken.IsCancellationRequested)
{
if (!cluster.TryGetLeaseToken(out var leaseToken) || leaseToken.IsCancellationRequested)
{
await cluster.ApplyReadBarrierAsync();
}
}
await MasterWaitForleaseToken();

SlimDataPayload data = SimplePersistentState.Invoke();
long result = data.Queues.TryGetValue(key, out List<ReadOnlyMemory<byte>>? value) ? value.Count : 0L;
return result;
}

private async Task MasterWaitForleaseToken()
{
while (cluster.TryGetLeaseToken(out var leaseToken) && leaseToken.IsCancellationRequested)
{
Console.WriteLine("Master node is waiting for lease token");
await Task.Delay(10);
}
}

private async Task<EndPoint> GetAndWaitForLeader()
{
int numberWaitMaximum = 10;
while (cluster.Leader == null && numberWaitMaximum > 0)
TimeSpan timeWaited = TimeSpan.Zero;
while (cluster.Leader == null && timeWaited < _timeMaxToWaitForLeader)
{
await Task.Delay(500);
numberWaitMaximum--;
timeWaited += TimeSpan.FromMilliseconds(500);
}

if (cluster.Leader == null)
Expand Down
6 changes: 6 additions & 0 deletions src/SlimFaas/EnvironmentVariables.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public static class EnvironmentVariables
public const string ScaleReplicasWorkerDelayMilliseconds = "SCALE_REPLICAS_WORKER_DELAY_MILLISECONDS";
public const int ScaleReplicasWorkerDelayMillisecondsDefault = 1000;

public const string HealthWorkerDelayMilliseconds = "HEALTH_WORKER_DELAY_MILLISECONDS";
public const int HealthWorkerDelayMillisecondsDefault = 1000;

public const string HealthWorkerDelayToExitSeconds = "HEALTH_WORKER_DELAY_TO_EXIT_SECONDS";
public const int HealthWorkerDelayToExitSecondsDefault = 60;

public const string PodScaledUpByDefaultWhenInfrastructureHasNeverCalled =
"POD_SCALED_UP_BY_DEFAULT_WHEN_INFRASTRUCTURE_HAS_NEVER_CALLED";

Expand Down
Loading

0 comments on commit 4c8d2e0

Please sign in to comment.