Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename the annotations #2523

Merged
merged 4 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static IResourceBuilder<TestResource> AddTestResource(this IDistributedAp

var rb = builder.AddResource(new TestResource(name))
.WithResourceLogger()
.WithCustomResourceState(() => new()
.WithResourceUpdates(() => new()
{
ResourceType = "Test Resource",
State = "Starting",
Expand All @@ -36,15 +36,15 @@ public Task BeforeStartAsync(DistributedApplicationModel appModel, CancellationT
{
foreach (var item in appModel.Resources.OfType<TestResource>())
{
if (item.TryGetLastAnnotation<CustomResourceAnnotation>(out var customResourceAnnotation) &&
item.TryGetLastAnnotation<CustomResourceLoggerAnnotation>(out var loggerAnnotation))
if (item.TryGetLastAnnotation<ResourceUpdatesAnnotation>(out var resourceUpdates) &&
item.TryGetLastAnnotation<ResourceLoggerAnnotation>(out var loggerAnnotation))
{
var states = new[] { "Starting", "Running", "Finished" };

Task.Run(async () =>
{
// Simulate custom resource state changes
var state = customResourceAnnotation.GetInitialSnapshot();
var state = resourceUpdates.GetInitialSnapshot();
var seconds = Random.Shared.Next(2, 12);

state = state with
Expand All @@ -55,7 +55,7 @@ public Task BeforeStartAsync(DistributedApplicationModel appModel, CancellationT
loggerAnnotation.Logger.LogInformation("Starting test resource {ResourceName} with update interval {Interval} seconds", item.Name, seconds);

// This might run before the dashboard is ready to receive updates, but it will be queued.
await customResourceAnnotation.UpdateStateAsync(state);
await resourceUpdates.UpdateStateAsync(state);

using var timer = new PeriodicTimer(TimeSpan.FromSeconds(seconds));

Expand All @@ -70,7 +70,7 @@ public Task BeforeStartAsync(DistributedApplicationModel appModel, CancellationT

loggerAnnotation.Logger.LogInformation("Test resource {ResourceName} is now in state {State}", item.Name, randomState);

await customResourceAnnotation.UpdateStateAsync(state);
await resourceUpdates.UpdateStateAsync(state);
}
},
cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ namespace Aspire.Hosting;
public static class CustomResourceExtensions
{
/// <summary>
/// Adds a callback to configure the dashboard context for a resource.
/// Initializes the resource with a <see cref="ResourceUpdatesAnnotation"/> that allows publishing and subscribing to changes in the state of this resource.
/// </summary>
/// <typeparam name="TResource">The resource.</typeparam>
/// <param name="builder">The resource builder.</param>
/// <param name="initialSnapshotFactory">The factory to create the initial <see cref="CustomResourceSnapshot"/> for this resource.</param>
/// <returns>The resource builder.</returns>
public static IResourceBuilder<TResource> WithCustomResourceState<TResource>(this IResourceBuilder<TResource> builder, Func<CustomResourceSnapshot>? initialSnapshotFactory = null)
public static IResourceBuilder<TResource> WithResourceUpdates<TResource>(this IResourceBuilder<TResource> builder, Func<CustomResourceSnapshot>? initialSnapshotFactory = null)
where TResource : IResource
{
initialSnapshotFactory ??= () => CustomResourceSnapshot.Create(builder.Resource);

return builder.WithAnnotation(new CustomResourceAnnotation(initialSnapshotFactory), ResourceAnnotationMutationBehavior.Replace);
return builder.WithAnnotation(new ResourceUpdatesAnnotation(initialSnapshotFactory), ResourceAnnotationMutationBehavior.Replace);
}

/// <summary>
Expand All @@ -34,6 +34,6 @@ public static IResourceBuilder<TResource> WithCustomResourceState<TResource>(thi
public static IResourceBuilder<TResource> WithResourceLogger<TResource>(this IResourceBuilder<TResource> builder)
where TResource : IResource
{
return builder.WithAnnotation(new CustomResourceLoggerAnnotation(), ResourceAnnotationMutationBehavior.Replace);
return builder.WithAnnotation(new ResourceLoggerAnnotation(), ResourceAnnotationMutationBehavior.Replace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Aspire.Hosting.ApplicationModel;
/// <summary>
/// A annotation that exposes a Logger for the resource to write to.
/// </summary>
public sealed class CustomResourceLoggerAnnotation : IResourceAnnotation
public sealed class ResourceLoggerAnnotation : IResourceAnnotation
{
private readonly ResourceLogger _logger;
private readonly CancellationTokenSource _logStreamCts = new();
Expand All @@ -19,9 +19,9 @@ public sealed class CustomResourceLoggerAnnotation : IResourceAnnotation
private readonly CircularBuffer<(string Content, bool IsErrorMessage)> _backlog = new(10000);

/// <summary>
/// Creates a new <see cref="CustomResourceLoggerAnnotation"/>.
/// Creates a new <see cref="ResourceLoggerAnnotation"/>.
/// </summary>
public CustomResourceLoggerAnnotation()
public ResourceLoggerAnnotation()
{
_logger = new ResourceLogger(this);
}
Expand Down Expand Up @@ -49,7 +49,7 @@ public void Complete()
_logStreamCts.Cancel();
}

private sealed class ResourceLogger(CustomResourceLoggerAnnotation annotation) : ILogger
private sealed class ResourceLogger(ResourceLoggerAnnotation annotation) : ILogger
{
IDisposable? ILogger.BeginScope<TState>(TState state) => null;

Expand Down Expand Up @@ -77,7 +77,7 @@ public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Except
}
}

private sealed class LogAsyncEnumerable(CustomResourceLoggerAnnotation annotation) : IAsyncEnumerable<IReadOnlyList<(string, bool)>>
private sealed class LogAsyncEnumerable(ResourceLoggerAnnotation annotation) : IAsyncEnumerable<IReadOnlyList<(string, bool)>>
{
public async IAsyncEnumerator<IReadOnlyList<(string, bool)>> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
Expand All @@ -94,19 +94,18 @@ private sealed class LogAsyncEnumerable(CustomResourceLoggerAnnotation annotatio

var channel = Channel.CreateUnbounded<(string, bool)>();

using var _ = annotation._logStreamCts.Token.Register(() => channel.Writer.TryComplete());

void Log((string Content, bool IsErrorMessage) log)
{
channel.Writer.TryWrite(log);
}

annotation.OnNewLog += Log;

// Connect both tokens so we can stop streaming if the stream was closed
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, annotation._logStreamCts.Token);

try
{
await foreach (var entry in channel.GetBatches(cts.Token))
await foreach (var entry in channel.GetBatches(cancellationToken))
{
yield return entry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@
namespace Aspire.Hosting.ApplicationModel;

/// <summary>
/// The annotation that reflects how a resource shows up in the dashboard.
/// This is a single producer, single consumer channel model for pushing updates to the dashboard.
/// The resource server will be the only caller of WatchAsync.
/// The annotation that allows publishing and subscribing to changes in the state of a resource.
/// </summary>
public sealed class CustomResourceAnnotation(Func<CustomResourceSnapshot> initialSnapshotFactory) : IResourceAnnotation
public sealed class ResourceUpdatesAnnotation(Func<CustomResourceSnapshot> initialSnapshotFactory) : IResourceAnnotation
{
private readonly Channel<CustomResourceSnapshot> _channel = Channel.CreateUnbounded<CustomResourceSnapshot>();
private readonly CancellationTokenSource _streamClosedCts = new();

private Action<CustomResourceSnapshot>? OnSnapshotUpdated { get; set; }

/// <summary>
/// Watch for changes to the dashboard state for a resource.
/// </summary>
public IAsyncEnumerable<CustomResourceSnapshot> WatchAsync(CancellationToken cancellationToken = default) => _channel.Reader.ReadAllAsync(cancellationToken);
public IAsyncEnumerable<CustomResourceSnapshot> WatchAsync() => new ResourceUpdatesAsyncEnumerable(this);

/// <summary>
/// Gets the initial snapshot of the dashboard state for this resource.
Expand All @@ -30,14 +30,58 @@ public sealed class CustomResourceAnnotation(Func<CustomResourceSnapshot> initia
/// Updates the snapshot of the <see cref="CustomResourceSnapshot"/> for a resource.
/// </summary>
/// <param name="state">The new <see cref="CustomResourceSnapshot"/>.</param>
public async Task UpdateStateAsync(CustomResourceSnapshot state)
public Task UpdateStateAsync(CustomResourceSnapshot state)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we leaving this as "Async" in case we want to do something async in the future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

{
if (_streamClosedCts.IsCancellationRequested)
{
return Task.CompletedTask;
}

OnSnapshotUpdated?.Invoke(state);

return Task.CompletedTask;
}

/// <summary>
/// Signal that no more updates are expected for this resource.
/// </summary>
public void Complete()
{
_streamClosedCts.Cancel();
}

private sealed class ResourceUpdatesAsyncEnumerable(ResourceUpdatesAnnotation customResourceAnnotation) : IAsyncEnumerable<CustomResourceSnapshot>
{
await _channel.Writer.WriteAsync(state).ConfigureAwait(false);
public async IAsyncEnumerator<CustomResourceSnapshot> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
var channel = Channel.CreateUnbounded<CustomResourceSnapshot>();

void WriteToChannel(CustomResourceSnapshot state)
=> channel.Writer.TryWrite(state);

using var _ = customResourceAnnotation._streamClosedCts.Token.Register(() => channel.Writer.TryComplete());

customResourceAnnotation.OnSnapshotUpdated = WriteToChannel;

try
{
await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
{
yield return item;
}
}
finally
{
customResourceAnnotation.OnSnapshotUpdated -= WriteToChannel;

channel.Writer.TryComplete();
}
}
}
}

/// <summary>
/// The context for a all of the properties and URLs that should show up in the dashboard for a resource.
/// An immutable snapshot of the state of a resource.
/// </summary>
public sealed record CustomResourceSnapshot
{
davidfowl marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -59,7 +103,7 @@ public sealed record CustomResourceSnapshot
/// <summary>
/// The environment variables that should show up in the dashboard for this resource.
/// </summary>
public ImmutableArray<(string Name, string Value)> EnviromentVariables { get; init; } = [];
public ImmutableArray<(string Name, string Value)> EnvironmentVariables { get; init; } = [];

/// <summary>
/// The URLs that should show up in the dashboard for this resource.
Expand Down Expand Up @@ -106,7 +150,7 @@ static string GetUrl(EndpointAnnotation e) =>
return new CustomResourceSnapshot()
{
ResourceType = resource.GetType().Name.Replace("Resource", ""),
EnviromentVariables = environmentVariables,
EnvironmentVariables = environmentVariables,
Urls = urls,
Properties = properties
};
Expand Down
2 changes: 1 addition & 1 deletion src/Aspire.Hosting/Dashboard/ConsoleLogPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ LogsEnumerable SubscribeContainerResource(ContainerSnapshot container)

private static LogsEnumerable SubscribeGenericResource(IResource resource)
{
if (resource.TryGetLastAnnotation<CustomResourceLoggerAnnotation>(out var loggerAnnotation))
if (resource.TryGetLastAnnotation<ResourceLoggerAnnotation>(out var loggerAnnotation))
{
return loggerAnnotation.WatchAsync();
}
Expand Down
8 changes: 4 additions & 4 deletions src/Aspire.Hosting/Dashboard/DcpDataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,11 @@ private async Task ProcessInitialResourceAsync(IResource resource, CancellationT

await _onResourceChanged(snapshot, ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
}
else if (resource.TryGetLastAnnotation<CustomResourceAnnotation>(out var dashboardAnnotation))
else if (resource.TryGetLastAnnotation<ResourceUpdatesAnnotation>(out var resourceUpdates))
{
// We have a dashboard annotation, so we want to create a snapshot for the resource
// and update data immediately. We also want to watch for changes to the dashboard state.
var state = dashboardAnnotation.GetInitialSnapshot();
var state = resourceUpdates.GetInitialSnapshot();
var creationTimestamp = DateTime.UtcNow;

var snapshot = CreateResourceSnapshot(resource, creationTimestamp, state);
Expand All @@ -210,7 +210,7 @@ private async Task ProcessInitialResourceAsync(IResource resource, CancellationT

_ = Task.Run(async () =>
{
await foreach (var state in dashboardAnnotation.WatchAsync(cancellationToken))
await foreach (var state in resourceUpdates.WatchAsync().WithCancellation(cancellationToken))
{
try
{
Expand All @@ -231,7 +231,7 @@ private async Task ProcessInitialResourceAsync(IResource resource, CancellationT
private static GenericResourceSnapshot CreateResourceSnapshot(IResource resource, DateTime creationTimestamp, CustomResourceSnapshot dashboardState)
{
ImmutableArray<EnvironmentVariableSnapshot> environmentVariables = [..
dashboardState.EnviromentVariables.Select(e => new EnvironmentVariableSnapshot(e.Name, e.Value, false))];
dashboardState.EnvironmentVariables.Select(e => new EnvironmentVariableSnapshot(e.Name, e.Value, false))];

ImmutableArray<EndpointSnapshot> endpoints = [..
dashboardState.Urls.Select(u => new EndpointSnapshot(u, u))];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ internal static IResourceBuilder<ParameterResource> AddParameter(this IDistribut
{
var resource = new ParameterResource(name, callback, secret);
return builder.AddResource(resource)
.WithCustomResourceState(() =>
.WithResourceUpdates(() =>
{
var state = new CustomResourceSnapshot()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace Aspire.Hosting.Tests;

public class CustomResourceLoggerTests
public class ResourceLoggerTests
{
[Fact]
public void AddingResourceLoggerAnnotationAllowsLogging()
Expand All @@ -16,7 +16,7 @@ public void AddingResourceLoggerAnnotationAllowsLogging()
var testResource = builder.AddResource(new TestResource("myResource"))
.WithResourceLogger();

var annotation = testResource.Resource.Annotations.OfType<CustomResourceLoggerAnnotation>().SingleOrDefault();
var annotation = testResource.Resource.Annotations.OfType<ResourceLoggerAnnotation>().SingleOrDefault();

Assert.NotNull(annotation);

Expand Down Expand Up @@ -48,7 +48,7 @@ public async Task StreamingLogsCancelledAfterComplete()
var testResource = builder.AddResource(new TestResource("myResource"))
.WithResourceLogger();

var annotation = testResource.Resource.Annotations.OfType<CustomResourceLoggerAnnotation>().SingleOrDefault();
var annotation = testResource.Resource.Annotations.OfType<ResourceLoggerAnnotation>().SingleOrDefault();

Assert.NotNull(annotation);

Expand Down
Loading
Loading