From 2aeaef2d8b866801afd4fb471ae3baed8b58911d Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Fri, 1 Nov 2024 12:28:51 -0700 Subject: [PATCH 1/3] Wait for acknowledgment when sending message to gRPC channel --- .../Agents/GrpcAgentWorkerRuntime.cs | 48 +++++++++++++++---- 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/dotnet/src/Microsoft.AutoGen/Agents/GrpcAgentWorkerRuntime.cs b/dotnet/src/Microsoft.AutoGen/Agents/GrpcAgentWorkerRuntime.cs index c52509876ff..193f9dd2b63 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/GrpcAgentWorkerRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/GrpcAgentWorkerRuntime.cs @@ -19,7 +19,7 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent private readonly ConcurrentDictionary _agentTypes = new(); private readonly ConcurrentDictionary<(string Type, string Key), IAgentBase> _agents = new(); private readonly ConcurrentDictionary _pendingRequests = new(); - private readonly Channel _outboundMessagesChannel = Channel.CreateBounded(new BoundedChannelOptions(1024) + private readonly Channel<(Message Message, TaskCompletionSource WriteCompletionSource)> _outboundMessagesChannel = Channel.CreateBounded<(Message, TaskCompletionSource)>(new BoundedChannelOptions(1024) { AllowSynchronousContinuations = true, SingleReader = true, @@ -138,30 +138,34 @@ private async Task RunWritePump() var outboundMessages = _outboundMessagesChannel.Reader; while (!_shutdownCts.IsCancellationRequested) { + (Message Message, TaskCompletionSource WriteCompletionSource) item = default; try { await outboundMessages.WaitToReadAsync().ConfigureAwait(false); // Read the next message if we don't already have an unsent message // waiting to be sent. - if (!outboundMessages.TryRead(out var message)) + if (!outboundMessages.TryRead(out item)) { break; } while (!_shutdownCts.IsCancellationRequested) { - await channel.RequestStream.WriteAsync(message, _shutdownCts.Token).ConfigureAwait(false); + await channel.RequestStream.WriteAsync(item.Message, _shutdownCts.Token).ConfigureAwait(false); + item.WriteCompletionSource.TrySetResult(); break; } } catch (OperationCanceledException) { // Time to shut down. + item.WriteCompletionSource?.TrySetCanceled(); break; } catch (Exception ex) when (!_shutdownCts.IsCancellationRequested) { + item.WriteCompletionSource?.TrySetException(ex); _logger.LogError(ex, "Error writing to channel."); channel = RecreateChannel(channel); continue; @@ -169,9 +173,15 @@ private async Task RunWritePump() catch { // Shutdown requested. + item.WriteCompletionSource?.TrySetCanceled(); break; } } + + while (outboundMessages.TryRead(out var item)) + { + item.WriteCompletionSource.TrySetCanceled(); + } } private IAgentBase GetOrActivateAgent(AgentId agentId) @@ -213,7 +223,8 @@ await WriteChannelAsync(new Message //StateType = state?.Name, //Events = { events } } - }).ConfigureAwait(false); + }, + _shutdownCts.Token).ConfigureAwait(false); } } @@ -229,17 +240,36 @@ public async ValueTask SendRequest(IAgentBase agent, RpcRequest request) var requestId = Guid.NewGuid().ToString(); _pendingRequests[requestId] = (agent, request.RequestId); request.RequestId = requestId; - await WriteChannelAsync(new Message { Request = request }).ConfigureAwait(false); + try + { + await WriteChannelAsync(new Message { Request = request }).ConfigureAwait(false); + } + catch (Exception exception) + { + if (_pendingRequests.TryRemove(requestId, out _)) + { + agent.ReceiveMessage(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = exception.Message } }); + } + } } public async ValueTask PublishEvent(CloudEvent @event) { - await WriteChannelAsync(new Message { CloudEvent = @event }).ConfigureAwait(false); + try + { + await WriteChannelAsync(new Message { CloudEvent = @event }).ConfigureAwait(false); + } + catch (Exception exception) + { + _logger.LogWarning(exception, "Failed to publish event '{Event}'.", @event); + } } - private async Task WriteChannelAsync(Message message) + private async Task WriteChannelAsync(Message message, CancellationToken cancellationToken = default) { - await _outboundMessagesChannel.Writer.WriteAsync(message).ConfigureAwait(false); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + await _outboundMessagesChannel.Writer.WriteAsync((message, tcs), cancellationToken).ConfigureAwait(false); + await tcs.Task.WaitAsync(cancellationToken); } private AsyncDuplexStreamingCall GetChannel() @@ -269,7 +299,7 @@ private AsyncDuplexStreamingCall RecreateChannel(AsyncDuplexSt if (_channel is null || _channel == channel) { _channel?.Dispose(); - _channel = _client.OpenChannel(); + _channel = _client.OpenChannel(cancellationToken: _shutdownCts.Token); } } } From 7d4874efc08f3d5c9568e35c76597f7961b5ce67 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Fri, 1 Nov 2024 12:32:21 -0700 Subject: [PATCH 2/3] Add CancellationToken parameters to API surface --- .../Abstractions/IAgentContext.cs | 10 ++++----- .../Abstractions/IAgentWorkerRuntime.cs | 10 ++++----- .../src/Microsoft.AutoGen/Agents/AgentBase.cs | 5 ++--- .../Microsoft.AutoGen/Agents/AgentContext.cs | 20 ++++++++--------- .../Agents/GrpcAgentWorkerRuntime.cs | 22 +++++++++---------- 5 files changed, 33 insertions(+), 34 deletions(-) diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentContext.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentContext.cs index d93b6246765..ab5972730fb 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentContext.cs +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentContext.cs @@ -12,9 +12,9 @@ public interface IAgentContext IAgentBase? AgentInstance { get; set; } DistributedContextPropagator DistributedContextPropagator { get; } // TODO: Remove this. An abstraction should not have a dependency on DistributedContextPropagator. ILogger Logger { get; } // TODO: Remove this. An abstraction should not have a dependency on ILogger. - ValueTask Store(AgentState value); - ValueTask Read(AgentId agentId); - ValueTask SendResponseAsync(RpcRequest request, RpcResponse response); - ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request); - ValueTask PublishEventAsync(CloudEvent @event); + ValueTask Store(AgentState value, CancellationToken cancellationToken = default); + ValueTask Read(AgentId agentId, CancellationToken cancellationToken = default); + ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken = default); + ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default); + ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default); } diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorkerRuntime.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorkerRuntime.cs index 1a255e13234..c03259f722f 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorkerRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorkerRuntime.cs @@ -5,9 +5,9 @@ namespace Microsoft.AutoGen.Abstractions; public interface IAgentWorkerRuntime { - ValueTask PublishEvent(CloudEvent evt); - ValueTask SendRequest(IAgentBase agent, RpcRequest request); - ValueTask SendResponse(RpcResponse response); - ValueTask Store(AgentState value); - ValueTask Read(AgentId agentId); + ValueTask PublishEvent(CloudEvent evt, CancellationToken cancellationToken); + ValueTask SendRequest(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken); + ValueTask SendResponse(RpcResponse response, CancellationToken cancellationToken); + ValueTask Store(AgentState value, CancellationToken cancellationToken); + ValueTask Read(AgentId agentId, CancellationToken cancellationToken); } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs index af06c84e9ba..baa7ee201ed 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs @@ -202,15 +202,14 @@ public async ValueTask PublishEvent(CloudEvent item) var activity = s_source.StartActivity($"PublishEvent '{item.Type}'", ActivityKind.Client, Activity.Current?.Context ?? default); activity?.SetTag("peer.service", $"{item.Type}/{item.Source}"); - var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); // TODO: fix activity Context.DistributedContextPropagator.Inject(activity, item.Metadata, static (carrier, key, value) => ((IDictionary)carrier!)[key] = value); await this.InvokeWithActivityAsync( - static async ((AgentBase Agent, CloudEvent Event, TaskCompletionSource) state) => + static async ((AgentBase Agent, CloudEvent Event) state) => { await state.Agent._context.PublishEventAsync(state.Event).ConfigureAwait(false); }, - (this, item, completion), + (this, item), activity, item.Type).ConfigureAwait(false); } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs index 325bc33a11d..7de1e6565d3 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs @@ -15,25 +15,25 @@ internal sealed class AgentContext(AgentId agentId, IAgentWorkerRuntime runtime, public ILogger Logger { get; } = logger; public IAgentBase? AgentInstance { get; set; } public DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator; - public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response) + public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken) { response.RequestId = request.RequestId; - await _runtime.SendResponse(response); + await _runtime.SendResponse(response, cancellationToken).ConfigureAwait(false); } - public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request) + public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken) { - await _runtime.SendRequest(agent, request).ConfigureAwait(false); + await _runtime.SendRequest(agent, request, cancellationToken).ConfigureAwait(false); } - public async ValueTask PublishEventAsync(CloudEvent @event) + public async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken) { - await _runtime.PublishEvent(@event).ConfigureAwait(false); + await _runtime.PublishEvent(@event, cancellationToken).ConfigureAwait(false); } - public async ValueTask Store(AgentState value) + public async ValueTask Store(AgentState value, CancellationToken cancellationToken) { - await _runtime.Store(value).ConfigureAwait(false); + await _runtime.Store(value, cancellationToken).ConfigureAwait(false); } - public ValueTask Read(AgentId agentId) + public ValueTask Read(AgentId agentId, CancellationToken cancellationToken) { - return _runtime.Read(agentId); + return _runtime.Read(agentId, cancellationToken); } } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/GrpcAgentWorkerRuntime.cs b/dotnet/src/Microsoft.AutoGen/Agents/GrpcAgentWorkerRuntime.cs index 193f9dd2b63..b0550c1fb71 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/GrpcAgentWorkerRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/GrpcAgentWorkerRuntime.cs @@ -228,13 +228,13 @@ await WriteChannelAsync(new Message } } - public async ValueTask SendResponse(RpcResponse response) + public async ValueTask SendResponse(RpcResponse response, CancellationToken cancellationToken) { _logger.LogInformation("Sending response '{Response}'.", response); - await WriteChannelAsync(new Message { Response = response }).ConfigureAwait(false); + await WriteChannelAsync(new Message { Response = response }, cancellationToken).ConfigureAwait(false); } - public async ValueTask SendRequest(IAgentBase agent, RpcRequest request) + public async ValueTask SendRequest(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken) { _logger.LogInformation("[{AgentId}] Sending request '{Request}'.", agent.AgentId, request); var requestId = Guid.NewGuid().ToString(); @@ -242,7 +242,7 @@ public async ValueTask SendRequest(IAgentBase agent, RpcRequest request) request.RequestId = requestId; try { - await WriteChannelAsync(new Message { Request = request }).ConfigureAwait(false); + await WriteChannelAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false); } catch (Exception exception) { @@ -253,11 +253,11 @@ public async ValueTask SendRequest(IAgentBase agent, RpcRequest request) } } - public async ValueTask PublishEvent(CloudEvent @event) + public async ValueTask PublishEvent(CloudEvent @event, CancellationToken cancellationToken) { try { - await WriteChannelAsync(new Message { CloudEvent = @event }).ConfigureAwait(false); + await WriteChannelAsync(new Message { CloudEvent = @event }, cancellationToken).ConfigureAwait(false); } catch (Exception exception) { @@ -265,7 +265,7 @@ public async ValueTask PublishEvent(CloudEvent @event) } } - private async Task WriteChannelAsync(Message message, CancellationToken cancellationToken = default) + private async Task WriteChannelAsync(Message message, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); await _outboundMessagesChannel.Writer.WriteAsync((message, tcs), cancellationToken).ConfigureAwait(false); @@ -364,19 +364,19 @@ public async Task StopAsync(CancellationToken cancellationToken) _channel?.Dispose(); } } - public ValueTask Store(AgentState value) + public ValueTask Store(AgentState value, CancellationToken cancellationToken) { var agentId = value.AgentId ?? throw new InvalidOperationException("AgentId is required when saving AgentState."); - var response = _client.SaveState(value); + var response = _client.SaveState(value, cancellationToken: cancellationToken); if (!response.Success) { throw new InvalidOperationException($"Error saving AgentState for AgentId {agentId}."); } return ValueTask.CompletedTask; } - public async ValueTask Read(AgentId agentId) + public async ValueTask Read(AgentId agentId, CancellationToken cancellationToken) { - var response = await _client.GetStateAsync(agentId); + var response = await _client.GetStateAsync(agentId, cancellationToken: cancellationToken); // if (response.Success && response.AgentState.AgentId is not null) - why is success always false? if (response.AgentState.AgentId is not null) { From f13cb5dba167d3917cb706956267022e25bca1f1 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Fri, 1 Nov 2024 12:36:29 -0700 Subject: [PATCH 3/3] Clean up the Hello sample, support Aspire 9.0, & fix shutdown --- dotnet/samples/Hello/Backend/Backend.csproj | 16 +++++++------ dotnet/samples/Hello/Backend/Program.cs | 2 -- .../Hello/Hello.AppHost/Hello.AppHost.csproj | 7 +++--- .../Hello/HelloAIAgents/HelloAIAgent.cs | 1 - .../Hello/HelloAIAgents/HelloAIAgents.csproj | 24 +++++++++---------- dotnet/samples/Hello/HelloAIAgents/Program.cs | 3 --- .../Hello/HelloAgent/HelloAgent.csproj | 24 +++++++++---------- dotnet/samples/Hello/HelloAgent/Program.cs | 11 ++++----- .../HelloAgentState/HelloAgentState.csproj | 24 +++++++++---------- .../samples/Hello/HelloAgentState/Program.cs | 2 -- 10 files changed, 50 insertions(+), 64 deletions(-) diff --git a/dotnet/samples/Hello/Backend/Backend.csproj b/dotnet/samples/Hello/Backend/Backend.csproj index 60097b5d379..2f5a02ee511 100644 --- a/dotnet/samples/Hello/Backend/Backend.csproj +++ b/dotnet/samples/Hello/Backend/Backend.csproj @@ -1,14 +1,16 @@ - - - - - - - + Exe net8.0 enable enable + + + + + + + + diff --git a/dotnet/samples/Hello/Backend/Program.cs b/dotnet/samples/Hello/Backend/Program.cs index 9f55daf69fc..7abdb205a85 100644 --- a/dotnet/samples/Hello/Backend/Program.cs +++ b/dotnet/samples/Hello/Backend/Program.cs @@ -1,7 +1,5 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Program.cs -using Microsoft.Extensions.Hosting; - var app = await Microsoft.AutoGen.Runtime.Host.StartAsync(local: true); await app.WaitForShutdownAsync(); diff --git a/dotnet/samples/Hello/Hello.AppHost/Hello.AppHost.csproj b/dotnet/samples/Hello/Hello.AppHost/Hello.AppHost.csproj index 3ecd30dee13..88d23268c44 100644 --- a/dotnet/samples/Hello/Hello.AppHost/Hello.AppHost.csproj +++ b/dotnet/samples/Hello/Hello.AppHost/Hello.AppHost.csproj @@ -1,5 +1,4 @@ - Exe net8.0 @@ -10,12 +9,12 @@ - - + + + - diff --git a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs index ebde6d6d2f5..f7939da7d68 100644 --- a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs +++ b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs @@ -4,7 +4,6 @@ using Microsoft.AutoGen.Abstractions; using Microsoft.AutoGen.Agents; using Microsoft.Extensions.AI; -using Microsoft.Extensions.DependencyInjection; namespace Hello; [TopicSubscription("HelloAgents")] diff --git a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj index 73f1891b3f2..86bccb13b37 100644 --- a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj +++ b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj @@ -1,16 +1,4 @@ - - - - - - - - - - - - - + Exe net8.0 @@ -18,4 +6,14 @@ enable + + + + + + + + + + diff --git a/dotnet/samples/Hello/HelloAIAgents/Program.cs b/dotnet/samples/Hello/HelloAIAgents/Program.cs index 9d1964bfd1e..ebede82bb4f 100644 --- a/dotnet/samples/Hello/HelloAIAgents/Program.cs +++ b/dotnet/samples/Hello/HelloAIAgents/Program.cs @@ -2,11 +2,8 @@ // Program.cs using Hello; -using Microsoft.AspNetCore.Builder; using Microsoft.AutoGen.Abstractions; using Microsoft.AutoGen.Agents; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; // send a message to the agent var builder = WebApplication.CreateBuilder(); diff --git a/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj b/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj index eb2ba96d664..8799eb7275d 100644 --- a/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj +++ b/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj @@ -1,16 +1,4 @@ - - - - - - - - - - - - - + Exe net8.0 @@ -18,4 +6,14 @@ enable + + + + + + + + + + diff --git a/dotnet/samples/Hello/HelloAgent/Program.cs b/dotnet/samples/Hello/HelloAgent/Program.cs index fbe5d2f6dff..02ad838dea0 100644 --- a/dotnet/samples/Hello/HelloAgent/Program.cs +++ b/dotnet/samples/Hello/HelloAgent/Program.cs @@ -3,8 +3,6 @@ using Microsoft.AutoGen.Abstractions; using Microsoft.AutoGen.Agents; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; // step 1: create in-memory agent runtime @@ -27,7 +25,8 @@ namespace Hello [TopicSubscription("HelloAgents")] public class HelloAgent( IAgentContext context, - [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : AgentBase( + [FromKeyedServices("EventTypes")] EventTypes typeRegistry, + IHostApplicationLifetime hostApplicationLifetime) : AgentBase( context, typeRegistry), ISayHello, @@ -58,11 +57,11 @@ public async Task Handle(ConversationClosed item) Message = goodbye }.ToCloudEvent(this.AgentId.Key); await PublishEvent(evt).ConfigureAwait(false); - //sleep - await Task.Delay(10000).ConfigureAwait(false); - await AgentsApp.ShutdownAsync().ConfigureAwait(false); + // Signal shutdown. + hostApplicationLifetime.StopApplication(); } + public async Task SayHello(string ask) { var response = $"\n\n\n\n***************Hello {ask}**********************\n\n\n\n"; diff --git a/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj b/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj index eb2ba96d664..8799eb7275d 100644 --- a/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj +++ b/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj @@ -1,16 +1,4 @@ - - - - - - - - - - - - - + Exe net8.0 @@ -18,4 +6,14 @@ enable + + + + + + + + + + diff --git a/dotnet/samples/Hello/HelloAgentState/Program.cs b/dotnet/samples/Hello/HelloAgentState/Program.cs index 66b888d6c46..c1e00e4d632 100644 --- a/dotnet/samples/Hello/HelloAgentState/Program.cs +++ b/dotnet/samples/Hello/HelloAgentState/Program.cs @@ -3,8 +3,6 @@ using Microsoft.AutoGen.Abstractions; using Microsoft.AutoGen.Agents; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; // send a message to the agent var app = await AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived