Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improvements to load generator #177

Merged
merged 1 commit into from
Sep 26, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 83 additions & 35 deletions examples/MomentoLoadGen/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ namespace MomentoLoadGen
{
public record CsharpLoadGeneratorOptions
(
int printStatsEveryNRequests,
uint requestTimeoutMs,
int cacheItemPayloadBytes,
int numberOfConcurrentRequests,
int maxRequestsPerSecond,
int totalNumberOfOperationsToExecute
);

Expand All @@ -35,6 +37,7 @@ internal class CsharpLoadGeneratorContext
public Recorder SetLatencies;

public int GlobalRequestCount;
public int LastWorkerStatsPrintRequestCount;
public int GlobalSuccessCount;
public int GlobalUnavailableCount;
public int GlobalDeadlineExceededCount;
Expand All @@ -48,6 +51,7 @@ public CsharpLoadGeneratorContext()
SetLatencies = HistogramFactory.With64BitBucketSize().WithValuesFrom(1).WithValuesUpTo(TimeStamp.Minutes(1)).WithPrecisionOf(1).WithThreadSafeWrites().WithThreadSafeReads().Create();

GlobalRequestCount = 0;
LastWorkerStatsPrintRequestCount = 0;
GlobalSuccessCount = 0;
GlobalDeadlineExceededCount = 0;
GlobalResourceExhaustedCount = 0;
Expand All @@ -62,7 +66,6 @@ public class CsharpLoadGenerator
const int CACHE_ITEM_TTL_SECONDS = 60;
const string CACHE_NAME = "momento-loadgen";
const int NUM_REQUESTS_PER_OPERATION = 2;
const int PRINT_STATS_EVERY_N_REQUESTS = 5000;

private readonly ILogger<CsharpLoadGenerator> _logger;
private readonly CsharpLoadGeneratorOptions _options;
Expand Down Expand Up @@ -103,6 +106,8 @@ public async Task Run()


var numOperationsPerWorker = _options.totalNumberOfOperationsToExecute / _options.numberOfConcurrentRequests;
var workerDelayBetweenRequests = Convert.ToInt32(Math.Floor((1000.0 * _options.numberOfConcurrentRequests) / (_options.maxRequestsPerSecond * 1)));
Console.WriteLine($"Targeting a max of {_options.maxRequestsPerSecond} requests per second (delay between requests: {workerDelayBetweenRequests})");
var totalNumRequestsExpected = _options.totalNumberOfOperationsToExecute * NUM_REQUESTS_PER_OPERATION;

var context = new CsharpLoadGeneratorContext();
Expand All @@ -113,12 +118,18 @@ public async Task Run()
momento,
context,
workerId + 1,
numOperationsPerWorker
numOperationsPerWorker,
workerDelayBetweenRequests,
_options.printStatsEveryNRequests
)
);
).ToList();

var statsPrinterTask = LaunchStatsPrinterTask(context, totalNumRequestsExpected);
var statsPrinterTask = LaunchStatsPrinterTask(context, _options.printStatsEveryNRequests, totalNumRequestsExpected);

var firstResult = await Task.WhenAny(asyncResults);
// this will ensure that the program exits promptly if one of the async
// tasks throws an uncaught exception.
await firstResult;
await Task.WhenAll(asyncResults);

await statsPrinterTask;
Expand All @@ -130,26 +141,28 @@ private async Task LaunchAndRunWorkers(
SimpleCacheClient client,
CsharpLoadGeneratorContext context,
int workerId,
int numOperations)
int numOperations,
int delayMillisBetweenRequests,
int printStatsEveryNRequests
)
{
for (var i = 1; i <= numOperations; i++)
{
await IssueAsyncSetGet(client, context, workerId, i);

await IssueAsyncSetGet(client, context, workerId, i, delayMillisBetweenRequests, printStatsEveryNRequests);
}
}

