Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add basic retry middleware #283

Merged
merged 1 commit into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Momento.Sdk/Config/Middleware/IMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,6 @@ public Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TResponse>
TRequest request,
CallOptions callOptions,
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation
);
) where TRequest : class where TResponse: class;

}
6 changes: 3 additions & 3 deletions src/Momento.Sdk/Config/Middleware/LoggingMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TRes
TRequest request,
CallOptions callOptions,
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation
)
) where TRequest : class where TResponse : class
{
_logger.LogDebug("Executing request of type: {}", request?.GetType());
_logger.LogDebug("Executing request of type: {}", request.GetType());
var nextState = await continuation(request, callOptions);
return new MiddlewareResponseState<TResponse>(
ResponseAsync: nextState.ResponseAsync.ContinueWith(r =>
{
_logger.LogDebug("Got response for request of type: {}", request?.GetType());
_logger.LogDebug("Got response for request of type: {}", request.GetType());
return r.Result;
}),
ResponseHeadersAsync: nextState.ResponseHeadersAsync,
Expand Down
6 changes: 5 additions & 1 deletion src/Momento.Sdk/Config/Middleware/PassThroughMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ IMiddleware IMiddleware.WithLoggerFactory(ILoggerFactory loggerFactory)
return WithLoggerFactory(loggerFactory);
}

public Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TResponse>(TRequest request, CallOptions callOptions, Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation)
public Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TResponse>(
TRequest request,
CallOptions callOptions,
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation
) where TRequest : class where TResponse : class
{
return continuation(request, callOptions);
}
Expand Down
27 changes: 22 additions & 5 deletions src/Momento.Sdk/Config/Retry/FixedCountRetryStrategy.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
using System.Net.NetworkInformation;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Momento.Sdk.Internal.Retry;

namespace Momento.Sdk.Config.Retry;

public class FixedCountRetryStrategy : IRetryStrategy
{
public ILoggerFactory? LoggerFactory { get; }

private ILogger _logger;
private readonly IRetryEligibilityStrategy _eligibilityStrategy;

public int MaxAttempts { get; }

//FixedCountRetryStrategy(retryableStatusCodes = DEFAULT_RETRYABLE_STATUS_CODES, maxAttempts = 3),
public FixedCountRetryStrategy(int maxAttempts, ILoggerFactory? loggerFactory = null)
public FixedCountRetryStrategy(int maxAttempts, IRetryEligibilityStrategy? eligibilityStrategy = null, ILoggerFactory? loggerFactory = null)
{
LoggerFactory = loggerFactory;
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<FixedCountRetryStrategy>();
_eligibilityStrategy = eligibilityStrategy ?? new DefaultRetryEligibilityStrategy(loggerFactory);
MaxAttempts = maxAttempts;
}

public FixedCountRetryStrategy WithLoggerFactory(ILoggerFactory loggerFactory)
{
return new(MaxAttempts, loggerFactory);
return new(MaxAttempts, _eligibilityStrategy.WithLoggerFactory(loggerFactory), loggerFactory);
}

IRetryStrategy IRetryStrategy.WithLoggerFactory(ILoggerFactory loggerFactory)
Expand All @@ -26,15 +35,23 @@ IRetryStrategy IRetryStrategy.WithLoggerFactory(ILoggerFactory loggerFactory)

public FixedCountRetryStrategy WithMaxAttempts(int maxAttempts)
{
return new(maxAttempts, LoggerFactory);
return new(maxAttempts, _eligibilityStrategy, LoggerFactory);
}

public int? DetermineWhenToRetryRequest(IGrpcResponse grpcResponse, IGrpcRequest grpcRequest, int attemptNumber)
public int? DetermineWhenToRetryRequest<TRequest>(Status grpcStatus, TRequest grpcRequest, int attemptNumber) where TRequest : class
{
_logger.LogDebug($"Determining whether request is eligible for retry; status code: {grpcStatus.StatusCode}, request type: {grpcRequest.GetType()}, attemptNumber: {attemptNumber}, maxAttempts: {MaxAttempts}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and elsewhere I notice you're charging the users' runtimes string interpolation allocations unconditionally. I think you should rewrite all these LogDebugs like:

Suggested change
_logger.LogDebug($"Determining whether request is eligible for retry; status code: {grpcStatus.StatusCode}, request type: {grpcRequest.GetType()}, attemptNumber: {attemptNumber}, maxAttempts: {MaxAttempts}");
_logger.LogDebug(
"Determining whether request is eligible for retry; status code: {StatusCode}, request type: {RequestType}, attemptNumber: {AttemptNumber}, maxAttempts: {MaxAttempts}",
grpcStatus.StatusCode,
grpcRequest.GetType(),
attemptNumber,
MaxAttempts
);

see for more examples and detail: https://learn.microsoft.com/en-us/dotnet/api/microsoft.extensions.logging.loggerextensions.logdebug?view=dotnet-plat-ext-6.0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call, agreed, it was my intent to write them that way but I always forget. thx

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kotlin-structured-logging is an addictive library. It's so nice to not have to care about anything w.r.t. logging invocation cost inside of those logging blocks. I catch myself writing logs like this too so I look out for them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i hope we will have a rust-structured-logging (or maybe one already exists in the rust universe)

if (! _eligibilityStrategy.IsEligibleForRetry(grpcStatus, grpcRequest))
{
return null;
}
if (attemptNumber > MaxAttempts)
{
_logger.LogDebug($"Exceeded max retry count ({MaxAttempts})");
return null;
}
_logger.LogDebug($"Request is eligible for retry (attempt {attemptNumber} of {MaxAttempts}, retrying immediately.");
return 0;
}

}
6 changes: 0 additions & 6 deletions src/Momento.Sdk/Config/Retry/IGrpcRequest.cs

This file was deleted.

6 changes: 0 additions & 6 deletions src/Momento.Sdk/Config/Retry/IGrpcResponse.cs

This file was deleted.

14 changes: 14 additions & 0 deletions src/Momento.Sdk/Config/Retry/IRetryEligibilityStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using Grpc.Core;
using Microsoft.Extensions.Logging;

namespace Momento.Sdk.Config.Retry
{
public interface IRetryEligibilityStrategy
{
public ILoggerFactory? LoggerFactory { get; }
public IRetryEligibilityStrategy WithLoggerFactory(ILoggerFactory loggerFactory);
public bool IsEligibleForRetry<TRequest>(Status status, TRequest request) where TRequest : class;
}
}

9 changes: 4 additions & 5 deletions src/Momento.Sdk/Config/Retry/IRetryStrategy.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Grpc.Core;
using Microsoft.Extensions.Logging;

namespace Momento.Sdk.Config.Retry;
Expand All @@ -8,16 +9,14 @@ namespace Momento.Sdk.Config.Retry;
public interface IRetryStrategy
{
public ILoggerFactory? LoggerFactory { get; }

public IRetryStrategy WithLoggerFactory(ILoggerFactory loggerFactory);

/// <summary>
/// Calculates whether or not to retry a request based on the type of request and number of attempts.
/// </summary>
/// <param name="grpcResponse"></param>
/// <param name="grpcStatus"></param>
/// <param name="grpcRequest"></param>
/// <param name="attemptNumber"></param>
/// <returns>Returns number of milliseconds after which the request should be retried, or <see langword="null"/> if the request should not be retried.</returns>
public int? DetermineWhenToRetryRequest(IGrpcResponse grpcResponse, IGrpcRequest grpcRequest, int attemptNumber);

public IRetryStrategy WithLoggerFactory(ILoggerFactory loggerFactory);
public int? DetermineWhenToRetryRequest<TRequest>(Status grpcStatus, TRequest grpcRequest, int attemptNumber) where TRequest : class;
}
8 changes: 5 additions & 3 deletions src/Momento.Sdk/Internal/DataGrpcManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Momento.Protos.CacheClient;
using Momento.Sdk.Config;
using Momento.Sdk.Config.Middleware;
using Momento.Sdk.Config.Retry;
using Momento.Sdk.Internal.Middleware;
using static System.Reflection.Assembly;
using static Grpc.Core.Interceptors.Interceptor;
Expand Down Expand Up @@ -167,7 +168,7 @@ public async Task<_ListLengthResponse> ListLengthAsync(_ListLengthRequest reques
}
}

public class DataGrpcManager : IDisposable
internal class DataGrpcManager : IDisposable
{
private readonly GrpcChannel channel;

Expand Down Expand Up @@ -198,8 +199,9 @@ internal DataGrpcManager(IConfiguration config, string authToken, string host)

var middlewares = config.Middlewares.Concat(
new List<IMiddleware> {
new MaxConcurrentRequestsMiddleware(config.LoggerFactory, config.TransportStrategy.MaxConcurrentRequests),
new HeaderMiddleware(config.LoggerFactory, headers)
new RetryMiddleware(config.LoggerFactory, config.RetryStrategy),
new HeaderMiddleware(config.LoggerFactory, headers),
new MaxConcurrentRequestsMiddleware(config.LoggerFactory, config.TransportStrategy.MaxConcurrentRequests)
}
).ToList();

Expand Down
6 changes: 3 additions & 3 deletions src/Momento.Sdk/Internal/Middleware/HeaderMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace Momento.Sdk.Internal.Middleware
{
class Header
internal class Header
{
public const string AuthorizationKey = "Authorization";
public const string AgentKey = "Agent";
Expand All @@ -24,7 +24,7 @@ public Header(String name, String value)
}
}

class HeaderMiddleware : IMiddleware
internal class HeaderMiddleware : IMiddleware
{
public ILoggerFactory? LoggerFactory { get; }

Expand Down Expand Up @@ -54,7 +54,7 @@ public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TRes
TRequest request,
CallOptions callOptions,
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation
)
) where TRequest : class where TResponse : class
{
var callOptionsWithHeaders = callOptions;
if (callOptionsWithHeaders.Headers == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace Momento.Sdk.Internal.Middleware
// cases are unaffected. For the degenerate case (5000+ concurrent requests),
// this protects the server and actually seems to improve client-side p999
// latencies by quite a bit.
public class MaxConcurrentRequestsMiddleware : IMiddleware
internal class MaxConcurrentRequestsMiddleware : IMiddleware
{
public ILoggerFactory LoggerFactory { get; }
private readonly int _maxConcurrentRequests;
Expand All @@ -47,8 +47,6 @@ public MaxConcurrentRequestsMiddleware(ILoggerFactory loggerFactory, int maxConc
_semaphore = new FairAsyncSemaphore(maxConcurrentRequests);
}



public MaxConcurrentRequestsMiddleware WithLoggerFactory(ILoggerFactory loggerFactory)
{
return new(loggerFactory, _maxConcurrentRequests);
Expand All @@ -59,7 +57,11 @@ IMiddleware IMiddleware.WithLoggerFactory(ILoggerFactory loggerFactory)
return WithLoggerFactory(loggerFactory);
}

public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TResponse>(TRequest request, CallOptions callOptions, Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation)
public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TResponse>(
TRequest request,
CallOptions callOptions,
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation
) where TRequest : class where TResponse : class
{
await _semaphore.WaitOne();
try
Expand All @@ -74,8 +76,6 @@ public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TRes
_semaphore.Release();
}
}


}
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public static async Task<MiddlewareResponseState<TResponse>> WrapRequest<TReques
TRequest request,
CallOptions callOptions,
Func<TRequest, CallOptions, AsyncUnaryCall<TResponse>> continuation
)
) where TRequest : class where TResponse : class
{
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuationWithMiddlewareResponseState = (r, o) =>
{
Expand Down
89 changes: 89 additions & 0 deletions src/Momento.Sdk/Internal/Middleware/RetryMiddleware.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using System;
using System.Threading.Tasks;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Momento.Sdk.Config.Middleware;
using System.Linq;
using Momento.Protos.CacheClient;
using System.Collections.Generic;

namespace Momento.Sdk.Config.Retry
{
internal class RetryMiddleware : IMiddleware
{
public ILoggerFactory? LoggerFactory { get; }

private readonly ILogger _logger;
private readonly IRetryStrategy _retryStrategy;

public RetryMiddleware(ILoggerFactory loggerFactory, IRetryStrategy retryStrategy)
{
LoggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<RetryMiddleware>();
_retryStrategy = retryStrategy;
}

public RetryMiddleware WithLoggerFactory(ILoggerFactory loggerFactory)
{
return new(loggerFactory, _retryStrategy);
}

IMiddleware IMiddleware.WithLoggerFactory(ILoggerFactory loggerFactory)
{
return WithLoggerFactory(loggerFactory);
}

public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TResponse>(
TRequest request,
CallOptions callOptions,
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation
) where TRequest : class where TResponse : class
{
var foo = request.GetType();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be removed

MiddlewareResponseState<TResponse> nextState;
int attemptNumber = 0;
int? retryAfterMillis = 0;
do
{
var delay = retryAfterMillis ?? 0;
if (delay > 0)
{
await Task.Delay(delay);
}
attemptNumber++;
nextState = await continuation(request, callOptions);
Comment on lines +53 to +54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I see how this will disambiguate between the likes of ListPushBack() or DictionaryIncrement() (non-idempotent methods) and Get() or Set() (idempotent methods). What's your plan there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the code in DefaultRetryEligibilityStrategy, which is called as part of the FixedCountRetryStrategy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I completely missed _retryableRequestTypes, thanks.


// NOTE: we need a try/catch block here, because: (a) we cannot call
// `nextState.GetStatus()` until after we `await` the response, or
// it will throw an error. and (b) if the status is anything other
// than "ok", the `await` on the response will throw an exception.
try
{
await nextState.ResponseAsync;

if (attemptNumber > 1)
{
_logger.LogDebug($"Retry succeeded (attempt {attemptNumber})");
}
break;
}
catch (Exception)
{
var status = nextState.GetStatus();
_logger.LogDebug($"Request failed with status {status.StatusCode}, checking to see if we should retry; attempt Number: {attemptNumber}");
_logger.LogTrace($"Failed request status: {status}");
retryAfterMillis = _retryStrategy.DetermineWhenToRetryRequest(nextState.GetStatus(), request, attemptNumber);
}
}
while (retryAfterMillis != null);

return new MiddlewareResponseState<TResponse>(
ResponseAsync: nextState.ResponseAsync,
ResponseHeadersAsync: nextState.ResponseHeadersAsync,
GetStatus: nextState.GetStatus,
GetTrailers: nextState.GetTrailers
);
}
}
}

Loading