From f041ae845e96ebc8ead3bc66cacd42c5f53c0579 Mon Sep 17 00:00:00 2001 From: Sourabh Jain Date: Thu, 30 Mar 2023 01:00:12 +0530 Subject: [PATCH 01/13] first draft --- .../src/Telemetry/ClientTelemetry.cs | 37 ++++++++++--------- .../src/Telemetry/ClientTelemetryOptions.cs | 2 +- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs index 8a83778fdb..90ffa2e750 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs @@ -19,7 +19,6 @@ namespace Microsoft.Azure.Cosmos.Telemetry using Microsoft.Azure.Cosmos.Tracing.TraceData; using Microsoft.Azure.Documents; using Util; - using static Microsoft.Azure.Cosmos.Tracing.TraceData.ClientSideRequestStatisticsTraceDatum; /// /// This class collects and send all the telemetry information. @@ -39,19 +38,20 @@ internal class ClientTelemetry : IDisposable private readonly NetworkDataRecorder networkDataRecorder; private readonly CancellationTokenSource cancellationTokenSource; - + private readonly CancellationTokenSource processorCancellationTokenSource = new CancellationTokenSource(ClientTelemetryOptions.ProcessorTimeOutInMs); // 5 min + private readonly GlobalEndpointManager globalEndpointManager; private Task telemetryTask; - + // Not disposing this task. If we dispose a client then, telemetry job(telemetryTask) should stop but processor task(processorTask) should make best effort to finish the job in background. + private Task processorTask; + private ConcurrentDictionary operationInfoMap = new ConcurrentDictionary(); private ConcurrentDictionary cacheRefreshInfoMap = new ConcurrentDictionary(); - private int numberOfFailures = 0; - /// /// Only for Mocking in tests /// @@ -149,12 +149,6 @@ private async Task EnrichAndSendAsync() { while (!this.cancellationTokenSource.IsCancellationRequested) { - if (this.numberOfFailures == allowedNumberOfFailures) - { - this.Dispose(); - break; - } - if (string.IsNullOrEmpty(this.clientTelemetryInfo.GlobalDatabaseAccountName)) { AccountProperties accountProperties = await ClientTelemetryHelper.SetAccountNameAsync(this.globalEndpointManager); @@ -180,23 +174,32 @@ private async Task EnrichAndSendAsync() ConcurrentDictionary cacheRefreshInfoSnapshot = Interlocked.Exchange(ref this.cacheRefreshInfoMap, new ConcurrentDictionary()); + + List requestInfoSnapshot = this.networkDataRecorder.GetRequests(); try { - await this.processor + // Use the Wait method with a CancellationToken to wait for the task to complete. + // If anyhow prev task was not finished sucessfully in 5 min then ,Do not trigger a new task. + this.processorTask?.Wait(this.processorCancellationTokenSource.Token); + + Task tempProcessorTask = Task.Run(() => this.processor .ProcessAndSendAsync( clientTelemetryInfo: this.clientTelemetryInfo, operationInfoSnapshot: operationInfoSnapshot, cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, - requestInfoSnapshot: this.networkDataRecorder.GetRequests(), - cancellationToken: this.cancellationTokenSource.Token); + requestInfoSnapshot: requestInfoSnapshot, + cancellationToken: this.cancellationTokenSource.Token)); + tempProcessorTask.Start(); // run it in background - this.numberOfFailures = 0; + this.processorTask = tempProcessorTask; + } + catch (OperationCanceledException) + { + DefaultTrace.TraceError("Telemetry Job Processor failed due to timeout in 5 min"); } catch (Exception ex) { - this.numberOfFailures++; - DefaultTrace.TraceError("Telemetry Job Processor failed with error : {0}", ex); } } diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs index 47232a187e..6341836e79 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs @@ -79,7 +79,7 @@ internal static class ClientTelemetryOptions internal const double Percentile99 = 99.0; internal const double Percentile999 = 99.9; internal const string DateFormat = "yyyy-MM-ddTHH:mm:ssZ"; - + internal const int ProcessorTimeOutInMs = 5 * 60 * 1000; // 5 minutes internal const string EnvPropsClientTelemetrySchedulingInSeconds = "COSMOS.CLIENT_TELEMETRY_SCHEDULING_IN_SECONDS"; internal const string EnvPropsClientTelemetryEnabled = "COSMOS.CLIENT_TELEMETRY_ENABLED"; internal const string EnvPropsClientTelemetryVmMetadataUrl = "COSMOS.VM_METADATA_URL"; From 190d4305ee0516909af3da803f8e2cc240839e25 Mon Sep 17 00:00:00 2001 From: Sourabh Jain Date: Thu, 30 Mar 2023 01:42:43 +0530 Subject: [PATCH 02/13] remove failure count test --- .../ClientTelemetryTests.cs | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTelemetryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTelemetryTests.cs index 7e924ffd53..30edc7ebf7 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTelemetryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTelemetryTests.cs @@ -1075,33 +1075,6 @@ private static void AssertCacheRefreshInfoInformation( } } - [TestMethod] - public async Task CheckMisconfiguredTelemetryEndpoint_should_stop_the_job() - { - int retryCounter = 0; - HttpClientHandlerHelper customHttpHandler = new HttpClientHandlerHelper - { - RequestCallBack = (request, cancellation) => - { - if (request.RequestUri.AbsoluteUri.Equals(ClientTelemetryOptions.GetClientTelemetryEndpoint().AbsoluteUri)) - { - retryCounter++; - throw new Exception("Exception while sending telemetry"); - } - - return null; - } - }; - - Container container = await this.CreateClientAndContainer( - mode: ConnectionMode.Direct, - customHttpHandler: customHttpHandler); - - await Task.Delay(TimeSpan.FromMilliseconds(5000)); // wait for 5 sec, ideally telemetry would be sent 5 times but client telemetry endpoint is not functional (in this test), it should try 3 times maximum and after that client telemetry job should be stopped. - - Assert.AreEqual(3, retryCounter); - } - private static ItemBatchOperation CreateItem(string itemId) { var testItem = new { id = itemId, Status = itemId }; From 47e493aad01e7bbd8f2e353e1a54da3398005dd7 Mon Sep 17 00:00:00 2001 From: Sourabh Jain Date: Tue, 4 Apr 2023 00:07:29 +0530 Subject: [PATCH 03/13] refactporing --- .../src/Telemetry/ClientTelemetry.cs | 52 ++++++++----------- .../src/Telemetry/ClientTelemetryOptions.cs | 4 +- .../src/Telemetry/ClientTelemetryProcessor.cs | 10 ++-- 3 files changed, 31 insertions(+), 35 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs index 90ffa2e750..8e08b9421d 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs @@ -28,8 +28,6 @@ namespace Microsoft.Azure.Cosmos.Telemetry /// internal class ClientTelemetry : IDisposable { - private const int allowedNumberOfFailures = 3; - private static readonly TimeSpan observingWindow = ClientTelemetryOptions.GetScheduledTimeSpan(); private readonly ClientTelemetryProperties clientTelemetryInfo; @@ -37,8 +35,8 @@ internal class ClientTelemetry : IDisposable private readonly DiagnosticsHandlerHelper diagnosticsHelper; private readonly NetworkDataRecorder networkDataRecorder; - private readonly CancellationTokenSource cancellationTokenSource; - private readonly CancellationTokenSource processorCancellationTokenSource = new CancellationTokenSource(ClientTelemetryOptions.ProcessorTimeOutInMs); // 5 min + private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); + private readonly CancellationTokenSource processorCancellationTokenSource = new CancellationTokenSource(ClientTelemetryOptions.ProcessorTimeOut); // 5 min private readonly GlobalEndpointManager globalEndpointManager; @@ -57,7 +55,6 @@ private ConcurrentDictionary cacheRef /// internal ClientTelemetry() { - this.cancellationTokenSource = new CancellationTokenSource(); } /// @@ -123,8 +120,6 @@ internal ClientTelemetry( aggregationIntervalInSec: (int)observingWindow.TotalSeconds); this.networkDataRecorder = new NetworkDataRecorder(); - - this.cancellationTokenSource = new CancellationTokenSource(); } /// @@ -137,9 +132,9 @@ private void StartObserverTask() /// /// Task which does below operations , periodically - /// 1. Set Account information (one time at the time of initialization) - /// 2. Load VM metedata information (one time at the time of initialization) - /// 3. Calculate and Send telemetry Information to juno service (never ending task)/// + /// 1. Set Account information (one time during initialization) + /// 2. Load VM metedata information (one time during initialization) + /// 3. Calculate and Send telemetry Information to Client Telemetry Service (never ending task)/// /// Async Task private async Task EnrichAndSendAsync() { @@ -179,28 +174,27 @@ ConcurrentDictionary cacheRefreshInfo try { - // Use the Wait method with a CancellationToken to wait for the task to complete. - // If anyhow prev task was not finished sucessfully in 5 min then ,Do not trigger a new task. - this.processorTask?.Wait(this.processorCancellationTokenSource.Token); - - Task tempProcessorTask = Task.Run(() => this.processor - .ProcessAndSendAsync( - clientTelemetryInfo: this.clientTelemetryInfo, - operationInfoSnapshot: operationInfoSnapshot, - cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, - requestInfoSnapshot: requestInfoSnapshot, - cancellationToken: this.cancellationTokenSource.Token)); - tempProcessorTask.Start(); // run it in background - - this.processorTask = tempProcessorTask; - } - catch (OperationCanceledException) - { - DefaultTrace.TraceError("Telemetry Job Processor failed due to timeout in 5 min"); + // Initiating Telemetry Data Processor task which will serialize and send telemetry information to Client Telemetry Service service + this.processorTask = Task.Run( + function: () => this.processor + .ProcessAndSendAsync( + clientTelemetryInfo: this.clientTelemetryInfo, + operationInfoSnapshot: operationInfoSnapshot, + cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, + requestInfoSnapshot: requestInfoSnapshot), + cancellationToken: this.processorCancellationTokenSource.Token) + .ContinueWith((i) => + { + Exception ex = i.Exception?.GetBaseException(); + if (ex != null) + { + DefaultTrace.TraceError($"Client Telemetry data processing task faulted and stopped running. ErrorType={ex.GetType()} ErrorMessage={ex.Message}"); + } + }, TaskContinuationOptions.OnlyOnFaulted); } catch (Exception ex) { - DefaultTrace.TraceError("Telemetry Job Processor failed with error : {0}", ex); + DefaultTrace.TraceError("Telemetry Job Processor failed to run with error : {0}", ex); } } } diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs index 6341836e79..2ff4f0a309 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs @@ -79,13 +79,15 @@ internal static class ClientTelemetryOptions internal const double Percentile99 = 99.0; internal const double Percentile999 = 99.9; internal const string DateFormat = "yyyy-MM-ddTHH:mm:ssZ"; - internal const int ProcessorTimeOutInMs = 5 * 60 * 1000; // 5 minutes + internal const string EnvPropsClientTelemetrySchedulingInSeconds = "COSMOS.CLIENT_TELEMETRY_SCHEDULING_IN_SECONDS"; internal const string EnvPropsClientTelemetryEnabled = "COSMOS.CLIENT_TELEMETRY_ENABLED"; internal const string EnvPropsClientTelemetryVmMetadataUrl = "COSMOS.VM_METADATA_URL"; internal const string EnvPropsClientTelemetryEndpoint = "COSMOS.CLIENT_TELEMETRY_ENDPOINT"; internal const string EnvPropsClientTelemetryEnvironmentName = "COSMOS.ENVIRONMENT_NAME"; + internal static readonly TimeSpan ClientTelemetryServiceTimeOut = TimeSpan.FromMinutes(1); + internal static readonly TimeSpan ProcessorTimeOut = TimeSpan.FromMinutes(5); // 5 minutes internal static readonly ResourceType AllowedResourceTypes = ResourceType.Document; // Why 5 sec? As of now, if any network request is taking more than 5 millisecond sec, we will consider it slow request this value can be revisited in future internal static readonly TimeSpan NetworkLatencyThreshold = TimeSpan.FromMilliseconds(5); diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs index c9511cb4e4..5f790f9b72 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs @@ -23,7 +23,8 @@ internal class ClientTelemetryProcessor private readonly AuthorizationTokenProvider tokenProvider; private readonly CosmosHttpClient httpClient; - + private readonly CancellationTokenSource serviceCancellationToken = new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryServiceTimeOut); + internal ClientTelemetryProcessor(CosmosHttpClient httpClient, AuthorizationTokenProvider tokenProvider) { this.httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient)); @@ -38,8 +39,7 @@ internal async Task ProcessAndSendAsync( ClientTelemetryProperties clientTelemetryInfo, ConcurrentDictionary operationInfoSnapshot, ConcurrentDictionary cacheRefreshInfoSnapshot, - IReadOnlyList requestInfoSnapshot, - CancellationToken cancellationToken) + IReadOnlyList requestInfoSnapshot) { try { @@ -48,7 +48,7 @@ await ClientTelemetryPayloadWriter.SerializedPayloadChunksAsync( operationInfoSnapshot: operationInfoSnapshot, cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, sampledRequestInfo: requestInfoSnapshot, - callback: async (payload) => await this.SendAsync(clientTelemetryInfo.GlobalDatabaseAccountName, payload, cancellationToken)); + callback: async (payload) => await this.SendAsync(clientTelemetryInfo.GlobalDatabaseAccountName, payload, this.serviceCancellationToken.Token)); } catch (Exception ex) { @@ -59,7 +59,7 @@ await ClientTelemetryPayloadWriter.SerializedPayloadChunksAsync( } /// - /// Task to send telemetry information to configured Juno endpoint. + /// Task to send telemetry information to configured Client Telemetry Service endpoint. /// If endpoint is not configured then it won't even try to send information. It will just trace an error message. /// In any case it resets the telemetry information to collect the latest one. /// From 773a70e797e3210db76bee2fb96e3ac6e9845a84 Mon Sep 17 00:00:00 2001 From: Sourabh Jain Date: Tue, 4 Apr 2023 02:23:37 +0530 Subject: [PATCH 04/13] code refactor --- .../src/Telemetry/ClientTelemetry.cs | 38 ++++++++++--------- .../src/Telemetry/ClientTelemetryOptions.cs | 2 +- .../src/Telemetry/ClientTelemetryProcessor.cs | 6 ++- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs index 8e08b9421d..02f50fe933 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs @@ -35,8 +35,7 @@ internal class ClientTelemetry : IDisposable private readonly DiagnosticsHandlerHelper diagnosticsHelper; private readonly NetworkDataRecorder networkDataRecorder; - private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); - private readonly CancellationTokenSource processorCancellationTokenSource = new CancellationTokenSource(ClientTelemetryOptions.ProcessorTimeOut); // 5 min + private readonly CancellationTokenSource cancellationTokenSource; private readonly GlobalEndpointManager globalEndpointManager; @@ -120,6 +119,7 @@ internal ClientTelemetry( aggregationIntervalInSec: (int)observingWindow.TotalSeconds); this.networkDataRecorder = new NetworkDataRecorder(); + this.cancellationTokenSource = new CancellationTokenSource(); } /// @@ -175,22 +175,12 @@ ConcurrentDictionary cacheRefreshInfo try { // Initiating Telemetry Data Processor task which will serialize and send telemetry information to Client Telemetry Service service - this.processorTask = Task.Run( - function: () => this.processor - .ProcessAndSendAsync( - clientTelemetryInfo: this.clientTelemetryInfo, - operationInfoSnapshot: operationInfoSnapshot, - cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, - requestInfoSnapshot: requestInfoSnapshot), - cancellationToken: this.processorCancellationTokenSource.Token) - .ContinueWith((i) => - { - Exception ex = i.Exception?.GetBaseException(); - if (ex != null) - { - DefaultTrace.TraceError($"Client Telemetry data processing task faulted and stopped running. ErrorType={ex.GetType()} ErrorMessage={ex.Message}"); - } - }, TaskContinuationOptions.OnlyOnFaulted); + this.processorTask = Task.Run(() => ClientTelemetry.RunProcessorTaskAsync(this.processor + .ProcessAndSendAsync( + clientTelemetryInfo: this.clientTelemetryInfo, + operationInfoSnapshot: operationInfoSnapshot, + cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, + requestInfoSnapshot: requestInfoSnapshot)), new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut).Token); } catch (Exception ex) { @@ -206,6 +196,18 @@ ConcurrentDictionary cacheRefreshInfo DefaultTrace.TraceInformation("Telemetry Job Stopped."); } + private static async Task RunProcessorTaskAsync(Task processingTask) + { + try + { + await processingTask; + } + catch (Exception ex) + { + DefaultTrace.TraceError($"Client Telemetry data processing task faulted. {0}", ex); + } + } + /// /// Collects Cache Telemetry Information. /// diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs index 2ff4f0a309..1df38194f7 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs @@ -87,7 +87,7 @@ internal static class ClientTelemetryOptions internal const string EnvPropsClientTelemetryEnvironmentName = "COSMOS.ENVIRONMENT_NAME"; internal static readonly TimeSpan ClientTelemetryServiceTimeOut = TimeSpan.FromMinutes(1); - internal static readonly TimeSpan ProcessorTimeOut = TimeSpan.FromMinutes(5); // 5 minutes + internal static readonly TimeSpan ClientTelemetryProcessorTimeOut = TimeSpan.FromMinutes(5); // 5 minutes internal static readonly ResourceType AllowedResourceTypes = ResourceType.Document; // Why 5 sec? As of now, if any network request is taking more than 5 millisecond sec, we will consider it slow request this value can be revisited in future internal static readonly TimeSpan NetworkLatencyThreshold = TimeSpan.FromMilliseconds(5); diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs index 5f790f9b72..15c45369b3 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs @@ -23,7 +23,6 @@ internal class ClientTelemetryProcessor private readonly AuthorizationTokenProvider tokenProvider; private readonly CosmosHttpClient httpClient; - private readonly CancellationTokenSource serviceCancellationToken = new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryServiceTimeOut); internal ClientTelemetryProcessor(CosmosHttpClient httpClient, AuthorizationTokenProvider tokenProvider) { @@ -41,6 +40,7 @@ internal async Task ProcessAndSendAsync( ConcurrentDictionary cacheRefreshInfoSnapshot, IReadOnlyList requestInfoSnapshot) { + CancellationTokenSource serviceCancellationToken = new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryServiceTimeOut); try { await ClientTelemetryPayloadWriter.SerializedPayloadChunksAsync( @@ -48,10 +48,12 @@ await ClientTelemetryPayloadWriter.SerializedPayloadChunksAsync( operationInfoSnapshot: operationInfoSnapshot, cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, sampledRequestInfo: requestInfoSnapshot, - callback: async (payload) => await this.SendAsync(clientTelemetryInfo.GlobalDatabaseAccountName, payload, this.serviceCancellationToken.Token)); + callback: async (payload) => await this.SendAsync(clientTelemetryInfo.GlobalDatabaseAccountName, payload, serviceCancellationToken.Token)); } catch (Exception ex) { + serviceCancellationToken.Dispose(); + DefaultTrace.TraceError($"Exception while serializing telemetry payload: {ex}"); throw; } From f7cda908c428544b53bf47afd88e546086f512f3 Mon Sep 17 00:00:00 2001 From: Sourabh Jain Date: Tue, 4 Apr 2023 03:28:22 +0530 Subject: [PATCH 05/13] create task with timeout --- .../src/Telemetry/ClientTelemetry.cs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs index 02f50fe933..0e3c050ca6 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs @@ -174,13 +174,15 @@ ConcurrentDictionary cacheRefreshInfo try { + CancellationTokenSource cancellationToken = new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut); + // Initiating Telemetry Data Processor task which will serialize and send telemetry information to Client Telemetry Service service this.processorTask = Task.Run(() => ClientTelemetry.RunProcessorTaskAsync(this.processor .ProcessAndSendAsync( clientTelemetryInfo: this.clientTelemetryInfo, operationInfoSnapshot: operationInfoSnapshot, cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, - requestInfoSnapshot: requestInfoSnapshot)), new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut).Token); + requestInfoSnapshot: requestInfoSnapshot)), cancellationToken.Token); } catch (Exception ex) { @@ -200,7 +202,12 @@ private static async Task RunProcessorTaskAsync(Task processingTask) { try { - await processingTask; + Task completedTask = await Task.WhenAny(processingTask, Task.Delay(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut)); + + if (completedTask != processingTask) + { + throw new TimeoutException($"Telemetry Processor Task timed out after {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut.TotalMilliseconds}ms"); + } } catch (Exception ex) { From aa438408a37c90c42e07d632142fd160b5f07461 Mon Sep 17 00:00:00 2001 From: Sourabh Jain Date: Tue, 4 Apr 2023 03:42:39 +0530 Subject: [PATCH 06/13] fix test --- .../Telemetry/ClientTelemetryTests.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs index 4a6b077baa..d3d38aed63 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs @@ -245,8 +245,7 @@ await processor.ProcessAndSendAsync( clientTelemetryProperties, operationInfoSnapshot, cacheRefreshInfoSnapshot, - requestInfoList, - new CancellationToken()); + requestInfoList); Assert.AreEqual(expectedOperationInfoSize, actualOperationInfoSize, "Operation Info is not correct"); Assert.AreEqual(expectedCacheRefreshInfoSize, actualCacheRefreshInfoSize, "Cache Refresh Info is not correct"); From 58ee92fb10652c5ce6e6d7df90c004e7beb8dce0 Mon Sep 17 00:00:00 2001 From: Sourabh Jain Date: Wed, 5 Apr 2023 01:48:37 +0530 Subject: [PATCH 07/13] code refactoring --- .../src/Telemetry/ClientTelemetry.cs | 43 +++++++++---------- .../src/Telemetry/ClientTelemetryOptions.cs | 4 +- .../src/Telemetry/ClientTelemetryProcessor.cs | 7 ++- .../Telemetry/ClientTelemetryTests.cs | 39 ++++++++++++++++- 4 files changed, 65 insertions(+), 28 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs index 0e3c050ca6..1bce6efdb2 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs @@ -40,9 +40,7 @@ internal class ClientTelemetry : IDisposable private readonly GlobalEndpointManager globalEndpointManager; private Task telemetryTask; - // Not disposing this task. If we dispose a client then, telemetry job(telemetryTask) should stop but processor task(processorTask) should make best effort to finish the job in background. - private Task processorTask; - + private ConcurrentDictionary operationInfoMap = new ConcurrentDictionary(); @@ -54,6 +52,7 @@ private ConcurrentDictionary cacheRef /// internal ClientTelemetry() { + this.cancellationTokenSource = new CancellationTokenSource(); } /// @@ -175,14 +174,16 @@ ConcurrentDictionary cacheRefreshInfo try { CancellationTokenSource cancellationToken = new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut); - - // Initiating Telemetry Data Processor task which will serialize and send telemetry information to Client Telemetry Service service - this.processorTask = Task.Run(() => ClientTelemetry.RunProcessorTaskAsync(this.processor - .ProcessAndSendAsync( - clientTelemetryInfo: this.clientTelemetryInfo, - operationInfoSnapshot: operationInfoSnapshot, - cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, - requestInfoSnapshot: requestInfoSnapshot)), cancellationToken.Token); + Task processorTask = Task.Run(() => this.processor + .ProcessAndSendAsync( + clientTelemetryInfo: this.clientTelemetryInfo, + operationInfoSnapshot: operationInfoSnapshot, + cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, + requestInfoSnapshot: requestInfoSnapshot), cancellationToken.Token); + + // Initiating Telemetry Data Processor task which will serialize and send telemetry information to Client Telemetry Service + // Not disposing this task. If we dispose a client then, telemetry job(telemetryTask) should stop but processor task(processorTask) should make best effort to finish the job in background. + _ = ClientTelemetry.RunProcessorTaskAsync(this.clientTelemetryInfo.DateTimeUtc, processorTask); } catch (Exception ex) { @@ -198,20 +199,18 @@ ConcurrentDictionary cacheRefreshInfo DefaultTrace.TraceInformation("Telemetry Job Stopped."); } - private static async Task RunProcessorTaskAsync(Task processingTask) + internal static async Task RunProcessorTaskAsync(string telemetryDate, Task processingTask) { - try + Task[] tasks = new Task[] { Task.Delay(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut), processingTask }; + Task completedTask = await Task.WhenAny(tasks); + + if (!object.ReferenceEquals(completedTask, processingTask)) { - Task completedTask = await Task.WhenAny(processingTask, Task.Delay(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut)); + DefaultTrace.TraceError($"Telemetry Processor Task timed out after {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut.TotalMilliseconds}ms with telemetry date as {telemetryDate}"); + + processingTask.Dispose(); - if (completedTask != processingTask) - { - throw new TimeoutException($"Telemetry Processor Task timed out after {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut.TotalMilliseconds}ms"); - } - } - catch (Exception ex) - { - DefaultTrace.TraceError($"Client Telemetry data processing task faulted. {0}", ex); + throw new TimeoutException($"Telemetry Processor Task timed out after {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut.TotalMilliseconds}ms with telemetry date as {telemetryDate}"); } } diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs index 1df38194f7..f4ab9c70d2 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs @@ -87,7 +87,6 @@ internal static class ClientTelemetryOptions internal const string EnvPropsClientTelemetryEnvironmentName = "COSMOS.ENVIRONMENT_NAME"; internal static readonly TimeSpan ClientTelemetryServiceTimeOut = TimeSpan.FromMinutes(1); - internal static readonly TimeSpan ClientTelemetryProcessorTimeOut = TimeSpan.FromMinutes(5); // 5 minutes internal static readonly ResourceType AllowedResourceTypes = ResourceType.Document; // Why 5 sec? As of now, if any network request is taking more than 5 millisecond sec, we will consider it slow request this value can be revisited in future internal static readonly TimeSpan NetworkLatencyThreshold = TimeSpan.FromMilliseconds(5); @@ -103,7 +102,8 @@ internal static class ClientTelemetryOptions internal static readonly int NetworkTelemetrySampleSize = 200; internal static int PayloadSizeThreshold = 1024 * 1024 * 2; // 2MB - + internal static TimeSpan ClientTelemetryProcessorTimeOut = TimeSpan.FromMinutes(5); // 5 minutes + private static Uri clientTelemetryEndpoint; private static string environmentName; private static TimeSpan scheduledTimeSpan = TimeSpan.Zero; diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs index 15c45369b3..2ba8d24a6b 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos.Telemetry using System; using System.Collections.Concurrent; using System.Collections.Generic; + using System.Diagnostics; using System.Net.Http; using System.Text; using System.Threading; @@ -52,11 +53,13 @@ await ClientTelemetryPayloadWriter.SerializedPayloadChunksAsync( } catch (Exception ex) { - serviceCancellationToken.Dispose(); - DefaultTrace.TraceError($"Exception while serializing telemetry payload: {ex}"); throw; } + finally + { + serviceCancellationToken.Dispose(); + } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs index d3d38aed63..7cfe8efac9 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs @@ -147,7 +147,6 @@ public async Task CheckIfPayloadIsDividedCorrectlyAsync(int expectedOperationInf string payloadJson = request.Content.ReadAsStringAsync().Result; Assert.IsTrue(payloadJson.Length <= ClientTelemetryOptions.PayloadSizeThreshold, "Payload Size is " + payloadJson.Length); - Console.WriteLine(payloadJson); ClientTelemetryProperties propertiesToSend = JsonConvert.DeserializeObject(payloadJson); Assert.AreEqual(7, propertiesToSend.SystemInfo.Count, "System Info is not correct"); @@ -251,7 +250,43 @@ await processor.ProcessAndSendAsync( Assert.AreEqual(expectedCacheRefreshInfoSize, actualCacheRefreshInfoSize, "Cache Refresh Info is not correct"); Assert.AreEqual(expectedRequestInfoSize, actualRequestInfoSize, "Request Info is not correct"); } - + + [TestMethod] + public async Task ClientTelmetryProcessor_should_TimeOut() + { + Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEndpoint, "http://dummy.telemetry.endpoint/"); + ClientTelemetryOptions.ClientTelemetryProcessorTimeOut = TimeSpan.FromTicks(1); + + string data = File.ReadAllText("Telemetry/ClientTelemetryPayloadWithoutMetrics.json", Encoding.UTF8); + ClientTelemetryProperties clientTelemetryProperties = JsonConvert.DeserializeObject(data); + + Mock mockHttpHandler = new Mock(); + _ = mockHttpHandler.Setup(x => x.SendAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK))); + + ClientTelemetryProcessor processor = new ClientTelemetryProcessor( + MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(new HttpHandlerHelper(mockHttpHandler.Object))), + Mock.Of()); + try + { + Task processorTask = Task.Run(() => processor.ProcessAndSendAsync( + clientTelemetryProperties, + default, + default, + default)); + + await ClientTelemetry.RunProcessorTaskAsync(DateTime.Now.ToString(), processorTask); + + Assert.Fail("Expected TimeoutException"); + } + catch(TimeoutException ex) + { + Assert.IsTrue(ex is TimeoutException, "TimeoutException is not thrown"); + } + } + [TestMethod] [ExpectedException(typeof(FormatException))] public void CheckMisconfiguredTelemetry_should_fail() From 6e397a2f6d798d5654b6caaffe6a8f26d0c76cbf Mon Sep 17 00:00:00 2001 From: Sourabh Jain Date: Wed, 5 Apr 2023 18:23:38 +0530 Subject: [PATCH 08/13] fix timeout code --- .../src/Telemetry/ClientTelemetry.cs | 37 +++++++++---------- .../Telemetry/ClientTelemetryPayloadWriter.cs | 17 +++++++++ .../src/Telemetry/ClientTelemetryProcessor.cs | 9 ++++- .../Telemetry/ClientTelemetryTests.cs | 8 ++-- 4 files changed, 47 insertions(+), 24 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs index 1bce6efdb2..eda6379d00 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs @@ -148,7 +148,7 @@ private async Task EnrichAndSendAsync() AccountProperties accountProperties = await ClientTelemetryHelper.SetAccountNameAsync(this.globalEndpointManager); this.clientTelemetryInfo.GlobalDatabaseAccountName = accountProperties.Id; } - + await Task.Delay(observingWindow, this.cancellationTokenSource.Token); this.clientTelemetryInfo.DateTimeUtc = DateTime.UtcNow.ToString(ClientTelemetryOptions.DateFormat); @@ -170,24 +170,24 @@ ConcurrentDictionary cacheRefreshInfo = Interlocked.Exchange(ref this.cacheRefreshInfoMap, new ConcurrentDictionary()); List requestInfoSnapshot = this.networkDataRecorder.GetRequests(); - + try { CancellationTokenSource cancellationToken = new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut); - Task processorTask = Task.Run(() => this.processor - .ProcessAndSendAsync( - clientTelemetryInfo: this.clientTelemetryInfo, - operationInfoSnapshot: operationInfoSnapshot, - cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, - requestInfoSnapshot: requestInfoSnapshot), cancellationToken.Token); // Initiating Telemetry Data Processor task which will serialize and send telemetry information to Client Telemetry Service // Not disposing this task. If we dispose a client then, telemetry job(telemetryTask) should stop but processor task(processorTask) should make best effort to finish the job in background. - _ = ClientTelemetry.RunProcessorTaskAsync(this.clientTelemetryInfo.DateTimeUtc, processorTask); + _ = Task.Run(() => ClientTelemetry + .RunProcessorTaskAsync(this.processor.ProcessAndSendAsync( + clientTelemetryInfo: this.clientTelemetryInfo, + operationInfoSnapshot: operationInfoSnapshot, + cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, + requestInfoSnapshot: requestInfoSnapshot, + cancellationToken: cancellationToken)), cancellationToken.Token); } catch (Exception ex) { - DefaultTrace.TraceError("Telemetry Job Processor failed to run with error : {0}", ex); + DefaultTrace.TraceError("Exception while processing the data : {0} with telemetry date as {1}", ex.Message, this.clientTelemetryInfo.DateTimeUtc); } } } @@ -199,18 +199,15 @@ ConcurrentDictionary cacheRefreshInfo DefaultTrace.TraceInformation("Telemetry Job Stopped."); } - internal static async Task RunProcessorTaskAsync(string telemetryDate, Task processingTask) + internal static async Task RunProcessorTaskAsync(Task processingTask) { - Task[] tasks = new Task[] { Task.Delay(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut), processingTask }; - Task completedTask = await Task.WhenAny(tasks); - - if (!object.ReferenceEquals(completedTask, processingTask)) + try { - DefaultTrace.TraceError($"Telemetry Processor Task timed out after {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut.TotalMilliseconds}ms with telemetry date as {telemetryDate}"); - - processingTask.Dispose(); - - throw new TimeoutException($"Telemetry Processor Task timed out after {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut.TotalMilliseconds}ms with telemetry date as {telemetryDate}"); + await processingTask; + } + finally + { + processingTask.Dispose(); // release all resources } } diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs index 370d576fcc..6bc10069f7 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs @@ -10,6 +10,7 @@ namespace Microsoft.Azure.Cosmos.Telemetry using System.IO; using System.Linq; using System.Text; + using System.Threading; using System.Threading.Tasks; using HdrHistogram; using Microsoft.Azure.Cosmos.Telemetry.Models; @@ -22,6 +23,7 @@ public static async Task SerializedPayloadChunksAsync( ConcurrentDictionary operationInfoSnapshot, ConcurrentDictionary cacheRefreshInfoSnapshot, IReadOnlyList sampledRequestInfo, + CancellationToken cancellationToken, Func callback) { if (properties == null) @@ -37,6 +39,11 @@ public static async Task SerializedPayloadChunksAsync( { foreach (KeyValuePair entry in operationInfoSnapshot) { + if (cancellationToken.IsCancellationRequested) + { + throw new TimeoutException("Operation data Processing is cancelled due to timeout"); + } + long lengthNow = stringBuilder.Length; OperationInfo payloadForLatency = entry.Key; @@ -75,6 +82,11 @@ public static async Task SerializedPayloadChunksAsync( foreach (KeyValuePair entry in cacheRefreshInfoSnapshot) { + if (cancellationToken.IsCancellationRequested) + { + throw new TimeoutException("Cache data Processing is cancelled due to timeout"); + } + long lengthNow = stringBuilder.Length; CacheRefreshInfo payloadForLatency = entry.Key; @@ -106,6 +118,11 @@ public static async Task SerializedPayloadChunksAsync( foreach (RequestInfo entry in sampledRequestInfo) { + if (cancellationToken.IsCancellationRequested) + { + throw new TimeoutException("Request data Processing is cancelled due to timeout"); + } + long lengthNow = stringBuilder.Length; string latencyMetrics = JsonConvert.SerializeObject(entry); diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs index 2ba8d24a6b..ac3056ce86 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs @@ -39,16 +39,23 @@ internal async Task ProcessAndSendAsync( ClientTelemetryProperties clientTelemetryInfo, ConcurrentDictionary operationInfoSnapshot, ConcurrentDictionary cacheRefreshInfoSnapshot, - IReadOnlyList requestInfoSnapshot) + IReadOnlyList requestInfoSnapshot, + CancellationTokenSource cancellationToken) { CancellationTokenSource serviceCancellationToken = new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryServiceTimeOut); try { + if (cancellationToken.IsCancellationRequested) + { + throw new TimeoutException("Processing is cancelled due to timeout"); + } + await ClientTelemetryPayloadWriter.SerializedPayloadChunksAsync( properties: clientTelemetryInfo, operationInfoSnapshot: operationInfoSnapshot, cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, sampledRequestInfo: requestInfoSnapshot, + cancellationToken: cancellationToken.Token, callback: async (payload) => await this.SendAsync(clientTelemetryInfo.GlobalDatabaseAccountName, payload, serviceCancellationToken.Token)); } catch (Exception ex) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs index 7cfe8efac9..ed23faaad8 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs @@ -244,7 +244,8 @@ await processor.ProcessAndSendAsync( clientTelemetryProperties, operationInfoSnapshot, cacheRefreshInfoSnapshot, - requestInfoList); + requestInfoList, + new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut)); Assert.AreEqual(expectedOperationInfoSize, actualOperationInfoSize, "Operation Info is not correct"); Assert.AreEqual(expectedCacheRefreshInfoSize, actualCacheRefreshInfoSize, "Cache Refresh Info is not correct"); @@ -275,9 +276,10 @@ public async Task ClientTelmetryProcessor_should_TimeOut() clientTelemetryProperties, default, default, - default)); + default, + new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut))); - await ClientTelemetry.RunProcessorTaskAsync(DateTime.Now.ToString(), processorTask); + await ClientTelemetry.RunProcessorTaskAsync(processorTask); Assert.Fail("Expected TimeoutException"); } From f9e529944e0c71a7f5024edcf47d3dd631ceaa51 Mon Sep 17 00:00:00 2001 From: Sourabh Jain Date: Thu, 6 Apr 2023 11:22:19 +0530 Subject: [PATCH 09/13] space fix --- Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs | 1 - Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs | 1 - 2 files changed, 2 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs index eda6379d00..ba00f9c56f 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs @@ -36,7 +36,6 @@ internal class ClientTelemetry : IDisposable private readonly NetworkDataRecorder networkDataRecorder; private readonly CancellationTokenSource cancellationTokenSource; - private readonly GlobalEndpointManager globalEndpointManager; private Task telemetryTask; diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs index f4ab9c70d2..e619972d9f 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs @@ -79,7 +79,6 @@ internal static class ClientTelemetryOptions internal const double Percentile99 = 99.0; internal const double Percentile999 = 99.9; internal const string DateFormat = "yyyy-MM-ddTHH:mm:ssZ"; - internal const string EnvPropsClientTelemetrySchedulingInSeconds = "COSMOS.CLIENT_TELEMETRY_SCHEDULING_IN_SECONDS"; internal const string EnvPropsClientTelemetryEnabled = "COSMOS.CLIENT_TELEMETRY_ENABLED"; internal const string EnvPropsClientTelemetryVmMetadataUrl = "COSMOS.VM_METADATA_URL"; From 583354465e6c2e7cbce008b34741c237a4f9bba1 Mon Sep 17 00:00:00 2001 From: Sourabh Jain Date: Thu, 6 Apr 2023 22:06:19 +0530 Subject: [PATCH 10/13] not failing if processor is taking time --- .../Telemetry/ClientTelemetryPayloadWriter.cs | 13 ++- .../src/Telemetry/ClientTelemetryProcessor.cs | 5 +- .../Telemetry/ClientTelemetryTests.cs | 107 +++++++++++++++--- 3 files changed, 105 insertions(+), 20 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs index 6bc10069f7..7b03262fca 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs @@ -13,6 +13,7 @@ namespace Microsoft.Azure.Cosmos.Telemetry using System.Threading; using System.Threading.Tasks; using HdrHistogram; + using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Cosmos.Telemetry.Models; using Newtonsoft.Json; @@ -41,7 +42,9 @@ public static async Task SerializedPayloadChunksAsync( { if (cancellationToken.IsCancellationRequested) { - throw new TimeoutException("Operation data Processing is cancelled due to timeout"); + DefaultTrace.TraceError($"Client Telemetry Processor took more than {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut} to process the data. Skipped while processing operation data."); + // if it took more than allowed time, then go ahead with the data whatever is processed + continue; } long lengthNow = stringBuilder.Length; @@ -84,7 +87,9 @@ public static async Task SerializedPayloadChunksAsync( { if (cancellationToken.IsCancellationRequested) { - throw new TimeoutException("Cache data Processing is cancelled due to timeout"); + DefaultTrace.TraceError($"Client Telemetry Processor took more than {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut} to process the data. Skipped while processing cache refresh data."); + // if it took more than allowed time, then go ahead with the data whatever is processed + continue; } long lengthNow = stringBuilder.Length; @@ -120,7 +125,9 @@ public static async Task SerializedPayloadChunksAsync( { if (cancellationToken.IsCancellationRequested) { - throw new TimeoutException("Request data Processing is cancelled due to timeout"); + DefaultTrace.TraceError($"Client Telemetry Processor took more than {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut} to process the data. Skipped while processing RequestInfo data."); + // if it took more than allowed time, then go ahead with the data whatever is processed + continue; } long lengthNow = stringBuilder.Length; diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs index ac3056ce86..86bac6f97c 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs @@ -45,10 +45,7 @@ internal async Task ProcessAndSendAsync( CancellationTokenSource serviceCancellationToken = new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryServiceTimeOut); try { - if (cancellationToken.IsCancellationRequested) - { - throw new TimeoutException("Processing is cancelled due to timeout"); - } + cancellationToken.Token.ThrowIfCancellationRequested(); await ClientTelemetryPayloadWriter.SerializedPayloadChunksAsync( properties: clientTelemetryInfo, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs index ed23faaad8..7a09c4eb76 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs @@ -253,40 +253,121 @@ await processor.ProcessAndSendAsync( } [TestMethod] - public async Task ClientTelmetryProcessor_should_TimeOut() + [DataRow(1)] // 1 tick smallest value for a timespan to get timeout exception + [DataRow(10000)] // 1/2 Millisecond + public async Task ClientTelmetryProcessor_should_timeOut_or_skip_records(int timeInTicks) { Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEndpoint, "http://dummy.telemetry.endpoint/"); - ClientTelemetryOptions.ClientTelemetryProcessorTimeOut = TimeSpan.FromTicks(1); + ClientTelemetryOptions.ClientTelemetryProcessorTimeOut = TimeSpan.FromTicks(timeInTicks); string data = File.ReadAllText("Telemetry/ClientTelemetryPayloadWithoutMetrics.json", Encoding.UTF8); ClientTelemetryProperties clientTelemetryProperties = JsonConvert.DeserializeObject(data); + + ConcurrentDictionary operationInfoSnapshot + = new ConcurrentDictionary(); + int actualOperationInfoSize = 0; + int actualCacheRefreshInfoSize = 0; + Mock mockHttpHandler = new Mock(); _ = mockHttpHandler.Setup(x => x.SendAsync( It.IsAny(), It.IsAny())) + .Callback( + (request, cancellationToken) => + { + string payloadJson = request.Content.ReadAsStringAsync().Result; + Assert.IsTrue(payloadJson.Length <= ClientTelemetryOptions.PayloadSizeThreshold, "Payload Size is " + payloadJson.Length); + + ClientTelemetryProperties propertiesToSend = JsonConvert.DeserializeObject(payloadJson); + + actualOperationInfoSize += propertiesToSend.OperationInfo?.Count ?? 0; + actualCacheRefreshInfoSize += propertiesToSend.CacheRefreshInfo?.Count ?? 0; + }) .Returns(Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK))); ClientTelemetryProcessor processor = new ClientTelemetryProcessor( MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(new HttpHandlerHelper(mockHttpHandler.Object))), Mock.Of()); - try + + for (int i = 0; i < 20; i++) { - Task processorTask = Task.Run(() => processor.ProcessAndSendAsync( - clientTelemetryProperties, - default, - default, - default, - new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut))); + OperationInfo opeInfo = new OperationInfo(Regions.WestUS, + 0, + Documents.ConsistencyLevel.Session.ToString(), + "databaseName" + i, + "containerName", + Documents.OperationType.Read, + Documents.ResourceType.Document, + 200, + 0); - await ClientTelemetry.RunProcessorTaskAsync(processorTask); + LongConcurrentHistogram latency = new LongConcurrentHistogram(ClientTelemetryOptions.RequestLatencyMin, + ClientTelemetryOptions.RequestLatencyMax, + ClientTelemetryOptions.RequestLatencyPrecision); + latency.RecordValue(10); - Assert.Fail("Expected TimeoutException"); + LongConcurrentHistogram requestcharge = new LongConcurrentHistogram(ClientTelemetryOptions.RequestChargeMin, + ClientTelemetryOptions.RequestChargeMax, + ClientTelemetryOptions.RequestChargePrecision); + requestcharge.RecordValue(11); + + operationInfoSnapshot.TryAdd(opeInfo, (latency, requestcharge)); } - catch(TimeoutException ex) + + ConcurrentDictionary cacheRefreshInfoSnapshot + = new ConcurrentDictionary(); + for (int i = 0; i < 10; i++) { - Assert.IsTrue(ex is TimeoutException, "TimeoutException is not thrown"); + CacheRefreshInfo crInfo = new CacheRefreshInfo(Regions.WestUS, + 10, + Documents.ConsistencyLevel.Session.ToString(), + "databaseName" + i, + "containerName", + Documents.OperationType.Read, + Documents.ResourceType.Document, + 200, + 1002, + "dummycache"); + + LongConcurrentHistogram latency = new LongConcurrentHistogram(ClientTelemetryOptions.RequestLatencyMin, + ClientTelemetryOptions.RequestLatencyMax, + ClientTelemetryOptions.RequestLatencyPrecision); + latency.RecordValue(10); + + cacheRefreshInfoSnapshot.TryAdd(crInfo, latency); } + + try + { + await processor.ProcessAndSendAsync( + clientTelemetryProperties, + operationInfoSnapshot, + cacheRefreshInfoSnapshot, + default, + new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut)); + if (timeInTicks > 1) + { + Assert.IsTrue(actualOperationInfoSize > 0, "actualOperationInfoSize is " + actualOperationInfoSize); + Assert.AreEqual(0, actualCacheRefreshInfoSize, "Cache Refresh Info is not correct"); + } + else + { + Assert.Fail("should have thrown exception"); + } + } + catch (OperationCanceledException ex) + { + if (timeInTicks == 1) + { + Assert.IsTrue(ex is OperationCanceledException); + } + else + { + Assert.Fail("should not throw exception"); + } + } + } [TestMethod] From 74bec8f5eb149ec0838c9c39094e24a0fccb1031 Mon Sep 17 00:00:00 2001 From: Sourabh Jain Date: Fri, 7 Apr 2023 02:32:17 +0530 Subject: [PATCH 11/13] fix procrsser test --- .../src/Telemetry/ClientTelemetry.cs | 45 +++++++++++----- .../Telemetry/ClientTelemetryPayloadWriter.cs | 22 -------- .../src/Telemetry/ClientTelemetryProcessor.cs | 14 ++--- .../Telemetry/ClientTelemetryTests.cs | 52 +++++++------------ 4 files changed, 54 insertions(+), 79 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs index ba00f9c56f..0059da6eb9 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs @@ -173,16 +173,18 @@ ConcurrentDictionary cacheRefreshInfo try { CancellationTokenSource cancellationToken = new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut); + Task processorTask = Task.Run(() => this.processor + .ProcessAndSendAsync( + clientTelemetryInfo: this.clientTelemetryInfo, + operationInfoSnapshot: operationInfoSnapshot, + cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, + requestInfoSnapshot: requestInfoSnapshot, + processorCancelToken: cancellationToken), cancellationToken.Token); // Initiating Telemetry Data Processor task which will serialize and send telemetry information to Client Telemetry Service // Not disposing this task. If we dispose a client then, telemetry job(telemetryTask) should stop but processor task(processorTask) should make best effort to finish the job in background. - _ = Task.Run(() => ClientTelemetry - .RunProcessorTaskAsync(this.processor.ProcessAndSendAsync( - clientTelemetryInfo: this.clientTelemetryInfo, - operationInfoSnapshot: operationInfoSnapshot, - cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, - requestInfoSnapshot: requestInfoSnapshot, - cancellationToken: cancellationToken)), cancellationToken.Token); + _ = ClientTelemetry.RunProcessorTaskAsync(this.clientTelemetryInfo.DateTimeUtc, processorTask, ClientTelemetryOptions.ClientTelemetryProcessorTimeOut); + } catch (Exception ex) { @@ -198,15 +200,30 @@ ConcurrentDictionary cacheRefreshInfo DefaultTrace.TraceInformation("Telemetry Job Stopped."); } - internal static async Task RunProcessorTaskAsync(Task processingTask) + /// + /// This Task makes sure, processing task is timing out after 5 minute of timeout + /// + /// + /// + /// + internal static async Task RunProcessorTaskAsync(string telemetryDate, Task processingTask, TimeSpan timeout) { - try - { - await processingTask; - } - finally + using (CancellationTokenSource tokenForDelayTask = new CancellationTokenSource()) { - processingTask.Dispose(); // release all resources + Task delayTask = Task.Delay(timeout, tokenForDelayTask.Token); + + Task resultTask = await Task.WhenAny(processingTask, delayTask); + if (resultTask == delayTask) + { + DefaultTrace.TraceVerbose($"Processor task with date as {telemetryDate} is cancelled as it did not finish in {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut} milliseconds."); + // Operation cancelled + throw new TimeoutException($"Processor task with date as {telemetryDate} is cancelled as it did not finish in {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut} milliseconds."); + } + else + { + // Cancel the timer task so that it does not fire + tokenForDelayTask.Cancel(); + } } } diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs index 7b03262fca..a0207a6a19 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs @@ -24,7 +24,6 @@ public static async Task SerializedPayloadChunksAsync( ConcurrentDictionary operationInfoSnapshot, ConcurrentDictionary cacheRefreshInfoSnapshot, IReadOnlyList sampledRequestInfo, - CancellationToken cancellationToken, Func callback) { if (properties == null) @@ -40,13 +39,6 @@ public static async Task SerializedPayloadChunksAsync( { foreach (KeyValuePair entry in operationInfoSnapshot) { - if (cancellationToken.IsCancellationRequested) - { - DefaultTrace.TraceError($"Client Telemetry Processor took more than {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut} to process the data. Skipped while processing operation data."); - // if it took more than allowed time, then go ahead with the data whatever is processed - continue; - } - long lengthNow = stringBuilder.Length; OperationInfo payloadForLatency = entry.Key; @@ -85,13 +77,6 @@ public static async Task SerializedPayloadChunksAsync( foreach (KeyValuePair entry in cacheRefreshInfoSnapshot) { - if (cancellationToken.IsCancellationRequested) - { - DefaultTrace.TraceError($"Client Telemetry Processor took more than {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut} to process the data. Skipped while processing cache refresh data."); - // if it took more than allowed time, then go ahead with the data whatever is processed - continue; - } - long lengthNow = stringBuilder.Length; CacheRefreshInfo payloadForLatency = entry.Key; @@ -123,13 +108,6 @@ public static async Task SerializedPayloadChunksAsync( foreach (RequestInfo entry in sampledRequestInfo) { - if (cancellationToken.IsCancellationRequested) - { - DefaultTrace.TraceError($"Client Telemetry Processor took more than {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut} to process the data. Skipped while processing RequestInfo data."); - // if it took more than allowed time, then go ahead with the data whatever is processed - continue; - } - long lengthNow = stringBuilder.Length; string latencyMetrics = JsonConvert.SerializeObject(entry); diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs index 86bac6f97c..0c56268640 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs @@ -40,30 +40,24 @@ internal async Task ProcessAndSendAsync( ConcurrentDictionary operationInfoSnapshot, ConcurrentDictionary cacheRefreshInfoSnapshot, IReadOnlyList requestInfoSnapshot, - CancellationTokenSource cancellationToken) + CancellationTokenSource processorCancelToken) { - CancellationTokenSource serviceCancellationToken = new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryServiceTimeOut); try { - cancellationToken.Token.ThrowIfCancellationRequested(); - + using CancellationTokenSource cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(processorCancelToken.Token); + await ClientTelemetryPayloadWriter.SerializedPayloadChunksAsync( properties: clientTelemetryInfo, operationInfoSnapshot: operationInfoSnapshot, cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, sampledRequestInfo: requestInfoSnapshot, - cancellationToken: cancellationToken.Token, - callback: async (payload) => await this.SendAsync(clientTelemetryInfo.GlobalDatabaseAccountName, payload, serviceCancellationToken.Token)); + callback: async (payload) => await this.SendAsync(clientTelemetryInfo.GlobalDatabaseAccountName, payload, cancellationToken.Token)); } catch (Exception ex) { DefaultTrace.TraceError($"Exception while serializing telemetry payload: {ex}"); throw; } - finally - { - serviceCancellationToken.Dispose(); - } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs index 7a09c4eb76..0d17482708 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs @@ -254,18 +254,14 @@ await processor.ProcessAndSendAsync( [TestMethod] [DataRow(1)] // 1 tick smallest value for a timespan to get timeout exception - [DataRow(10000)] // 1/2 Millisecond - public async Task ClientTelmetryProcessor_should_timeOut_or_skip_records(int timeInTicks) + public async Task ClientTelmetryProcessor_should_timeout(int timeOutInTicks) { Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEndpoint, "http://dummy.telemetry.endpoint/"); - ClientTelemetryOptions.ClientTelemetryProcessorTimeOut = TimeSpan.FromTicks(timeInTicks); + ClientTelemetryOptions.ClientTelemetryProcessorTimeOut = TimeSpan.FromTicks(timeOutInTicks); string data = File.ReadAllText("Telemetry/ClientTelemetryPayloadWithoutMetrics.json", Encoding.UTF8); ClientTelemetryProperties clientTelemetryProperties = JsonConvert.DeserializeObject(data); - - ConcurrentDictionary operationInfoSnapshot - = new ConcurrentDictionary(); - + int actualOperationInfoSize = 0; int actualCacheRefreshInfoSize = 0; @@ -289,7 +285,10 @@ public async Task ClientTelmetryProcessor_should_timeOut_or_skip_records(int tim ClientTelemetryProcessor processor = new ClientTelemetryProcessor( MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(new HttpHandlerHelper(mockHttpHandler.Object))), Mock.Of()); - + + ConcurrentDictionary operationInfoSnapshot + = new ConcurrentDictionary(); + for (int i = 0; i < 20; i++) { OperationInfo opeInfo = new OperationInfo(Regions.WestUS, @@ -340,34 +339,21 @@ ConcurrentDictionary cacheRefreshInfo try { - await processor.ProcessAndSendAsync( - clientTelemetryProperties, - operationInfoSnapshot, - cacheRefreshInfoSnapshot, - default, - new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut)); - if (timeInTicks > 1) - { - Assert.IsTrue(actualOperationInfoSize > 0, "actualOperationInfoSize is " + actualOperationInfoSize); - Assert.AreEqual(0, actualCacheRefreshInfoSize, "Cache Refresh Info is not correct"); - } - else - { - Assert.Fail("should have thrown exception"); - } + Task processorTask = Task.Run(() => processor.ProcessAndSendAsync( + clientTelemetryProperties, + operationInfoSnapshot, + cacheRefreshInfoSnapshot, + default, + new CancellationTokenSource(TimeSpan.FromSeconds(1)))); + + await ClientTelemetry.RunProcessorTaskAsync(DateTime.Now.ToString(), processorTask, TimeSpan.FromTicks(1)); + + Assert.Fail("should have thrown exception"); } - catch (OperationCanceledException ex) + catch (TimeoutException ex) { - if (timeInTicks == 1) - { - Assert.IsTrue(ex is OperationCanceledException); - } - else - { - Assert.Fail("should not throw exception"); - } + Assert.IsTrue(ex is TimeoutException); } - } [TestMethod] From d96f38c325f0417fe138e3ce2cf40efbb802a34d Mon Sep 17 00:00:00 2001 From: Sourabh Jain Date: Fri, 7 Apr 2023 03:46:27 +0530 Subject: [PATCH 12/13] code refactor --- .../src/Telemetry/ClientTelemetry.cs | 8 ++++---- .../src/Telemetry/ClientTelemetryOptions.cs | 5 ++--- .../src/Telemetry/ClientTelemetryPayloadWriter.cs | 2 -- .../src/Telemetry/ClientTelemetryProcessor.cs | 11 ++++------- .../Telemetry/ClientTelemetryTests.cs | 12 +++++------- 5 files changed, 15 insertions(+), 23 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs index 0059da6eb9..25a98b2be7 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs @@ -179,7 +179,7 @@ ConcurrentDictionary cacheRefreshInfo operationInfoSnapshot: operationInfoSnapshot, cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, requestInfoSnapshot: requestInfoSnapshot, - processorCancelToken: cancellationToken), cancellationToken.Token); + cancellationToken: cancellationToken.Token), cancellationToken.Token); // Initiating Telemetry Data Processor task which will serialize and send telemetry information to Client Telemetry Service // Not disposing this task. If we dispose a client then, telemetry job(telemetryTask) should stop but processor task(processorTask) should make best effort to finish the job in background. @@ -188,7 +188,7 @@ ConcurrentDictionary cacheRefreshInfo } catch (Exception ex) { - DefaultTrace.TraceError("Exception while processing the data : {0} with telemetry date as {1}", ex.Message, this.clientTelemetryInfo.DateTimeUtc); + DefaultTrace.TraceError("Exception while initiating processing task : {0} with telemetry date as {1}", ex.Message, this.clientTelemetryInfo.DateTimeUtc); } } } @@ -215,9 +215,9 @@ internal static async Task RunProcessorTaskAsync(string telemetryDate, Task proc Task resultTask = await Task.WhenAny(processingTask, delayTask); if (resultTask == delayTask) { - DefaultTrace.TraceVerbose($"Processor task with date as {telemetryDate} is cancelled as it did not finish in {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut} milliseconds."); + DefaultTrace.TraceVerbose($"Processor task with date as {telemetryDate} is canceled as it did not finish in {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut}"); // Operation cancelled - throw new TimeoutException($"Processor task with date as {telemetryDate} is cancelled as it did not finish in {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut} milliseconds."); + throw new OperationCanceledException($"Processor task with date as {telemetryDate} is canceled as it did not finish in {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut}"); } else { diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs index e619972d9f..2aeaadca63 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs @@ -84,8 +84,7 @@ internal static class ClientTelemetryOptions internal const string EnvPropsClientTelemetryVmMetadataUrl = "COSMOS.VM_METADATA_URL"; internal const string EnvPropsClientTelemetryEndpoint = "COSMOS.CLIENT_TELEMETRY_ENDPOINT"; internal const string EnvPropsClientTelemetryEnvironmentName = "COSMOS.ENVIRONMENT_NAME"; - - internal static readonly TimeSpan ClientTelemetryServiceTimeOut = TimeSpan.FromMinutes(1); + internal static readonly ResourceType AllowedResourceTypes = ResourceType.Document; // Why 5 sec? As of now, if any network request is taking more than 5 millisecond sec, we will consider it slow request this value can be revisited in future internal static readonly TimeSpan NetworkLatencyThreshold = TimeSpan.FromMilliseconds(5); @@ -101,7 +100,7 @@ internal static class ClientTelemetryOptions internal static readonly int NetworkTelemetrySampleSize = 200; internal static int PayloadSizeThreshold = 1024 * 1024 * 2; // 2MB - internal static TimeSpan ClientTelemetryProcessorTimeOut = TimeSpan.FromMinutes(5); // 5 minutes + internal static TimeSpan ClientTelemetryProcessorTimeOut = TimeSpan.FromMinutes(5); private static Uri clientTelemetryEndpoint; private static string environmentName; diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs index a0207a6a19..370d576fcc 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs @@ -10,10 +10,8 @@ namespace Microsoft.Azure.Cosmos.Telemetry using System.IO; using System.Linq; using System.Text; - using System.Threading; using System.Threading.Tasks; using HdrHistogram; - using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Cosmos.Telemetry.Models; using Newtonsoft.Json; diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs index 0c56268640..c9511cb4e4 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs @@ -7,7 +7,6 @@ namespace Microsoft.Azure.Cosmos.Telemetry using System; using System.Collections.Concurrent; using System.Collections.Generic; - using System.Diagnostics; using System.Net.Http; using System.Text; using System.Threading; @@ -24,7 +23,7 @@ internal class ClientTelemetryProcessor private readonly AuthorizationTokenProvider tokenProvider; private readonly CosmosHttpClient httpClient; - + internal ClientTelemetryProcessor(CosmosHttpClient httpClient, AuthorizationTokenProvider tokenProvider) { this.httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient)); @@ -40,18 +39,16 @@ internal async Task ProcessAndSendAsync( ConcurrentDictionary operationInfoSnapshot, ConcurrentDictionary cacheRefreshInfoSnapshot, IReadOnlyList requestInfoSnapshot, - CancellationTokenSource processorCancelToken) + CancellationToken cancellationToken) { try { - using CancellationTokenSource cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(processorCancelToken.Token); - await ClientTelemetryPayloadWriter.SerializedPayloadChunksAsync( properties: clientTelemetryInfo, operationInfoSnapshot: operationInfoSnapshot, cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot, sampledRequestInfo: requestInfoSnapshot, - callback: async (payload) => await this.SendAsync(clientTelemetryInfo.GlobalDatabaseAccountName, payload, cancellationToken.Token)); + callback: async (payload) => await this.SendAsync(clientTelemetryInfo.GlobalDatabaseAccountName, payload, cancellationToken)); } catch (Exception ex) { @@ -62,7 +59,7 @@ await ClientTelemetryPayloadWriter.SerializedPayloadChunksAsync( } /// - /// Task to send telemetry information to configured Client Telemetry Service endpoint. + /// Task to send telemetry information to configured Juno endpoint. /// If endpoint is not configured then it won't even try to send information. It will just trace an error message. /// In any case it resets the telemetry information to collect the latest one. /// diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs index 0d17482708..3c644d34e6 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs @@ -245,7 +245,7 @@ await processor.ProcessAndSendAsync( operationInfoSnapshot, cacheRefreshInfoSnapshot, requestInfoList, - new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut)); + new CancellationTokenSource(ClientTelemetryOptions.ClientTelemetryProcessorTimeOut).Token); Assert.AreEqual(expectedOperationInfoSize, actualOperationInfoSize, "Operation Info is not correct"); Assert.AreEqual(expectedCacheRefreshInfoSize, actualCacheRefreshInfoSize, "Cache Refresh Info is not correct"); @@ -253,11 +253,9 @@ await processor.ProcessAndSendAsync( } [TestMethod] - [DataRow(1)] // 1 tick smallest value for a timespan to get timeout exception - public async Task ClientTelmetryProcessor_should_timeout(int timeOutInTicks) + public async Task ClientTelmetryProcessor_should_timeout() { Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEndpoint, "http://dummy.telemetry.endpoint/"); - ClientTelemetryOptions.ClientTelemetryProcessorTimeOut = TimeSpan.FromTicks(timeOutInTicks); string data = File.ReadAllText("Telemetry/ClientTelemetryPayloadWithoutMetrics.json", Encoding.UTF8); ClientTelemetryProperties clientTelemetryProperties = JsonConvert.DeserializeObject(data); @@ -344,15 +342,15 @@ ConcurrentDictionary cacheRefreshInfo operationInfoSnapshot, cacheRefreshInfoSnapshot, default, - new CancellationTokenSource(TimeSpan.FromSeconds(1)))); + new CancellationTokenSource(TimeSpan.FromSeconds(1)).Token)); await ClientTelemetry.RunProcessorTaskAsync(DateTime.Now.ToString(), processorTask, TimeSpan.FromTicks(1)); Assert.Fail("should have thrown exception"); } - catch (TimeoutException ex) + catch (OperationCanceledException ex) { - Assert.IsTrue(ex is TimeoutException); + Assert.IsTrue(ex is OperationCanceledException); } } From 62d1f9cb27bc7cbb68b540f92175c320394807bf Mon Sep 17 00:00:00 2001 From: Sourabh Jain Date: Sun, 9 Apr 2023 08:59:04 +0530 Subject: [PATCH 13/13] refactor and test fix --- .../src/Telemetry/ClientTelemetry.cs | 4 +-- .../Telemetry/ClientTelemetryTests.cs | 33 +++++++++---------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs index 25a98b2be7..dde7d5a7c5 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs @@ -215,9 +215,9 @@ internal static async Task RunProcessorTaskAsync(string telemetryDate, Task proc Task resultTask = await Task.WhenAny(processingTask, delayTask); if (resultTask == delayTask) { - DefaultTrace.TraceVerbose($"Processor task with date as {telemetryDate} is canceled as it did not finish in {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut}"); + DefaultTrace.TraceVerbose($"Processor task with date as {telemetryDate} is canceled as it did not finish in {timeout}"); // Operation cancelled - throw new OperationCanceledException($"Processor task with date as {telemetryDate} is canceled as it did not finish in {ClientTelemetryOptions.ClientTelemetryProcessorTimeOut}"); + throw new OperationCanceledException($"Processor task with date as {telemetryDate} is canceled as it did not finish in {timeout}"); } else { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs index 3c644d34e6..9308d22806 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Telemetry/ClientTelemetryTests.cs @@ -30,6 +30,7 @@ public class ClientTelemetryTests public void Cleanup() { Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEnabled, null); + Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEndpoint, null); } [TestMethod] @@ -334,24 +335,22 @@ ConcurrentDictionary cacheRefreshInfo cacheRefreshInfoSnapshot.TryAdd(crInfo, latency); } - - try - { - Task processorTask = Task.Run(() => processor.ProcessAndSendAsync( - clientTelemetryProperties, - operationInfoSnapshot, - cacheRefreshInfoSnapshot, - default, - new CancellationTokenSource(TimeSpan.FromSeconds(1)).Token)); - - await ClientTelemetry.RunProcessorTaskAsync(DateTime.Now.ToString(), processorTask, TimeSpan.FromTicks(1)); - - Assert.Fail("should have thrown exception"); - } - catch (OperationCanceledException ex) + + Task processorTask = Task.Run(async () => { - Assert.IsTrue(ex is OperationCanceledException); - } + CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(1)); + await Task.Delay(1000, cts.Token); // Making this task wait to ensure that processir is taking more time. + await processor.ProcessAndSendAsync(clientTelemetryProperties, + operationInfoSnapshot, + cacheRefreshInfoSnapshot, + default, + cts.Token); + }); + + await Assert.ThrowsExceptionAsync(() => ClientTelemetry.RunProcessorTaskAsync( + telemetryDate: DateTime.Now.ToString(), + processingTask: processorTask, + timeout: TimeSpan.FromTicks(1))); } [TestMethod]