Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kudu + KEDA publishing for Durable Functions on SQL #191

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Common/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ public static TimeSpan MaxAllowedExecutionTime
public const string HubName = "HubName";
public const string DurableTaskStorageConnection = "connection";
public const string DurableTaskStorageConnectionName = "azureStorageConnectionStringName";
public const string DurableTaskSqlConnectionName = "connectionStringName";
public const string DurableTaskStorageProvider = "storageProvider";
public const string DurableTaskMicrosoftSqlProviderType = "MicrosoftSQL";
public const string DurableTask = "durableTask";
public const string Extensions = "extensions";
public const string SitePackages = "SitePackages";
Expand Down
141 changes: 113 additions & 28 deletions Kudu.Core/Functions/KedaFunctionTriggerProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,19 @@ public IEnumerable<ScaleTrigger> GetFunctionTriggers(string zipFilePath)
return null;
}

List<ScaleTrigger> kedaScaleTriggers = new List<ScaleTrigger>();
string hostJsonText = null;
var triggerBindings = new List<FunctionTrigger>();
using (var zip = ZipFile.OpenRead(zipFilePath))
{
var hostJsonEntry = zip.Entries.FirstOrDefault(e => IsHostJson(e.FullName));
if (hostJsonEntry != null)
{
using (var reader = new StreamReader(hostJsonEntry.Open()))
{
hostJsonText = reader.ReadToEnd();
}
}

var entries = zip.Entries
.Where(e => IsFunctionJson(e.FullName));

Expand All @@ -33,26 +43,44 @@ public IEnumerable<ScaleTrigger> GetFunctionTriggers(string zipFilePath)
{
using (var reader = new StreamReader(stream))
{
var functionTriggers = ParseFunctionJson(GetFunctionName(entry), reader.ReadToEnd());
if (functionTriggers?.Any() == true)
{
kedaScaleTriggers.AddRange(functionTriggers);
}
triggerBindings.AddRange(ParseFunctionJson(GetFunctionName(entry), reader.ReadToEnd()));
}
}
}
}

var durableTriggers = triggerBindings.Where(b => IsDurable(b));
var standardTriggers = triggerBindings.Where(b => !IsDurable(b));

var kedaScaleTriggers = new List<ScaleTrigger>();
kedaScaleTriggers.AddRange(GetStandardScaleTriggers(standardTriggers));

// Durable Functions triggers are treated as a group and get configuration from host.json
if (durableTriggers.Any() && TryGetDurableKedaTrigger(hostJsonText, out ScaleTrigger durableScaleTrigger))
{
kedaScaleTriggers.Add(durableScaleTrigger);
}

bool IsFunctionJson(string fullName)
{
return fullName.EndsWith(Constants.FunctionsConfigFile) &&
fullName.Count(c => c == '/' || c == '\\') == 1;
}

bool IsHostJson(string fullName)
{
return fullName.Equals(Constants.FunctionsHostConfigFile, StringComparison.OrdinalIgnoreCase);
}

bool IsDurable(FunctionTrigger function) =>
function.Type.Equals("orchestrationTrigger", StringComparison.OrdinalIgnoreCase) ||
function.Type.Equals("activityTrigger", StringComparison.OrdinalIgnoreCase) ||
function.Type.Equals("entityTrigger", StringComparison.OrdinalIgnoreCase);

return kedaScaleTriggers;
}

public IEnumerable<ScaleTrigger> ParseFunctionJson(string functionName, string functionJson)
private IEnumerable<FunctionTrigger> ParseFunctionJson(string functionName, string functionJson)
{
var json = JObject.Parse(functionJson);
if (json.TryGetValue("disabled", out JToken value))
Expand All @@ -67,51 +95,56 @@ public IEnumerable<ScaleTrigger> ParseFunctionJson(string functionName, string f

if (disabled)
{
return null;
yield break;
}
}

var excluded = json.TryGetValue("excluded", out value) && (bool)value;
if (excluded)
{
return null;
yield break;
}

var triggers = new List<ScaleTrigger>();
foreach (JObject binding in (JArray)json["bindings"])
{
var type = (string)binding["type"];
if (type.EndsWith("Trigger", StringComparison.OrdinalIgnoreCase))
{
var scaleTrigger = new ScaleTrigger
{
Type = GetKedaTriggerType(type),
Metadata = new Dictionary<string, string>()
};
foreach (var property in binding)
yield return new FunctionTrigger(functionName, binding, type);
}
}
}

private static IEnumerable<ScaleTrigger> GetStandardScaleTriggers(IEnumerable<FunctionTrigger> standardTriggers)
{
foreach (FunctionTrigger function in standardTriggers)
{
var scaleTrigger = new ScaleTrigger
{
Type = GetKedaTriggerType(function.Type),
Metadata = new Dictionary<string, string>()
};
foreach (var property in function.Binding)
{
if (property.Value.Type == JTokenType.String)
{
if (property.Value.Type == JTokenType.String)
{
scaleTrigger.Metadata.Add(property.Key, property.Value.ToString());
}
scaleTrigger.Metadata.Add(property.Key, property.Value.ToString());
}

scaleTrigger.Metadata.Add("functionName", functionName);
triggers.Add(scaleTrigger);
}
}

return triggers;
scaleTrigger.Metadata.Add("functionName", function.FunctionName);
yield return scaleTrigger;
}
}

