Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Client Telemetry: Refactors code to run client telemetry data processing task in background. #3783

Merged
merged 13 commits into from
Apr 10, 2023
78 changes: 48 additions & 30 deletions Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace Microsoft.Azure.Cosmos.Telemetry
using Microsoft.Azure.Cosmos.Tracing.TraceData;
using Microsoft.Azure.Documents;
using Util;
using static Microsoft.Azure.Cosmos.Tracing.TraceData.ClientSideRequestStatisticsTraceDatum;

/// <summary>
/// This class collects and send all the telemetry information.
Expand All @@ -29,8 +28,6 @@ namespace Microsoft.Azure.Cosmos.Telemetry
/// </summary>
internal class ClientTelemetry : IDisposable
{
private const int allowedNumberOfFailures = 3;

private static readonly TimeSpan observingWindow = ClientTelemetryOptions.GetScheduledTimeSpan();

private readonly ClientTelemetryProperties clientTelemetryInfo;
Expand All @@ -39,7 +36,6 @@ internal class ClientTelemetry : IDisposable
private readonly NetworkDataRecorder networkDataRecorder;

private readonly CancellationTokenSource cancellationTokenSource;

private readonly GlobalEndpointManager globalEndpointManager;

private Task telemetryTask;
Expand All @@ -50,8 +46,6 @@ internal class ClientTelemetry : IDisposable
private ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfoMap
= new ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram>();

private int numberOfFailures = 0;

/// <summary>
/// Only for Mocking in tests
/// </summary>
Expand Down Expand Up @@ -123,7 +117,6 @@ internal ClientTelemetry(
aggregationIntervalInSec: (int)observingWindow.TotalSeconds);

this.networkDataRecorder = new NetworkDataRecorder();

this.cancellationTokenSource = new CancellationTokenSource();
}

Expand All @@ -137,9 +130,9 @@ private void StartObserverTask()

/// <summary>
/// Task which does below operations , periodically
/// 1. Set Account information (one time at the time of initialization)
/// 2. Load VM metedata information (one time at the time of initialization)
/// 3. Calculate and Send telemetry Information to juno service (never ending task)/// </summary>
/// 1. Set Account information (one time during initialization)
/// 2. Load VM metedata information (one time during initialization)
/// 3. Calculate and Send telemetry Information to Client Telemetry Service (never ending task)/// </summary>
/// <returns>Async Task</returns>
private async Task EnrichAndSendAsync()
{
Expand All @@ -149,18 +142,12 @@ private async Task EnrichAndSendAsync()
{
while (!this.cancellationTokenSource.IsCancellationRequested)
{
if (this.numberOfFailures == allowedNumberOfFailures)
sourabh1007 marked this conversation as resolved.
Show resolved Hide resolved
sourabh1007 marked this conversation as resolved.
Show resolved Hide resolved
{
this.Dispose();
break;
}

if (string.IsNullOrEmpty(this.clientTelemetryInfo.GlobalDatabaseAccountName))
{
AccountProperties accountProperties = await ClientTelemetryHelper.SetAccountNameAsync(this.globalEndpointManager);
this.clientTelemetryInfo.GlobalDatabaseAccountName = accountProperties.Id;
}

await Task.Delay(observingWindow, this.cancellationTokenSource.Token);

this.clientTelemetryInfo.DateTimeUtc = DateTime.UtcNow.ToString(ClientTelemetryOptions.DateFormat);
Expand All @@ -180,24 +167,28 @@ private async Task EnrichAndSendAsync()

ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfoSnapshot
= Interlocked.Exchange(ref this.cacheRefreshInfoMap, new ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram>());


List<RequestInfo> requestInfoSnapshot = this.networkDataRecorder.GetRequests();

try
{
await this.processor
.ProcessAndSendAsync(
clientTelemetryInfo: this.clientTelemetryInfo,
operationInfoSnapshot: operationInfoSnapshot,
cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot,
requestInfoSnapshot: this.networkDataRecorder.GetRequests(),
cancellationToken: this.cancellationTokenSource.Token);

this.numberOfFailures = 0;
CancellationTokenSource cancellationToken = new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut);
sourabh1007 marked this conversation as resolved.
Show resolved Hide resolved
Task processorTask = Task.Run(() => this.processor
.ProcessAndSendAsync(
clientTelemetryInfo: this.clientTelemetryInfo,
operationInfoSnapshot: operationInfoSnapshot,
cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot,
requestInfoSnapshot: requestInfoSnapshot,
cancellationToken: cancellationToken.Token), cancellationToken.Token);

// Initiating Telemetry Data Processor task which will serialize and send telemetry information to Client Telemetry Service
// Not disposing this task. If we dispose a client then, telemetry job(telemetryTask) should stop but processor task(processorTask) should make best effort to finish the job in background.
_ = ClientTelemetry.RunProcessorTaskAsync(this.clientTelemetryInfo.DateTimeUtc, processorTask, ClientTelemetryOptions.ClientTelemetryProcessorTimeOut);

}
catch (Exception ex)
{
this.numberOfFailures++;

DefaultTrace.TraceError("Telemetry Job Processor failed with error : {0}", ex);
DefaultTrace.TraceError("Exception while initiating processing task : {0} with telemetry date as {1}", ex.Message, this.clientTelemetryInfo.DateTimeUtc);
}
}
}
Expand All @@ -209,6 +200,33 @@ await this.processor
DefaultTrace.TraceInformation("Telemetry Job Stopped.");
}

