Skip to content

Commit

Permalink
chore: improvements to load generator
Browse files Browse the repository at this point in the history
This makes the following improvements to the load generator:
* Handles throttles without crashing
* Handles timeouts without crashing
* Adds a very simplistic implementation of a "max requests per second" setting,
  which can be used to rate-limit the load generator on the client side
  • Loading branch information
cprice404 committed Sep 23, 2022
1 parent e56969f commit fa254be
Showing 1 changed file with 83 additions and 36 deletions.
119 changes: 83 additions & 36 deletions examples/MomentoLoadGen/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ namespace MomentoLoadGen
{
public record CsharpLoadGeneratorOptions
(
int printStatsEveryNRequests,
uint requestTimeoutMs,
int cacheItemPayloadBytes,
int numberOfConcurrentRequests,
int maxRequestsPerSecond,
int totalNumberOfOperationsToExecute
);

Expand All @@ -35,6 +37,7 @@ internal class CsharpLoadGeneratorContext
public Recorder SetLatencies;

public int GlobalRequestCount;
public int LastWorkerStatsPrintRequestCount;
public int GlobalSuccessCount;
public int GlobalUnavailableCount;
public int GlobalDeadlineExceededCount;
Expand All @@ -48,6 +51,7 @@ public CsharpLoadGeneratorContext()
SetLatencies = HistogramFactory.With64BitBucketSize().WithValuesFrom(1).WithValuesUpTo(TimeStamp.Minutes(1)).WithPrecisionOf(1).WithThreadSafeWrites().WithThreadSafeReads().Create();

GlobalRequestCount = 0;
LastWorkerStatsPrintRequestCount = 0;
GlobalSuccessCount = 0;
GlobalDeadlineExceededCount = 0;
GlobalResourceExhaustedCount = 0;
Expand All @@ -62,7 +66,6 @@ public class CsharpLoadGenerator
const int CACHE_ITEM_TTL_SECONDS = 60;
const string CACHE_NAME = "momento-loadgen";
const int NUM_REQUESTS_PER_OPERATION = 2;
const int PRINT_STATS_EVERY_N_REQUESTS = 5000;

private readonly ILogger<CsharpLoadGenerator> _logger;
private readonly CsharpLoadGeneratorOptions _options;
Expand Down Expand Up @@ -103,6 +106,8 @@ public async Task Run()


var numOperationsPerWorker = _options.totalNumberOfOperationsToExecute / _options.numberOfConcurrentRequests;
var workerDelayBetweenRequests = Convert.ToInt32(Math.Floor((1000.0 * _options.numberOfConcurrentRequests) / (_options.maxRequestsPerSecond * 1)));
Console.WriteLine($"Targeting a max of {_options.maxRequestsPerSecond} requests per second (delay between requests: {workerDelayBetweenRequests})");
var totalNumRequestsExpected = _options.totalNumberOfOperationsToExecute * NUM_REQUESTS_PER_OPERATION;

var context = new CsharpLoadGeneratorContext();
Expand All @@ -113,12 +118,18 @@ public async Task Run()
momento,
context,
workerId + 1,
numOperationsPerWorker
numOperationsPerWorker,
workerDelayBetweenRequests,
_options.printStatsEveryNRequests
)
);
).ToList();

var statsPrinterTask = LaunchStatsPrinterTask(context, totalNumRequestsExpected);
var statsPrinterTask = LaunchStatsPrinterTask(context, _options.printStatsEveryNRequests, totalNumRequestsExpected);

var firstResult = await Task.WhenAny(asyncResults);
// this will ensure that the program exits promptly if one of the async
// tasks throws an uncaught exception.
await firstResult;
await Task.WhenAll(asyncResults);

await statsPrinterTask;
Expand All @@ -130,26 +141,28 @@ private async Task LaunchAndRunWorkers(
SimpleCacheClient client,
CsharpLoadGeneratorContext context,
int workerId,
int numOperations)
int numOperations,
int delayMillisBetweenRequests,
int printStatsEveryNRequests
)
{
for (var i = 1; i <= numOperations; i++)
{
await IssueAsyncSetGet(client, context, workerId, i);

await IssueAsyncSetGet(client, context, workerId, i, delayMillisBetweenRequests, printStatsEveryNRequests);
}
}

