diff --git a/src/Aspire.Hosting/ApplicationModel/ResourceLoggerService.cs b/src/Aspire.Hosting/ApplicationModel/ResourceLoggerService.cs index b9b793df6d..85e0e805e5 100644 --- a/src/Aspire.Hosting/ApplicationModel/ResourceLoggerService.cs +++ b/src/Aspire.Hosting/ApplicationModel/ResourceLoggerService.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Collections.Concurrent; +using System.Runtime.CompilerServices; using System.Threading.Channels; using Aspire.Dashboard.Otlp.Storage; using Microsoft.Extensions.Logging; @@ -15,11 +16,29 @@ public class ResourceLoggerService { private readonly ConcurrentDictionary _loggers = new(); + private Action<(string, ResourceLoggerState)>? _loggerAdded; + private event Action<(string, ResourceLoggerState)> LoggerAdded + { + add + { + _loggerAdded += value; + + foreach (var logger in _loggers) + { + value((logger.Key, logger.Value)); + } + } + remove + { + _loggerAdded -= value; + } + } + /// /// Gets the logger for the resource to write to. /// /// The resource name - /// An . + /// An which represents the resource. public ILogger GetLogger(IResource resource) { ArgumentNullException.ThrowIfNull(resource); @@ -31,7 +50,7 @@ public ILogger GetLogger(IResource resource) /// Gets the logger for the resource to write to. /// /// The name of the resource from the Aspire application model. - /// An which repesents the named resource. + /// An which represents the named resource. public ILogger GetLogger(string resourceName) { ArgumentNullException.ThrowIfNull(resourceName); @@ -39,11 +58,23 @@ public ILogger GetLogger(string resourceName) return GetResourceLoggerState(resourceName).Logger; } + /// + /// Watch for changes to the log stream for a resource. + /// + /// The resource to watch for logs. + /// An async enumerable that returns the logs as they are written. + public IAsyncEnumerable> WatchAsync(IResource resource) + { + ArgumentNullException.ThrowIfNull(resource); + + return WatchAsync(resource.Name); + } + /// /// Watch for changes to the log stream for a resource. /// /// The resource name - /// + /// An async enumerable that returns the logs as they are written. public IAsyncEnumerable> WatchAsync(string resourceName) { ArgumentNullException.ThrowIfNull(resourceName); @@ -52,15 +83,39 @@ public IAsyncEnumerable> WatchAsync(string resourceName) } /// - /// Watch for changes to the log stream for a resource. + /// Watch for subscribers to the log stream for a resource. /// - /// The resource to watch for logs. - /// - public IAsyncEnumerable> WatchAsync(IResource resource) + /// + /// An async enumerable that returns when the first subscriber is added to a log, + /// or when the last subscriber is removed. + /// + public async IAsyncEnumerable WatchAnySubscribersAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { - ArgumentNullException.ThrowIfNull(resource); + var channel = Channel.CreateUnbounded(); - return WatchAsync(resource.Name); + void OnLoggerAdded((string Name, ResourceLoggerState State) loggerItem) + { + var (name, state) = loggerItem; + + state.OnSubscribersChanged += (hasSubscribers) => + { + channel.Writer.TryWrite(new(name, hasSubscribers)); + }; + } + + LoggerAdded += OnLoggerAdded; + + try + { + await foreach (var entry in channel.Reader.ReadAllAsync(cancellationToken)) + { + yield return entry; + } + } + finally + { + LoggerAdded -= OnLoggerAdded; + } } /// @@ -91,8 +146,27 @@ public void Complete(string name) } } + /// + /// Clears the log stream's backlog for the resource. + /// + public void ClearBacklog(string resourceName) + { + ArgumentNullException.ThrowIfNull(resourceName); + + if (_loggers.TryGetValue(resourceName, out var logger)) + { + logger.ClearBacklog(); + } + } + private ResourceLoggerState GetResourceLoggerState(string resourceName) => - _loggers.GetOrAdd(resourceName, _ => new ResourceLoggerState()); + _loggers.GetOrAdd(resourceName, (name, context) => + { + var state = new ResourceLoggerState(); + context._loggerAdded?.Invoke((name, state)); + return state; + }, + this); /// /// A logger for the resource to write to. @@ -102,7 +176,6 @@ private sealed class ResourceLoggerState private readonly ResourceLogger _logger; private readonly CancellationTokenSource _logStreamCts = new(); - // History of logs, capped at 10000 entries. private readonly CircularBuffer _backlog = new(10000); /// @@ -113,21 +186,112 @@ public ResourceLoggerState() _logger = new ResourceLogger(this); } + private Action? _onSubscribersChanged; + public event Action OnSubscribersChanged + { + add + { + _onSubscribersChanged += value; + + var hasSubscribers = false; + + lock (this) + { + if (_onNewLog is not null) // we have subscribers + { + hasSubscribers = true; + } + } + + if (hasSubscribers) + { + value(hasSubscribers); + } + } + remove + { + _onSubscribersChanged -= value; + } + } + /// /// Watch for changes to the log stream for a resource. /// /// The log stream for the resource. - public IAsyncEnumerable> WatchAsync() + public async IAsyncEnumerable> WatchAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { - lock (_backlog) + var channel = Channel.CreateUnbounded(); + + using var _ = _logStreamCts.Token.Register(() => channel.Writer.TryComplete()); + + void Log(LogLine log) { - // REVIEW: Performance makes me very sad, but we can optimize this later. - return new LogAsyncEnumerable(this, _backlog.ToList()); + channel.Writer.TryWrite(log); + } + + OnNewLog += Log; + + // ensure the backlog snapshot is taken after subscribing to OnNewLog + // to ensure the backlog snapshot contains the correct logs. The backlog + // can get cleared when there are no subscribers, so we ensure we are subscribing first. + + // REVIEW: Performance makes me very sad, but we can optimize this later. + var backlogSnapshot = GetBacklogSnapshot(); + if (backlogSnapshot.Length > 0) + { + yield return backlogSnapshot; + } + + try + { + await foreach (var entry in channel.GetBatchesAsync(cancellationToken: cancellationToken)) + { + yield return entry; + } + } + finally + { + OnNewLog -= Log; + + channel.Writer.TryComplete(); } } // This provides the fan out to multiple subscribers. - private Action? OnNewLog { get; set; } + private Action? _onNewLog; + private event Action OnNewLog + { + add + { + bool raiseSubscribersChanged; + lock (this) + { + raiseSubscribersChanged = _onNewLog is null; // is this the first subscriber? + + _onNewLog += value; + } + + if (raiseSubscribersChanged) + { + _onSubscribersChanged?.Invoke(true); + } + } + remove + { + bool raiseSubscribersChanged; + lock (this) + { + _onNewLog -= value; + + raiseSubscribersChanged = _onNewLog is null; // is this the last subscriber? + } + + if (raiseSubscribersChanged) + { + _onSubscribersChanged?.Invoke(false); + } + } + } /// /// The logger for the resource to write to. This will write updates to the live log stream for this resource. @@ -143,7 +307,23 @@ public void Complete() _logStreamCts.Cancel(); } - private sealed class ResourceLogger(ResourceLoggerState annotation) : ILogger + public void ClearBacklog() + { + lock (_backlog) + { + _backlog.Clear(); + } + } + + private LogLine[] GetBacklogSnapshot() + { + lock (_backlog) + { + return [.. _backlog]; + } + } + + private sealed class ResourceLogger(ResourceLoggerState loggerState) : ILogger { private int _lineNumber; @@ -153,7 +333,7 @@ private sealed class ResourceLogger(ResourceLoggerState annotation) : ILogger public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) { - if (annotation._logStreamCts.IsCancellationRequested) + if (loggerState._logStreamCts.IsCancellationRequested) { // Noop if logging after completing the stream return; @@ -163,52 +343,23 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, Except var isErrorMessage = logLevel >= LogLevel.Error; LogLine logLine; - lock (annotation._backlog) + lock (loggerState._backlog) { _lineNumber++; logLine = new LogLine(_lineNumber, log, isErrorMessage); - annotation._backlog.Add(logLine); + loggerState._backlog.Add(logLine); } - annotation.OnNewLog?.Invoke(logLine); - } - } - - private sealed class LogAsyncEnumerable(ResourceLoggerState annotation, List backlogSnapshot) : IAsyncEnumerable> - { - public async IAsyncEnumerator> GetAsyncEnumerator(CancellationToken cancellationToken = default) - { - if (backlogSnapshot.Count > 0) - { - yield return backlogSnapshot; - } - - var channel = Channel.CreateUnbounded(); - - using var _ = annotation._logStreamCts.Token.Register(() => channel.Writer.TryComplete()); - - void Log(LogLine log) - { - channel.Writer.TryWrite(log); - } - - annotation.OnNewLog += Log; - - try - { - await foreach (var entry in channel.GetBatchesAsync(cancellationToken: cancellationToken)) - { - yield return entry; - } - } - finally - { - annotation.OnNewLog -= Log; - - channel.Writer.TryComplete(); - } + loggerState._onNewLog?.Invoke(logLine); } } } } + +/// +/// Represents a log subscriber for a resource. +/// +/// The the resource name. +/// Determines if there are any subscribers. +public readonly record struct LogSubscriber(string Name, bool AnySubscribers); diff --git a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs index 2de2471701..3988eeb26e 100644 --- a/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs +++ b/src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Collections.Concurrent; +using System.Runtime.CompilerServices; using System.Threading.Channels; using Microsoft.Extensions.Logging; @@ -22,8 +23,40 @@ public class ResourceNotificationService(ILogger lo /// Watch for changes to the state for all resources. /// /// - public IAsyncEnumerable WatchAsync() => - new AllResourceUpdatesAsyncEnumerable(this); + public async IAsyncEnumerable WatchAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) + { + // Return the last snapshot for each resource. + foreach (var state in _resourceNotificationStates) + { + var (resource, resourceId) = state.Key; + + if (state.Value.LastSnapshot is not null) + { + yield return new ResourceEvent(resource, resourceId, state.Value.LastSnapshot); + } + } + + var channel = Channel.CreateUnbounded(); + + void WriteToChannel(ResourceEvent resourceEvent) => + channel.Writer.TryWrite(resourceEvent); + + OnResourceUpdated += WriteToChannel; + + try + { + await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken)) + { + yield return item; + } + } + finally + { + OnResourceUpdated -= WriteToChannel; + + channel.Writer.TryComplete(); + } + } /// /// Updates the snapshot of the for a resource. @@ -37,9 +70,9 @@ public Task PublishUpdateAsync(IResource resource, string resourceId, Func _resourceNotificationStates.GetOrAdd((resource, resourceId), _ => new ResourceNotificationState()); - private sealed class AllResourceUpdatesAsyncEnumerable(ResourceNotificationService resourceNotificationService) : IAsyncEnumerable - { - public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) - { - // Return the last snapshot for each resource. - foreach (var state in resourceNotificationService._resourceNotificationStates) - { - var (resource, resourceId) = state.Key; - - if (state.Value.LastSnapshot is not null) - { - yield return new ResourceEvent(resource, resourceId, state.Value.LastSnapshot); - } - } - - var channel = Channel.CreateUnbounded(); - - void WriteToChannel(ResourceEvent resourceEvent) => - channel.Writer.TryWrite(resourceEvent); - - resourceNotificationService.OnResourceUpdated += WriteToChannel; - - try - { - await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken)) - { - yield return item; - } - } - finally - { - resourceNotificationService.OnResourceUpdated -= WriteToChannel; - - channel.Writer.TryComplete(); - } - } - } - /// /// The annotation that allows publishing and subscribing to changes in the state of a resource. /// diff --git a/src/Aspire.Hosting/Dcp/ApplicationExecutor.cs b/src/Aspire.Hosting/Dcp/ApplicationExecutor.cs index f9f6d395db..49d90a9fd1 100644 --- a/src/Aspire.Hosting/Dcp/ApplicationExecutor.cs +++ b/src/Aspire.Hosting/Dcp/ApplicationExecutor.cs @@ -6,6 +6,7 @@ using System.Diagnostics; using System.Net.Sockets; using System.Text.Json; +using System.Threading.Channels; using Aspire.Dashboard.Model; using Aspire.Hosting.ApplicationModel; using Aspire.Hosting.Dashboard; @@ -92,6 +93,10 @@ internal sealed class ApplicationExecutor(ILogger logger, private readonly ConcurrentDictionary _hiddenResources = new(); private DcpInfo? _dcpInfo; + private readonly record struct LogInformationEntry(string ResourceName, bool? LogsAvailable, bool? HasSubscribers); + private readonly Channel _logInformationChannel = Channel.CreateUnbounded( + new UnboundedChannelOptions { SingleReader = true }); + private string DefaultContainerHostName => configuration["AppHost:ContainerHostname"] ?? _dcpInfo?.Containers?.ContainerHostName ?? "host.docker.internal"; public async Task RunApplicationAsync(CancellationToken cancellationToken = default) @@ -195,6 +200,73 @@ await Task.WhenAll( }, cancellationToken); + Task.Run(async () => + { + await foreach (var subscribers in loggerService.WatchAnySubscribersAsync()) + { + _logInformationChannel.Writer.TryWrite(new(subscribers.Name, LogsAvailable: null, subscribers.AnySubscribers)); + } + }, + cancellationToken); + + // Listen to the "log information channel" - which contains updates when resources have logs available and when they have subscribers. + // A resource needs both logs available and subscribers before it starts streaming its logs. + // We only want to start the log stream for resources when they have subscribers. + // And when there are no more subscribers, we want to stop the stream. + Task.Run(async () => + { + var resourceLogState = new Dictionary(); + + await foreach (var entry in _logInformationChannel.Reader.ReadAllAsync(cancellationToken)) + { + var logsAvailable = false; + var hasSubscribers = false; + if (resourceLogState.TryGetValue(entry.ResourceName, out (bool, bool) stateEntry)) + { + (logsAvailable, hasSubscribers) = stateEntry; + } + + // LogsAvailable can only go from false => true. Once it is true, it can never go back to false. + Debug.Assert(!entry.LogsAvailable.HasValue || entry.LogsAvailable.Value, "entry.LogsAvailable should never be 'false'"); + + logsAvailable = entry.LogsAvailable ?? logsAvailable; + hasSubscribers = entry.HasSubscribers ?? hasSubscribers; + + if (logsAvailable) + { + if (hasSubscribers) + { + if (_containersMap.TryGetValue(entry.ResourceName, out var container)) + { + StartLogStream(container); + } + else if (_executablesMap.TryGetValue(entry.ResourceName, out var executable)) + { + StartLogStream(executable); + } + } + else + { + if (_logStreams.TryRemove(entry.ResourceName, out var cts)) + { + cts.Cancel(); + } + + if (_containersMap.TryGetValue(entry.ResourceName, out var _) || + _executablesMap.TryGetValue(entry.ResourceName, out var _)) + { + // Clear out the backlog for containers and executables after the last subscriber leaves. + // When a new subscriber is added, the full log will be replayed. + loggerService.ClearBacklog(entry.ResourceName); + } + } + } + + resourceLogState[entry.ResourceName] = (logsAvailable, hasSubscribers); + } + }, + cancellationToken); + async Task WatchKubernetesResource(Func handler) where T : CustomResource { var retryUntilCancelled = new RetryStrategyOptions() @@ -273,6 +345,9 @@ private async Task ProcessResourceChange(WatchEventType watchEventType, T res cts.Cancel(); } + // Complete the log stream + loggerService.Complete(resource.Metadata.Name); + // TODO: Handle resource deletion if (_logger.IsEnabled(LogLevel.Trace)) { @@ -295,7 +370,11 @@ private async Task ProcessResourceChange(WatchEventType watchEventType, T res // Notifications are associated with the application model resource, so we need to update with that context await notificationService.PublishUpdateAsync(appModelResource, resource.Metadata.Name, s => snapshotFactory(resource, s)).ConfigureAwait(false); - StartLogStream(resource); + if (resource is Container { LogsAvailable: true } || + resource is Executable { LogsAvailable: true }) + { + _logInformationChannel.Writer.TryWrite(new(resource.Metadata.Name, LogsAvailable: true, HasSubscribers: null)); + } } // Update all child resources of containers @@ -385,11 +464,6 @@ private void StartLogStream(T resource) where T : CustomResource { _logger.LogError(ex, "Error streaming logs for {ResourceName}", resource.Metadata.Name); } - finally - { - // Complete the log stream - loggerService.Complete(resource.Metadata.Name); - } }, cts.Token); diff --git a/src/Aspire.Hosting/Dcp/ResourceLogSource.cs b/src/Aspire.Hosting/Dcp/ResourceLogSource.cs index be8af99685..c61ca894f2 100644 --- a/src/Aspire.Hosting/Dcp/ResourceLogSource.cs +++ b/src/Aspire.Hosting/Dcp/ResourceLogSource.cs @@ -39,12 +39,15 @@ public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken var stderrStreamTask = Task.Run(() => StreamLogsAsync(stderrStream, isError: true), cancellationToken); // End the enumeration when both streams have been read to completion. - _ = Task.WhenAll(stdoutStreamTask, stderrStreamTask).ContinueWith - (_ => { channel.Writer.TryComplete(); }, - cancellationToken, - TaskContinuationOptions.None, - TaskScheduler.Default).ConfigureAwait(false); + async Task WaitForStreamsToCompleteAsync() + { + await Task.WhenAll(stdoutStreamTask, stderrStreamTask).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + channel.Writer.TryComplete(); + } + + _ = WaitForStreamsToCompleteAsync(); + await foreach (var batch in channel.GetBatchesAsync(cancellationToken: cancellationToken)) { yield return batch; diff --git a/tests/Aspire.Hosting.Tests/ResourceLoggerServiceTests.cs b/tests/Aspire.Hosting.Tests/ResourceLoggerServiceTests.cs index dfe2fd1d6b..8eb151b823 100644 --- a/tests/Aspire.Hosting.Tests/ResourceLoggerServiceTests.cs +++ b/tests/Aspire.Hosting.Tests/ResourceLoggerServiceTests.cs @@ -68,6 +68,49 @@ public async Task StreamingLogsCancelledAfterComplete() Assert.False(await backlogEnumerator.MoveNextAsync()); } + [Fact] + public async Task SecondSubscriberGetsBacklog() + { + var service = new ResourceLoggerService(); + var testResource = new TestResource("myResource"); + + var logger = service.GetLogger(testResource); + + var subscriber1 = service.WatchAsync(testResource); + logger.LogInformation("Hello, world!"); + logger.LogInformation("Hello, world2!"); + + await using var subscriber2Enumerator = service.WatchAsync(testResource).GetAsyncEnumerator(); + + Assert.True(await subscriber2Enumerator.MoveNextAsync()); + Assert.Collection(subscriber2Enumerator.Current, + log => Assert.Equal("Hello, world!", log.Content), + log => Assert.Equal("Hello, world2!", log.Content)); + + logger.LogInformation("Hello, again!"); + + service.Complete(testResource); + + var subscriber1Logs = subscriber1.ToBlockingEnumerable().SelectMany(x => x).ToList(); + Assert.Collection(subscriber1Logs, + log => Assert.Equal("Hello, world!", log.Content), + log => Assert.Equal("Hello, world2!", log.Content), + log => Assert.Equal("Hello, again!", log.Content)); + + Assert.True(await subscriber2Enumerator.MoveNextAsync()); + Assert.Collection(subscriber2Enumerator.Current, + log => Assert.Equal("Hello, again!", log.Content)); + + Assert.False(await subscriber2Enumerator.MoveNextAsync()); + + service.ClearBacklog(testResource.Name); + + var backlog = service.WatchAsync(testResource).ToBlockingEnumerable().SelectMany(x => x).ToList(); + + // the backlog should be cleared + Assert.Empty(backlog); + } + private sealed class TestResource(string name) : Resource(name) { diff --git a/tests/Aspire.Hosting.Tests/ResourceNotificationTests.cs b/tests/Aspire.Hosting.Tests/ResourceNotificationTests.cs index 5572a10a17..3990f5dfc6 100644 --- a/tests/Aspire.Hosting.Tests/ResourceNotificationTests.cs +++ b/tests/Aspire.Hosting.Tests/ResourceNotificationTests.cs @@ -48,7 +48,7 @@ async Task> GetValuesAsync(CancellationToken cancellationTok { var values = new List(); - await foreach (var item in notificationService.WatchAsync().WithCancellation(cancellationToken)) + await foreach (var item in notificationService.WatchAsync(cancellationToken)) { values.Add(item); @@ -99,7 +99,7 @@ async Task> GetValuesAsync(CancellationToken cancellation) { var values = new List(); - await foreach (var item in notificationService.WatchAsync().WithCancellation(cancellation)) + await foreach (var item in notificationService.WatchAsync(cancellation)) { values.Add(item);