Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve resources page performance with many resources #2556

Merged
merged 3 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Aspire.Dashboard/Components/Pages/ConsoleLogs.razor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected override async Task OnInitializedAsync()

var loadingTcs = new TaskCompletionSource();

TrackResourceSnapshots();
await TrackResourceSnapshotsAsync();

// Wait for resource to be selected. If selected resource isn't available after a few seconds then stop waiting.
try
Expand All @@ -74,14 +74,14 @@ protected override async Task OnInitializedAsync()
Logger.LogWarning(ex, "Load timeout while waiting for resource {ResourceName}.", ResourceName);
}

void TrackResourceSnapshots()
async Task TrackResourceSnapshotsAsync()
{
if (!DashboardClient.IsEnabled)
{
return;
}

var (snapshot, subscription) = DashboardClient.SubscribeResources();
var (snapshot, subscription) = await DashboardClient.SubscribeResourcesAsync(_resourceSubscriptionCancellation.Token);

Logger.LogDebug("Received initial resource snapshot with {ResourceCount} resources.", snapshot.Length);

Expand Down
8 changes: 4 additions & 4 deletions src/Aspire.Dashboard/Components/Pages/Resources.razor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ static bool UnionWithKeys(ConcurrentDictionary<string, bool> left, ConcurrentDic
private readonly GridSort<ResourceViewModel> _stateSort = GridSort<ResourceViewModel>.ByAscending(p => p.State);
private readonly GridSort<ResourceViewModel> _startTimeSort = GridSort<ResourceViewModel>.ByDescending(p => p.CreationTimeStamp);

protected override void OnInitialized()
protected override async Task OnInitializedAsync()
{
_applicationUnviewedErrorCounts = TelemetryRepository.GetApplicationUnviewedErrorLogsCount();

if (DashboardClient.IsEnabled)
{
SubscribeResources();
await SubscribeResourcesAsync();
}

_logsSubscription = TelemetryRepository.OnNewLogs(null, SubscriptionType.Other, async () =>
Expand All @@ -115,9 +115,9 @@ protected override void OnInitialized()
await InvokeAsync(StateHasChanged);
});

void SubscribeResources()
async Task SubscribeResourcesAsync()
{
var (snapshot, subscription) = DashboardClient.SubscribeResources();
var (snapshot, subscription) = await DashboardClient.SubscribeResourcesAsync(_watchTaskCancellationTokenSource.Token);

// Apply snapshot.
foreach (var resource in snapshot)
Expand Down
80 changes: 53 additions & 27 deletions src/Aspire.Dashboard/Model/DashboardClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace Aspire.Dashboard.Model;
/// <para>
/// If the <c>DOTNET_RESOURCE_SERVICE_ENDPOINT_URL</c> environment variable is not specified, then there's
/// no known endpoint to connect to, and this dashboard client will be disabled. Calls to
/// <see cref="IDashboardClient.SubscribeResources"/> and <see cref="IDashboardClient.SubscribeConsoleLogs"/>
/// <see cref="IDashboardClient.SubscribeResourcesAsync"/> and <see cref="IDashboardClient.SubscribeConsoleLogs"/>
/// will throw if <see cref="IDashboardClient.IsEnabled"/> is <see langword="false"/>. Callers should
/// check this property first, before calling these methods.
/// </para>
Expand All @@ -37,7 +37,8 @@ internal sealed class DashboardClient : IDashboardClient
private readonly Dictionary<string, ResourceViewModel> _resourceByName = new(StringComparers.ResourceName);
private readonly CancellationTokenSource _cts = new();
private readonly CancellationToken _clientCancellationToken;
private readonly TaskCompletionSource _whenConnected = new();
private readonly TaskCompletionSource _whenConnectedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource _initialDataReceivedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly object _lock = new();

private readonly ILoggerFactory _loggerFactory;
Expand Down Expand Up @@ -75,7 +76,7 @@ public DashboardClient(ILoggerFactory loggerFactory, IConfiguration configuratio
_state = StateDisabled;
_logger.LogDebug($"{ResourceServiceUrlVariableName} is not specified. Dashboard client services are unavailable.");
_cts.Cancel();
_whenConnected.TrySetCanceled();
_whenConnectedTcs.TrySetCanceled();
return;
}

Expand Down Expand Up @@ -164,11 +165,11 @@ async Task ConnectAsync()

_applicationName = response.ApplicationName;

_whenConnected.TrySetResult();
_whenConnectedTcs.TrySetResult();
}
catch (Exception ex)
{
_whenConnected.TrySetException(ex);
_whenConnectedTcs.TrySetException(ex);
}
}

Expand Down Expand Up @@ -241,6 +242,8 @@ async Task WatchResourcesAsync()
changes ??= [];
changes.Add(new(ResourceViewModelChangeType.Upsert, viewModel));
}

