Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Double Buffering for Live Metrics Document. #39828

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using Azure.Monitor.OpenTelemetry.LiveMetrics.Models;

namespace Azure.Monitor.OpenTelemetry.LiveMetrics
{
/// <summary>
/// Manages a thread-safe collection of DocumentIngress objects with a fixed capacity.
/// </summary>
internal class DocumentBuffer
{
private readonly ConcurrentQueue<DocumentIngress> _documents = new();
private readonly int _capacity = 20;
// ConcurrentQueue<T>.Count is not used because it is not an O(1) operation. Instead, we use a separate counter.
// Atomic counter for the number of documents in the queue.
private int _count = 0;

public void Add(DocumentIngress document)
rajkumar-rangaraj marked this conversation as resolved.
Show resolved Hide resolved
{
// Ensure the queue does not exceed capacity.
if (Interlocked.CompareExchange(ref _count, 0, 0) < _capacity)
{
_documents.Enqueue(document);
Interlocked.Increment(ref _count);
}
}

public IEnumerable<DocumentIngress> ReadAllAndClear()
{
// There is no need to decrement the count since we are clearing the queue. After this operation, the instance will not be used anymore.
// The method 'Add' is not called while this method is running; therefore, the count will remain unchanged.
while (_documents.TryDequeue(out DocumentIngress item))
{
yield return item;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.Threading;
using Azure.Monitor.OpenTelemetry.LiveMetrics.Models;

namespace Azure.Monitor.OpenTelemetry.LiveMetrics
{
/// <summary>
/// Implements a double buffering mechanism for handling DocumentIngress objects.
/// This allows for concurrent writes to one buffer while the other can be read from or processed.
/// The 'WriteDocument' method is used to add documents to the current active buffer.
/// The 'FlipDocumentBuffers' method swaps the current buffer with a new one, allowing the
/// consumer to process the documents in the returned buffer without interference from ongoing writes.
/// </summary>
internal class DoubleBuffer
{
private DocumentBuffer _currentBuffer = new();

public void WriteDocument(DocumentIngress document)
{
_currentBuffer.Add(document);
}

public DocumentBuffer FlipDocumentBuffers()
{
// Atomically exchange the current buffer with a new empty buffer and return the old buffer
return Interlocked.Exchange(ref _currentBuffer, new DocumentBuffer());
rajkumar-rangaraj marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using Azure.Monitor.OpenTelemetry.LiveMetrics.Models;
using OpenTelemetry;
using OpenTelemetry.Metrics;

Expand All @@ -9,18 +10,28 @@ namespace Azure.Monitor.OpenTelemetry.LiveMetrics
internal sealed class LiveMetricsExporter : BaseExporter<Metric>
{
private readonly string _instrumentationKey;
private readonly DoubleBuffer _doubleBuffer;
private LiveMetricsResource? _resource;
private bool _disposed;

public LiveMetricsExporter(LiveMetricsExporterOptions options)
public LiveMetricsExporter(DoubleBuffer doubleBuffer, LiveMetricsExporterOptions options)
{
_instrumentationKey = "";
_doubleBuffer = doubleBuffer;
}

internal LiveMetricsResource? MetricResource => _resource ??= ParentProvider?.GetResource().CreateAzureMonitorResource(_instrumentationKey);

public override ExportResult Export(in Batch<Metric> batch)
{
MonitoringDataPoint monitoringDataPoint = new MonitoringDataPoint();
rajkumar-rangaraj marked this conversation as resolved.
Show resolved Hide resolved
DocumentBuffer filledBuffer = _doubleBuffer.FlipDocumentBuffers();

foreach (var item in filledBuffer.ReadAllAndClear())
{
monitoringDataPoint.Documents.Add(item);
}

return ExportResult.Success;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public static TracerProviderBuilder AddLiveMetrics(
exporterOptions = sp.GetRequiredService<IOptionsMonitor<LiveMetricsExporterOptions>>().Get(finalOptionsName);
}

return new LiveMetricsExtractionProcessor(new LiveMetricsExporter(exporterOptions));
DoubleBuffer doubleBuffer = new();

return new LiveMetricsExtractionProcessor(doubleBuffer, new LiveMetricsExporter(doubleBuffer, exporterOptions));
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Globalization;
Expand All @@ -29,12 +28,11 @@ internal sealed class LiveMetricsExtractionProcessor : BaseProcessor<Activity>
private readonly Counter<long> _dependencySucceededPerSecond;
private readonly Counter<long> _dependencyFailedPerSecond;
private readonly Counter<long> _exceptionsPerSecond;
// TODO: Explore concurrent collections.
private readonly List<DocumentIngress> _documentIngress = new();
private readonly DoubleBuffer _doubleBuffer;

internal LiveMetricsResource? LiveMetricsResource => _resource ??= ParentProvider?.GetResource().CreateAzureMonitorResource();

internal LiveMetricsExtractionProcessor(LiveMetricsExporter liveMetricExporter)
internal LiveMetricsExtractionProcessor(DoubleBuffer doubleBuffer, LiveMetricsExporter liveMetricExporter)
{
_meterProvider = Sdk.CreateMeterProviderBuilder()
.AddMeter(LiveMetricConstants.LiveMetricMeterName)
Expand All @@ -54,6 +52,7 @@ internal LiveMetricsExtractionProcessor(LiveMetricsExporter liveMetricExporter)
_dependencySucceededPerSecond = _meter.CreateCounter<long>(LiveMetricConstants.DependencySucceededPerSecondInstrumentName);
_dependencyFailedPerSecond = _meter.CreateCounter<long>(LiveMetricConstants.DependencyFailedPerSecondInstrumentName);
_exceptionsPerSecond = _meter.CreateCounter<long>(LiveMetricConstants.ExceptionsPerSecondInstrumentName);
_doubleBuffer = doubleBuffer;
}

public override void OnEnd(Activity activity)
Expand Down Expand Up @@ -176,7 +175,8 @@ private void AddExceptionDocument(string? exceptionType, string? exceptionMessag
// TODO: DocumentStreamIds = new List<string>(),
// TODO: Properties = new Dictionary<string, string>(), - Validate with UX team if this is needed.
};
_documentIngress.Add(exceptionDocumentIngress);

_doubleBuffer.WriteDocument(exceptionDocumentIngress);
}

private void AddRemoteDependencyDocument(Activity activity)
Expand All @@ -196,7 +196,8 @@ private void AddRemoteDependencyDocument(Activity activity)
// TODO: DocumentStreamIds = new List<string>(),
// TODO: Properties = new Dictionary<string, string>(), - Validate with UX team if this is needed.
};
_documentIngress.Add(remoteDependencyDocumentIngress);

_doubleBuffer.WriteDocument(remoteDependencyDocumentIngress);
}

private void AddRequestDocument(Activity activity, string? statusCodeAttributeValue)
Expand All @@ -215,7 +216,8 @@ private void AddRequestDocument(Activity activity, string? statusCodeAttributeVa
// TODO: DocumentStreamIds = new List<string>(),
// TODO: Properties = new Dictionary<string, string>(), - Validate with UX team if this is needed.
};
_documentIngress.Add(requestDocumentIngress);

_doubleBuffer.WriteDocument(requestDocumentIngress);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
Loading