Skip to content

Commit

Permalink
fix: fix configuration settings for maxConn and gRPC channels
Browse files Browse the repository at this point in the history
  • Loading branch information
pratik151192 committed Aug 2, 2023
1 parent 3479dde commit ba0ef95
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 4 deletions.
16 changes: 15 additions & 1 deletion src/Momento.Sdk/CacheClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Microsoft.Extensions.Logging;
using Momento.Sdk.Auth;
using Momento.Sdk.Config;
using Momento.Sdk.Config.Transport;
using Momento.Sdk.Exceptions;
using Momento.Sdk.Internal;
using Momento.Sdk.Internal.ExtensionMethods;
Expand Down Expand Up @@ -52,7 +53,20 @@ public CacheClient(IConfiguration config, ICredentialProvider authProvider, Time
Utils.ArgumentStrictlyPositive(defaultTtl, "defaultTtl");
this.controlClient = new(_loggerFactory, authProvider.AuthToken, authProvider.ControlEndpoint);
this.dataClients = new List<ScsDataClient>();
for (var i = 1; i <= config.TransportStrategy.GrpcConfig.MinNumGrpcChannels; i++)
int minNumGrpcChannels = this.config.TransportStrategy.GrpcConfig.MinNumGrpcChannels;
int currentMaxConcurrentRequests = this.config.TransportStrategy.MaxConcurrentRequests;
if (minNumGrpcChannels > 1)
{

int newMaxConcurrentRequests = (currentMaxConcurrentRequests / minNumGrpcChannels);
ITransportStrategy transportStrategy = this.config.TransportStrategy;
transportStrategy = transportStrategy.WithMaxConcurrentRequests(newMaxConcurrentRequests);
this.config = this.config.WithTransportStrategy(transportStrategy);
_logger.LogWarning("Overriding maxConcurrentRequests for each gRPC channel to {}." +
" Min gRPC channels: {}, total maxConcurrentRequests: {}", newMaxConcurrentRequests,
minNumGrpcChannels, currentMaxConcurrentRequests);
}
for (var i = 1; i <= minNumGrpcChannels; i++)
{
this.dataClients.Add(new(config, authProvider.AuthToken, authProvider.CacheEndpoint, defaultTtl));
}
Expand Down
4 changes: 2 additions & 2 deletions src/Momento.Sdk/Config/Configurations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static IConfiguration V1(ILoggerFactory? loggerFactory = null)
IRetryStrategy retryStrategy = new FixedCountRetryStrategy(finalLoggerFactory, maxAttempts: 3);
ITransportStrategy transportStrategy = new StaticTransportStrategy(
loggerFactory: finalLoggerFactory,
maxConcurrentRequests: 100,
maxConcurrentRequests: 200, // max of 2 connections https://github.com/momentohq/client-sdk-dotnet/issues/460
grpcConfig: new StaticGrpcConfiguration(deadline: TimeSpan.FromMilliseconds(15000))
);
return new Laptop(finalLoggerFactory, retryStrategy, transportStrategy);
Expand Down Expand Up @@ -107,7 +107,7 @@ public static IConfiguration V1(ILoggerFactory? loggerFactory = null)
IRetryStrategy retryStrategy = new FixedCountRetryStrategy(finalLoggerFactory, maxAttempts: 3);
ITransportStrategy transportStrategy = new StaticTransportStrategy(
loggerFactory: finalLoggerFactory,
maxConcurrentRequests: 200,
maxConcurrentRequests: 400, // max of 4 connections https://github.com/momentohq/client-sdk-dotnet/issues/460
grpcConfig: new StaticGrpcConfiguration(deadline: TimeSpan.FromMilliseconds(1100)));
return new Default(finalLoggerFactory, retryStrategy, transportStrategy);
}
Expand Down
2 changes: 2 additions & 0 deletions src/Momento.Sdk/Config/Transport/ITransportStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public interface ITransportStrategy

/// <summary>
/// Copy constructor to update the maximum number of concurrent requests.
/// For every 100 concurrent requests, a new gRPC connection gets created. This value essentially limits
/// the number of connections you will get for your gRPC client.
/// </summary>
/// <param name="maxConcurrentRequests"></param>
/// <returns>A new ITransportStrategy with the specified maxConccurrentRequests</returns>
Expand Down
13 changes: 12 additions & 1 deletion src/Momento.Sdk/Internal/Middleware/FairAsyncSemaphore.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