private async Task LaunchStatsPrinterTask(CsharpLoadGeneratorContext context, int totalNumRequests)
private async Task LaunchStatsPrinterTask(CsharpLoadGeneratorContext context, int printStatsEveryNRequests, int totalNumRequests)
{
var setsAccumulatingHistogram = new LongHistogram(TimeStamp.Minutes(1), 1);
var getsAccumulatingHistogram = new LongHistogram(TimeStamp.Minutes(1), 1);

var nextStatsUpdateRequestCount = PRINT_STATS_EVERY_N_REQUESTS;
var nextStatsUpdateRequestCount = printStatsEveryNRequests;
while (context.GlobalRequestCount < totalNumRequests)
{
if (context.GlobalRequestCount >= nextStatsUpdateRequestCount)
{
nextStatsUpdateRequestCount += PRINT_STATS_EVERY_N_REQUESTS;
nextStatsUpdateRequestCount += printStatsEveryNRequests;
PrintStats(setsAccumulatingHistogram, getsAccumulatingHistogram, context);
}

Expand Down Expand Up @@ -183,9 +196,10 @@ private void PrintStats(LongHistogram setsAccumulatingHistogram, LongHistogram g
");
}

private async Task IssueAsyncSetGet(SimpleCacheClient client, CsharpLoadGeneratorContext context, int workerId, int operationId)
private async Task IssueAsyncSetGet(SimpleCacheClient client, CsharpLoadGeneratorContext context, int workerId, int operationId, int delayMillisBetweenRequests, int printStatsEveryNRequests)
{
var cacheKey = $"worker{workerId}operation{operationId}";

var setStartTime = System.Diagnostics.Stopwatch.StartNew();
var result = await ExecuteRequestAndUpdateContextCounts(
context,
Expand All @@ -195,6 +209,10 @@ private async Task IssueAsyncSetGet(SimpleCacheClient client, CsharpLoadGenerato
{
var setDuration = setStartTime.ElapsedMilliseconds;
context.SetLatencies.RecordValue(setDuration);
if (setDuration < delayMillisBetweenRequests)
{
await Task.Delay((int)(delayMillisBetweenRequests - setDuration));
}
}

var getStartTime = System.Diagnostics.Stopwatch.StartNew();
Expand All @@ -207,6 +225,10 @@ private async Task IssueAsyncSetGet(SimpleCacheClient client, CsharpLoadGenerato
{
var getDuration = getStartTime.ElapsedMilliseconds;
context.GetLatencies.RecordValue(getDuration);
if (getDuration < delayMillisBetweenRequests)
{
await Task.Delay((int)(delayMillisBetweenRequests - getDuration));
}

string valueString;

Expand All @@ -220,9 +242,15 @@ private async Task IssueAsyncSetGet(SimpleCacheClient client, CsharpLoadGenerato
valueString = "n/a";
}

if (context.GlobalRequestCount % PRINT_STATS_EVERY_N_REQUESTS == 0)
var globalRequestCount = context.GlobalRequestCount;
if (globalRequestCount % printStatsEveryNRequests == 0)
{
_logger.LogInformation($"worker: {workerId}, worker request: {operationId}, global request: {context.GlobalRequestCount}, status: ${getResult.Status}, val: ${valueString}");
var lastPrintCount = Interlocked.Exchange(ref context.LastWorkerStatsPrintRequestCount, globalRequestCount);
if (lastPrintCount != globalRequestCount)
{
Console.WriteLine($"worker: {workerId} last print count: {lastPrintCount} global request count: {globalRequestCount}");
_logger.LogInformation($"worker: {workerId}, worker request: {operationId}, global request: {context.GlobalRequestCount}, status: {getResult.Status}, val: {valueString}");
}
}
}
}
Expand Down Expand Up @@ -260,6 +288,14 @@ Func<Task<T>> block
throw e;
}
}
catch (Momento.Sdk.Exceptions.TimeoutException)
{
return Tuple.Create(AsyncSetGetResult.DEADLINE_EXCEEDED, default(T));
}
catch (LimitExceededException)
{
return Tuple.Create(AsyncSetGetResult.RESOURCE_EXHAUSTED, default(T));
}
catch (Exception e)
{
_logger.LogError("CAUGHT AN EXCEPTION WHILE EXECUTING REQUEST: {0}", e);
Expand Down Expand Up @@ -348,31 +384,43 @@ static async Task Main(string[] args)
using ILoggerFactory loggerFactory = InitializeLogging();

CsharpLoadGeneratorOptions loadGeneratorOptions = new CsharpLoadGeneratorOptions(
/**
* Configures the Momento client to timeout if a request exceeds this limit.
* Momento client default is 5 seconds.
*/
///
/// Each time the load generator has executed this many requests, it will
/// print out some statistics about throughput and latency.
///
printStatsEveryNRequests: 1000,
///
/// Configures the Momento client to timeout if a request exceeds this limit.
/// Momento client default is 5 seconds.
///
requestTimeoutMs: 5 * 1000,
/**
* Controls the size of the payload that will be used for the cache items in
* the load test. Smaller payloads will generally provide lower latencies than
* larger payloads.
*/
///
/// Controls the size of the payload that will be used for the cache items in
/// the load test. Smaller payloads will generally provide lower latencies than
/// larger payloads.
///
cacheItemPayloadBytes: 100,
/**
* Controls the number of concurrent requests that will be made (via asynchronous
* function calls) by the load test. Increasing this number may improve throughput,
* but it will also increase CPU consumption. As CPU usage increases and there
* is more contention between the concurrent function calls, client-side latencies
* may increase.
*/
///
/// Controls the number of concurrent requests that will be made (via asynchronous
/// function calls) by the load test. Increasing this number may improve throughput,
/// but it will also increase CPU consumption. As CPU usage increases and there
/// is more contention between the concurrent function calls, client-side latencies
/// may increase.
///
numberOfConcurrentRequests: 50,
/**
* Controls how long the load test will run. We will execute this many operations
* (1 cache 'set' followed immediately by 1 'get') across all of our concurrent
* workers before exiting. Statistics will be logged every 1000 operations.
*/
totalNumberOfOperationsToExecute: 50_000
///
/// Sets an upper bound on how many requests per second will be sent to the server.
/// Momento caches have a default throttling limit of 100 requests per second,
/// so if you raise this, you may observe throttled requests. Contact
/// [email protected] to inquire about raising your limits.
///
maxRequestsPerSecond: 100,
///
/// Controls how long the load test will run. We will execute this many operations
/// (1 cache 'set' followed immediately by 1 'get') across all of our concurrent
/// workers before exiting. Statistics will be logged every 1000 operations.
///
totalNumberOfOperationsToExecute: 500_000
);

CsharpLoadGenerator loadGenerator = new CsharpLoadGenerator(
Expand Down