Skip to content

Commit

Permalink
Merge pull request #463 from momentohq/configuration
Browse files Browse the repository at this point in the history
fix: fix configuration settings for maxConn and gRPC channels

The dotnet SDK exposes two configurations to clients MaxConcurrentRequests and MinNumGRPCChannels. Both these configurations essentially impacts the number of connections that we open to the Momento server. Before this PR, a user could provide 10 gRPC channels as minimum, and 20 as maxConn, assuming that 20 maxConn will be a global setting. However, because how we wire our middleware, each channel would get 20 max concurrent requests as the setting. This PR resolves the conflict between the interplay of these settings (more details inline on code comments).
  • Loading branch information
pratik151192 authored Aug 8, 2023
2 parents 7e246b3 + bb793b9 commit 0022707
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 4 deletions.
42 changes: 41 additions & 1 deletion src/Momento.Sdk/CacheClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Microsoft.Extensions.Logging;
using Momento.Sdk.Auth;
using Momento.Sdk.Config;
using Momento.Sdk.Config.Transport;
using Momento.Sdk.Exceptions;
using Momento.Sdk.Internal;
using Momento.Sdk.Internal.ExtensionMethods;
Expand Down Expand Up @@ -52,7 +53,46 @@ public CacheClient(IConfiguration config, ICredentialProvider authProvider, Time
Utils.ArgumentStrictlyPositive(defaultTtl, "defaultTtl");
this.controlClient = new(_loggerFactory, authProvider.AuthToken, authProvider.ControlEndpoint);
this.dataClients = new List<ScsDataClient>();
for (var i = 1; i <= config.TransportStrategy.GrpcConfig.MinNumGrpcChannels; i++)
int minNumGrpcChannels = this.config.TransportStrategy.GrpcConfig.MinNumGrpcChannels;
int currentMaxConcurrentRequests = this.config.TransportStrategy.MaxConcurrentRequests;
/**

Check warning on line 58 in src/Momento.Sdk/CacheClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

XML comment is not placed on a valid language element

Check warning on line 58 in src/Momento.Sdk/CacheClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

XML comment is not placed on a valid language element

Check warning on line 58 in src/Momento.Sdk/CacheClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

XML comment is not placed on a valid language element

Check warning on line 58 in src/Momento.Sdk/CacheClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

XML comment is not placed on a valid language element

Check warning on line 58 in src/Momento.Sdk/CacheClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

XML comment is not placed on a valid language element

Check warning on line 58 in src/Momento.Sdk/CacheClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

XML comment is not placed on a valid language element

Check warning on line 58 in src/Momento.Sdk/CacheClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

XML comment is not placed on a valid language element

Check warning on line 58 in src/Momento.Sdk/CacheClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

XML comment is not placed on a valid language element
* Client Configuration Logic:
*
* At the time of writing, customers have two client configurations affecting the number of gRPC connections spawned:
*
* 1. MinNumGrpcChannels: Determines the number of unique data clients to create based on the provided value.
* Each client eagerly creates one unique connection.
*
* 2. MaxConcurrentRequests: Configures each channel or data client to create a unique connection dynamically/lazily
* when 100 client concurrent requests are hit.
*
* For example, if we have 2 channels and a client provides a value of 200 for MaxConcurrentRequests, we can create a
* maximum of 2 * (200 / 100) ≈ 4 unique connections.
*
* Understanding Client Expectations:
*
* The client presumes that MaxConcurrentRequests is applied at a global level rather than per channel or data client.
* While some clients might utilize minNumGrpcChannels, it is expected that such clients are few and not the majority.
*
* Logic Implementation:
*
* This logic ensures that we honor the maxConcurrentRequests provided by a client if they also provide minNumGrpcChannels,
* and we internally "distribute" the max concurrent requests evenly over all the channels. This makes sure that
* we honor client's maxConcurrentRequests at a global level, and also create the number of channels they request.
* If they do not explicitly provide the number of channels, we default to 1 with the max concurrent requests
* applied to that single channel.
*/
if (minNumGrpcChannels > 1)
{
int newMaxConcurrentRequests = Math.Max(1, currentMaxConcurrentRequests / minNumGrpcChannels);
ITransportStrategy transportStrategy = this.config.TransportStrategy
.WithMaxConcurrentRequests(newMaxConcurrentRequests);
this.config = this.config.WithTransportStrategy(transportStrategy);
_logger.LogWarning("Overriding maxConcurrentRequests for each gRPC channel to {}." +
" Min gRPC channels: {}, total maxConcurrentRequests: {}", newMaxConcurrentRequests,
minNumGrpcChannels, currentMaxConcurrentRequests);
}
for (var i = 1; i <= minNumGrpcChannels; i++)
{
this.dataClients.Add(new(config, authProvider.AuthToken, authProvider.CacheEndpoint, defaultTtl));
}
Expand Down
4 changes: 2 additions & 2 deletions src/Momento.Sdk/Config/Configurations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static IConfiguration V1(ILoggerFactory? loggerFactory = null)
IRetryStrategy retryStrategy = new FixedCountRetryStrategy(finalLoggerFactory, maxAttempts: 3);
ITransportStrategy transportStrategy = new StaticTransportStrategy(
loggerFactory: finalLoggerFactory,
maxConcurrentRequests: 100,
maxConcurrentRequests: 200, // max of 2 connections https://github.com/momentohq/client-sdk-dotnet/issues/460
grpcConfig: new StaticGrpcConfiguration(deadline: TimeSpan.FromMilliseconds(15000))
);
return new Laptop(finalLoggerFactory, retryStrategy, transportStrategy);
Expand Down Expand Up @@ -107,7 +107,7 @@ public static IConfiguration V1(ILoggerFactory? loggerFactory = null)
IRetryStrategy retryStrategy = new FixedCountRetryStrategy(finalLoggerFactory, maxAttempts: 3);
ITransportStrategy transportStrategy = new StaticTransportStrategy(
loggerFactory: finalLoggerFactory,
maxConcurrentRequests: 200,
maxConcurrentRequests: 400, // max of 4 connections https://github.com/momentohq/client-sdk-dotnet/issues/460
grpcConfig: new StaticGrpcConfiguration(deadline: TimeSpan.FromMilliseconds(1100)));
return new Default(finalLoggerFactory, retryStrategy, transportStrategy);
}
Expand Down
2 changes: 2 additions & 0 deletions src/Momento.Sdk/Config/Transport/ITransportStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public interface ITransportStrategy

