Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CancellationToken parameters to API surface #4036

Merged
merged 1 commit into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions dotnet/src/Microsoft.AutoGen/Abstractions/IAgentContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ public interface IAgentContext
IAgentBase? AgentInstance { get; set; }
DistributedContextPropagator DistributedContextPropagator { get; } // TODO: Remove this. An abstraction should not have a dependency on DistributedContextPropagator.
ILogger Logger { get; } // TODO: Remove this. An abstraction should not have a dependency on ILogger.
ValueTask Store(AgentState value);
ValueTask<AgentState> Read(AgentId agentId);
ValueTask SendResponseAsync(RpcRequest request, RpcResponse response);
ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request);
ValueTask PublishEventAsync(CloudEvent @event);
ValueTask Store(AgentState value, CancellationToken cancellationToken = default);
ValueTask<AgentState> Read(AgentId agentId, CancellationToken cancellationToken = default);
ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken = default);
ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default);
ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default);
}
10 changes: 5 additions & 5 deletions dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorkerRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ namespace Microsoft.AutoGen.Abstractions;

public interface IAgentWorkerRuntime
{
ValueTask PublishEvent(CloudEvent evt);
ValueTask SendRequest(IAgentBase agent, RpcRequest request);
ValueTask SendResponse(RpcResponse response);
ValueTask Store(AgentState value);
ValueTask<AgentState> Read(AgentId agentId);
ValueTask PublishEvent(CloudEvent evt, CancellationToken cancellationToken);
ValueTask SendRequest(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken);
ValueTask SendResponse(RpcResponse response, CancellationToken cancellationToken);
ValueTask Store(AgentState value, CancellationToken cancellationToken);
ValueTask<AgentState> Read(AgentId agentId, CancellationToken cancellationToken);
}
5 changes: 2 additions & 3 deletions dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,14 @@ public async ValueTask PublishEvent(CloudEvent item)
var activity = s_source.StartActivity($"PublishEvent '{item.Type}'", ActivityKind.Client, Activity.Current?.Context ?? default);
activity?.SetTag("peer.service", $"{item.Type}/{item.Source}");

var completion = new TaskCompletionSource<CloudEvent>(TaskCreationOptions.RunContinuationsAsynchronously);
// TODO: fix activity
Context.DistributedContextPropagator.Inject(activity, item.Metadata, static (carrier, key, value) => ((IDictionary<string, string>)carrier!)[key] = value);
await this.InvokeWithActivityAsync(
static async ((AgentBase Agent, CloudEvent Event, TaskCompletionSource<CloudEvent>) state) =>
static async ((AgentBase Agent, CloudEvent Event) state) =>
{
await state.Agent._context.PublishEventAsync(state.Event).ConfigureAwait(false);
},
(this, item, completion),
(this, item),
activity,
item.Type).ConfigureAwait(false);
}
Expand Down
20 changes: 10 additions & 10 deletions dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@ internal sealed class AgentContext(AgentId agentId, IAgentWorkerRuntime runtime,
public ILogger Logger { get; } = logger;
public IAgentBase? AgentInstance { get; set; }
public DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator;
public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response)
public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken)
{
response.RequestId = request.RequestId;
await _runtime.SendResponse(response);
await _runtime.SendResponse(response, cancellationToken).ConfigureAwait(false);
}
public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request)
public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken)
{
await _runtime.SendRequest(agent, request).ConfigureAwait(false);
await _runtime.SendRequest(agent, request, cancellationToken).ConfigureAwait(false);
}
public async ValueTask PublishEventAsync(CloudEvent @event)
public async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken)
{
await _runtime.PublishEvent(@event).ConfigureAwait(false);
await _runtime.PublishEvent(@event, cancellationToken).ConfigureAwait(false);
}
public async ValueTask Store(AgentState value)
public async ValueTask Store(AgentState value, CancellationToken cancellationToken)
{
await _runtime.Store(value).ConfigureAwait(false);
await _runtime.Store(value, cancellationToken).ConfigureAwait(false);
}
public ValueTask<AgentState> Read(AgentId agentId)
public ValueTask<AgentState> Read(AgentId agentId, CancellationToken cancellationToken)
{
return _runtime.Read(agentId);
return _runtime.Read(agentId, cancellationToken);
}
}
22 changes: 11 additions & 11 deletions dotnet/src/Microsoft.AutoGen/Agents/GrpcAgentWorkerRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,21 +228,21 @@ await WriteChannelAsync(new Message
}
}

public async ValueTask SendResponse(RpcResponse response)
public async ValueTask SendResponse(RpcResponse response, CancellationToken cancellationToken)
{
_logger.LogInformation("Sending response '{Response}'.", response);
await WriteChannelAsync(new Message { Response = response }).ConfigureAwait(false);
await WriteChannelAsync(new Message { Response = response }, cancellationToken).ConfigureAwait(false);
}

public async ValueTask SendRequest(IAgentBase agent, RpcRequest request)
public async ValueTask SendRequest(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken)
{
_logger.LogInformation("[{AgentId}] Sending request '{Request}'.", agent.AgentId, request);
var requestId = Guid.NewGuid().ToString();
_pendingRequests[requestId] = (agent, request.RequestId);
request.RequestId = requestId;
try
{
await WriteChannelAsync(new Message { Request = request }).ConfigureAwait(false);
await WriteChannelAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
Expand All @@ -253,19 +253,19 @@ public async ValueTask SendRequest(IAgentBase agent, RpcRequest request)
}
}

public async ValueTask PublishEvent(CloudEvent @event)
public async ValueTask PublishEvent(CloudEvent @event, CancellationToken cancellationToken)
{
try
{
await WriteChannelAsync(new Message { CloudEvent = @event }).ConfigureAwait(false);
await WriteChannelAsync(new Message { CloudEvent = @event }, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
_logger.LogWarning(exception, "Failed to publish event '{Event}'.", @event);
}
}

private async Task WriteChannelAsync(Message message, CancellationToken cancellationToken = default)
private async Task WriteChannelAsync(Message message, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
await _outboundMessagesChannel.Writer.WriteAsync((message, tcs), cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -364,19 +364,19 @@ public async Task StopAsync(CancellationToken cancellationToken)
_channel?.Dispose();
}
}
public ValueTask Store(AgentState value)
public ValueTask Store(AgentState value, CancellationToken cancellationToken)
{
var agentId = value.AgentId ?? throw new InvalidOperationException("AgentId is required when saving AgentState.");
var response = _client.SaveState(value);
var response = _client.SaveState(value, cancellationToken: cancellationToken);
if (!response.Success)
{
throw new InvalidOperationException($"Error saving AgentState for AgentId {agentId}.");
}
return ValueTask.CompletedTask;
}
public async ValueTask<AgentState> Read(AgentId agentId)
public async ValueTask<AgentState> Read(AgentId agentId, CancellationToken cancellationToken)
{
var response = await _client.GetStateAsync(agentId);
var response = await _client.GetStateAsync(agentId, cancellationToken: cancellationToken);
// if (response.Success && response.AgentState.AgentId is not null) - why is success always false?
if (response.AgentState.AgentId is not null)
{
Expand Down
Loading