Skip to content

Commit

Permalink
fdfsf
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaume-chervet committed Jun 28, 2023
1 parent d273ec2 commit 04ee5a7
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 48 deletions.
29 changes: 8 additions & 21 deletions src/SlimFaas/Kubernetes/KubernetesService.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Collections;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.CodeAnalysis;
using System.Text;
using k8s;
using k8s.Autorest;
Expand Down Expand Up @@ -39,22 +38,11 @@ public record DeploymentInformation
public int NumberParallelRequest { get; set; }
}

public record PodInformation
public record PodInformation(string Name, bool? Started, bool? Ready, string Ip, string DeploymentName)
{
public PodInformation(string name, bool? started, bool? ready, string ip, string deploymentName)
{
Name = name;
Started = started;
Ready = ready;
Ip = ip;
DeploymentName = deploymentName;
}

public string Name { get; set; }
public bool? Started { get; set; }
public bool? Ready { get; set; }
public string Ip { get; init; }
public string DeploymentName { get; init; }
public string Name { get; set; } = Name;
public bool? Started { get; set; } = Started;
public bool? Ready { get; set; } = Ready;
}

[ExcludeFromCodeCoverage]
Expand All @@ -77,14 +65,13 @@ public KubernetesService(IConfiguration config, ILogger<KubernetesService> logge
try
{
using var client = new Kubernetes(_k8SConfig);
var patchString = "{\"spec\": {\"replicas\": " + request.Replicas + "}}";
var patchString = $"{{\"spec\": {{\"replicas\": {request?.Replicas}}}}}";
var patch = new V1Patch(patchString, V1Patch.PatchType.MergePatch);
await client.PatchNamespacedDeploymentScaleAsync(patch, request.Deployment, request.Namespace);
await client.PatchNamespacedDeploymentScaleAsync(patch, request?.Deployment, request?.Namespace);
}
catch (HttpOperationException e)
{
var empty = "";
_logger.LogError(e, $"{empty}Error while scaling kubernetes deployment" + request.Deployment);
_logger.LogError(e, "Error while scaling kubernetes deployment {RequestDeployment}", request?.Deployment);
return request;
}

Expand Down
69 changes: 42 additions & 27 deletions src/SlimFaas/SlimProxyMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ public class SlimProxyMiddleware
{
private readonly RequestDelegate _next;
private readonly IQueue _queue;
private readonly int _timeoutMaximumWaitWakeSyncFunctionMilliSecond;

public SlimProxyMiddleware(RequestDelegate next, IQueue queue)
public SlimProxyMiddleware(RequestDelegate next, IQueue queue, int timeoutWaitWakeSyncFunctionMilliSecond = 20000)
{
_next = next;
_queue = queue;
_timeoutMaximumWaitWakeSyncFunctionMilliSecond = int.Parse(Environment.GetEnvironmentVariable("TIMEOUT_MAXIMUM_WAIT_WAKE_SYNC_FUNCTION") ?? timeoutWaitWakeSyncFunctionMilliSecond.ToString());
}

public async Task InvokeAsync(HttpContext context, ILogger<SlimProxyMiddleware> faasLogger,
Expand Down Expand Up @@ -91,43 +93,56 @@ private async Task BuildSyncResponseAsync(HttpContext context, HistoryHttpMemory
context.Response.StatusCode = 404;
return;
}

historyHttpService.SetTickLastCall(functionName, DateTime.Now.Ticks);
var numerLoop = 100;
while (numerLoop > 0)
{
var isAnyContainerStarted = replicasService.Deployments.Functions.Any(f => f.Replicas is > 0 && f.Pods.Any(p => p.Ready.HasValue && p.Ready.Value));
if(!isAnyContainerStarted)
{
numerLoop--;
await Task.Delay(200);
continue;
}
numerLoop=0;
}

await WaitForAnyPodStartedAsync(context, historyHttpService, replicasService, functionName);

var responseMessagePromise = sendClient.SendHttpRequestSync(context, functionName, functionPath, context.Request.QueryString.ToUriComponent());
var counterLimit = 100;
// TODO manage request Aborded

var lastSetTicks = DateTime.Now.Ticks;
historyHttpService.SetTickLastCall(functionName, lastSetTicks);
while (!responseMessagePromise.IsCompleted)
{
await Task.Delay(10);
counterLimit--;
if (counterLimit <= 0)
{
historyHttpService.SetTickLastCall(functionName, DateTime.Now.Ticks);
}

counterLimit = 100;
await Task.Delay(10, context.RequestAborted);
var isOneSecondElapsed = new DateTime(lastSetTicks) < DateTime.Now.AddSeconds(-1);
if (!isOneSecondElapsed) continue;
lastSetTicks = DateTime.Now.Ticks;
historyHttpService.SetTickLastCall(functionName, lastSetTicks);
}

historyHttpService.SetTickLastCall(functionName, DateTime.Now.Ticks);
using var responseMessage = responseMessagePromise.Result;
context.Response.StatusCode = (int)responseMessage.StatusCode;
CopyFromTargetResponseHeaders(context, responseMessage);
await responseMessage.Content.CopyToAsync(context.Response.Body);
}



private async Task WaitForAnyPodStartedAsync(HttpContext context, HistoryHttpMemoryService historyHttpService,
IReplicasService replicasService, string functionName)
{
var numberLoop = _timeoutMaximumWaitWakeSyncFunctionMilliSecond / 10;
var lastSetTicks = DateTime.Now.Ticks;
historyHttpService.SetTickLastCall(functionName, lastSetTicks);
while (numberLoop > 0)
{
var isAnyContainerStarted = replicasService.Deployments.Functions.Any(f =>
f.Replicas is > 0 && f.Pods.Any(p => p.Ready.HasValue && p.Ready.Value));
if (!isAnyContainerStarted && !context.RequestAborted.IsCancellationRequested)
{
numberLoop--;
await Task.Delay(10, context.RequestAborted);
var isOneSecondElapsed = new DateTime(lastSetTicks) < DateTime.Now.AddSeconds(-1);
if (isOneSecondElapsed)
{
lastSetTicks = DateTime.Now.Ticks;
historyHttpService.SetTickLastCall(functionName, lastSetTicks);
}
continue;
}

numberLoop = 0;
}
}

private void CopyFromTargetResponseHeaders(HttpContext context, HttpResponseMessage responseMessage)
{
foreach (var header in responseMessage.Headers)
Expand Down

0 comments on commit 04ee5a7

Please sign in to comment.