diff --git a/playground/CustomResources/CustomResources.AppHost/TestResource.cs b/playground/CustomResources/CustomResources.AppHost/TestResource.cs index f95b3fa488..7c5f2d4c5f 100644 --- a/playground/CustomResources/CustomResources.AppHost/TestResource.cs +++ b/playground/CustomResources/CustomResources.AppHost/TestResource.cs @@ -13,7 +13,7 @@ public static IResourceBuilder AddTestResource(this IDistributedAp var rb = builder.AddResource(new TestResource(name)) .WithResourceLogger() - .WithCustomResourceState(() => new() + .WithResourceUpdates(() => new() { ResourceType = "Test Resource", State = "Starting", @@ -36,15 +36,15 @@ public Task BeforeStartAsync(DistributedApplicationModel appModel, CancellationT { foreach (var item in appModel.Resources.OfType()) { - if (item.TryGetLastAnnotation(out var customResourceAnnotation) && - item.TryGetLastAnnotation(out var loggerAnnotation)) + if (item.TryGetLastAnnotation(out var resourceUpdates) && + item.TryGetLastAnnotation(out var loggerAnnotation)) { var states = new[] { "Starting", "Running", "Finished" }; Task.Run(async () => { // Simulate custom resource state changes - var state = customResourceAnnotation.GetInitialSnapshot(); + var state = resourceUpdates.GetInitialSnapshot(); var seconds = Random.Shared.Next(2, 12); state = state with @@ -55,7 +55,7 @@ public Task BeforeStartAsync(DistributedApplicationModel appModel, CancellationT loggerAnnotation.Logger.LogInformation("Starting test resource {ResourceName} with update interval {Interval} seconds", item.Name, seconds); // This might run before the dashboard is ready to receive updates, but it will be queued. - await customResourceAnnotation.UpdateStateAsync(state); + await resourceUpdates.UpdateStateAsync(state); using var timer = new PeriodicTimer(TimeSpan.FromSeconds(seconds)); @@ -70,7 +70,7 @@ public Task BeforeStartAsync(DistributedApplicationModel appModel, CancellationT loggerAnnotation.Logger.LogInformation("Test resource {ResourceName} is now in state {State}", item.Name, randomState); - await customResourceAnnotation.UpdateStateAsync(state); + await resourceUpdates.UpdateStateAsync(state); } }, cancellationToken); diff --git a/src/Aspire.Hosting/ApplicationModel/CustomResourceExtensions.cs b/src/Aspire.Hosting/ApplicationModel/CustomResourceExtensions.cs index 812b97c354..49a9cb3092 100644 --- a/src/Aspire.Hosting/ApplicationModel/CustomResourceExtensions.cs +++ b/src/Aspire.Hosting/ApplicationModel/CustomResourceExtensions.cs @@ -11,18 +11,18 @@ namespace Aspire.Hosting; public static class CustomResourceExtensions { /// - /// Adds a callback to configure the dashboard context for a resource. + /// Initializes the resource with a that allows publishing and subscribing to changes in the state of this resource. /// /// The resource. /// The resource builder. /// The factory to create the initial for this resource. /// The resource builder. - public static IResourceBuilder WithCustomResourceState(this IResourceBuilder builder, Func? initialSnapshotFactory = null) + public static IResourceBuilder WithResourceUpdates(this IResourceBuilder builder, Func? initialSnapshotFactory = null) where TResource : IResource { initialSnapshotFactory ??= () => CustomResourceSnapshot.Create(builder.Resource); - return builder.WithAnnotation(new CustomResourceAnnotation(initialSnapshotFactory), ResourceAnnotationMutationBehavior.Replace); + return builder.WithAnnotation(new ResourceUpdatesAnnotation(initialSnapshotFactory), ResourceAnnotationMutationBehavior.Replace); } /// @@ -34,6 +34,6 @@ public static IResourceBuilder WithCustomResourceState(thi public static IResourceBuilder WithResourceLogger(this IResourceBuilder builder) where TResource : IResource { - return builder.WithAnnotation(new CustomResourceLoggerAnnotation(), ResourceAnnotationMutationBehavior.Replace); + return builder.WithAnnotation(new ResourceLoggerAnnotation(), ResourceAnnotationMutationBehavior.Replace); } } diff --git a/src/Aspire.Hosting/ApplicationModel/CustomResourceLoggerAnnotation.cs b/src/Aspire.Hosting/ApplicationModel/ResourceLoggerAnnotation.cs similarity index 84% rename from src/Aspire.Hosting/ApplicationModel/CustomResourceLoggerAnnotation.cs rename to src/Aspire.Hosting/ApplicationModel/ResourceLoggerAnnotation.cs index 86fe8b3a07..c34c2d5185 100644 --- a/src/Aspire.Hosting/ApplicationModel/CustomResourceLoggerAnnotation.cs +++ b/src/Aspire.Hosting/ApplicationModel/ResourceLoggerAnnotation.cs @@ -10,7 +10,7 @@ namespace Aspire.Hosting.ApplicationModel; /// /// A annotation that exposes a Logger for the resource to write to. /// -public sealed class CustomResourceLoggerAnnotation : IResourceAnnotation +public sealed class ResourceLoggerAnnotation : IResourceAnnotation { private readonly ResourceLogger _logger; private readonly CancellationTokenSource _logStreamCts = new(); @@ -19,9 +19,9 @@ public sealed class CustomResourceLoggerAnnotation : IResourceAnnotation private readonly CircularBuffer<(string Content, bool IsErrorMessage)> _backlog = new(10000); /// - /// Creates a new . + /// Creates a new . /// - public CustomResourceLoggerAnnotation() + public ResourceLoggerAnnotation() { _logger = new ResourceLogger(this); } @@ -49,7 +49,7 @@ public void Complete() _logStreamCts.Cancel(); } - private sealed class ResourceLogger(CustomResourceLoggerAnnotation annotation) : ILogger + private sealed class ResourceLogger(ResourceLoggerAnnotation annotation) : ILogger { IDisposable? ILogger.BeginScope(TState state) => null; @@ -77,7 +77,7 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, Except } } - private sealed class LogAsyncEnumerable(CustomResourceLoggerAnnotation annotation) : IAsyncEnumerable> + private sealed class LogAsyncEnumerable(ResourceLoggerAnnotation annotation) : IAsyncEnumerable> { public async IAsyncEnumerator> GetAsyncEnumerator(CancellationToken cancellationToken = default) { @@ -94,6 +94,8 @@ private sealed class LogAsyncEnumerable(CustomResourceLoggerAnnotation annotatio var channel = Channel.CreateUnbounded<(string, bool)>(); + using var _ = annotation._logStreamCts.Token.Register(() => channel.Writer.TryComplete()); + void Log((string Content, bool IsErrorMessage) log) { channel.Writer.TryWrite(log); @@ -101,12 +103,9 @@ void Log((string Content, bool IsErrorMessage) log) annotation.OnNewLog += Log; - // Connect both tokens so we can stop streaming if the stream was closed - using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, annotation._logStreamCts.Token); - try { - await foreach (var entry in channel.GetBatches(cts.Token)) + await foreach (var entry in channel.GetBatches(cancellationToken)) { yield return entry; } diff --git a/src/Aspire.Hosting/ApplicationModel/CustomResourceAnnotation.cs b/src/Aspire.Hosting/ApplicationModel/ResourceUpdatesAnnotation.cs similarity index 63% rename from src/Aspire.Hosting/ApplicationModel/CustomResourceAnnotation.cs rename to src/Aspire.Hosting/ApplicationModel/ResourceUpdatesAnnotation.cs index 2fbf3a2408..e8ab844329 100644 --- a/src/Aspire.Hosting/ApplicationModel/CustomResourceAnnotation.cs +++ b/src/Aspire.Hosting/ApplicationModel/ResourceUpdatesAnnotation.cs @@ -8,18 +8,18 @@ namespace Aspire.Hosting.ApplicationModel; /// -/// The annotation that reflects how a resource shows up in the dashboard. -/// This is a single producer, single consumer channel model for pushing updates to the dashboard. -/// The resource server will be the only caller of WatchAsync. +/// The annotation that allows publishing and subscribing to changes in the state of a resource. /// -public sealed class CustomResourceAnnotation(Func initialSnapshotFactory) : IResourceAnnotation +public sealed class ResourceUpdatesAnnotation(Func initialSnapshotFactory) : IResourceAnnotation { - private readonly Channel _channel = Channel.CreateUnbounded(); + private readonly CancellationTokenSource _streamClosedCts = new(); + + private Action? OnSnapshotUpdated { get; set; } /// /// Watch for changes to the dashboard state for a resource. /// - public IAsyncEnumerable WatchAsync(CancellationToken cancellationToken = default) => _channel.Reader.ReadAllAsync(cancellationToken); + public IAsyncEnumerable WatchAsync() => new ResourceUpdatesAsyncEnumerable(this); /// /// Gets the initial snapshot of the dashboard state for this resource. @@ -30,14 +30,58 @@ public sealed class CustomResourceAnnotation(Func initia /// Updates the snapshot of the for a resource. /// /// The new . - public async Task UpdateStateAsync(CustomResourceSnapshot state) + public Task UpdateStateAsync(CustomResourceSnapshot state) + { + if (_streamClosedCts.IsCancellationRequested) + { + return Task.CompletedTask; + } + + OnSnapshotUpdated?.Invoke(state); + + return Task.CompletedTask; + } + + /// + /// Signal that no more updates are expected for this resource. + /// + public void Complete() + { + _streamClosedCts.Cancel(); + } + + private sealed class ResourceUpdatesAsyncEnumerable(ResourceUpdatesAnnotation customResourceAnnotation) : IAsyncEnumerable { - await _channel.Writer.WriteAsync(state).ConfigureAwait(false); + public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + var channel = Channel.CreateUnbounded(); + + void WriteToChannel(CustomResourceSnapshot state) + => channel.Writer.TryWrite(state); + + using var _ = customResourceAnnotation._streamClosedCts.Token.Register(() => channel.Writer.TryComplete()); + + customResourceAnnotation.OnSnapshotUpdated = WriteToChannel; + + try + { + await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken)) + { + yield return item; + } + } + finally + { + customResourceAnnotation.OnSnapshotUpdated -= WriteToChannel; + + channel.Writer.TryComplete(); + } + } } } /// -/// The context for a all of the properties and URLs that should show up in the dashboard for a resource. +/// An immutable snapshot of the state of a resource. /// public sealed record CustomResourceSnapshot { @@ -59,7 +103,7 @@ public sealed record CustomResourceSnapshot /// /// The environment variables that should show up in the dashboard for this resource. /// - public ImmutableArray<(string Name, string Value)> EnviromentVariables { get; init; } = []; + public ImmutableArray<(string Name, string Value)> EnvironmentVariables { get; init; } = []; /// /// The URLs that should show up in the dashboard for this resource. @@ -106,7 +150,7 @@ static string GetUrl(EndpointAnnotation e) => return new CustomResourceSnapshot() { ResourceType = resource.GetType().Name.Replace("Resource", ""), - EnviromentVariables = environmentVariables, + EnvironmentVariables = environmentVariables, Urls = urls, Properties = properties }; diff --git a/src/Aspire.Hosting/Dashboard/ConsoleLogPublisher.cs b/src/Aspire.Hosting/Dashboard/ConsoleLogPublisher.cs index 002c8f011c..119da9b2a4 100644 --- a/src/Aspire.Hosting/Dashboard/ConsoleLogPublisher.cs +++ b/src/Aspire.Hosting/Dashboard/ConsoleLogPublisher.cs @@ -83,7 +83,7 @@ LogsEnumerable SubscribeContainerResource(ContainerSnapshot container) private static LogsEnumerable SubscribeGenericResource(IResource resource) { - if (resource.TryGetLastAnnotation(out var loggerAnnotation)) + if (resource.TryGetLastAnnotation(out var loggerAnnotation)) { return loggerAnnotation.WatchAsync(); } diff --git a/src/Aspire.Hosting/Dashboard/DcpDataSource.cs b/src/Aspire.Hosting/Dashboard/DcpDataSource.cs index 09fddbc587..0d12f2ef67 100644 --- a/src/Aspire.Hosting/Dashboard/DcpDataSource.cs +++ b/src/Aspire.Hosting/Dashboard/DcpDataSource.cs @@ -197,11 +197,11 @@ private async Task ProcessInitialResourceAsync(IResource resource, CancellationT await _onResourceChanged(snapshot, ResourceSnapshotChangeType.Upsert).ConfigureAwait(false); } - else if (resource.TryGetLastAnnotation(out var dashboardAnnotation)) + else if (resource.TryGetLastAnnotation(out var resourceUpdates)) { // We have a dashboard annotation, so we want to create a snapshot for the resource // and update data immediately. We also want to watch for changes to the dashboard state. - var state = dashboardAnnotation.GetInitialSnapshot(); + var state = resourceUpdates.GetInitialSnapshot(); var creationTimestamp = DateTime.UtcNow; var snapshot = CreateResourceSnapshot(resource, creationTimestamp, state); @@ -210,7 +210,7 @@ private async Task ProcessInitialResourceAsync(IResource resource, CancellationT _ = Task.Run(async () => { - await foreach (var state in dashboardAnnotation.WatchAsync(cancellationToken)) + await foreach (var state in resourceUpdates.WatchAsync().WithCancellation(cancellationToken)) { try { @@ -231,7 +231,7 @@ private async Task ProcessInitialResourceAsync(IResource resource, CancellationT private static GenericResourceSnapshot CreateResourceSnapshot(IResource resource, DateTime creationTimestamp, CustomResourceSnapshot dashboardState) { ImmutableArray environmentVariables = [.. - dashboardState.EnviromentVariables.Select(e => new EnvironmentVariableSnapshot(e.Name, e.Value, false))]; + dashboardState.EnvironmentVariables.Select(e => new EnvironmentVariableSnapshot(e.Name, e.Value, false))]; ImmutableArray endpoints = [.. dashboardState.Urls.Select(u => new EndpointSnapshot(u, u))]; diff --git a/src/Aspire.Hosting/Extensions/ParameterResourceBuilderExtensions.cs b/src/Aspire.Hosting/Extensions/ParameterResourceBuilderExtensions.cs index ab2897e7b5..8347571f2f 100644 --- a/src/Aspire.Hosting/Extensions/ParameterResourceBuilderExtensions.cs +++ b/src/Aspire.Hosting/Extensions/ParameterResourceBuilderExtensions.cs @@ -37,7 +37,7 @@ internal static IResourceBuilder AddParameter(this IDistribut { var resource = new ParameterResource(name, callback, secret); return builder.AddResource(resource) - .WithCustomResourceState(() => + .WithResourceUpdates(() => { var state = new CustomResourceSnapshot() { diff --git a/tests/Aspire.Hosting.Tests/CustomResourceLoggerTests.cs b/tests/Aspire.Hosting.Tests/ResourceLoggerTests.cs similarity index 94% rename from tests/Aspire.Hosting.Tests/CustomResourceLoggerTests.cs rename to tests/Aspire.Hosting.Tests/ResourceLoggerTests.cs index 673d9e963e..628bc92cd0 100644 --- a/tests/Aspire.Hosting.Tests/CustomResourceLoggerTests.cs +++ b/tests/Aspire.Hosting.Tests/ResourceLoggerTests.cs @@ -6,7 +6,7 @@ namespace Aspire.Hosting.Tests; -public class CustomResourceLoggerTests +public class ResourceLoggerTests { [Fact] public void AddingResourceLoggerAnnotationAllowsLogging() @@ -16,7 +16,7 @@ public void AddingResourceLoggerAnnotationAllowsLogging() var testResource = builder.AddResource(new TestResource("myResource")) .WithResourceLogger(); - var annotation = testResource.Resource.Annotations.OfType().SingleOrDefault(); + var annotation = testResource.Resource.Annotations.OfType().SingleOrDefault(); Assert.NotNull(annotation); @@ -48,7 +48,7 @@ public async Task StreamingLogsCancelledAfterComplete() var testResource = builder.AddResource(new TestResource("myResource")) .WithResourceLogger(); - var annotation = testResource.Resource.Annotations.OfType().SingleOrDefault(); + var annotation = testResource.Resource.Annotations.OfType().SingleOrDefault(); Assert.NotNull(annotation); diff --git a/tests/Aspire.Hosting.Tests/CustomResourceStateTests.cs b/tests/Aspire.Hosting.Tests/ResourceUpdatesTests.cs similarity index 69% rename from tests/Aspire.Hosting.Tests/CustomResourceStateTests.cs rename to tests/Aspire.Hosting.Tests/ResourceUpdatesTests.cs index d5d89bbc99..2fb617d17b 100644 --- a/tests/Aspire.Hosting.Tests/CustomResourceStateTests.cs +++ b/tests/Aspire.Hosting.Tests/ResourceUpdatesTests.cs @@ -5,7 +5,7 @@ namespace Aspire.Hosting.Tests; -public class CustomResourceStateTests +public class ResourceUpdatesTests { [Fact] public void CreatePopulatesStateFromResource() @@ -15,9 +15,9 @@ public void CreatePopulatesStateFromResource() var custom = builder.AddResource(new CustomResource("myResource")) .WithEndpoint(name: "ep", scheme: "http", hostPort: 8080) .WithEnvironment("x", "1000") - .WithCustomResourceState(); + .WithResourceUpdates(); - var annotation = custom.Resource.Annotations.OfType().SingleOrDefault(); + var annotation = custom.Resource.Annotations.OfType().SingleOrDefault(); Assert.NotNull(annotation); @@ -25,7 +25,7 @@ public void CreatePopulatesStateFromResource() Assert.Equal("Custom", state.ResourceType); - Assert.Collection(state.EnviromentVariables, a => + Assert.Collection(state.EnvironmentVariables, a => { Assert.Equal("x", a.Name); Assert.Equal("1000", a.Value); @@ -51,20 +51,20 @@ public void InitialStateCanBeSpecified() var custom = builder.AddResource(new CustomResource("myResource")) .WithEndpoint(name: "ep", scheme: "http", hostPort: 8080) .WithEnvironment("x", "1000") - .WithCustomResourceState(() => new() + .WithResourceUpdates(() => new() { ResourceType = "MyResource", Properties = [("A", "B")], }); - var annotation = custom.Resource.Annotations.OfType().SingleOrDefault(); + var annotation = custom.Resource.Annotations.OfType().SingleOrDefault(); Assert.NotNull(annotation); var state = annotation.GetInitialSnapshot(); Assert.Equal("MyResource", state.ResourceType); - Assert.Empty(state.EnviromentVariables); + Assert.Empty(state.EnvironmentVariables); Assert.Collection(state.Properties, c => { Assert.Equal("A", c.Key); @@ -80,12 +80,26 @@ public async Task ResourceUpdatesAreQueued() var custom = builder.AddResource(new CustomResource("myResource")) .WithEndpoint(name: "ep", scheme: "http", hostPort: 8080) .WithEnvironment("x", "1000") - .WithCustomResourceState(); + .WithResourceUpdates(); - var annotation = custom.Resource.Annotations.OfType().SingleOrDefault(); + var annotation = custom.Resource.Annotations.OfType().SingleOrDefault(); Assert.NotNull(annotation); + async Task> GetValuesAsync() + { + var values = new List(); + + await foreach (var item in annotation.WatchAsync()) + { + values.Add(item); + } + + return values; + } + + var enumerableTask = GetValuesAsync(); + var state = annotation.GetInitialSnapshot(); state = state with { Properties = state.Properties.Add(("A", "value")) }; @@ -96,15 +110,12 @@ public async Task ResourceUpdatesAreQueued() await annotation.UpdateStateAsync(state); - var enumerator = annotation.WatchAsync().GetAsyncEnumerator(); - - await enumerator.MoveNextAsync(); - - Assert.Equal("value", enumerator.Current.Properties.Single(p => p.Key == "A").Value); + annotation.Complete(); - await enumerator.MoveNextAsync(); + var values = await enumerableTask; - Assert.Equal("value", enumerator.Current.Properties.Single(p => p.Key == "B").Value); + Assert.Equal("value", values[0].Properties.Single(p => p.Key == "A").Value); + Assert.Equal("value", values[1].Properties.Single(p => p.Key == "B").Value); } private sealed class CustomResource(string name) : Resource(name),