private async Task LaunchStatsPrinterTask(CsharpLoadGeneratorContext context, int totalNumRequests)
private async Task LaunchStatsPrinterTask(CsharpLoadGeneratorContext context, int printStatsEveryNRequests, int totalNumRequests)
{
var setsAccumulatingHistogram = new LongHistogram(TimeStamp.Minutes(1), 1);
var getsAccumulatingHistogram = new LongHistogram(TimeStamp.Minutes(1), 1);

var nextStatsUpdateRequestCount = PRINT_STATS_EVERY_N_REQUESTS;
var nextStatsUpdateRequestCount = printStatsEveryNRequests;
while (context.GlobalRequestCount < totalNumRequests)
{
if (context.GlobalRequestCount >= nextStatsUpdateRequestCount)
{
nextStatsUpdateRequestCount += PRINT_STATS_EVERY_N_REQUESTS;
nextStatsUpdateRequestCount += printStatsEveryNRequests;
PrintStats(setsAccumulatingHistogram, getsAccumulatingHistogram, context);
}

Expand Down Expand Up @@ -183,18 +196,23 @@ private void PrintStats(LongHistogram setsAccumulatingHistogram, LongHistogram g
");
}

private async Task IssueAsyncSetGet(SimpleCacheClient client, CsharpLoadGeneratorContext context, int workerId, int operationId)
private async Task IssueAsyncSetGet(SimpleCacheClient client, CsharpLoadGeneratorContext context, int workerId, int operationId, int delayMillisBetweenRequests, int printStatsEveryNRequests)
{
var cacheKey = $"worker{workerId}operation{operationId}";

var setStartTime = System.Diagnostics.Stopwatch.StartNew();
var result = await ExecuteRequestAndUpdateContextCounts(
context,
() => client.SetAsync(CACHE_NAME, cacheKey, _cacheValue)
);
if (result != null)
{
var setDuration = setStartTime.ElapsedMilliseconds;
var setDuration = setStartTime.ElapsedMilliseconds;
context.SetLatencies.RecordValue(setDuration);
if (setDuration < delayMillisBetweenRequests)
{
await Task.Delay((int)(delayMillisBetweenRequests - setDuration));
}
}

var getStartTime = System.Diagnostics.Stopwatch.StartNew();
Expand All @@ -207,6 +225,10 @@ private async Task IssueAsyncSetGet(SimpleCacheClient client, CsharpLoadGenerato
{
var getDuration = getStartTime.ElapsedMilliseconds;
context.GetLatencies.RecordValue(getDuration);
if (getDuration < delayMillisBetweenRequests)
{
await Task.Delay((int)(delayMillisBetweenRequests - getDuration));
}

string valueString;

Expand All @@ -220,9 +242,14 @@ private async Task IssueAsyncSetGet(SimpleCacheClient client, CsharpLoadGenerato
valueString = "n/a";
}

if (context.GlobalRequestCount % PRINT_STATS_EVERY_N_REQUESTS == 0)
var globalRequestCount = context.GlobalRequestCount;
if (globalRequestCount % printStatsEveryNRequests == 0)
{
_logger.LogInformation($"worker: {workerId}, worker request: {operationId}, global request: {context.GlobalRequestCount}, status: ${getResult.Status}, val: ${valueString}");
var lastPrintCount = Interlocked.Exchange(ref context.LastWorkerStatsPrintRequestCount, globalRequestCount);
if (lastPrintCount != globalRequestCount) {
Console.WriteLine($"worker: {workerId} last print count: {lastPrintCount} global request count: {globalRequestCount}");
_logger.LogInformation($"worker: {workerId}, worker request: {operationId}, global request: {context.GlobalRequestCount}, status: {getResult.Status}, val: {valueString}");
}
}
}
}
Expand Down Expand Up @@ -260,6 +287,14 @@ Func<Task<T>> block
throw e;
}
}
catch (Momento.Sdk.Exceptions.TimeoutException)
{
return Tuple.Create(AsyncSetGetResult.DEADLINE_EXCEEDED, default(T));
}
catch (LimitExceededException)
{
return Tuple.Create(AsyncSetGetResult.RESOURCE_EXHAUSTED, default(T));
}
catch (Exception e)
{
_logger.LogError("CAUGHT AN EXCEPTION WHILE EXECUTING REQUEST: {0}", e);
Expand Down Expand Up @@ -348,31 +383,43 @@ static async Task Main(string[] args)
using ILoggerFactory loggerFactory = InitializeLogging();

CsharpLoadGeneratorOptions loadGeneratorOptions = new CsharpLoadGeneratorOptions(
/**
* Configures the Momento client to timeout if a request exceeds this limit.
* Momento client default is 5 seconds.
*/
///
/// Each time the load generator has executed this many requests, it will
/// print out some statistics about throughput and latency.
///
printStatsEveryNRequests: 1000,
///
/// Configures the Momento client to timeout if a request exceeds this limit.
/// Momento client default is 5 seconds.
///
requestTimeoutMs: 5 * 1000,
/**
* Controls the size of the payload that will be used for the cache items in
* the load test. Smaller payloads will generally provide lower latencies than
* larger payloads.
*/
///
/// Controls the size of the payload that will be used for the cache items in
/// the load test. Smaller payloads will generally provide lower latencies than
/// larger payloads.
///
cacheItemPayloadBytes: 100,
/**
* Controls the number of concurrent requests that will be made (via asynchronous
* function calls) by the load test. Increasing this number may improve throughput,
* but it will also increase CPU consumption. As CPU usage increases and there
* is more contention between the concurrent function calls, client-side latencies
* may increase.
*/
///
/// Controls the number of concurrent requests that will be made (via asynchronous
/// function calls) by the load test. Increasing this number may improve throughput,
/// but it will also increase CPU consumption. As CPU usage increases and there
/// is more contention between the concurrent function calls, client-side latencies
/// may increase.
///
numberOfConcurrentRequests: 50,
/**
* Controls how long the load test will run. We will execute this many operations
* (1 cache 'set' followed immediately by 1 'get') across all of our concurrent
* workers before exiting. Statistics will be logged every 1000 operations.
*/
totalNumberOfOperationsToExecute: 50_000
///
/// Sets an upper bound on how many requests per second will be sent to the server.
/// Momento caches have a default throttling limit of 100 requests per second,
/// so if you raise this, you may observe throttled requests. Contact
/// [email protected] to inquire about raising your limits.
///
maxRequestsPerSecond: 100,
///
/// Controls how long the load test will run. We will execute this many operations
/// (1 cache 'set' followed immediately by 1 'get') across all of our concurrent
/// workers before exiting. Statistics will be logged every 1000 operations.
///
totalNumberOfOperationsToExecute: 500_000
);

CsharpLoadGenerator loadGenerator = new CsharpLoadGenerator(
Expand Down

0 comments on commit fa254be

Please sign in to comment.