Skip to content

Commit

Permalink
fix(slimfaas): fix event openshift (#64)
Browse files Browse the repository at this point in the history
* fix(slimfaas): event not throw on openshift

* update

* fix

* test

* update

* update doc

* fix

* update

* update

* test

* update

* Update
  • Loading branch information
guillaume-chervet authored Jun 13, 2024
1 parent d584322 commit f8cc3a7
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 27 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ spec:
- name: BASE_FUNCTION_URL
value: "http://{function_name}.{namespace}.svc.cluster.local:8080"
- name: BASE_FUNCTION_POD_URL # require for publish route
value: "http://{pod_name}.{function_name}.{namespace}.svc.cluster.local:8080"
value: "http://{pod_ip}.svc.cluster.local:8080"
- name: BASE_SLIMDATA_URL
value: "http://{pod_name}.slimfaas.{namespace}.svc.cluster.local:3262/" # Don't expose this port, it can also be like "http://{pod_ip}:3262/" but if you can use DNS it's better
- name: SLIMFAAS_PORTS
Expand Down Expand Up @@ -331,6 +331,9 @@ spec:
- **SlimFaas/PathsStartWithVisibility** : ""
- Comma separated list of path prefixed by the default visibility. example: "Private:/mypath/subpath,Public:/mypath2"
- "Public:" or "Private:" are prefix that set the path visibility, if not set, "SlimFaas/DefaultVisibility" is used
- **SlimFaas/ExcludeDeploymentsFromVisibilityPrivate** : ""
- Comma separated list of deployment names or statefulset names
- Message from that pods will be considered as public. It is useful if you want to exclude some pods from the private visibility, for example for a backend for frontend.
- **SlimFaas/Schedule** : "" #json configuration
- Allows you to define a schedule for your functions. If you want to wake up your infrastructure at 07:00 or for example scale down after 60 seconds of inactivity after 07:00 and scale down after 10 seconds of inactivity after 21:00

Expand Down
18 changes: 12 additions & 6 deletions src/SlimFaas/Kubernetes/KubernetesService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ public record DeploymentInformation(string Deployment, string Namespace, IList<P
ScheduleConfig? Schedule = null,
IList<string>? SubscribeEvents = null,
FunctionVisibility Visibility = FunctionVisibility.Public,
IList<string>? PathsStartWithVisibility = null);
IList<string>? PathsStartWithVisibility = null,
IList<string>? ExcludeDeploymentsFromVisibilityPrivate = null
);

public record PodInformation(string Name, bool? Started, bool? Ready, string Ip, string DeploymentName);

Expand All @@ -81,6 +83,7 @@ public class KubernetesService : IKubernetesService
private const string SubscribeEvents = "SlimFaas/SubscribeEvents";
private const string DefaultVisibility = "SlimFaas/DefaultVisibility";
private const string PathsStartWithVisibility = "SlimFaas/PathsStartWithVisibility";
private const string ExcludeDeploymentsFromVisibilityPrivate = "SlimFaas/ExcludeDeploymentsFromVisibilityPrivate";

private const string ReplicasStartAsSoonAsOneFunctionRetrieveARequest =
"SlimFaas/ReplicasStartAsSoonAsOneFunctionRetrieveARequest";
Expand Down Expand Up @@ -182,7 +185,7 @@ public async Task<DeploymentsInformations> ListFunctionsAsync(string kubeNamespa
podList.Where(p => p.Name.StartsWith(deploymentListItem.Metadata.Name)).ToList()))
.FirstOrDefault();

IEnumerable<PodInformation> podInformations = podList as PodInformation[] ?? podList.ToArray();
IEnumerable<PodInformation> podInformations = podList.ToArray();
AddDeployments(kubeNamespace, deploymentList, podInformations, deploymentInformationList, _logger);
AddStatefulSets(kubeNamespace, statefulSetList, podInformations, deploymentInformationList, _logger);