/// <summary>
/// This Task makes sure, processing task is timing out after 5 minute of timeout
/// </summary>
/// <param name="telemetryDate"></param>
/// <param name="processingTask"></param>
/// <param name="timeout"></param>
internal static async Task RunProcessorTaskAsync(string telemetryDate, Task processingTask, TimeSpan timeout)
{
using (CancellationTokenSource tokenForDelayTask = new CancellationTokenSource())
{
Task delayTask = Task.Delay(timeout, tokenForDelayTask.Token);

Task resultTask = await Task.WhenAny(processingTask, delayTask);
if (resultTask == delayTask)
{
DefaultTrace.TraceVerbose($"Processor task with date as {telemetryDate} is canceled as it did not finish in {timeout}");
// Operation cancelled
throw new OperationCanceledException($"Processor task with date as {telemetryDate} is canceled as it did not finish in {timeout}");
}
else
{
// Cancel the timer task so that it does not fire
tokenForDelayTask.Cancel();
}
}
}

/// <summary>
/// Collects Cache Telemetry Information.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,12 @@ internal static class ClientTelemetryOptions
internal const double Percentile99 = 99.0;
internal const double Percentile999 = 99.9;
internal const string DateFormat = "yyyy-MM-ddTHH:mm:ssZ";

internal const string EnvPropsClientTelemetrySchedulingInSeconds = "COSMOS.CLIENT_TELEMETRY_SCHEDULING_IN_SECONDS";
internal const string EnvPropsClientTelemetryEnabled = "COSMOS.CLIENT_TELEMETRY_ENABLED";
internal const string EnvPropsClientTelemetryVmMetadataUrl = "COSMOS.VM_METADATA_URL";
internal const string EnvPropsClientTelemetryEndpoint = "COSMOS.CLIENT_TELEMETRY_ENDPOINT";
internal const string EnvPropsClientTelemetryEnvironmentName = "COSMOS.ENVIRONMENT_NAME";

internal static readonly ResourceType AllowedResourceTypes = ResourceType.Document;
// Why 5 sec? As of now, if any network request is taking more than 5 millisecond sec, we will consider it slow request this value can be revisited in future
internal static readonly TimeSpan NetworkLatencyThreshold = TimeSpan.FromMilliseconds(5);
Expand All @@ -101,7 +100,8 @@ internal static class ClientTelemetryOptions

internal static readonly int NetworkTelemetrySampleSize = 200;
internal static int PayloadSizeThreshold = 1024 * 1024 * 2; // 2MB

internal static TimeSpan ClientTelemetryProcessorTimeOut = TimeSpan.FromMinutes(5);

private static Uri clientTelemetryEndpoint;
private static string environmentName;
private static TimeSpan scheduledTimeSpan = TimeSpan.Zero;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1075,33 +1075,6 @@ private static void AssertCacheRefreshInfoInformation(
}
}

[TestMethod]
public async Task CheckMisconfiguredTelemetryEndpoint_should_stop_the_job()
{
int retryCounter = 0;
HttpClientHandlerHelper customHttpHandler = new HttpClientHandlerHelper
{
RequestCallBack = (request, cancellation) =>
{
if (request.RequestUri.AbsoluteUri.Equals(ClientTelemetryOptions.GetClientTelemetryEndpoint().AbsoluteUri))
{
retryCounter++;
throw new Exception("Exception while sending telemetry");
}

return null;
}
};

Container container = await this.CreateClientAndContainer(
mode: ConnectionMode.Direct,
customHttpHandler: customHttpHandler);

await Task.Delay(TimeSpan.FromMilliseconds(5000)); // wait for 5 sec, ideally telemetry would be sent 5 times but client telemetry endpoint is not functional (in this test), it should try 3 times maximum and after that client telemetry job should be stopped.

Assert.AreEqual(3, retryCounter);
}

private static ItemBatchOperation CreateItem(string itemId)
{
var testItem = new { id = itemId, Status = itemId };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class ClientTelemetryTests
public void Cleanup()
{
Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEnabled, null);
Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEndpoint, null);
}

[TestMethod]
Expand Down Expand Up @@ -147,7 +148,6 @@ public async Task CheckIfPayloadIsDividedCorrectlyAsync(int expectedOperationInf
string payloadJson = request.Content.ReadAsStringAsync().Result;
Assert.IsTrue(payloadJson.Length <= ClientTelemetryOptions.PayloadSizeThreshold, "Payload Size is " + payloadJson.Length);

Console.WriteLine(payloadJson);
ClientTelemetryProperties propertiesToSend = JsonConvert.DeserializeObject<ClientTelemetryProperties>(payloadJson);

Assert.AreEqual(7, propertiesToSend.SystemInfo.Count, "System Info is not correct");
Expand Down Expand Up @@ -245,14 +245,114 @@ await processor.ProcessAndSendAsync(
clientTelemetryProperties,
operationInfoSnapshot,
cacheRefreshInfoSnapshot,
requestInfoList,
new CancellationToken());
requestInfoList,
new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut).Token);

