Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make reading Kubeconfig from DCP resilient #3132

Merged
merged 12 commits into from
Mar 26, 2024
7 changes: 7 additions & 0 deletions src/Aspire.Hosting/Dcp/DcpOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ internal sealed class DcpOptions
/// </summary>
public bool? RandomizePorts { get; set; }

public int KubernetesConfigReadRetryCount { get; set; } = 300;

public int KubernetesConfigReadRetryIntervalMilliseconds { get; set; } = 100;

public void ApplyApplicationConfiguration(DistributedApplicationOptions appOptions, IConfiguration dcpPublisherConfiguration, IConfiguration publishingConfiguration, IConfiguration coreConfiguration)
{
string? publisher = publishingConfiguration[nameof(PublishingOptions.Publisher)];
Expand Down Expand Up @@ -125,6 +129,9 @@ public void ApplyApplicationConfiguration(DistributedApplicationOptions appOptio
DependencyCheckTimeout = coreConfiguration.GetValue<int>("DOTNET_ASPIRE_DEPENDENCY_CHECK_TIMEOUT", DependencyCheckTimeout);
}

KubernetesConfigReadRetryCount = dcpPublisherConfiguration.GetValue<int>(nameof(KubernetesConfigReadRetryCount), KubernetesConfigReadRetryCount);
KubernetesConfigReadRetryIntervalMilliseconds = dcpPublisherConfiguration.GetValue<int>(nameof(KubernetesConfigReadRetryIntervalMilliseconds), KubernetesConfigReadRetryIntervalMilliseconds);

if (!string.IsNullOrEmpty(dcpPublisherConfiguration[nameof(ResourceNameSuffix)]))
{
ResourceNameSuffix = dcpPublisherConfiguration[nameof(ResourceNameSuffix)];
Expand Down
86 changes: 78 additions & 8 deletions src/Aspire.Hosting/Dcp/KubernetesService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Immutable;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using Aspire.Hosting.Dcp.Model;
using k8s;
using k8s.Exceptions;
using k8s.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Polly;
using Polly.Retry;

namespace Aspire.Hosting.Dcp;

Expand Down Expand Up @@ -39,7 +44,7 @@ Task<Stream> GetLogStreamAsync<T>(
CancellationToken cancellationToken = default) where T : CustomResource;
}

internal sealed class KubernetesService(Locations locations) : IKubernetesService, IDisposable
internal sealed class KubernetesService(ILogger<KubernetesService> logger, IOptions<DcpOptions> dcpOptions, Locations locations) : IKubernetesService, IDisposable
{
private static readonly TimeSpan s_initialRetryDelay = TimeSpan.FromMilliseconds(100);
private static GroupVersion GroupVersion => Model.Dcp.GroupVersion;
Expand Down Expand Up @@ -212,6 +217,7 @@ public Task<Stream> GetLogStreamAsync<T>(

public void Dispose()
{
_kubeconfigReadSemaphore?.Dispose();
_kubernetes?.Dispose();
}

Expand Down Expand Up @@ -254,7 +260,7 @@ private async Task<TResult> ExecuteWithRetry<TResult>(
{
try
{
EnsureKubernetes();
await EnsureKubernetesAsync(cancellationToken).ConfigureAwait(false);
return await operation(_kubernetes!).ConfigureAwait(false);
}
catch (Exception e) when (IsRetryable(e))
Expand All @@ -280,16 +286,80 @@ private async Task<TResult> ExecuteWithRetry<TResult>(

private static bool IsRetryable(Exception ex) => ex is HttpRequestException || ex is KubeConfigException;

private void EnsureKubernetes()
private readonly SemaphoreSlim _kubeconfigReadSemaphore = new(1);

private ResiliencePipeline? _resiliencePipeline;

private ResiliencePipeline GetReadKubeconfigResiliencePipeline()
{
if (_kubernetes != null) { return; }
if (_resiliencePipeline == null)
{
var configurationReadRetry = new RetryStrategyOptions()
mitchdenny marked this conversation as resolved.
Show resolved Hide resolved
{
ShouldHandle = new PredicateBuilder().Handle<KubeConfigException>(),
BackoffType = DelayBackoffType.Constant,
mitchdenny marked this conversation as resolved.
Show resolved Hide resolved
MaxRetryAttempts = dcpOptions.Value.KubernetesConfigReadRetryCount,
MaxDelay = TimeSpan.FromMilliseconds(dcpOptions.Value.KubernetesConfigReadRetryIntervalMilliseconds),
OnRetry = (retry) =>
{
logger.LogDebug(
mitchdenny marked this conversation as resolved.
Show resolved Hide resolved
"Waiting for Kubernetes configuration file at '{DcpKubeconfigPath}' (attempt {Iteration}).",
locations.DcpKubeconfigPath,
retry.AttemptNumber
);
return ValueTask.CompletedTask;
}
};

lock (Model.Dcp.Schema)
_resiliencePipeline = new ResiliencePipelineBuilder().AddRetry(configurationReadRetry).Build();
}

return _resiliencePipeline;
}

private async Task EnsureKubernetesAsync(CancellationToken cancellationToken = default)
{
// Return early before waiting for the semaphore if we can.
if (_kubernetes != null)
{
if (_kubernetes != null) { return; }
return;
}

var config = KubernetesClientConfiguration.BuildConfigFromConfigFile(kubeconfigPath: locations.DcpKubeconfigPath, useRelativePaths: false);
_kubernetes = new DcpKubernetesClient(config);
await _kubeconfigReadSemaphore.WaitAsync(-1, cancellationToken).ConfigureAwait(false);

try
{
// Second chance shortcut if multiple threads got caught.
if (_kubernetes != null)
{
return;
}

// We retry reading the kubeconfig file because DCP takes a few moments to write
// it to disk. This retry pipeline will only be invoked by a single thread the
// rest will be held at the semaphore.
var readStopwatch = new Stopwatch();
readStopwatch.Start();

var pipeline = GetReadKubeconfigResiliencePipeline();
_kubernetes = await pipeline.ExecuteAsync<DcpKubernetesClient>(async (cancellationToken) =>
{
var fileInfo = new FileInfo(locations.DcpKubeconfigPath);
var config = await KubernetesClientConfiguration.BuildConfigFromConfigFileAsync(kubeconfig: fileInfo, useRelativePaths: false).ConfigureAwait(false);
readStopwatch.Stop();

logger.LogDebug(
"Successfully read Kubernetes configuration from '{DcpKubeconfigPath}' after {DurationMs} milliseconds.",
locations.DcpKubeconfigPath,
readStopwatch.ElapsedMilliseconds
);

return new DcpKubernetesClient(config);
}, cancellationToken).ConfigureAwait(false);
}
finally
{
_kubeconfigReadSemaphore.Release();
}
}
}