Expand Down Expand Up @@ -217,7 +220,7 @@ private static void AddDeployments(string kubeNamespace, V1DeploymentList deploy
DeploymentInformation deploymentInformation = new(
name,
kubeNamespace,
podList.Where(p => p.DeploymentName == deploymentListItem.Metadata.Name).ToList(),
podList.Where(p => p.DeploymentName.StartsWith(name)).ToList(),
deploymentListItem.Spec.Replicas ?? 0,
annotations.TryGetValue(ReplicasAtStart, out string? annotationReplicasAtStart)
? int.Parse(annotationReplicasAtStart)
Expand Down Expand Up @@ -245,7 +248,9 @@ private static void AddDeployments(string kubeNamespace, V1DeploymentList deploy
: FunctionVisibility.Public,
annotations.TryGetValue(PathsStartWithVisibility, out string? valueUrlsStartWithVisibility)
? valueUrlsStartWithVisibility.Split(',').ToList()
: new List<string>());
: new List<string>(),
annotations.TryGetValue(ExcludeDeploymentsFromVisibilityPrivate, out string? valueExcludeDeploymentsFromVisibilityPrivate) ? valueExcludeDeploymentsFromVisibilityPrivate.Split(',').ToList() : new List<string>()
);
deploymentInformationList.Add(deploymentInformation);
}
catch (Exception e)
Expand Down Expand Up @@ -292,7 +297,7 @@ private static void AddStatefulSets(string kubeNamespace, V1StatefulSetList depl
DeploymentInformation deploymentInformation = new(
name,
kubeNamespace,
podList.Where(p => p.DeploymentName == deploymentListItem.Metadata.Name).ToList(),
podList.Where(p => p.DeploymentName.StartsWith(name)).ToList(),
deploymentListItem.Spec.Replicas ?? 0,
annotations.TryGetValue(ReplicasAtStart, out string? annotationReplicasAtStart)
? int.Parse(annotationReplicasAtStart)
Expand All @@ -317,7 +322,8 @@ private static void AddStatefulSets(string kubeNamespace, V1StatefulSetList depl
: new List<string>(),
annotations.TryGetValue(DefaultVisibility, out string? visibility)
? Enum.Parse<FunctionVisibility>(visibility)
: FunctionVisibility.Public);
: FunctionVisibility.Public,
annotations.TryGetValue(ExcludeDeploymentsFromVisibilityPrivate, out string? valueExcludeDeploymentsFromVisibilityPrivate) ? valueExcludeDeploymentsFromVisibilityPrivate.Split(',').ToList() : new List<string>());

deploymentInformationList.Add(deploymentInformation);
}
Expand Down
8 changes: 5 additions & 3 deletions src/SlimFaas/SendClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Task<HttpResponseMessage> SendHttpRequestSync(HttpContext httpContext, string fu
string functionQuery, string? baseUrl = null);
}

