From d99bb7d3ba6c659c60fc72a88a335b670d5b298e Mon Sep 17 00:00:00 2001 From: Guillaume Chervet Date: Mon, 26 Jun 2023 16:16:13 +0200 Subject: [PATCH] feat(slimfaas): call only at least when one pod is read --- src/SlimFaas/Kubernetes/KubernetesService.cs | 57 ++++++++++++++- src/SlimFaas/ReplicasSynchronizationWorker.cs | 2 +- src/SlimFaas/SlimProxyMiddleware.cs | 17 ++++- src/SlimFaas/SlimWorker.cs | 3 + .../SlimProxyMiddlewareTests.cs | 36 ++++++++++ tests/SlimFaas.Tests/SlimWorkerShould.cs | 13 +++- tests/SlimFaas.Tests/TestShould.cs | 72 +++++++++++++++++++ 7 files changed, 195 insertions(+), 5 deletions(-) create mode 100644 tests/SlimFaas.Tests/TestShould.cs diff --git a/src/SlimFaas/Kubernetes/KubernetesService.cs b/src/SlimFaas/Kubernetes/KubernetesService.cs index c4328efe..dee7d05f 100644 --- a/src/SlimFaas/Kubernetes/KubernetesService.cs +++ b/src/SlimFaas/Kubernetes/KubernetesService.cs @@ -1,4 +1,5 @@ -using System.Diagnostics.CodeAnalysis; +using System.Collections; +using System.Diagnostics.CodeAnalysis; using k8s; using k8s.Autorest; using k8s.Models; @@ -27,6 +28,7 @@ public record DeploymentsInformations public record DeploymentInformation { public string Deployment { get; set; } + public IList Pods { get; set; } public string Namespace { get; set; } public int? Replicas { get; set; } public int ReplicasMin { get; set; } @@ -36,6 +38,15 @@ public record DeploymentInformation public int NumberParallelRequest { get; set; } } +public record PodInformation +{ + public string Name { get; set; } + public bool? Started { get; set; } + public bool? Ready { get; set; } + public string Ip { get; set; } + public string DeploymentName { get; set; } +} + [ExcludeFromCodeCoverage] public class KubernetesService : IKubernetesService { @@ -82,7 +93,12 @@ public async Task ListFunctionsAsync(string kubeNamespa { IList? deploymentInformationList = new List(); using var client = new Kubernetes(_k8SConfig); - var deploymentList = await client.ListNamespacedDeploymentAsync(kubeNamespace); + var deploymentListTask = client.ListNamespacedDeploymentAsync(kubeNamespace); + var podListTask = client.ListNamespacedPodAsync(kubeNamespace); + + await Task.WhenAll(deploymentListTask, podListTask); + var deploymentList = deploymentListTask.Result; + var podList = MapPodInformations(podListTask.Result); var slimFaasDeploymentInformation = deploymentList.Items.Where(deploymentListItem => deploymentListItem.Metadata.Name == "slimfaas").Select(deploymentListItem => new SlimFaasDeploymentInformation { @@ -97,6 +113,7 @@ public async Task ListFunctionsAsync(string kubeNamespa var deploymentInformation = new DeploymentInformation { Deployment = deploymentListItem.Metadata.Name, + Pods = podList.Where(p => p.DeploymentName == deploymentListItem.Metadata.Name).ToList(), Namespace = kubeNamespace, Replicas = deploymentListItem.Spec.Replicas, ReplicasAtStart = annotations.ContainsKey(ReplicasAtStart) @@ -142,4 +159,40 @@ public async Task ListFunctionsAsync(string kubeNamespa } } + private static IList MapPodInformations(V1PodList list) + { + IList podInformations = new List(); + foreach (var item in list.Items) + { + var containerStatus = item.Status.ContainerStatuses.FirstOrDefault(); + if (containerStatus == null) + { + continue; + } + + var ready = containerStatus.Ready; + var started = containerStatus.Started; + var podIP = item.Status.PodIP; + var podName = item.Metadata.Name; + var deploymentName = string.Empty; + if (item.Metadata.Labels.TryGetValue("app", out var label)) + { + deploymentName = label; + } + + var podInformation = new PodInformation() + { + Started = started, + Ready = ready, + Ip = podIP, + Name = podName, + DeploymentName = deploymentName + }; + + podInformations.Add(podInformation); + } + + return podInformations; + } + } \ No newline at end of file diff --git a/src/SlimFaas/ReplicasSynchronizationWorker.cs b/src/SlimFaas/ReplicasSynchronizationWorker.cs index 7c4cbcf3..6fd66456 100644 --- a/src/SlimFaas/ReplicasSynchronizationWorker.cs +++ b/src/SlimFaas/ReplicasSynchronizationWorker.cs @@ -7,7 +7,7 @@ public class ReplicasSynchronizationWorker: BackgroundService private readonly int _delay; private readonly string _namespace; - public ReplicasSynchronizationWorker(IReplicasService replicasService, ILogger logger, int delay = 10000) + public ReplicasSynchronizationWorker(IReplicasService replicasService, ILogger logger, int delay = 1000) { _replicasService = replicasService; _logger = logger; diff --git a/src/SlimFaas/SlimProxyMiddleware.cs b/src/SlimFaas/SlimProxyMiddleware.cs index 65046451..dc46235b 100644 --- a/src/SlimFaas/SlimProxyMiddleware.cs +++ b/src/SlimFaas/SlimProxyMiddleware.cs @@ -21,7 +21,8 @@ public SlimProxyMiddleware(RequestDelegate next, IQueue queue) _queue = queue; } - public async Task InvokeAsync(HttpContext context, ILogger faasLogger, HistoryHttpMemoryService historyHttpService, ISendClient sendClient) + public async Task InvokeAsync(HttpContext context, ILogger faasLogger, + HistoryHttpMemoryService historyHttpService, ISendClient sendClient, IReplicasService replicasService) { var contextRequest = context.Request; var (functionPath, functionName, functionType) = GetFunctionInfo(faasLogger, contextRequest); @@ -36,6 +37,19 @@ public async Task InvokeAsync(HttpContext context, ILogger contextResponse.StatusCode = 200; return; case FunctionType.Sync: + + var numerLoop = 100; + while (numerLoop > 0) + { + if(replicasService.Deployments.Functions.Count(f => f.Replicas.HasValue && f.Replicas.Value > 0 && f.Pods.Count(p => p.Ready.HasValue && p.Ready.Value ) <= 0) <= 0 ) + { + numerLoop--; + await Task.Delay(200); + continue; + } + numerLoop=0; + } + await BuildSyncResponse(context, historyHttpService, sendClient, functionName, functionPath); return; case FunctionType.Async: @@ -59,6 +73,7 @@ private async Task BuildSyncResponse(HttpContext context, HistoryHttpMemoryServi ISendClient sendClient, string functionName, string functionPath) { historyHttpService.SetTickLastCall(functionName, DateTime.Now.Ticks); + var responseMessagePromise = sendClient.SendHttpRequestSync(context, functionName, functionPath, context.Request.QueryString.ToUriComponent()); var counterLimit = 100; // TODO manage request Aborded diff --git a/src/SlimFaas/SlimWorker.cs b/src/SlimFaas/SlimWorker.cs index 0885fb5b..1baf878c 100644 --- a/src/SlimFaas/SlimWorker.cs +++ b/src/SlimFaas/SlimWorker.cs @@ -53,6 +53,9 @@ private async Task DoOneCycle(CancellationToken stoppingToken, Dictionary p.Ready.HasValue && p.Ready.Value ) <= 0) continue; + if (numberProcessingTasks >= numberLimitProcessingTasks) continue; await SendHttpRequestToFunction(processingTasks, numberLimitProcessingTasks, numberProcessingTasks, functionDeployment); diff --git a/tests/SlimFaas.Tests/SlimProxyMiddlewareTests.cs b/tests/SlimFaas.Tests/SlimProxyMiddlewareTests.cs index 48bfb17b..fb5ed61a 100644 --- a/tests/SlimFaas.Tests/SlimProxyMiddlewareTests.cs +++ b/tests/SlimFaas.Tests/SlimProxyMiddlewareTests.cs @@ -9,6 +9,39 @@ namespace SlimFaas.Tests; + +class MemoryReplicasService : IReplicasService +{ + public DeploymentsInformations Deployments => + new() + { + Functions = new List() + { + new() + { + Replicas = 0, + Deployment = "fibonacci", + Namespace = "default", + Pods = new List { new() { Ready = true } } + } + }, + SlimFaas = new SlimFaasDeploymentInformation + { + Replicas = 1 + } + }; + + public Task SyncDeploymentsAsync(string kubeNamespace) + { + throw new NotImplementedException(); + } + + public Task CheckScaleAsync(string kubeNamespace) + { + throw new NotImplementedException(); + } +} + class MemoryQueue: IQueue { public async Task EnqueueAsync(string key, string message) @@ -66,6 +99,7 @@ public async Task CallFunctionInSyncModeAndReturnOk() services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); }) .Configure(app => { @@ -92,6 +126,7 @@ public async Task CallFunctionInAsyncSyncModeAndReturnOk() services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); }) .Configure(app => { @@ -118,6 +153,7 @@ public async Task JustWakeFunctionAndReturnOk() services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); }) .Configure(app => { diff --git a/tests/SlimFaas.Tests/SlimWorkerShould.cs b/tests/SlimFaas.Tests/SlimWorkerShould.cs index 426c8643..38c86ffd 100644 --- a/tests/SlimFaas.Tests/SlimWorkerShould.cs +++ b/tests/SlimFaas.Tests/SlimWorkerShould.cs @@ -54,7 +54,18 @@ public async Task OnlyCallOneFunctionAsync() ReplicasMin = 0, ReplicasAtStart = 1, TimeoutSecondBeforeSetReplicasMin = 300, - ReplicasStartAsSoonAsOneFunctionRetrieveARequest = true + ReplicasStartAsSoonAsOneFunctionRetrieveARequest = true, + Pods = new List() + { + new() + { + Name = "fibonacci", + DeploymentName = "fibonacci", + Ready = true, + Started = true, + Ip = "" + } + } } } }); diff --git a/tests/SlimFaas.Tests/TestShould.cs b/tests/SlimFaas.Tests/TestShould.cs new file mode 100644 index 00000000..a3d900cb --- /dev/null +++ b/tests/SlimFaas.Tests/TestShould.cs @@ -0,0 +1,72 @@ +using k8s; +using k8s.Models; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Moq; + +namespace SlimFaas.Tests; + +public class TestShould +{ + [Fact] + public async Task SyncLastTicksBetweenDatabaseAndMemory() + { + var inMemorySettings = new Dictionary { + {"UseKubeConfig", "true"}, + }; + + IConfiguration configuration = new ConfigurationBuilder() + .AddInMemoryCollection(inMemorySettings) + .Build(); + //var kubernetesService = new KubernetesService(configuration); + //var functions = await kubernetesService.ListFunctionsAsync("lightfaas-demo"); + + // Load from the default kubeconfig on the machine. + var config = KubernetesClientConfiguration.BuildConfigFromConfigFile(); + using var client = new Kubernetes(config); + var namespaces = client.CoreV1.ListNamespace(); + foreach (var ns in namespaces.Items) { + Console.WriteLine(ns.Metadata.Name); + + + var list = client.CoreV1.ListNamespacedPod(ns.Metadata.Name); + MapPodInformations(list); + } + } + + private static IList MapPodInformations(V1PodList list) + { + IList podInformations = new List(); + foreach (var item in list.Items) + { + var containerStatus = item.Status.ContainerStatuses.FirstOrDefault(); + if (containerStatus == null) + { + continue; + } + + var ready = containerStatus.Ready; + var started = containerStatus.Started; + var podIP = item.Status.PodIP; + var podName = item.Metadata.Name; + var deploymentName = string.Empty; + if (item.Metadata.Labels.TryGetValue("app", out var label)) + { + deploymentName = label; + } + + var podInformation = new PodInformation() + { + Started = started, + Ready = ready, + Ip = podIP, + Name = podName, + DeploymentName = deploymentName + }; + + podInformations.Add(podInformation); + } + + return podInformations; + } +} \ No newline at end of file