Skip to content

Commit

Permalink
Dashboard should request log streams only when the user gestures to d…
Browse files Browse the repository at this point in the history
…isplay them (#3235) (#3343)

Change the AppHost to only subscribe to DCP's logs when there is a subscriber for the logs.

Fix #2789

* Show logs

* Remove the "StreamingLogger" because that causes subsequent subscribers to not see what was already written.

Fix duplicate logs issue by clearing out the backlog when the last subscriber leaves.

* Fix a concurrency issue with the backlog. Ensure the backlog is only snap shotted after subscribing to the log.

Fix existing tests for new functionality and add an additional test.

* PR feedback

Only clear the backlog on containers and executables.

* PR feedback.

Move lock out of async iterator.

* Clean up code.

- Remove count and instead check if the delegate is null to indicate whether there are subscribers or not.
- Remove the separate IAsyncEnumerable classes and just use `async IAsyncEnumerable` methods.

* Fix a race at startup when logs aren't available.

Employ a 2nd loop that listens for both "has subscribers" and "logs available".

* Simplify ResourceNotificationService.WatchAsync.

* Fix test build

* Address PR feedback

- Set SingleReader=true on the logInformationChannel.
- Add comment and assert that LogsAvailable can only turn true and can't go back to false.

---------

Co-authored-by: David Fowler <[email protected]>
  • Loading branch information
eerhardt and davidfowl authored Apr 4, 2024
1 parent af97f4d commit 14b23a1
Show file tree
Hide file tree
Showing 6 changed files with 379 additions and 113 deletions.
265 changes: 208 additions & 57 deletions src/Aspire.Hosting/ApplicationModel/ResourceLoggerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Aspire.Dashboard.Otlp.Storage;
using Microsoft.Extensions.Logging;
Expand All @@ -15,11 +16,29 @@ public class ResourceLoggerService
{
private readonly ConcurrentDictionary<string, ResourceLoggerState> _loggers = new();

private Action<(string, ResourceLoggerState)>? _loggerAdded;
private event Action<(string, ResourceLoggerState)> LoggerAdded
{
add
{
_loggerAdded += value;

foreach (var logger in _loggers)
{
value((logger.Key, logger.Value));
}
}
remove
{
_loggerAdded -= value;
}
}

/// <summary>
/// Gets the logger for the resource to write to.
/// </summary>
/// <param name="resource">The resource name</param>
/// <returns>An <see cref="ILogger"/>.</returns>
/// <returns>An <see cref="ILogger"/> which represents the resource.</returns>
public ILogger GetLogger(IResource resource)
{
ArgumentNullException.ThrowIfNull(resource);
Expand All @@ -31,19 +50,31 @@ public ILogger GetLogger(IResource resource)
/// Gets the logger for the resource to write to.
/// </summary>
/// <param name="resourceName">The name of the resource from the Aspire application model.</param>
/// <returns>An <see cref="ILogger"/> which repesents the named resource.</returns>
/// <returns>An <see cref="ILogger"/> which represents the named resource.</returns>
public ILogger GetLogger(string resourceName)
{
ArgumentNullException.ThrowIfNull(resourceName);

return GetResourceLoggerState(resourceName).Logger;
}

/// <summary>
/// Watch for changes to the log stream for a resource.
/// </summary>
/// <param name="resource">The resource to watch for logs.</param>
/// <returns>An async enumerable that returns the logs as they are written.</returns>
public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync(IResource resource)
{
ArgumentNullException.ThrowIfNull(resource);

return WatchAsync(resource.Name);
}

/// <summary>
/// Watch for changes to the log stream for a resource.
/// </summary>
/// <param name="resourceName">The resource name</param>
/// <returns></returns>
/// <returns>An async enumerable that returns the logs as they are written.</returns>
public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync(string resourceName)
{
ArgumentNullException.ThrowIfNull(resourceName);
Expand All @@ -52,15 +83,39 @@ public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync(string resourceName)
}

/// <summary>
/// Watch for changes to the log stream for a resource.
/// Watch for subscribers to the log stream for a resource.
/// </summary>
/// <param name="resource">The resource to watch for logs.</param>
/// <returns></returns>
public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync(IResource resource)
/// <returns>
/// An async enumerable that returns when the first subscriber is added to a log,
/// or when the last subscriber is removed.
/// </returns>
public async IAsyncEnumerable<LogSubscriber> WatchAnySubscribersAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(resource);
var channel = Channel.CreateUnbounded<LogSubscriber>();

return WatchAsync(resource.Name);
void OnLoggerAdded((string Name, ResourceLoggerState State) loggerItem)
{
var (name, state) = loggerItem;

state.OnSubscribersChanged += (hasSubscribers) =>
{
channel.Writer.TryWrite(new(name, hasSubscribers));
};
}

LoggerAdded += OnLoggerAdded;

try
{
await foreach (var entry in channel.Reader.ReadAllAsync(cancellationToken))
{
yield return entry;
}
}
finally
{
LoggerAdded -= OnLoggerAdded;
}
}

/// <summary>
Expand Down Expand Up @@ -91,8 +146,27 @@ public void Complete(string name)
}
}

/// <summary>
/// Clears the log stream's backlog for the resource.
/// </summary>
public void ClearBacklog(string resourceName)
{
ArgumentNullException.ThrowIfNull(resourceName);

if (_loggers.TryGetValue(resourceName, out var logger))
{
logger.ClearBacklog();
}
}

private ResourceLoggerState GetResourceLoggerState(string resourceName) =>
_loggers.GetOrAdd(resourceName, _ => new ResourceLoggerState());
_loggers.GetOrAdd(resourceName, (name, context) =>
{
var state = new ResourceLoggerState();
context._loggerAdded?.Invoke((name, state));
return state;
},
this);

