Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release/8.0] Dashboard should request log streams only when the user gestures to display them #3343

Merged
merged 1 commit into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 208 additions & 57 deletions src/Aspire.Hosting/ApplicationModel/ResourceLoggerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Aspire.Dashboard.Otlp.Storage;
using Microsoft.Extensions.Logging;
Expand All @@ -15,11 +16,29 @@ public class ResourceLoggerService
{
private readonly ConcurrentDictionary<string, ResourceLoggerState> _loggers = new();

private Action<(string, ResourceLoggerState)>? _loggerAdded;
private event Action<(string, ResourceLoggerState)> LoggerAdded
{
add
{
_loggerAdded += value;

foreach (var logger in _loggers)
{
value((logger.Key, logger.Value));
}
}
remove
{
_loggerAdded -= value;
}
}

/// <summary>
/// Gets the logger for the resource to write to.
/// </summary>
/// <param name="resource">The resource name</param>
/// <returns>An <see cref="ILogger"/>.</returns>
/// <returns>An <see cref="ILogger"/> which represents the resource.</returns>
public ILogger GetLogger(IResource resource)
{
ArgumentNullException.ThrowIfNull(resource);
Expand All @@ -31,19 +50,31 @@ public ILogger GetLogger(IResource resource)
/// Gets the logger for the resource to write to.
/// </summary>
/// <param name="resourceName">The name of the resource from the Aspire application model.</param>
/// <returns>An <see cref="ILogger"/> which repesents the named resource.</returns>
/// <returns>An <see cref="ILogger"/> which represents the named resource.</returns>
public ILogger GetLogger(string resourceName)
{
ArgumentNullException.ThrowIfNull(resourceName);

return GetResourceLoggerState(resourceName).Logger;
}

/// <summary>
/// Watch for changes to the log stream for a resource.
/// </summary>
/// <param name="resource">The resource to watch for logs.</param>
/// <returns>An async enumerable that returns the logs as they are written.</returns>
public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync(IResource resource)
{
ArgumentNullException.ThrowIfNull(resource);

return WatchAsync(resource.Name);
}

/// <summary>
/// Watch for changes to the log stream for a resource.
/// </summary>
/// <param name="resourceName">The resource name</param>
/// <returns></returns>
/// <returns>An async enumerable that returns the logs as they are written.</returns>
public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync(string resourceName)
{
ArgumentNullException.ThrowIfNull(resourceName);
Expand All @@ -52,15 +83,39 @@ public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync(string resourceName)
}

/// <summary>
/// Watch for changes to the log stream for a resource.
/// Watch for subscribers to the log stream for a resource.
/// </summary>
/// <param name="resource">The resource to watch for logs.</param>
/// <returns></returns>
public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync(IResource resource)
/// <returns>
/// An async enumerable that returns when the first subscriber is added to a log,
/// or when the last subscriber is removed.
/// </returns>
public async IAsyncEnumerable<LogSubscriber> WatchAnySubscribersAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(resource);
var channel = Channel.CreateUnbounded<LogSubscriber>();

return WatchAsync(resource.Name);
void OnLoggerAdded((string Name, ResourceLoggerState State) loggerItem)
{
var (name, state) = loggerItem;

state.OnSubscribersChanged += (hasSubscribers) =>
{
channel.Writer.TryWrite(new(name, hasSubscribers));
};
}

LoggerAdded += OnLoggerAdded;

try
{
await foreach (var entry in channel.Reader.ReadAllAsync(cancellationToken))
{
yield return entry;
}
}
finally
{
LoggerAdded -= OnLoggerAdded;
}
}

/// <summary>
Expand Down Expand Up @@ -91,8 +146,27 @@ public void Complete(string name)
}
}

/// <summary>
/// Clears the log stream's backlog for the resource.
/// </summary>
public void ClearBacklog(string resourceName)
{
ArgumentNullException.ThrowIfNull(resourceName);

if (_loggers.TryGetValue(resourceName, out var logger))
{
logger.ClearBacklog();
}
}

private ResourceLoggerState GetResourceLoggerState(string resourceName) =>
_loggers.GetOrAdd(resourceName, _ => new ResourceLoggerState());
_loggers.GetOrAdd(resourceName, (name, context) =>
{
var state = new ResourceLoggerState();
context._loggerAdded?.Invoke((name, state));
return state;
},
this);

/// <summary>
/// A logger for the resource to write to.
Expand All @@ -102,7 +176,6 @@ private sealed class ResourceLoggerState
private readonly ResourceLogger _logger;
private readonly CancellationTokenSource _logStreamCts = new();

// History of logs, capped at 10000 entries.
private readonly CircularBuffer<LogLine> _backlog = new(10000);

/// <summary>
Expand All @@ -113,21 +186,112 @@ public ResourceLoggerState()
_logger = new ResourceLogger(this);
}

