Skip to content

Commit

Permalink
feat: add basic retry middleware
Browse files Browse the repository at this point in the history
This commit adds an initial implementation of a retry middleware,
along with fleshing out the FixedCountRetryStrategy for compatibility
with the new middleware.  As of this commit, any Get/Set requests
that fail with an Unavailable or Internal error gRPC status code
will be retried up to 3 attempts.

There is no backoff or jitter yet; we should implement that in the
future.

The determination of which gRPC status codes and request types are
eligible for retries is implemented via a new IRetryEligibilityStrategy
interface.  This way, individual retry strategies have the ability
to override the behavior, but in the 90% case they can just re-use
the default eligibility strategy.

NOTE: we implement retries as a bespoke middleware rather than using
the retry configuration that is built into the .NET gRPC library
because the built-in implementation has some strict restrictions
that aren't compatible with our goals here (e.g. no response that
includes any headers is eligible for retry in the built-in implementation,
and we always get headers back from envoy.)
  • Loading branch information
cprice404 committed Oct 7, 2022
1 parent 3d5ef7c commit 3ef2590
Show file tree
Hide file tree
Showing 16 changed files with 238 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/Momento.Sdk/Config/Middleware/IMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,6 @@ public Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TResponse>
TRequest request,
CallOptions callOptions,
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation
);
) where TRequest : class where TResponse: class;

}
6 changes: 3 additions & 3 deletions src/Momento.Sdk/Config/Middleware/LoggingMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TRes
TRequest request,
CallOptions callOptions,
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation
)
) where TRequest : class where TResponse : class
{
_logger.LogDebug("Executing request of type: {}", request?.GetType());
_logger.LogDebug("Executing request of type: {}", request.GetType());
var nextState = await continuation(request, callOptions);
return new MiddlewareResponseState<TResponse>(
ResponseAsync: nextState.ResponseAsync.ContinueWith(r =>
{
_logger.LogDebug("Got response for request of type: {}", request?.GetType());
_logger.LogDebug("Got response for request of type: {}", request.GetType());
return r.Result;
}),
ResponseHeadersAsync: nextState.ResponseHeadersAsync,
Expand Down
6 changes: 5 additions & 1 deletion src/Momento.Sdk/Config/Middleware/PassThroughMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ IMiddleware IMiddleware.WithLoggerFactory(ILoggerFactory loggerFactory)
return WithLoggerFactory(loggerFactory);
}

public Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TResponse>(TRequest request, CallOptions callOptions, Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation)
public Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TResponse>(
TRequest request,
CallOptions callOptions,
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation
) where TRequest : class where TResponse : class
{
return continuation(request, callOptions);
}
Expand Down
27 changes: 22 additions & 5 deletions src/Momento.Sdk/Config/Retry/FixedCountRetryStrategy.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
using System.Net.NetworkInformation;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Momento.Sdk.Internal.Retry;

namespace Momento.Sdk.Config.Retry;

public class FixedCountRetryStrategy : IRetryStrategy
{
public ILoggerFactory? LoggerFactory { get; }

private ILogger _logger;
private readonly IRetryEligibilityStrategy _eligibilityStrategy;

public int MaxAttempts { get; }

//FixedCountRetryStrategy(retryableStatusCodes = DEFAULT_RETRYABLE_STATUS_CODES, maxAttempts = 3),
public FixedCountRetryStrategy(int maxAttempts, ILoggerFactory? loggerFactory = null)
public FixedCountRetryStrategy(int maxAttempts, IRetryEligibilityStrategy? eligibilityStrategy = null, ILoggerFactory? loggerFactory = null)
{
LoggerFactory = loggerFactory;
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<FixedCountRetryStrategy>();
_eligibilityStrategy = eligibilityStrategy ?? new DefaultRetryEligibilityStrategy(loggerFactory);
MaxAttempts = maxAttempts;
}

public FixedCountRetryStrategy WithLoggerFactory(ILoggerFactory loggerFactory)
{
return new(MaxAttempts, loggerFactory);
return new(MaxAttempts, _eligibilityStrategy.WithLoggerFactory(loggerFactory), loggerFactory);
}

IRetryStrategy IRetryStrategy.WithLoggerFactory(ILoggerFactory loggerFactory)
Expand All @@ -26,15 +35,23 @@ IRetryStrategy IRetryStrategy.WithLoggerFactory(ILoggerFactory loggerFactory)

public FixedCountRetryStrategy WithMaxAttempts(int maxAttempts)
{
return new(maxAttempts, LoggerFactory);
return new(maxAttempts, _eligibilityStrategy, LoggerFactory);
}

public int? DetermineWhenToRetryRequest(IGrpcResponse grpcResponse, IGrpcRequest grpcRequest, int attemptNumber)
public int? DetermineWhenToRetryRequest<TRequest>(Status grpcStatus, TRequest grpcRequest, int attemptNumber) where TRequest : class
{
_logger.LogDebug($"Determining whether request is eligible for retry; status code: {grpcStatus.StatusCode}, request type: {grpcRequest.GetType()}, attemptNumber: {attemptNumber}, maxAttempts: {MaxAttempts}");
if (! _eligibilityStrategy.IsEligibleForRetry(grpcStatus, grpcRequest))
{
return null;
}
if (attemptNumber > MaxAttempts)
{
_logger.LogDebug($"Exceeded max retry count ({MaxAttempts})");
return null;
}
_logger.LogDebug($"Request is eligible for retry (attempt {attemptNumber} of {MaxAttempts}, retrying immediately.");
return 0;
}

}
6 changes: 0 additions & 6 deletions src/Momento.Sdk/Config/Retry/IGrpcRequest.cs

This file was deleted.

6 changes: 0 additions & 6 deletions src/Momento.Sdk/Config/Retry/IGrpcResponse.cs

This file was deleted.

14 changes: 14 additions & 0 deletions src/Momento.Sdk/Config/Retry/IRetryEligibilityStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using Grpc.Core;
using Microsoft.Extensions.Logging;

namespace Momento.Sdk.Config.Retry
{
public interface IRetryEligibilityStrategy
{
public ILoggerFactory? LoggerFactory { get; }
public IRetryEligibilityStrategy WithLoggerFactory(ILoggerFactory loggerFactory);
public bool IsEligibleForRetry<TRequest>(Status status, TRequest request) where TRequest : class;
}
}

9 changes: 4 additions & 5 deletions src/Momento.Sdk/Config/Retry/IRetryStrategy.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Grpc.Core;
using Microsoft.Extensions.Logging;

namespace Momento.Sdk.Config.Retry;
Expand All @@ -8,16 +9,14 @@ namespace Momento.Sdk.Config.Retry;
public interface IRetryStrategy
{
public ILoggerFactory? LoggerFactory { get; }

public IRetryStrategy WithLoggerFactory(ILoggerFactory loggerFactory);

/// <summary>
/// Calculates whether or not to retry a request based on the type of request and number of attempts.
/// </summary>
/// <param name="grpcResponse"></param>
/// <param name="grpcStatus"></param>
/// <param name="grpcRequest"></param>
/// <param name="attemptNumber"></param>
/// <returns>Returns number of milliseconds after which the request should be retried, or <see langword="null"/> if the request should not be retried.</returns>
public int? DetermineWhenToRetryRequest(IGrpcResponse grpcResponse, IGrpcRequest grpcRequest, int attemptNumber);

public IRetryStrategy WithLoggerFactory(ILoggerFactory loggerFactory);
public int? DetermineWhenToRetryRequest<TRequest>(Status grpcStatus, TRequest grpcRequest, int attemptNumber) where TRequest : class;
}
2 changes: 1 addition & 1 deletion src/Momento.Sdk/Internal/ControlGrpcManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private async Task<MiddlewareResponseState<TResponse>> WrapWithMiddleware<TReque
TRequest request,
CallOptions callOptions,
Func<TRequest, CallOptions, AsyncUnaryCall<TResponse>> continuation
)
) where TRequest : class where TResponse : class
{
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuationWithMiddlewareResponseState = (r, o) =>
{
Expand Down
10 changes: 6 additions & 4 deletions src/Momento.Sdk/Internal/DataGrpcManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Momento.Protos.CacheClient;
using Momento.Sdk.Config;
using Momento.Sdk.Config.Middleware;
using Momento.Sdk.Config.Retry;
using Momento.Sdk.Internal.Middleware;
using static System.Reflection.Assembly;
using static Grpc.Core.Interceptors.Interceptor;
Expand Down Expand Up @@ -170,7 +171,7 @@ private async Task<MiddlewareResponseState<TResponse>> WrapWithMiddleware<TReque
TRequest request,
CallOptions callOptions,
Func<TRequest, CallOptions, AsyncUnaryCall<TResponse>> continuation
)
) where TRequest : class where TResponse : class
{
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuationWithMiddlewareResponseState = (r, o) =>
{
Expand All @@ -191,7 +192,7 @@ Func<TRequest, CallOptions, AsyncUnaryCall<TResponse>> continuation
}
}