_initialDataReceivedTcs.TrySetResult();
}
else if (response.KindCase == WatchResourcesUpdate.KindOneofCase.Changes)
{
Expand Down Expand Up @@ -287,7 +290,8 @@ async Task WatchResourcesAsync()
// TODO send batches over the channel instead of individual items? They are batched downstream however
foreach (var change in changes)
{
await channel.Writer.WriteAsync(change, cancellationToken).ConfigureAwait(false);
// Channel is unbound so TryWrite always succeeds.
channel.Writer.TryWrite(change);
}
}
}
Expand All @@ -305,7 +309,7 @@ Task IDashboardClient.WhenConnected
// If someone is waiting for the connection, we need to ensure connection is starting.
EnsureInitialized();

return _whenConnected.Task;
return _whenConnectedTcs.Task;
}
}

Expand All @@ -316,40 +320,44 @@ string IDashboardClient.ApplicationName
?? "Aspire";
}

ResourceViewModelSubscription IDashboardClient.SubscribeResources()
async Task<ResourceViewModelSubscription> IDashboardClient.SubscribeResourcesAsync(CancellationToken cancellationToken)
{
EnsureInitialized();

var cts = CancellationTokenSource.CreateLinkedTokenSource(_clientCancellationToken, cancellationToken);

// Wait for initial data to be received from the server. This allows initial data to be returned with subscription when client is starting.
await _initialDataReceivedTcs.Task.WaitAsync(cts.Token).ConfigureAwait(false);

// There are two types of channel in this class. This is not a gRPC channel.
// It's a producer-consumer queue channel, used to push updates to subscribers
// without blocking the producer here.
var channel = Channel.CreateUnbounded<ResourceViewModelChange>(
new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = true });

lock (_lock)
{
// There are two types of channel in this class. This is not a gRPC channel.
// It's a producer-consumer queue channel, used to push updates to subscribers
// without blocking the producer here.
var channel = Channel.CreateUnbounded<ResourceViewModelChange>(
new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = true });

ImmutableInterlocked.Update(ref _outgoingChannels, static (set, channel) => set.Add(channel), channel);

return new ResourceViewModelSubscription(
InitialState: _resourceByName.Values.ToImmutableArray(),
Subscription: StreamUpdates());
Subscription: StreamUpdatesAsync(cts.Token));
}

async IAsyncEnumerable<ResourceViewModelChange> StreamUpdates([EnumeratorCancellation] CancellationToken enumeratorCancellationToken = default)
async IAsyncEnumerable<ResourceViewModelChange> StreamUpdatesAsync([EnumeratorCancellation] CancellationToken enumeratorCancellationToken = default)
{
try
{
try
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(_clientCancellationToken, enumeratorCancellationToken);

await foreach (var batch in channel.Reader.ReadAllAsync(cts.Token).ConfigureAwait(false))
{
yield return batch;
}
}
finally
await foreach (var batch in channel.Reader.ReadAllAsync(enumeratorCancellationToken).ConfigureAwait(false))
{
ImmutableInterlocked.Update(ref _outgoingChannels, static (set, channel) => set.Remove(channel), channel);
yield return batch;
}
}
finally
{
cts.Dispose();
ImmutableInterlocked.Update(ref _outgoingChannels, static (set, channel) => set.Remove(channel), channel);
}
}
}

Expand Down Expand Up @@ -391,4 +399,22 @@ public async ValueTask DisposeAsync()
await TaskHelpers.WaitIgnoreCancelAsync(_connection, _logger, "Unexpected error from connection task.").ConfigureAwait(false);
}
}

// Internal for testing.
// TODO: Improve this in the future by making the client injected with DI and have it return data.
internal void SetInitialDataReceived(IList<Resource>? initialData = null)
{
if (initialData != null)
{
lock (_lock)
{
foreach (var data in initialData)
{
_resourceByName[data.Name] = data.ToViewModel();
}
}
}

_initialDataReceivedTcs.TrySetResult();
}
}
2 changes: 1 addition & 1 deletion src/Aspire.Dashboard/Model/IDashboardClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface IDashboardClient : IAsyncDisposable
/// Callers are required to manage the lifetime of the subscription,
/// using cancellation during enumeration.
/// </remarks>
ResourceViewModelSubscription SubscribeResources();
Task<ResourceViewModelSubscription> SubscribeResourcesAsync(CancellationToken cancellationToken);

/// <summary>
/// Gets a stream of console log messages for the specified resource.
Expand Down
16 changes: 8 additions & 8 deletions src/Aspire.Dashboard/Model/ResourceOutgoingPeerResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ public ResourceOutgoingPeerResolver(IDashboardClient resourceService)
return;
}