public class SendClient(HttpClient httpClient) : ISendClient
public class SendClient(HttpClient httpClient, ILogger<SendClient> logger) : ISendClient
{
private readonly string _baseFunctionUrl =
Environment.GetEnvironmentVariable(EnvironmentVariables.BaseFunctionUrl) ??
Expand All @@ -27,6 +27,7 @@ public async Task<HttpResponseMessage> SendHttpRequestAsync(CustomRequest custom
string customRequestQuery = customRequest.Query;
string targetUrl =
ComputeTargetUrl(functionUrl, customRequestFunctionName, customRequestPath, customRequestQuery, _namespaceSlimFaas);
logger.LogDebug("Sending async request to {TargetUrl}", targetUrl);
HttpRequestMessage targetRequestMessage = CreateTargetMessage(customRequest, new Uri(targetUrl));
if (context != null)
{
Expand All @@ -41,8 +42,9 @@ public async Task<HttpResponseMessage> SendHttpRequestAsync(CustomRequest custom
public async Task<HttpResponseMessage> SendHttpRequestSync(HttpContext context, string functionName,
string functionPath, string functionQuery, string? baseUrl = null)
{
string targetUri = ComputeTargetUrl(baseUrl ?? _baseFunctionUrl, functionName, functionPath, functionQuery, _namespaceSlimFaas);
HttpRequestMessage targetRequestMessage = CreateTargetMessage(context, new Uri(targetUri));
string targetUrl = ComputeTargetUrl(baseUrl ?? _baseFunctionUrl, functionName, functionPath, functionQuery, _namespaceSlimFaas);
logger.LogDebug("Sending sync request to {TargetUrl}", targetUrl);
HttpRequestMessage targetRequestMessage = CreateTargetMessage(context, new Uri(targetUrl));
HttpResponseMessage responseMessage = await httpClient.SendAsync(targetRequestMessage,
HttpCompletionOption.ResponseHeadersRead, context.RequestAborted);
return responseMessage;
Expand Down
1 change: 1 addition & 0 deletions src/SlimFaas/SlimDataEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public static string Get(PodInformation podInformation, string? baseUrl = null)
baseSlimDataUrl = baseSlimDataUrl.Replace("{pod_name}", podInformation.Name);
baseSlimDataUrl = baseSlimDataUrl.Replace("{pod_ip}", podInformation.Ip);
baseSlimDataUrl = baseSlimDataUrl.Replace("{namespace}", namespaceSlimFaas);
baseSlimDataUrl = baseSlimDataUrl.Replace("{function_name}", podInformation.DeploymentName);
}

return baseSlimDataUrl;
Expand Down
60 changes: 49 additions & 11 deletions src/SlimFaas/SlimProxyMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,29 +83,53 @@ await BuildPublishResponseAsync(context, historyHttpService, sendClient, replica
case FunctionType.Async:
default:
{
await BuildAsyncResponseAsync(context, replicasService, functionName, functionPath);
await BuildAsyncResponseAsync(logger, context, replicasService, functionName, functionPath);
break;
}
}
}

private static Boolean MessageComeFromNamepaceInternal(HttpContext context, IReplicasService replicasService)
private static Boolean MessageComeFromNamespaceInternal(ILogger<SlimProxyMiddleware> logger, HttpContext context, IReplicasService replicasService, DeploymentInformation currentFunction)
{
IList<string> podIps = replicasService.Deployments.Pods.Select(p => p.Ip).ToList();
IList<string> podIps = replicasService.Deployments.Functions.Select(p => p.Pods).SelectMany(p => p).Where(p => currentFunction?.ExcludeDeploymentsFromVisibilityPrivate?.Contains(p.DeploymentName) == false).Select(p => p.Ip).ToList();
var forwardedFor = context.Request.Headers["X-Forwarded-For"].FirstOrDefault();
var remoteIp = context.Connection.RemoteIpAddress?.ToString();
logger.LogDebug("ForwardedFor: {ForwardedFor}, RemoteIp: {RemoteIp}", forwardedFor, remoteIp);
if(logger.IsEnabled(LogLevel.Debug))
{
foreach (var podIp in podIps)
{
logger.LogDebug("PodIp: {PodIp}", podIp);
}
}

if (IsInternalIp(forwardedFor, podIps) || IsInternalIp(remoteIp, podIps))
{
logger.LogDebug("Request come from internal namespace ForwardedFor: {ForwardedFor}, RemoteIp: {RemoteIp}", forwardedFor, remoteIp);
return true;
}
logger.LogDebug("Request come from external namespace ForwardedFor: {ForwardedFor}, RemoteIp: {RemoteIp}", forwardedFor, remoteIp);

return false;
}

private static bool IsInternalIp(string? ipAddress, IList<string> podIps)
{
return ipAddress != null && podIps.Contains(ipAddress);

if (string.IsNullOrEmpty(ipAddress))
{
return false;
}

foreach (string podIp in podIps)
{
if (ipAddress.Contains(podIp))
{
return true;
}
}

return false;
}

private static void BuildStatusResponse(IReplicasService replicasService,
Expand Down Expand Up @@ -148,7 +172,7 @@ private static void BuildWakeResponse(IReplicasService replicasService, IWakeUpF
}
}

private static List<DeploymentInformation> SearchFunctions(HttpContext context, IReplicasService replicasService, string eventName)
private static List<DeploymentInformation> SearchFunctions(ILogger<SlimProxyMiddleware> logger, HttpContext context, IReplicasService replicasService, string eventName)
{
// example: "Public:my-event-name1,Private:my-event-name2,my-event-name3"
var result = new List<DeploymentInformation>();
Expand All @@ -164,7 +188,7 @@ private static List<DeploymentInformation> SearchFunctions(HttpContext context,
if (splits.Length == 1 && splits[0] == eventName)
{
if (deploymentInformation.Visibility == FunctionVisibility.Public ||
MessageComeFromNamepaceInternal(context, replicasService))
MessageComeFromNamespaceInternal(logger, context, replicasService, deploymentInformation))
{
result.Add(deploymentInformation);
}
Expand All @@ -173,7 +197,7 @@ private static List<DeploymentInformation> SearchFunctions(HttpContext context,
{
var visibility = splits[0];
var visibilityEnum = Enum.Parse<FunctionVisibility>(visibility, true);
if(visibilityEnum == FunctionVisibility.Private && MessageComeFromNamepaceInternal(context, replicasService))
if(visibilityEnum == FunctionVisibility.Private && MessageComeFromNamespaceInternal(logger, context, replicasService, deploymentInformation))
{
result.Add(deploymentInformation);
} else if(visibilityEnum == FunctionVisibility.Public)
Expand Down Expand Up @@ -215,7 +239,7 @@ private static FunctionVisibility GetFunctionVisibility(ILogger<SlimProxyMiddlew
return function.Visibility;
}

private async Task BuildAsyncResponseAsync(HttpContext context, IReplicasService replicasService, string functionName,
private async Task BuildAsyncResponseAsync(ILogger<SlimProxyMiddleware> logger, HttpContext context, IReplicasService replicasService, string functionName,
string functionPath)
{
DeploymentInformation? function = SearchFunction(replicasService, functionName);
Expand All @@ -227,7 +251,7 @@ private async Task BuildAsyncResponseAsync(HttpContext context, IReplicasService

var visibility = GetFunctionVisibility(logger, function, functionPath);

if (visibility == FunctionVisibility.Private && !MessageComeFromNamepaceInternal(context, replicasService))
if (visibility == FunctionVisibility.Private && !MessageComeFromNamespaceInternal(logger, context, replicasService, function))
{
context.Response.StatusCode = 404;
return;
Expand All @@ -245,10 +269,11 @@ private async Task BuildPublishResponseAsync(HttpContext context, HistoryHttpMem
ISendClient sendClient, IReplicasService replicasService, string eventName, string functionPath)
{
logger.LogDebug("Receiving event: {EventName}", eventName);
var functions = SearchFunctions(context, replicasService, eventName);
var functions = SearchFunctions(logger, context, replicasService, eventName);
var slimFaasSubscribeEvents = _slimFaasSubscribeEvents.Where(s => s.Key == eventName);
if (functions.Count <= 0 && !slimFaasSubscribeEvents.Any())
{
logger.LogDebug("Return 404 from event: {EventName}", eventName);
context.Response.StatusCode = 404;
return;
}
Expand All @@ -260,6 +285,7 @@ private async Task BuildPublishResponseAsync(HttpContext context, HistoryHttpMem
historyHttpService.SetTickLastCall(function.Deployment, lastSetTicks);
foreach (var pod in function.Pods)
{
logger.LogDebug("Pod {PodName} is ready: {PodReady}", pod.Name, pod.Ready);
if (pod.Ready != true)
{
continue;
Expand Down Expand Up @@ -304,6 +330,18 @@ private async Task BuildPublishResponseAsync(HttpContext context, HistoryHttpMem
}
}

if (logger.IsEnabled(LogLevel.Debug))
{
foreach (Task<HttpResponseMessage> task in tasks)
{
if (task.IsCompleted)
{
using HttpResponseMessage responseMessage = task.Result;
logger.LogDebug("Response from event {EventName} with status code {StatusCode}", eventName, responseMessage.StatusCode);
}
}
}

context.Response.StatusCode = 204;
}

Expand All @@ -319,7 +357,7 @@ private async Task BuildSyncResponseAsync(HttpContext context, HistoryHttpMemory

var visibility = GetFunctionVisibility(logger, function, functionPath);

if (visibility == FunctionVisibility.Private && !MessageComeFromNamepaceInternal(context, replicasService))
if (visibility == FunctionVisibility.Private && !MessageComeFromNamespaceInternal(logger, context, replicasService, function))
{
context.Response.StatusCode = 404;
return;
Expand Down
3 changes: 2 additions & 1 deletion src/SlimFaas/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"Default": "Warning",
"Microsoft.AspNetCore": "Error",
"DotNext.Net.Cluster": "Error",
"SlimData": "Error"
"SlimData": "Error",
"SlimFaas": "Error"
}
},
"UseKubeConfig": false,
Expand Down
10 changes: 7 additions & 3 deletions tests/SlimFaas.Tests/SendClientShould.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Net;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Moq;

namespace SlimFaas.Tests;

Expand Down Expand Up @@ -28,7 +30,9 @@ public async Task CallFunctionAsync(string httpMethod)
return await Task.FromResult(responseMessage);
}));

SendClient sendClient = new SendClient(httpClient);
var mockLogger = new Mock<ILogger<SendClient>>();

SendClient sendClient = new(httpClient, mockLogger.Object);
CustomRequest customRequest =
new CustomRequest(new List<CustomHeader> { new() { Key = "key", Values = new[] { "value1" } } },
new byte[1], "fibonacci", "health", httpMethod, "");
Expand Down Expand Up @@ -62,8 +66,8 @@ public async Task CallFunctionSync(string httpMethod)
sendedRequest = request;
return await Task.FromResult(responseMessage);
}));

SendClient sendClient = new SendClient(httpClient);
var mockLogger = new Mock<ILogger<SendClient>>();
SendClient sendClient = new SendClient(httpClient, mockLogger.Object);

DefaultHttpContext httpContext = new DefaultHttpContext();
HttpRequest httpContextRequest = httpContext.Request;
Expand Down
4 changes: 2 additions & 2 deletions tests/SlimFaas.Tests/SlimProxyMiddlewareTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ public class ProxyMiddlewareTests

[Theory]
[InlineData("/publish-event/toto/hello", HttpStatusCode.NoContent, "http://localhost:5002/hello" )]
[InlineData("/publish-event/reload/hello", HttpStatusCode.NoContent, "http://fibonacci-2.{function_name}:8080//hello,http://fibonacci-1.{function_name}:8080//hello,http://localhost:5002/hello" )]
[InlineData("/publish-event/reloadnoprefix/hello", HttpStatusCode.NoContent, "http://fibonacci-2.{function_name}:8080//hello,http://fibonacci-1.{function_name}:8080//hello")]
[InlineData("/publish-event/reload/hello", HttpStatusCode.NoContent, "http://fibonacci-2.fibonacci:8080//hello,http://fibonacci-1.fibonacci:8080//hello,http://localhost:5002/hello" )]
[InlineData("/publish-event/reloadnoprefix/hello", HttpStatusCode.NoContent, "http://fibonacci-2.fibonacci:8080//hello,http://fibonacci-1.fibonacci:8080//hello")]
[InlineData("/publish-event/wrong/download", HttpStatusCode.NotFound, null)]
[InlineData("/publish-event/reloadprivate/hello", HttpStatusCode.NotFound, null)]
public async Task CallPublishInSyncModeAndReturnOk(string path, HttpStatusCode expected, string? times)
Expand Down

0 comments on commit f8cc3a7

Please sign in to comment.