public class DataGrpcManager : IDisposable
internal class DataGrpcManager : IDisposable
{
private readonly GrpcChannel channel;

Expand Down Expand Up @@ -222,8 +223,9 @@ internal DataGrpcManager(IConfiguration config, string authToken, string host)

var middlewares = config.Middlewares.Concat(
new List<IMiddleware> {
new MaxConcurrentRequestsMiddleware(config.LoggerFactory, config.TransportStrategy.MaxConcurrentRequests),
new HeaderMiddleware(config.LoggerFactory, headers)
new RetryMiddleware(config.LoggerFactory, config.RetryStrategy),
new HeaderMiddleware(config.LoggerFactory, headers),
new MaxConcurrentRequestsMiddleware(config.LoggerFactory, config.TransportStrategy.MaxConcurrentRequests)
}
).ToList();

Expand Down
6 changes: 3 additions & 3 deletions src/Momento.Sdk/Internal/Middleware/HeaderMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace Momento.Sdk.Internal.Middleware
{
class Header
internal class Header
{
public const string AuthorizationKey = "Authorization";
public const string AgentKey = "Agent";
Expand All @@ -24,7 +24,7 @@ public Header(String name, String value)
}
}

class HeaderMiddleware : IMiddleware
internal class HeaderMiddleware : IMiddleware
{
public ILoggerFactory? LoggerFactory { get; }

Expand Down Expand Up @@ -54,7 +54,7 @@ public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TRes
TRequest request,
CallOptions callOptions,
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation
)
) where TRequest : class where TResponse : class
{
var callOptionsWithHeaders = callOptions;
if (callOptionsWithHeaders.Headers == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace Momento.Sdk.Internal.Middleware
// cases are unaffected. For the degenerate case (5000+ concurrent requests),
// this protects the server and actually seems to improve client-side p999
// latencies by quite a bit.
public class MaxConcurrentRequestsMiddleware : IMiddleware
internal class MaxConcurrentRequestsMiddleware : IMiddleware
{
public ILoggerFactory LoggerFactory { get; }
private readonly int _maxConcurrentRequests;
Expand All @@ -47,8 +47,6 @@ public MaxConcurrentRequestsMiddleware(ILoggerFactory loggerFactory, int maxConc
_semaphore = new FairAsyncSemaphore(maxConcurrentRequests);
}



public MaxConcurrentRequestsMiddleware WithLoggerFactory(ILoggerFactory loggerFactory)
{
return new(loggerFactory, _maxConcurrentRequests);
Expand All @@ -59,7 +57,11 @@ IMiddleware IMiddleware.WithLoggerFactory(ILoggerFactory loggerFactory)
return WithLoggerFactory(loggerFactory);
}

public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TResponse>(TRequest request, CallOptions callOptions, Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation)
public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TResponse>(
TRequest request,
CallOptions callOptions,
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation
) where TRequest : class where TResponse : class
{
await _semaphore.WaitOne();
try
Expand All @@ -74,8 +76,6 @@ public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TRes
_semaphore.Release();
}
}


}
}

