diff --git a/src/Momento.Sdk/Config/Middleware/IMiddleware.cs b/src/Momento.Sdk/Config/Middleware/IMiddleware.cs index 7c39c0ce..f4fc1827 100644 --- a/src/Momento.Sdk/Config/Middleware/IMiddleware.cs +++ b/src/Momento.Sdk/Config/Middleware/IMiddleware.cs @@ -54,6 +54,6 @@ public Task> WrapRequest TRequest request, CallOptions callOptions, Func>> continuation - ); + ) where TRequest : class where TResponse: class; } diff --git a/src/Momento.Sdk/Config/Middleware/LoggingMiddleware.cs b/src/Momento.Sdk/Config/Middleware/LoggingMiddleware.cs index dc69f64e..dd7c9806 100644 --- a/src/Momento.Sdk/Config/Middleware/LoggingMiddleware.cs +++ b/src/Momento.Sdk/Config/Middleware/LoggingMiddleware.cs @@ -36,14 +36,14 @@ public async Task> WrapRequest>> continuation - ) + ) where TRequest : class where TResponse : class { - _logger.LogDebug("Executing request of type: {}", request?.GetType()); + _logger.LogDebug("Executing request of type: {}", request.GetType()); var nextState = await continuation(request, callOptions); return new MiddlewareResponseState( ResponseAsync: nextState.ResponseAsync.ContinueWith(r => { - _logger.LogDebug("Got response for request of type: {}", request?.GetType()); + _logger.LogDebug("Got response for request of type: {}", request.GetType()); return r.Result; }), ResponseHeadersAsync: nextState.ResponseHeadersAsync, diff --git a/src/Momento.Sdk/Config/Middleware/PassThroughMiddleware.cs b/src/Momento.Sdk/Config/Middleware/PassThroughMiddleware.cs index 22f63d37..17972798 100644 --- a/src/Momento.Sdk/Config/Middleware/PassThroughMiddleware.cs +++ b/src/Momento.Sdk/Config/Middleware/PassThroughMiddleware.cs @@ -26,7 +26,11 @@ IMiddleware IMiddleware.WithLoggerFactory(ILoggerFactory loggerFactory) return WithLoggerFactory(loggerFactory); } - public Task> WrapRequest(TRequest request, CallOptions callOptions, Func>> continuation) + public Task> WrapRequest( + TRequest request, + CallOptions callOptions, + Func>> continuation + ) where TRequest : class where TResponse : class { return continuation(request, callOptions); } diff --git a/src/Momento.Sdk/Config/Retry/FixedCountRetryStrategy.cs b/src/Momento.Sdk/Config/Retry/FixedCountRetryStrategy.cs index dba0d8ac..b604efad 100644 --- a/src/Momento.Sdk/Config/Retry/FixedCountRetryStrategy.cs +++ b/src/Momento.Sdk/Config/Retry/FixedCountRetryStrategy.cs @@ -1,22 +1,31 @@ +using System.Net.NetworkInformation; +using Grpc.Core; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Momento.Sdk.Internal.Retry; namespace Momento.Sdk.Config.Retry; public class FixedCountRetryStrategy : IRetryStrategy { public ILoggerFactory? LoggerFactory { get; } + + private ILogger _logger; + private readonly IRetryEligibilityStrategy _eligibilityStrategy; + public int MaxAttempts { get; } - //FixedCountRetryStrategy(retryableStatusCodes = DEFAULT_RETRYABLE_STATUS_CODES, maxAttempts = 3), - public FixedCountRetryStrategy(int maxAttempts, ILoggerFactory? loggerFactory = null) + public FixedCountRetryStrategy(int maxAttempts, IRetryEligibilityStrategy? eligibilityStrategy = null, ILoggerFactory? loggerFactory = null) { LoggerFactory = loggerFactory; + _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger(); + _eligibilityStrategy = eligibilityStrategy ?? new DefaultRetryEligibilityStrategy(loggerFactory); MaxAttempts = maxAttempts; } public FixedCountRetryStrategy WithLoggerFactory(ILoggerFactory loggerFactory) { - return new(MaxAttempts, loggerFactory); + return new(MaxAttempts, _eligibilityStrategy.WithLoggerFactory(loggerFactory), loggerFactory); } IRetryStrategy IRetryStrategy.WithLoggerFactory(ILoggerFactory loggerFactory) @@ -26,15 +35,23 @@ IRetryStrategy IRetryStrategy.WithLoggerFactory(ILoggerFactory loggerFactory) public FixedCountRetryStrategy WithMaxAttempts(int maxAttempts) { - return new(maxAttempts, LoggerFactory); + return new(maxAttempts, _eligibilityStrategy, LoggerFactory); } - public int? DetermineWhenToRetryRequest(IGrpcResponse grpcResponse, IGrpcRequest grpcRequest, int attemptNumber) + public int? DetermineWhenToRetryRequest(Status grpcStatus, TRequest grpcRequest, int attemptNumber) where TRequest : class { + _logger.LogDebug($"Determining whether request is eligible for retry; status code: {grpcStatus.StatusCode}, request type: {grpcRequest.GetType()}, attemptNumber: {attemptNumber}, maxAttempts: {MaxAttempts}"); + if (! _eligibilityStrategy.IsEligibleForRetry(grpcStatus, grpcRequest)) + { + return null; + } if (attemptNumber > MaxAttempts) { + _logger.LogDebug($"Exceeded max retry count ({MaxAttempts})"); return null; } + _logger.LogDebug($"Request is eligible for retry (attempt {attemptNumber} of {MaxAttempts}, retrying immediately."); return 0; } + } diff --git a/src/Momento.Sdk/Config/Retry/IGrpcRequest.cs b/src/Momento.Sdk/Config/Retry/IGrpcRequest.cs deleted file mode 100644 index 9f955c2e..00000000 --- a/src/Momento.Sdk/Config/Retry/IGrpcRequest.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace Momento.Sdk.Config.Retry; - -public interface IGrpcRequest -{ - -} diff --git a/src/Momento.Sdk/Config/Retry/IGrpcResponse.cs b/src/Momento.Sdk/Config/Retry/IGrpcResponse.cs deleted file mode 100644 index b070df77..00000000 --- a/src/Momento.Sdk/Config/Retry/IGrpcResponse.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace Momento.Sdk.Config.Retry; - -public interface IGrpcResponse -{ - -} diff --git a/src/Momento.Sdk/Config/Retry/IRetryEligibilityStrategy.cs b/src/Momento.Sdk/Config/Retry/IRetryEligibilityStrategy.cs new file mode 100644 index 00000000..c0ebb211 --- /dev/null +++ b/src/Momento.Sdk/Config/Retry/IRetryEligibilityStrategy.cs @@ -0,0 +1,14 @@ +using System; +using Grpc.Core; +using Microsoft.Extensions.Logging; + +namespace Momento.Sdk.Config.Retry +{ + public interface IRetryEligibilityStrategy + { + public ILoggerFactory? LoggerFactory { get; } + public IRetryEligibilityStrategy WithLoggerFactory(ILoggerFactory loggerFactory); + public bool IsEligibleForRetry(Status status, TRequest request) where TRequest : class; + } +} + diff --git a/src/Momento.Sdk/Config/Retry/IRetryStrategy.cs b/src/Momento.Sdk/Config/Retry/IRetryStrategy.cs index efafd7cc..cbf3fa10 100644 --- a/src/Momento.Sdk/Config/Retry/IRetryStrategy.cs +++ b/src/Momento.Sdk/Config/Retry/IRetryStrategy.cs @@ -1,3 +1,4 @@ +using Grpc.Core; using Microsoft.Extensions.Logging; namespace Momento.Sdk.Config.Retry; @@ -8,16 +9,14 @@ namespace Momento.Sdk.Config.Retry; public interface IRetryStrategy { public ILoggerFactory? LoggerFactory { get; } - + public IRetryStrategy WithLoggerFactory(ILoggerFactory loggerFactory); /// /// Calculates whether or not to retry a request based on the type of request and number of attempts. /// - /// + /// /// /// /// Returns number of milliseconds after which the request should be retried, or if the request should not be retried. - public int? DetermineWhenToRetryRequest(IGrpcResponse grpcResponse, IGrpcRequest grpcRequest, int attemptNumber); - - public IRetryStrategy WithLoggerFactory(ILoggerFactory loggerFactory); + public int? DetermineWhenToRetryRequest(Status grpcStatus, TRequest grpcRequest, int attemptNumber) where TRequest : class; } diff --git a/src/Momento.Sdk/Internal/DataGrpcManager.cs b/src/Momento.Sdk/Internal/DataGrpcManager.cs index d7f9085a..9c3c7f2e 100644 --- a/src/Momento.Sdk/Internal/DataGrpcManager.cs +++ b/src/Momento.Sdk/Internal/DataGrpcManager.cs @@ -10,6 +10,7 @@ using Momento.Protos.CacheClient; using Momento.Sdk.Config; using Momento.Sdk.Config.Middleware; +using Momento.Sdk.Config.Retry; using Momento.Sdk.Internal.Middleware; using static System.Reflection.Assembly; using static Grpc.Core.Interceptors.Interceptor; @@ -167,7 +168,7 @@ public async Task<_ListLengthResponse> ListLengthAsync(_ListLengthRequest reques } } -public class DataGrpcManager : IDisposable +internal class DataGrpcManager : IDisposable { private readonly GrpcChannel channel; @@ -198,8 +199,9 @@ internal DataGrpcManager(IConfiguration config, string authToken, string host) var middlewares = config.Middlewares.Concat( new List { - new MaxConcurrentRequestsMiddleware(config.LoggerFactory, config.TransportStrategy.MaxConcurrentRequests), - new HeaderMiddleware(config.LoggerFactory, headers) + new RetryMiddleware(config.LoggerFactory, config.RetryStrategy), + new HeaderMiddleware(config.LoggerFactory, headers), + new MaxConcurrentRequestsMiddleware(config.LoggerFactory, config.TransportStrategy.MaxConcurrentRequests) } ).ToList(); diff --git a/src/Momento.Sdk/Internal/Middleware/HeaderMiddleware.cs b/src/Momento.Sdk/Internal/Middleware/HeaderMiddleware.cs index 4b68ca77..ec61cd21 100644 --- a/src/Momento.Sdk/Internal/Middleware/HeaderMiddleware.cs +++ b/src/Momento.Sdk/Internal/Middleware/HeaderMiddleware.cs @@ -9,7 +9,7 @@ namespace Momento.Sdk.Internal.Middleware { - class Header + internal class Header { public const string AuthorizationKey = "Authorization"; public const string AgentKey = "Agent"; @@ -24,7 +24,7 @@ public Header(String name, String value) } } - class HeaderMiddleware : IMiddleware + internal class HeaderMiddleware : IMiddleware { public ILoggerFactory? LoggerFactory { get; } @@ -54,7 +54,7 @@ public async Task> WrapRequest>> continuation - ) + ) where TRequest : class where TResponse : class { var callOptionsWithHeaders = callOptions; if (callOptionsWithHeaders.Headers == null) diff --git a/src/Momento.Sdk/Internal/Middleware/MaxConcurrentRequestsMiddleware.cs b/src/Momento.Sdk/Internal/Middleware/MaxConcurrentRequestsMiddleware.cs index 0e82d67f..224bbe70 100644 --- a/src/Momento.Sdk/Internal/Middleware/MaxConcurrentRequestsMiddleware.cs +++ b/src/Momento.Sdk/Internal/Middleware/MaxConcurrentRequestsMiddleware.cs @@ -34,7 +34,7 @@ namespace Momento.Sdk.Internal.Middleware // cases are unaffected. For the degenerate case (5000+ concurrent requests), // this protects the server and actually seems to improve client-side p999 // latencies by quite a bit. - public class MaxConcurrentRequestsMiddleware : IMiddleware + internal class MaxConcurrentRequestsMiddleware : IMiddleware { public ILoggerFactory LoggerFactory { get; } private readonly int _maxConcurrentRequests; @@ -47,8 +47,6 @@ public MaxConcurrentRequestsMiddleware(ILoggerFactory loggerFactory, int maxConc _semaphore = new FairAsyncSemaphore(maxConcurrentRequests); } - - public MaxConcurrentRequestsMiddleware WithLoggerFactory(ILoggerFactory loggerFactory) { return new(loggerFactory, _maxConcurrentRequests); @@ -59,7 +57,11 @@ IMiddleware IMiddleware.WithLoggerFactory(ILoggerFactory loggerFactory) return WithLoggerFactory(loggerFactory); } - public async Task> WrapRequest(TRequest request, CallOptions callOptions, Func>> continuation) + public async Task> WrapRequest( + TRequest request, + CallOptions callOptions, + Func>> continuation + ) where TRequest : class where TResponse : class { await _semaphore.WaitOne(); try @@ -74,8 +76,6 @@ public async Task> WrapRequest> WrapRequest> continuation - ) + ) where TRequest : class where TResponse : class { Func>> continuationWithMiddlewareResponseState = (r, o) => { diff --git a/src/Momento.Sdk/Internal/Middleware/RetryMiddleware.cs b/src/Momento.Sdk/Internal/Middleware/RetryMiddleware.cs new file mode 100644 index 00000000..59f3cbeb --- /dev/null +++ b/src/Momento.Sdk/Internal/Middleware/RetryMiddleware.cs @@ -0,0 +1,89 @@ +using System; +using System.Threading.Tasks; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using Momento.Sdk.Config.Middleware; +using System.Linq; +using Momento.Protos.CacheClient; +using System.Collections.Generic; + +namespace Momento.Sdk.Config.Retry +{ + internal class RetryMiddleware : IMiddleware + { + public ILoggerFactory? LoggerFactory { get; } + + private readonly ILogger _logger; + private readonly IRetryStrategy _retryStrategy; + + public RetryMiddleware(ILoggerFactory loggerFactory, IRetryStrategy retryStrategy) + { + LoggerFactory = loggerFactory; + _logger = loggerFactory.CreateLogger(); + _retryStrategy = retryStrategy; + } + + public RetryMiddleware WithLoggerFactory(ILoggerFactory loggerFactory) + { + return new(loggerFactory, _retryStrategy); + } + + IMiddleware IMiddleware.WithLoggerFactory(ILoggerFactory loggerFactory) + { + return WithLoggerFactory(loggerFactory); + } + + public async Task> WrapRequest( + TRequest request, + CallOptions callOptions, + Func>> continuation + ) where TRequest : class where TResponse : class + { + var foo = request.GetType(); + MiddlewareResponseState nextState; + int attemptNumber = 0; + int? retryAfterMillis = 0; + do + { + var delay = retryAfterMillis ?? 0; + if (delay > 0) + { + await Task.Delay(delay); + } + attemptNumber++; + nextState = await continuation(request, callOptions); + + // NOTE: we need a try/catch block here, because: (a) we cannot call + // `nextState.GetStatus()` until after we `await` the response, or + // it will throw an error. and (b) if the status is anything other + // than "ok", the `await` on the response will throw an exception. + try + { + await nextState.ResponseAsync; + + if (attemptNumber > 1) + { + _logger.LogDebug($"Retry succeeded (attempt {attemptNumber})"); + } + break; + } + catch (Exception) + { + var status = nextState.GetStatus(); + _logger.LogDebug($"Request failed with status {status.StatusCode}, checking to see if we should retry; attempt Number: {attemptNumber}"); + _logger.LogTrace($"Failed request status: {status}"); + retryAfterMillis = _retryStrategy.DetermineWhenToRetryRequest(nextState.GetStatus(), request, attemptNumber); + } + } + while (retryAfterMillis != null); + + return new MiddlewareResponseState( + ResponseAsync: nextState.ResponseAsync, + ResponseHeadersAsync: nextState.ResponseHeadersAsync, + GetStatus: nextState.GetStatus, + GetTrailers: nextState.GetTrailers + ); + } + } +} + diff --git a/src/Momento.Sdk/Internal/Retry/DefaultRetryEligibilityStrategy.cs b/src/Momento.Sdk/Internal/Retry/DefaultRetryEligibilityStrategy.cs new file mode 100644 index 00000000..3a48bacc --- /dev/null +++ b/src/Momento.Sdk/Internal/Retry/DefaultRetryEligibilityStrategy.cs @@ -0,0 +1,78 @@ +using System; +using System.Collections.Generic; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Momento.Protos.CacheClient; +using Momento.Sdk.Config.Retry; + +namespace Momento.Sdk.Internal.Retry +{ + public class DefaultRetryEligibilityStrategy : IRetryEligibilityStrategy + { + private readonly HashSet _retryableStatusCodes = new HashSet + { + //StatusCode.OK, + //StatusCode.Cancelled, + //StatusCode.Unknown, + //StatusCode.InvalidArgument, + //StatusCode.DeadlineExceeded, + //StatusCode.NotFound, + //StatusCode.AlreadyExists, + //StatusCode.PermissionDenied, + //StatusCode.Unauthenticated, + //StatusCode.ResourceExhausted, + //StatusCode.FailedPrecondition, + //StatusCode.Aborted, + //StatusCode.OutOfRange, + //StatusCode.Unimplemented, + StatusCode.Internal, + StatusCode.Unavailable, + //StatusCode.DataLoss, + }; + + private readonly HashSet _retryableRequestTypes = new HashSet + { + typeof(_SetRequest), + typeof(_GetRequest) + }; + + public ILoggerFactory? LoggerFactory { get; } + + private readonly ILogger _logger; + + public DefaultRetryEligibilityStrategy(ILoggerFactory? loggerFactory) + { + _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger(); + } + + public DefaultRetryEligibilityStrategy WithLoggerFactory(ILoggerFactory loggerFactory) + { + return new(loggerFactory); + } + + IRetryEligibilityStrategy IRetryEligibilityStrategy.WithLoggerFactory(ILoggerFactory loggerFactory) + { + return WithLoggerFactory(loggerFactory); + } + + public bool IsEligibleForRetry(Status status, TRequest request) + where TRequest : class + { + if (!_retryableStatusCodes.Contains(status.StatusCode)) + { + _logger.LogDebug("Response with status code {} is not retryable.", status.StatusCode); + return false; + } + + if (!_retryableRequestTypes.Contains(request.GetType())) + { + _logger.LogDebug("Request with type {} is not retryable.", request.GetType()); + return false; + } + + return true; + } + } +} + diff --git a/src/Momento.Sdk/Internal/ScsDataClient.cs b/src/Momento.Sdk/Internal/ScsDataClient.cs index 09e0ca88..60f4938a 100644 --- a/src/Momento.Sdk/Internal/ScsDataClient.cs +++ b/src/Momento.Sdk/Internal/ScsDataClient.cs @@ -15,10 +15,10 @@ namespace Momento.Sdk.Internal; public class ScsDataClientBase : IDisposable { - protected readonly DataGrpcManager grpcManager; - protected readonly uint defaultTtlSeconds; - protected readonly uint dataClientOperationTimeoutMilliseconds; - protected readonly ILogger _logger; + internal readonly DataGrpcManager grpcManager; + private readonly uint defaultTtlSeconds; + private readonly uint dataClientOperationTimeoutMilliseconds; + private readonly ILogger _logger; protected readonly CacheExceptionMapper _exceptionMapper; public ScsDataClientBase(IConfiguration config, string authToken, string endpoint, uint defaultTtlSeconds) diff --git a/src/Momento.Sdk/Momento.Sdk.csproj b/src/Momento.Sdk/Momento.Sdk.csproj index 0a53e658..2f29c95a 100644 --- a/src/Momento.Sdk/Momento.Sdk.csproj +++ b/src/Momento.Sdk/Momento.Sdk.csproj @@ -39,9 +39,11 @@ + +