Assert.AreEqual(expectedOperationInfoSize, actualOperationInfoSize, "Operation Info is not correct");
Assert.AreEqual(expectedCacheRefreshInfoSize, actualCacheRefreshInfoSize, "Cache Refresh Info is not correct");
Assert.AreEqual(expectedRequestInfoSize, actualRequestInfoSize, "Request Info is not correct");
}


[TestMethod]
public async Task ClientTelmetryProcessor_should_timeout()
{
Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEndpoint, "http://dummy.telemetry.endpoint/");
sourabh1007 marked this conversation as resolved.
Show resolved Hide resolved

string data = File.ReadAllText("Telemetry/ClientTelemetryPayloadWithoutMetrics.json", Encoding.UTF8);
ClientTelemetryProperties clientTelemetryProperties = JsonConvert.DeserializeObject<ClientTelemetryProperties>(data);

int actualOperationInfoSize = 0;
int actualCacheRefreshInfoSize = 0;

Mock<IHttpHandler> mockHttpHandler = new Mock<IHttpHandler>();
_ = mockHttpHandler.Setup(x => x.SendAsync(
It.IsAny<HttpRequestMessage>(),
It.IsAny<CancellationToken>()))
.Callback<HttpRequestMessage, CancellationToken>(
(request, cancellationToken) =>
{
string payloadJson = request.Content.ReadAsStringAsync().Result;
Assert.IsTrue(payloadJson.Length <= ClientTelemetryOptions.PayloadSizeThreshold, "Payload Size is " + payloadJson.Length);

ClientTelemetryProperties propertiesToSend = JsonConvert.DeserializeObject<ClientTelemetryProperties>(payloadJson);

actualOperationInfoSize += propertiesToSend.OperationInfo?.Count ?? 0;
actualCacheRefreshInfoSize += propertiesToSend.CacheRefreshInfo?.Count ?? 0;
})
.Returns(Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK)));

ClientTelemetryProcessor processor = new ClientTelemetryProcessor(
MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(new HttpHandlerHelper(mockHttpHandler.Object))),
Mock.Of<AuthorizationTokenProvider>());

ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)> operationInfoSnapshot
= new ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)>();

for (int i = 0; i < 20; i++)
{
OperationInfo opeInfo = new OperationInfo(Regions.WestUS,
0,
Documents.ConsistencyLevel.Session.ToString(),
"databaseName" + i,
"containerName",
Documents.OperationType.Read,
Documents.ResourceType.Document,
200,
0);

LongConcurrentHistogram latency = new LongConcurrentHistogram(ClientTelemetryOptions.RequestLatencyMin,
ClientTelemetryOptions.RequestLatencyMax,
ClientTelemetryOptions.RequestLatencyPrecision);
latency.RecordValue(10);

LongConcurrentHistogram requestcharge = new LongConcurrentHistogram(ClientTelemetryOptions.RequestChargeMin,
ClientTelemetryOptions.RequestChargeMax,
ClientTelemetryOptions.RequestChargePrecision);
requestcharge.RecordValue(11);

operationInfoSnapshot.TryAdd(opeInfo, (latency, requestcharge));
}

ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfoSnapshot
= new ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram>();
for (int i = 0; i < 10; i++)
{
CacheRefreshInfo crInfo = new CacheRefreshInfo(Regions.WestUS,
10,
Documents.ConsistencyLevel.Session.ToString(),
"databaseName" + i,
"containerName",
Documents.OperationType.Read,
Documents.ResourceType.Document,
200,
1002,
"dummycache");

LongConcurrentHistogram latency = new LongConcurrentHistogram(ClientTelemetryOptions.RequestLatencyMin,
ClientTelemetryOptions.RequestLatencyMax,
ClientTelemetryOptions.RequestLatencyPrecision);
latency.RecordValue(10);

cacheRefreshInfoSnapshot.TryAdd(crInfo, latency);
}

Task processorTask = Task.Run(async () =>
{
CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(1));
await Task.Delay(1000, cts.Token); // Making this task wait to ensure that processir is taking more time.
await processor.ProcessAndSendAsync(clientTelemetryProperties,
operationInfoSnapshot,
cacheRefreshInfoSnapshot,
default,
cts.Token);
});

await Assert.ThrowsExceptionAsync<OperationCanceledException>(() => ClientTelemetry.RunProcessorTaskAsync(
telemetryDate: DateTime.Now.ToString(),
processingTask: processorTask,
timeout: TimeSpan.FromTicks(1)));
}

[TestMethod]
[ExpectedException(typeof(FormatException))]
public void CheckMisconfiguredTelemetry_should_fail()
Expand Down