var (snapshot, subscription) = resourceService.SubscribeResources();

foreach (var resource in snapshot)
{
var added = _resourceByName.TryAdd(resource.Name, resource);
Debug.Assert(added, "Should not receive duplicate resources in initial snapshot data.");
}

_watchTask = Task.Run(async () =>
{
var (snapshot, subscription) = await resourceService.SubscribeResourcesAsync(_watchContainersTokenSource.Token).ConfigureAwait(false);

foreach (var resource in snapshot)
{
var added = _resourceByName.TryAdd(resource.Name, resource);
Debug.Assert(added, "Should not receive duplicate resources in initial snapshot data.");
}

await foreach (var (changeType, resource) in subscription.WithCancellation(_watchContainersTokenSource.Token))
{
if (changeType == ResourceViewModelChangeType.Upsert)
Expand Down
2 changes: 1 addition & 1 deletion src/Aspire.Dashboard/Protos/Partials.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public ResourceViewModel ToViewModel()
ResourceType = ValidateNotNull(ResourceType),
DisplayName = ValidateNotNull(DisplayName),
Uid = ValidateNotNull(Uid),
CreationTimeStamp = CreatedAt.ToDateTime(),
CreationTimeStamp = ValidateNotNull(CreatedAt).ToDateTime(),
Properties = Properties.ToFrozenDictionary(property => ValidateNotNull(property.Name), property => ValidateNotNull(property.Value), StringComparers.ResourcePropertyName),
Endpoints = GetEndpoints(),
Environment = GetEnvironment(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void Log((string Content, bool IsErrorMessage) log)

try
{
await foreach (var entry in channel.GetBatches(cancellationToken))
await foreach (var entry in channel.GetBatchesAsync(cancellationToken))
{
yield return entry;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Aspire.Hosting/Dashboard/DockerContainerLogSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ internal sealed class DockerContainerLogSource(string containerId) : IAsyncEnume
// Don't forward cancellationToken here, because it's handled internally in WaitForExit
_ = Task.Run(() => WaitForExit(tcs, ctr), CancellationToken.None);

await foreach (var batch in channel.GetBatches(cancellationToken))
await foreach (var batch in channel.GetBatchesAsync(cancellationToken))
{
yield return batch;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Aspire.Hosting/Dashboard/FileLogSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal sealed partial class FileLogSource(string stdOutPath, string stdErrPath
var stdOut = Task.Run(() => WatchFileAsync(stdOutPath, isError: false), cancellationToken);
var stdErr = Task.Run(() => WatchFileAsync(stdErrPath, isError: true), cancellationToken);

await foreach (var batch in channel.GetBatches(cancellationToken))
await foreach (var batch in channel.GetBatchesAsync(cancellationToken))
{
yield return batch;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Aspire.Hosting/Dashboard/ResourceLogSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public async IAsyncEnumerator<LogEntryList> GetAsyncEnumerator(CancellationToken
TaskContinuationOptions.None,
TaskScheduler.Default).ConfigureAwait(false);

await foreach (var batch in channel.GetBatches(cancellationToken))
await foreach (var batch in channel.GetBatchesAsync(cancellationToken))
{
yield return batch;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Aspire.Hosting/Dashboard/ResourcePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async IAsyncEnumerable<IReadOnlyList<ResourceSnapshotChange>> StreamUpdates([Enu

try
{
await foreach (var batch in channel.GetBatches(linked.Token).ConfigureAwait(false))
await foreach (var batch in channel.GetBatchesAsync(linked.Token).ConfigureAwait(false))
{
yield return batch;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Shared/ChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal static class ChannelExtensions
/// <param name="channel">The channel to read values from.</param>
/// <param name="cancellationToken">A token that signals a loss of interest in the operation.</param>
/// <returns></returns>
public static async IAsyncEnumerable<IReadOnlyList<T>> GetBatches<T>(
public static async IAsyncEnumerable<IReadOnlyList<T>> GetBatchesAsync<T>(
this Channel<T> channel,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ private sealed class MockDashboardClient : IDashboardClient
public string ApplicationName => "<marquee>An HTML title!</marquee>";
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
public IAsyncEnumerable<IReadOnlyList<(string Content, bool IsErrorMessage)>>? SubscribeConsoleLogs(string resourceName, CancellationToken cancellationToken) => throw new NotImplementedException();
public ResourceViewModelSubscription SubscribeResources() => throw new NotImplementedException();
public Task<ResourceViewModelSubscription> SubscribeResourcesAsync(CancellationToken cancellationToken) => throw new NotImplementedException();
}
}
Loading