/// <summary>
/// Copy constructor to update the maximum number of concurrent requests.
/// For every 100 concurrent requests, a new gRPC connection gets created. This value essentially limits
/// the number of connections you will get for your gRPC client.
/// </summary>
/// <param name="maxConcurrentRequests"></param>
/// <returns>A new ITransportStrategy with the specified maxConccurrentRequests</returns>
Expand Down
13 changes: 12 additions & 1 deletion src/Momento.Sdk/Internal/Middleware/FairAsyncSemaphore.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

Expand All @@ -12,7 +13,8 @@ namespace Momento.Sdk.Internal.Middleware
internal class FairAsyncSemaphore
{
private readonly Channel<bool> _ticketChannel;

private int _currentTicketCount;

internal FairAsyncSemaphore(int numTickets)
{
_ticketChannel = Channel.CreateBounded<bool>(numTickets);
Expand All @@ -25,11 +27,14 @@ internal FairAsyncSemaphore(int numTickets)
throw new ApplicationException("Unable to initialize async channel");
}
}

_currentTicketCount = numTickets;
}

public async Task WaitOne()
{
await _ticketChannel.Reader.ReadAsync();
Interlocked.Decrement(ref _currentTicketCount);
}

public void Release()
Expand All @@ -39,6 +44,12 @@ public void Release()
{
throw new ApplicationException("more releases than waits! These must be 1:1");
}
Interlocked.Increment(ref _currentTicketCount);
}

public int GetCurrentTicketCount()
{
return Interlocked.CompareExchange(ref _currentTicketCount, 0, 0);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ internal class MaxConcurrentRequestsMiddleware : IMiddleware
{
private readonly int _maxConcurrentRequests;
private readonly FairAsyncSemaphore _semaphore;
private readonly ILogger _logger;

public MaxConcurrentRequestsMiddleware(ILoggerFactory loggerFactory, int maxConcurrentRequests)
{
_maxConcurrentRequests = maxConcurrentRequests;
_semaphore = new FairAsyncSemaphore(maxConcurrentRequests);
_logger = loggerFactory.CreateLogger<MaxConcurrentRequestsMiddleware>();
}

public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TResponse>(
Expand All @@ -51,7 +53,14 @@ public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TRes
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation
) where TRequest : class where TResponse : class
{
if (_semaphore.GetCurrentTicketCount() == 0)
{
_logger.LogDebug("Max concurrent requests reached. The client will wait until one or more requests " +
" have completed.");
}

await _semaphore.WaitOne();

try
{
var result = await continuation(request, callOptions);
Expand Down
17 changes: 17 additions & 0 deletions tests/Integration/Momento.Sdk.Tests/CacheEagerConnectionTest.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.Logging;
using Momento.Sdk.Auth;
using Momento.Sdk.Config;
using Momento.Sdk.Config.Transport;

namespace Momento.Sdk.Tests.Integration;

Expand Down Expand Up @@ -37,6 +38,22 @@ public void CacheClientConstructor_EagerConnection_Success()
var client = new CacheClient(config, authProvider, defaultTtl);
}

[Fact]
public void CacheClientConstructor_WithChannelsAndMaxConn_Success()
{
var config = Configurations.Laptop.Latest(loggerFactory);
IGrpcConfiguration grpcConfiguration = config.TransportStrategy.GrpcConfig;
grpcConfiguration = grpcConfiguration.WithMinNumGrpcChannels(10);
config = config.WithTransportStrategy(config.TransportStrategy
.WithGrpcConfig(grpcConfiguration)
.WithMaxConcurrentRequests(2));

// just validating that we can construct the client wh
var client = new CacheClient(config, authProvider, defaultTtl);
// still 2; clients shouldn't know we are doing 2/10 magic internally
Assert.Equal(2, config.TransportStrategy.MaxConcurrentRequests);
Assert.Equal(10, config.TransportStrategy.GrpcConfig.MinNumGrpcChannels);
}

[Fact]
public void CacheClientConstructor_EagerConnection_BadEndpoint()
Expand Down

0 comments on commit 0022707

Please sign in to comment.