89 changes: 89 additions & 0 deletions src/Momento.Sdk/Internal/Middleware/RetryMiddleware.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using System;
using System.Threading.Tasks;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Momento.Sdk.Config.Middleware;
using System.Linq;
using Momento.Protos.CacheClient;
using System.Collections.Generic;

namespace Momento.Sdk.Config.Retry
{
internal class RetryMiddleware : IMiddleware
{
public ILoggerFactory? LoggerFactory { get; }

private readonly ILogger _logger;
private readonly IRetryStrategy _retryStrategy;

public RetryMiddleware(ILoggerFactory loggerFactory, IRetryStrategy retryStrategy)
{
LoggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<RetryMiddleware>();
_retryStrategy = retryStrategy;
}

public RetryMiddleware WithLoggerFactory(ILoggerFactory loggerFactory)
{
return new(loggerFactory, _retryStrategy);
}

IMiddleware IMiddleware.WithLoggerFactory(ILoggerFactory loggerFactory)
{
return WithLoggerFactory(loggerFactory);
}

public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TResponse>(
TRequest request,
CallOptions callOptions,
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation
) where TRequest : class where TResponse : class
{
var foo = request.GetType();
MiddlewareResponseState<TResponse> nextState;
int attemptNumber = 0;
int? retryAfterMillis = 0;
do
{
var delay = retryAfterMillis ?? 0;
if (delay > 0)
{
await Task.Delay(delay);
}
attemptNumber++;
nextState = await continuation(request, callOptions);

// NOTE: we need a try/catch block here, because: (a) we cannot call
// `nextState.GetStatus()` until after we `await` the response, or
// it will throw an error. and (b) if the status is anything other
// than "ok", the `await` on the response will throw an exception.
try
{
await nextState.ResponseAsync;

if (attemptNumber > 1)
{
_logger.LogDebug($"Retry succeeded (attempt {attemptNumber})");
}
break;
}
catch (Exception)
{
var status = nextState.GetStatus();
_logger.LogDebug($"Request failed with status {status.StatusCode}, checking to see if we should retry; attempt Number: {attemptNumber}");
_logger.LogTrace($"Failed request status: {status}");
retryAfterMillis = _retryStrategy.DetermineWhenToRetryRequest(nextState.GetStatus(), request, attemptNumber);
}
}
while (retryAfterMillis != null);

return new MiddlewareResponseState<TResponse>(
ResponseAsync: nextState.ResponseAsync,
ResponseHeadersAsync: nextState.ResponseHeadersAsync,
GetStatus: nextState.GetStatus,
GetTrailers: nextState.GetTrailers
);
}
}
}

Loading

0 comments on commit 3ef2590

Please sign in to comment.