private Action<bool>? _onSubscribersChanged;
public event Action<bool> OnSubscribersChanged
{
add
{
_onSubscribersChanged += value;

var hasSubscribers = false;

lock (this)
{
if (_onNewLog is not null) // we have subscribers
{
hasSubscribers = true;
}
}

if (hasSubscribers)
{
value(hasSubscribers);
}
}
remove
{
_onSubscribersChanged -= value;
}
}

/// <summary>
/// Watch for changes to the log stream for a resource.
/// </summary>
/// <returns>The log stream for the resource.</returns>
public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync()
public async IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
lock (_backlog)
var channel = Channel.CreateUnbounded<LogLine>();

using var _ = _logStreamCts.Token.Register(() => channel.Writer.TryComplete());

void Log(LogLine log)
{
// REVIEW: Performance makes me very sad, but we can optimize this later.
return new LogAsyncEnumerable(this, _backlog.ToList());
channel.Writer.TryWrite(log);
}

OnNewLog += Log;

// ensure the backlog snapshot is taken after subscribing to OnNewLog
// to ensure the backlog snapshot contains the correct logs. The backlog
// can get cleared when there are no subscribers, so we ensure we are subscribing first.

// REVIEW: Performance makes me very sad, but we can optimize this later.
var backlogSnapshot = GetBacklogSnapshot();
if (backlogSnapshot.Length > 0)
{
yield return backlogSnapshot;
}

try
{
await foreach (var entry in channel.GetBatchesAsync(cancellationToken: cancellationToken))
{
yield return entry;
}
}
finally
{
OnNewLog -= Log;

channel.Writer.TryComplete();
}
}

// This provides the fan out to multiple subscribers.
private Action<LogLine>? OnNewLog { get; set; }
private Action<LogLine>? _onNewLog;
private event Action<LogLine> OnNewLog
{
add
{
bool raiseSubscribersChanged;
lock (this)
{
raiseSubscribersChanged = _onNewLog is null; // is this the first subscriber?

_onNewLog += value;
}

if (raiseSubscribersChanged)
{
_onSubscribersChanged?.Invoke(true);
}
}
remove
{
bool raiseSubscribersChanged;
lock (this)
{
_onNewLog -= value;

raiseSubscribersChanged = _onNewLog is null; // is this the last subscriber?
}

if (raiseSubscribersChanged)
{
_onSubscribersChanged?.Invoke(false);
}
}
}

/// <summary>
/// The logger for the resource to write to. This will write updates to the live log stream for this resource.
Expand All @@ -143,7 +307,23 @@ public void Complete()
_logStreamCts.Cancel();
}

private sealed class ResourceLogger(ResourceLoggerState annotation) : ILogger
public void ClearBacklog()
{
lock (_backlog)
{
_backlog.Clear();
}
}

private LogLine[] GetBacklogSnapshot()
{
lock (_backlog)
{
return [.. _backlog];
}
}

private sealed class ResourceLogger(ResourceLoggerState loggerState) : ILogger
{
private int _lineNumber;

Expand All @@ -153,7 +333,7 @@ private sealed class ResourceLogger(ResourceLoggerState annotation) : ILogger

public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func<TState, Exception?, string> formatter)
{
if (annotation._logStreamCts.IsCancellationRequested)
if (loggerState._logStreamCts.IsCancellationRequested)
{
// Noop if logging after completing the stream
return;
Expand All @@ -163,52 +343,23 @@ public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Except
var isErrorMessage = logLevel >= LogLevel.Error;

LogLine logLine;
lock (annotation._backlog)
lock (loggerState._backlog)
{
_lineNumber++;
logLine = new LogLine(_lineNumber, log, isErrorMessage);

annotation._backlog.Add(logLine);
loggerState._backlog.Add(logLine);
}

annotation.OnNewLog?.Invoke(logLine);
}
}

private sealed class LogAsyncEnumerable(ResourceLoggerState annotation, List<LogLine> backlogSnapshot) : IAsyncEnumerable<IReadOnlyList<LogLine>>
{
public async IAsyncEnumerator<IReadOnlyList<LogLine>> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
if (backlogSnapshot.Count > 0)
{
yield return backlogSnapshot;
}

var channel = Channel.CreateUnbounded<LogLine>();

using var _ = annotation._logStreamCts.Token.Register(() => channel.Writer.TryComplete());

void Log(LogLine log)
{
channel.Writer.TryWrite(log);
}

annotation.OnNewLog += Log;

try
{
await foreach (var entry in channel.GetBatchesAsync(cancellationToken: cancellationToken))
{
yield return entry;
}
}
finally
{
annotation.OnNewLog -= Log;

channel.Writer.TryComplete();
}
loggerState._onNewLog?.Invoke(logLine);
}
}
}
}

/// <summary>
/// Represents a log subscriber for a resource.
/// </summary>
/// <param name="Name">The the resource name.</param>
/// <param name="AnySubscribers">Determines if there are any subscribers.</param>
public readonly record struct LogSubscriber(string Name, bool AnySubscribers);
Loading