private static string GetFunctionName(ZipArchiveEntry zipEnetry)
private static string GetFunctionName(ZipArchiveEntry zipEntry)
{
if (string.IsNullOrWhiteSpace(zipEnetry?.FullName))
if (string.IsNullOrWhiteSpace(zipEntry?.FullName))
{
return string.Empty;
}

return zipEnetry.FullName.Split('/').Length == 2 ? zipEnetry.FullName.Split('/')[0] : zipEnetry.FullName.Split('\\')[0];
return zipEntry.FullName.Split('/').Length == 2 ? zipEntry.FullName.Split('/')[0] : zipEntry.FullName.Split('\\')[0];
}

public static string GetKedaTriggerType(string triggerType)
Expand Down Expand Up @@ -150,5 +183,57 @@ public static string GetKedaTriggerType(string triggerType)
return triggerType;
}
}

private static bool TryGetDurableKedaTrigger(string hostJsonText, out ScaleTrigger scaleTrigger)
{
scaleTrigger = null;
if (string.IsNullOrEmpty(hostJsonText))
{
return false;
}

JObject hostJson = JObject.Parse(hostJsonText);

// Reference: https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-bindings#durable-functions-2-0-host-json
string durableStorageProviderPath = $"{Constants.Extensions}.{Constants.DurableTask}.{Constants.DurableTaskStorageProvider}";
JObject storageProviderConfig = hostJson.SelectToken(durableStorageProviderPath) as JObject;
string storageType = storageProviderConfig?["type"]?.ToString();

// Custom storage types are supported starting in Durable Functions v2.4.2
if (string.Equals(storageType, Constants.DurableTaskMicrosoftSqlProviderType, StringComparison.OrdinalIgnoreCase))
{
scaleTrigger = new ScaleTrigger
{
// MSSQL scaler reference: https://keda.sh/docs/2.2/scalers/mssql/
Type = "mssql",
cgillum marked this conversation as resolved.
Show resolved Hide resolved
Metadata = new Dictionary<string, string>
{
["query"] = "SELECT dt.GetScaleMetric()",
["targetValue"] = "1", // super-conservative default
["connectionStringFromEnv"] = storageProviderConfig?[Constants.DurableTaskSqlConnectionName]?.ToString(),
}
};
}
else
{
// TODO: Support for the Azure Storage and Netherite backends
}

return scaleTrigger != null;
}

private class FunctionTrigger
{
public FunctionTrigger(string functionName, JObject binding, string type)
{
this.FunctionName = functionName;
this.Binding = binding;
this.Type = type;
}

public string FunctionName { get; }
public JObject Binding { get; }
public string Type { get; }
}
}
}
64 changes: 64 additions & 0 deletions Kudu.Tests/Core/Function/KedaFunctionTriggersProviderTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using Kudu.Core.Functions;
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using Xunit;

namespace Kudu.Tests.Core.Function
{
public class KedaFunctionTriggersProviderTests
{
[Fact]
public void DurableFunctionApp()
{
// Generate a zip archive with a host.json and the contents of a Durable Function app
string zipFilePath = Path.GetTempFileName();
using (var fileStream = File.OpenWrite(zipFilePath))
using (var archive = new ZipArchive(fileStream, ZipArchiveMode.Create, leaveOpen: true))
{
CreateJsonFileEntry(archive, "host.json", @"{""version"":""2.0"",""extensions"":{""durableTask"":{""hubName"":""DFTest"",""storageProvider"":{""type"":""MicrosoftSQL"",""connectionStringName"":""SQLDB_Connection""}}}}");
CreateJsonFileEntry(archive, "f1/function.json", @"{""bindings"":[{""type"":""orchestrationTrigger"",""name"":""context""}],""disabled"":false}");
CreateJsonFileEntry(archive, "f2/function.json", @"{""bindings"":[{""type"":""entityTrigger"",""name"":""ctx""}],""disabled"":false}");
CreateJsonFileEntry(archive, "f3/function.json", @"{""bindings"":[{""type"":""activityTrigger"",""name"":""input""}],""disabled"":false}");
CreateJsonFileEntry(archive, "f4/function.json", @"{""bindings"":[{""type"":""httpTrigger"",""methods"":[""post""],""authLevel"":""anonymous"",""name"":""req""}],""disabled"":false}");
}

try
{
var provider = new KedaFunctionTriggerProvider();
IEnumerable<ScaleTrigger> result = provider.GetFunctionTriggers(zipFilePath);
Assert.Equal(2, result.Count());

ScaleTrigger mssqlTrigger = Assert.Single(result, trigger => trigger.Type.Equals("mssql", StringComparison.OrdinalIgnoreCase));
string query = Assert.Contains("query", mssqlTrigger.Metadata);
Assert.False(string.IsNullOrEmpty(query));

string targetValue = Assert.Contains("targetValue", mssqlTrigger.Metadata);
Assert.False(string.IsNullOrEmpty(targetValue));
Assert.True(double.TryParse(targetValue, out _));

string connectionStringName = Assert.Contains("connectionStringFromEnv", mssqlTrigger.Metadata);
Assert.Equal("SQLDB_Connection", connectionStringName);

ScaleTrigger httpTrigger = Assert.Single(result, trigger => trigger.Type.Equals("httpTrigger", StringComparison.OrdinalIgnoreCase));
string functionName = Assert.Contains("functionName", httpTrigger.Metadata);
Assert.Equal("f4", functionName);
}
finally
{
File.Delete(zipFilePath);
}
}

private static void CreateJsonFileEntry(ZipArchive archive, string path, string content)
{
using (Stream entryStream = archive.CreateEntry(path).Open())
using (var streamWriter = new StreamWriter(entryStream))
{
streamWriter.Write(content);
}
}
}
}