/// <summary>
/// A logger for the resource to write to.
Expand All @@ -102,7 +176,6 @@ private sealed class ResourceLoggerState
private readonly ResourceLogger _logger;
private readonly CancellationTokenSource _logStreamCts = new();

// History of logs, capped at 10000 entries.
private readonly CircularBuffer<LogLine> _backlog = new(10000);

/// <summary>
Expand All @@ -113,21 +186,112 @@ public ResourceLoggerState()
_logger = new ResourceLogger(this);
}

private Action<bool>? _onSubscribersChanged;
public event Action<bool> OnSubscribersChanged
{
add
{
_onSubscribersChanged += value;

var hasSubscribers = false;

lock (this)
{
if (_onNewLog is not null) // we have subscribers
{
hasSubscribers = true;
}
}

if (hasSubscribers)
{
value(hasSubscribers);
}
}
remove
{
_onSubscribersChanged -= value;
}
}

/// <summary>
/// Watch for changes to the log stream for a resource.
/// </summary>
/// <returns>The log stream for the resource.</returns>
public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync()
public async IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
lock (_backlog)
var channel = Channel.CreateUnbounded<LogLine>();

using var _ = _logStreamCts.Token.Register(() => channel.Writer.TryComplete());

void Log(LogLine log)
{
// REVIEW: Performance makes me very sad, but we can optimize this later.
return new LogAsyncEnumerable(this, _backlog.ToList());
channel.Writer.TryWrite(log);
}

OnNewLog += Log;

// ensure the backlog snapshot is taken after subscribing to OnNewLog
// to ensure the backlog snapshot contains the correct logs. The backlog
// can get cleared when there are no subscribers, so we ensure we are subscribing first.

// REVIEW: Performance makes me very sad, but we can optimize this later.
var backlogSnapshot = GetBacklogSnapshot();
if (backlogSnapshot.Length > 0)
{
yield return backlogSnapshot;
}

try
{
await foreach (var entry in channel.GetBatchesAsync(cancellationToken: cancellationToken))
{
yield return entry;
}
}
finally
{
OnNewLog -= Log;

channel.Writer.TryComplete();
}
}

// This provides the fan out to multiple subscribers.
private Action<LogLine>? OnNewLog { get; set; }
private Action<LogLine>? _onNewLog;
private event Action<LogLine> OnNewLog
{
add
{
bool raiseSubscribersChanged;
lock (this)
{
raiseSubscribersChanged = _onNewLog is null; // is this the first subscriber?

_onNewLog += value;
}

if (raiseSubscribersChanged)
{
_onSubscribersChanged?.Invoke(true);
}
}
remove
{
bool raiseSubscribersChanged;
lock (this)
{
_onNewLog -= value;

raiseSubscribersChanged = _onNewLog is null; // is this the last subscriber?
}

if (raiseSubscribersChanged)
{
_onSubscribersChanged?.Invoke(false);
}
}
}

/// <summary>
/// The logger for the resource to write to. This will write updates to the live log stream for this resource.
Expand All @@ -143,7 +307,23 @@ public void Complete()
_logStreamCts.Cancel();
}

private sealed class ResourceLogger(ResourceLoggerState annotation) : ILogger
public void ClearBacklog()
{
lock (_backlog)
{
_backlog.Clear();
}
}

private LogLine[] GetBacklogSnapshot()
{
lock (_backlog)
{
return [.. _backlog];
}
}

private sealed class ResourceLogger(ResourceLoggerState loggerState) : ILogger
{
private int _lineNumber;

Expand All @@ -153,7 +333,7 @@ private sealed class ResourceLogger(ResourceLoggerState annotation) : ILogger

public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func<TState, Exception?, string> formatter)
{
if (annotation._logStreamCts.IsCancellationRequested)
if (loggerState._logStreamCts.IsCancellationRequested)
{
// Noop if logging after completing the stream
return;
Expand All @@ -163,52 +343,23 @@ public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Except
var isErrorMessage = logLevel >= LogLevel.Error;

LogLine logLine;
lock (annotation._backlog)
lock (loggerState._backlog)
{
_lineNumber++;
logLine = new LogLine(_lineNumber, log, isErrorMessage);

annotation._backlog.Add(logLine);
loggerState._backlog.Add(logLine);
}

annotation.OnNewLog?.Invoke(logLine);
}
}

private sealed class LogAsyncEnumerable(ResourceLoggerState annotation, List<LogLine> backlogSnapshot) : IAsyncEnumerable<IReadOnlyList<LogLine>>
{
public async IAsyncEnumerator<IReadOnlyList<LogLine>> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
if (backlogSnapshot.Count > 0)
{
yield return backlogSnapshot;
}

var channel = Channel.CreateUnbounded<LogLine>();

using var _ = annotation._logStreamCts.Token.Register(() => channel.Writer.TryComplete());

void Log(LogLine log)
{
channel.Writer.TryWrite(log);
}

annotation.OnNewLog += Log;

try
{
await foreach (var entry in channel.GetBatchesAsync(cancellationToken: cancellationToken))
{
yield return entry;
}
}
finally
{
annotation.OnNewLog -= Log;

channel.Writer.TryComplete();
}
loggerState._onNewLog?.Invoke(logLine);
}
}
}
}

/// <summary>
/// Represents a log subscriber for a resource.
/// </summary>
/// <param name="Name">The the resource name.</param>
/// <param name="AnySubscribers">Determines if there are any subscribers.</param>
public readonly record struct LogSubscriber(string Name, bool AnySubscribers);
Loading

0 comments on commit 14b23a1

Please sign in to comment.