Expand All @@ -12,7 +13,8 @@ namespace Momento.Sdk.Internal.Middleware
internal class FairAsyncSemaphore
{
private readonly Channel<bool> _ticketChannel;

private int _currentTicketCount;

internal FairAsyncSemaphore(int numTickets)
{
_ticketChannel = Channel.CreateBounded<bool>(numTickets);
Expand All @@ -25,11 +27,14 @@ internal FairAsyncSemaphore(int numTickets)
throw new ApplicationException("Unable to initialize async channel");
}
}

_currentTicketCount = numTickets;
}

public async Task WaitOne()
{
await _ticketChannel.Reader.ReadAsync();
Interlocked.Decrement(ref _currentTicketCount);
}

public void Release()
Expand All @@ -39,6 +44,12 @@ public void Release()
{
throw new ApplicationException("more releases than waits! These must be 1:1");
}
Interlocked.Increment(ref _currentTicketCount);
}

public int GetCurrentTicketCount()
{
return Interlocked.CompareExchange(ref _currentTicketCount, 0, 0);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ internal class MaxConcurrentRequestsMiddleware : IMiddleware
{
private readonly int _maxConcurrentRequests;
private readonly FairAsyncSemaphore _semaphore;
private readonly ILogger _logger;

public MaxConcurrentRequestsMiddleware(ILoggerFactory loggerFactory, int maxConcurrentRequests)
{
_maxConcurrentRequests = maxConcurrentRequests;
_semaphore = new FairAsyncSemaphore(maxConcurrentRequests);
_logger = loggerFactory.CreateLogger<MaxConcurrentRequestsMiddleware>();
}

public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TResponse>(
Expand All @@ -51,7 +53,14 @@ public async Task<MiddlewareResponseState<TResponse>> WrapRequest<TRequest, TRes
Func<TRequest, CallOptions, Task<MiddlewareResponseState<TResponse>>> continuation
) where TRequest : class where TResponse : class
{
if (_semaphore.GetCurrentTicketCount() == 0)
{
_logger.LogDebug("Max concurrent requests reached. The client will wait until one or more requests " +
" have completed.");
}

await _semaphore.WaitOne();

try
{
var result = await continuation(request, callOptions);
Expand Down
16 changes: 16 additions & 0 deletions tests/Integration/Momento.Sdk.Tests/CacheEagerConnectionTest.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.Logging;
using Momento.Sdk.Auth;
using Momento.Sdk.Config;
using Momento.Sdk.Config.Transport;

namespace Momento.Sdk.Tests.Integration;

Expand Down Expand Up @@ -37,6 +38,21 @@ public void CacheClientConstructor_EagerConnection_Success()
var client = new CacheClient(config, authProvider, defaultTtl);
}

[Fact]
public void CacheClientConstructor_WithChannelsAndMaxConn_Success()
{
var config = Configurations.Laptop.Latest(loggerFactory);
IGrpcConfiguration grpcConfiguration = config.TransportStrategy.GrpcConfig;
grpcConfiguration = grpcConfiguration.WithMinNumGrpcChannels(10);
config = config.WithTransportStrategy(config.TransportStrategy
.WithGrpcConfig(grpcConfiguration)
.WithMaxConcurrentRequests(500));
// still 500; clients shouldn't know we are doing 500/10 magic internally
Assert.Equal(500, config.TransportStrategy.MaxConcurrentRequests);
Assert.Equal(10, config.TransportStrategy.GrpcConfig.MinNumGrpcChannels);
// just validating that we can construct the client wh
var client = new CacheClient(config, authProvider, defaultTtl);
}

[Fact]
public void CacheClientConstructor_EagerConnection_BadEndpoint()
Expand Down
2 changes: 2 additions & 0 deletions tests/Unit/Momento.Sdk.Tests/ConfigTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using Momento.Sdk.Config;
using Momento.Sdk.Config.Transport;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;

public class ConfigTest
{
Expand Down

0 comments on commit ba0ef95

Please sign in to comment.