diff --git a/src/Momento.Sdk/CacheClient.cs b/src/Momento.Sdk/CacheClient.cs index 049694e4..3d7e3781 100644 --- a/src/Momento.Sdk/CacheClient.cs +++ b/src/Momento.Sdk/CacheClient.cs @@ -6,6 +6,7 @@ using Microsoft.Extensions.Logging; using Momento.Sdk.Auth; using Momento.Sdk.Config; +using Momento.Sdk.Config.Transport; using Momento.Sdk.Exceptions; using Momento.Sdk.Internal; using Momento.Sdk.Internal.ExtensionMethods; @@ -52,7 +53,46 @@ public CacheClient(IConfiguration config, ICredentialProvider authProvider, Time Utils.ArgumentStrictlyPositive(defaultTtl, "defaultTtl"); this.controlClient = new(_loggerFactory, authProvider.AuthToken, authProvider.ControlEndpoint); this.dataClients = new List(); - for (var i = 1; i <= config.TransportStrategy.GrpcConfig.MinNumGrpcChannels; i++) + int minNumGrpcChannels = this.config.TransportStrategy.GrpcConfig.MinNumGrpcChannels; + int currentMaxConcurrentRequests = this.config.TransportStrategy.MaxConcurrentRequests; + /** + * Client Configuration Logic: + * + * At the time of writing, customers have two client configurations affecting the number of gRPC connections spawned: + * + * 1. MinNumGrpcChannels: Determines the number of unique data clients to create based on the provided value. + * Each client eagerly creates one unique connection. + * + * 2. MaxConcurrentRequests: Configures each channel or data client to create a unique connection dynamically/lazily + * when 100 client concurrent requests are hit. + * + * For example, if we have 2 channels and a client provides a value of 200 for MaxConcurrentRequests, we can create a + * maximum of 2 * (200 / 100) ≈ 4 unique connections. + * + * Understanding Client Expectations: + * + * The client presumes that MaxConcurrentRequests is applied at a global level rather than per channel or data client. + * While some clients might utilize minNumGrpcChannels, it is expected that such clients are few and not the majority. + * + * Logic Implementation: + * + * This logic ensures that we honor the maxConcurrentRequests provided by a client if they also provide minNumGrpcChannels, + * and we internally "distribute" the max concurrent requests evenly over all the channels. This makes sure that + * we honor client's maxConcurrentRequests at a global level, and also create the number of channels they request. + * If they do not explicitly provide the number of channels, we default to 1 with the max concurrent requests + * applied to that single channel. + */ + if (minNumGrpcChannels > 1) + { + int newMaxConcurrentRequests = Math.Max(1, currentMaxConcurrentRequests / minNumGrpcChannels); + ITransportStrategy transportStrategy = this.config.TransportStrategy + .WithMaxConcurrentRequests(newMaxConcurrentRequests); + this.config = this.config.WithTransportStrategy(transportStrategy); + _logger.LogWarning("Overriding maxConcurrentRequests for each gRPC channel to {}." + + " Min gRPC channels: {}, total maxConcurrentRequests: {}", newMaxConcurrentRequests, + minNumGrpcChannels, currentMaxConcurrentRequests); + } + for (var i = 1; i <= minNumGrpcChannels; i++) { this.dataClients.Add(new(config, authProvider.AuthToken, authProvider.CacheEndpoint, defaultTtl)); } diff --git a/src/Momento.Sdk/Config/Configurations.cs b/src/Momento.Sdk/Config/Configurations.cs index 65b70fa7..2d7e8a08 100644 --- a/src/Momento.Sdk/Config/Configurations.cs +++ b/src/Momento.Sdk/Config/Configurations.cs @@ -54,7 +54,7 @@ public static IConfiguration V1(ILoggerFactory? loggerFactory = null) IRetryStrategy retryStrategy = new FixedCountRetryStrategy(finalLoggerFactory, maxAttempts: 3); ITransportStrategy transportStrategy = new StaticTransportStrategy( loggerFactory: finalLoggerFactory, - maxConcurrentRequests: 100, + maxConcurrentRequests: 200, // max of 2 connections https://github.com/momentohq/client-sdk-dotnet/issues/460 grpcConfig: new StaticGrpcConfiguration(deadline: TimeSpan.FromMilliseconds(15000)) ); return new Laptop(finalLoggerFactory, retryStrategy, transportStrategy); @@ -107,7 +107,7 @@ public static IConfiguration V1(ILoggerFactory? loggerFactory = null) IRetryStrategy retryStrategy = new FixedCountRetryStrategy(finalLoggerFactory, maxAttempts: 3); ITransportStrategy transportStrategy = new StaticTransportStrategy( loggerFactory: finalLoggerFactory, - maxConcurrentRequests: 200, + maxConcurrentRequests: 400, // max of 4 connections https://github.com/momentohq/client-sdk-dotnet/issues/460 grpcConfig: new StaticGrpcConfiguration(deadline: TimeSpan.FromMilliseconds(1100))); return new Default(finalLoggerFactory, retryStrategy, transportStrategy); } diff --git a/src/Momento.Sdk/Config/Transport/ITransportStrategy.cs b/src/Momento.Sdk/Config/Transport/ITransportStrategy.cs index 1c0e6149..0d894552 100644 --- a/src/Momento.Sdk/Config/Transport/ITransportStrategy.cs +++ b/src/Momento.Sdk/Config/Transport/ITransportStrategy.cs @@ -30,6 +30,8 @@ public interface ITransportStrategy /// /// Copy constructor to update the maximum number of concurrent requests. + /// For every 100 concurrent requests, a new gRPC connection gets created. This value essentially limits + /// the number of connections you will get for your gRPC client. /// /// /// A new ITransportStrategy with the specified maxConccurrentRequests diff --git a/src/Momento.Sdk/Internal/Middleware/FairAsyncSemaphore.cs b/src/Momento.Sdk/Internal/Middleware/FairAsyncSemaphore.cs index 9b49f850..a6880af7 100644 --- a/src/Momento.Sdk/Internal/Middleware/FairAsyncSemaphore.cs +++ b/src/Momento.Sdk/Internal/Middleware/FairAsyncSemaphore.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -12,7 +13,8 @@ namespace Momento.Sdk.Internal.Middleware internal class FairAsyncSemaphore { private readonly Channel _ticketChannel; - + private int _currentTicketCount; + internal FairAsyncSemaphore(int numTickets) { _ticketChannel = Channel.CreateBounded(numTickets); @@ -25,11 +27,14 @@ internal FairAsyncSemaphore(int numTickets) throw new ApplicationException("Unable to initialize async channel"); } } + + _currentTicketCount = numTickets; } public async Task WaitOne() { await _ticketChannel.Reader.ReadAsync(); + Interlocked.Decrement(ref _currentTicketCount); } public void Release() @@ -39,6 +44,12 @@ public void Release() { throw new ApplicationException("more releases than waits! These must be 1:1"); } + Interlocked.Increment(ref _currentTicketCount); + } + + public int GetCurrentTicketCount() + { + return Interlocked.CompareExchange(ref _currentTicketCount, 0, 0); } } } diff --git a/src/Momento.Sdk/Internal/Middleware/MaxConcurrentRequestsMiddleware.cs b/src/Momento.Sdk/Internal/Middleware/MaxConcurrentRequestsMiddleware.cs index 1503ceae..0e8a61f4 100644 --- a/src/Momento.Sdk/Internal/Middleware/MaxConcurrentRequestsMiddleware.cs +++ b/src/Momento.Sdk/Internal/Middleware/MaxConcurrentRequestsMiddleware.cs @@ -38,11 +38,13 @@ internal class MaxConcurrentRequestsMiddleware : IMiddleware { private readonly int _maxConcurrentRequests; private readonly FairAsyncSemaphore _semaphore; + private readonly ILogger _logger; public MaxConcurrentRequestsMiddleware(ILoggerFactory loggerFactory, int maxConcurrentRequests) { _maxConcurrentRequests = maxConcurrentRequests; _semaphore = new FairAsyncSemaphore(maxConcurrentRequests); + _logger = loggerFactory.CreateLogger(); } public async Task> WrapRequest( @@ -51,7 +53,14 @@ public async Task> WrapRequest>> continuation ) where TRequest : class where TResponse : class { + if (_semaphore.GetCurrentTicketCount() == 0) + { + _logger.LogDebug("Max concurrent requests reached. The client will wait until one or more requests " + + " have completed."); + } + await _semaphore.WaitOne(); + try { var result = await continuation(request, callOptions); diff --git a/tests/Integration/Momento.Sdk.Tests/CacheEagerConnectionTest.cs b/tests/Integration/Momento.Sdk.Tests/CacheEagerConnectionTest.cs index a717e6a0..c8e3f286 100644 --- a/tests/Integration/Momento.Sdk.Tests/CacheEagerConnectionTest.cs +++ b/tests/Integration/Momento.Sdk.Tests/CacheEagerConnectionTest.cs @@ -1,6 +1,7 @@ using Microsoft.Extensions.Logging; using Momento.Sdk.Auth; using Momento.Sdk.Config; +using Momento.Sdk.Config.Transport; namespace Momento.Sdk.Tests.Integration; @@ -37,6 +38,22 @@ public void CacheClientConstructor_EagerConnection_Success() var client = new CacheClient(config, authProvider, defaultTtl); } + [Fact] + public void CacheClientConstructor_WithChannelsAndMaxConn_Success() + { + var config = Configurations.Laptop.Latest(loggerFactory); + IGrpcConfiguration grpcConfiguration = config.TransportStrategy.GrpcConfig; + grpcConfiguration = grpcConfiguration.WithMinNumGrpcChannels(10); + config = config.WithTransportStrategy(config.TransportStrategy + .WithGrpcConfig(grpcConfiguration) + .WithMaxConcurrentRequests(2)); + + // just validating that we can construct the client wh + var client = new CacheClient(config, authProvider, defaultTtl); + // still 2; clients shouldn't know we are doing 2/10 magic internally + Assert.Equal(2, config.TransportStrategy.MaxConcurrentRequests); + Assert.Equal(10, config.TransportStrategy.GrpcConfig.MinNumGrpcChannels); + } [Fact] public void CacheClientConstructor_EagerConnection_BadEndpoint()