From ba0ef9544b0f4aae8c52967c12e5c964f7de42f7 Mon Sep 17 00:00:00 2001 From: Pratik Agarwal Date: Wed, 2 Aug 2023 12:08:03 -0700 Subject: [PATCH] fix: fix configuration settings for maxConn and gRPC channels --- src/Momento.Sdk/CacheClient.cs | 16 +++++++++++++++- src/Momento.Sdk/Config/Configurations.cs | 4 ++-- .../Config/Transport/ITransportStrategy.cs | 2 ++ .../Internal/Middleware/FairAsyncSemaphore.cs | 13 ++++++++++++- .../MaxConcurrentRequestsMiddleware.cs | 9 +++++++++ .../CacheEagerConnectionTest.cs | 16 ++++++++++++++++ tests/Unit/Momento.Sdk.Tests/ConfigTest.cs | 2 ++ 7 files changed, 58 insertions(+), 4 deletions(-) diff --git a/src/Momento.Sdk/CacheClient.cs b/src/Momento.Sdk/CacheClient.cs index 049694e4..08171e11 100644 --- a/src/Momento.Sdk/CacheClient.cs +++ b/src/Momento.Sdk/CacheClient.cs @@ -6,6 +6,7 @@ using Microsoft.Extensions.Logging; using Momento.Sdk.Auth; using Momento.Sdk.Config; +using Momento.Sdk.Config.Transport; using Momento.Sdk.Exceptions; using Momento.Sdk.Internal; using Momento.Sdk.Internal.ExtensionMethods; @@ -52,7 +53,20 @@ public CacheClient(IConfiguration config, ICredentialProvider authProvider, Time Utils.ArgumentStrictlyPositive(defaultTtl, "defaultTtl"); this.controlClient = new(_loggerFactory, authProvider.AuthToken, authProvider.ControlEndpoint); this.dataClients = new List(); - for (var i = 1; i <= config.TransportStrategy.GrpcConfig.MinNumGrpcChannels; i++) + int minNumGrpcChannels = this.config.TransportStrategy.GrpcConfig.MinNumGrpcChannels; + int currentMaxConcurrentRequests = this.config.TransportStrategy.MaxConcurrentRequests; + if (minNumGrpcChannels > 1) + { + + int newMaxConcurrentRequests = (currentMaxConcurrentRequests / minNumGrpcChannels); + ITransportStrategy transportStrategy = this.config.TransportStrategy; + transportStrategy = transportStrategy.WithMaxConcurrentRequests(newMaxConcurrentRequests); + this.config = this.config.WithTransportStrategy(transportStrategy); + _logger.LogWarning("Overriding maxConcurrentRequests for each gRPC channel to {}." + + " Min gRPC channels: {}, total maxConcurrentRequests: {}", newMaxConcurrentRequests, + minNumGrpcChannels, currentMaxConcurrentRequests); + } + for (var i = 1; i <= minNumGrpcChannels; i++) { this.dataClients.Add(new(config, authProvider.AuthToken, authProvider.CacheEndpoint, defaultTtl)); } diff --git a/src/Momento.Sdk/Config/Configurations.cs b/src/Momento.Sdk/Config/Configurations.cs index 65b70fa7..2d7e8a08 100644 --- a/src/Momento.Sdk/Config/Configurations.cs +++ b/src/Momento.Sdk/Config/Configurations.cs @@ -54,7 +54,7 @@ public static IConfiguration V1(ILoggerFactory? loggerFactory = null) IRetryStrategy retryStrategy = new FixedCountRetryStrategy(finalLoggerFactory, maxAttempts: 3); ITransportStrategy transportStrategy = new StaticTransportStrategy( loggerFactory: finalLoggerFactory, - maxConcurrentRequests: 100, + maxConcurrentRequests: 200, // max of 2 connections https://github.com/momentohq/client-sdk-dotnet/issues/460 grpcConfig: new StaticGrpcConfiguration(deadline: TimeSpan.FromMilliseconds(15000)) ); return new Laptop(finalLoggerFactory, retryStrategy, transportStrategy); @@ -107,7 +107,7 @@ public static IConfiguration V1(ILoggerFactory? loggerFactory = null) IRetryStrategy retryStrategy = new FixedCountRetryStrategy(finalLoggerFactory, maxAttempts: 3); ITransportStrategy transportStrategy = new StaticTransportStrategy( loggerFactory: finalLoggerFactory, - maxConcurrentRequests: 200, + maxConcurrentRequests: 400, // max of 4 connections https://github.com/momentohq/client-sdk-dotnet/issues/460 grpcConfig: new StaticGrpcConfiguration(deadline: TimeSpan.FromMilliseconds(1100))); return new Default(finalLoggerFactory, retryStrategy, transportStrategy); } diff --git a/src/Momento.Sdk/Config/Transport/ITransportStrategy.cs b/src/Momento.Sdk/Config/Transport/ITransportStrategy.cs index 1c0e6149..0d894552 100644 --- a/src/Momento.Sdk/Config/Transport/ITransportStrategy.cs +++ b/src/Momento.Sdk/Config/Transport/ITransportStrategy.cs @@ -30,6 +30,8 @@ public interface ITransportStrategy /// /// Copy constructor to update the maximum number of concurrent requests. + /// For every 100 concurrent requests, a new gRPC connection gets created. This value essentially limits + /// the number of connections you will get for your gRPC client. /// /// /// A new ITransportStrategy with the specified maxConccurrentRequests diff --git a/src/Momento.Sdk/Internal/Middleware/FairAsyncSemaphore.cs b/src/Momento.Sdk/Internal/Middleware/FairAsyncSemaphore.cs index 9b49f850..a6880af7 100644 --- a/src/Momento.Sdk/Internal/Middleware/FairAsyncSemaphore.cs +++ b/src/Momento.Sdk/Internal/Middleware/FairAsyncSemaphore.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -12,7 +13,8 @@ namespace Momento.Sdk.Internal.Middleware internal class FairAsyncSemaphore { private readonly Channel _ticketChannel; - + private int _currentTicketCount; + internal FairAsyncSemaphore(int numTickets) { _ticketChannel = Channel.CreateBounded(numTickets); @@ -25,11 +27,14 @@ internal FairAsyncSemaphore(int numTickets) throw new ApplicationException("Unable to initialize async channel"); } } + + _currentTicketCount = numTickets; } public async Task WaitOne() { await _ticketChannel.Reader.ReadAsync(); + Interlocked.Decrement(ref _currentTicketCount); } public void Release() @@ -39,6 +44,12 @@ public void Release() { throw new ApplicationException("more releases than waits! These must be 1:1"); } + Interlocked.Increment(ref _currentTicketCount); + } + + public int GetCurrentTicketCount() + { + return Interlocked.CompareExchange(ref _currentTicketCount, 0, 0); } } } diff --git a/src/Momento.Sdk/Internal/Middleware/MaxConcurrentRequestsMiddleware.cs b/src/Momento.Sdk/Internal/Middleware/MaxConcurrentRequestsMiddleware.cs index 1503ceae..0e8a61f4 100644 --- a/src/Momento.Sdk/Internal/Middleware/MaxConcurrentRequestsMiddleware.cs +++ b/src/Momento.Sdk/Internal/Middleware/MaxConcurrentRequestsMiddleware.cs @@ -38,11 +38,13 @@ internal class MaxConcurrentRequestsMiddleware : IMiddleware { private readonly int _maxConcurrentRequests; private readonly FairAsyncSemaphore _semaphore; + private readonly ILogger _logger; public MaxConcurrentRequestsMiddleware(ILoggerFactory loggerFactory, int maxConcurrentRequests) { _maxConcurrentRequests = maxConcurrentRequests; _semaphore = new FairAsyncSemaphore(maxConcurrentRequests); + _logger = loggerFactory.CreateLogger(); } public async Task> WrapRequest( @@ -51,7 +53,14 @@ public async Task> WrapRequest>> continuation ) where TRequest : class where TResponse : class { + if (_semaphore.GetCurrentTicketCount() == 0) + { + _logger.LogDebug("Max concurrent requests reached. The client will wait until one or more requests " + + " have completed."); + } + await _semaphore.WaitOne(); + try { var result = await continuation(request, callOptions); diff --git a/tests/Integration/Momento.Sdk.Tests/CacheEagerConnectionTest.cs b/tests/Integration/Momento.Sdk.Tests/CacheEagerConnectionTest.cs index a717e6a0..98dde944 100644 --- a/tests/Integration/Momento.Sdk.Tests/CacheEagerConnectionTest.cs +++ b/tests/Integration/Momento.Sdk.Tests/CacheEagerConnectionTest.cs @@ -1,6 +1,7 @@ using Microsoft.Extensions.Logging; using Momento.Sdk.Auth; using Momento.Sdk.Config; +using Momento.Sdk.Config.Transport; namespace Momento.Sdk.Tests.Integration; @@ -37,6 +38,21 @@ public void CacheClientConstructor_EagerConnection_Success() var client = new CacheClient(config, authProvider, defaultTtl); } + [Fact] + public void CacheClientConstructor_WithChannelsAndMaxConn_Success() + { + var config = Configurations.Laptop.Latest(loggerFactory); + IGrpcConfiguration grpcConfiguration = config.TransportStrategy.GrpcConfig; + grpcConfiguration = grpcConfiguration.WithMinNumGrpcChannels(10); + config = config.WithTransportStrategy(config.TransportStrategy + .WithGrpcConfig(grpcConfiguration) + .WithMaxConcurrentRequests(500)); + // still 500; clients shouldn't know we are doing 500/10 magic internally + Assert.Equal(500, config.TransportStrategy.MaxConcurrentRequests); + Assert.Equal(10, config.TransportStrategy.GrpcConfig.MinNumGrpcChannels); + // just validating that we can construct the client wh + var client = new CacheClient(config, authProvider, defaultTtl); + } [Fact] public void CacheClientConstructor_EagerConnection_BadEndpoint() diff --git a/tests/Unit/Momento.Sdk.Tests/ConfigTest.cs b/tests/Unit/Momento.Sdk.Tests/ConfigTest.cs index 9b86fecd..45c26b25 100644 --- a/tests/Unit/Momento.Sdk.Tests/ConfigTest.cs +++ b/tests/Unit/Momento.Sdk.Tests/ConfigTest.cs @@ -2,6 +2,8 @@ using Momento.Sdk.Config; using Momento.Sdk.Config.Transport; using Xunit; +using Xunit.Abstractions; +using Xunit.Sdk; public class ConfigTest {