Skip to content

Commit

Permalink
Azure Service Bus transport improvements for FIFO queues et al. Closes
Browse files Browse the repository at this point in the history
…GH-686. Closes GH-680.
  • Loading branch information
jeremydmiller committed Jan 16, 2024
1 parent 4c91056 commit ffd304b
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/Http/OpenApiDemonstrator/OpenApiDemonstrator.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<ItemGroup>
<PackageReference Include="FluentValidation" Version="11.3.0" />
<PackageReference Include="Lamar.Diagnostics" Version="11.1.2" />
<PackageReference Include="Marten.AspNetCore" Version="6.1.0" />
<PackageReference Include="Marten.AspNetCore" Version="6.4.0" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="7.0.2" />
<PackageReference Include="Microsoft.AspNetCore.SignalR" Version="1.1.0" />

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ namespace Wolverine.AzureServiceBus.Internal;

public class AzureServiceBusEnvelope : Envelope
{
public AzureServiceBusEnvelope(ServiceBusReceivedMessage message)
public AzureServiceBusEnvelope(ServiceBusReceivedMessage message, ServiceBusSessionReceiver sessionReceiver)
{
AzureMessage = message;
SessionReceiver = sessionReceiver;
}

public AzureServiceBusEnvelope(ProcessMessageEventArgs args)
Expand All @@ -15,9 +16,36 @@ public AzureServiceBusEnvelope(ProcessMessageEventArgs args)
AzureMessage = args.Message;
}

public ProcessMessageEventArgs Args { get; set; }
public AzureServiceBusEnvelope(ServiceBusReceivedMessage message, ServiceBusReceiver sessionReceiver)
{
AzureMessage = message;
ServiceBusReceiver = sessionReceiver;
}

public Task CompleteAsync(CancellationToken token)
{
return Args?.CompleteMessageAsync(AzureMessage, token) ?? ServiceBusReceiver?.CompleteMessageAsync(AzureMessage, token) ??
SessionReceiver?.CompleteMessageAsync(AzureMessage, token) ?? Task.CompletedTask;
}

public Task DeferAsync(CancellationToken token)
{
return Args?.DeferMessageAsync(AzureMessage, cancellationToken: token) ?? ServiceBusReceiver?.DeferMessageAsync(AzureMessage, cancellationToken: token) ??
SessionReceiver?.DeferMessageAsync(AzureMessage, cancellationToken: token) ?? Task.CompletedTask;
}

public Task DeadLetterAsync(CancellationToken token, string? deadLetterReason = null, string? deadLetterErrorDescription = null)
{
return Args?.DeadLetterMessageAsync(AzureMessage, cancellationToken: token, deadLetterReason: deadLetterReason, deadLetterErrorDescription:deadLetterErrorDescription)
?? ServiceBusReceiver?.DeadLetterMessageAsync(AzureMessage, cancellationToken: token, deadLetterReason: deadLetterReason, deadLetterErrorDescription:deadLetterErrorDescription)
?? SessionReceiver?.DeadLetterMessageAsync(AzureMessage, cancellationToken: token, deadLetterReason: deadLetterReason, deadLetterErrorDescription:deadLetterErrorDescription) ?? Task.CompletedTask;
}

private ProcessMessageEventArgs? Args { get; set; }

public ServiceBusReceivedMessage AzureMessage { get; }
private ServiceBusReceivedMessage AzureMessage { get; }
private ServiceBusSessionReceiver? SessionReceiver { get; }
private ServiceBusReceiver? ServiceBusReceiver { get; }
public Exception Exception { get; set; }
public bool IsCompleted { get; set; }
public ServiceBusReceiver? Receiver { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public BatchedAzureServiceBusListener(AzureServiceBusEndpoint endpoint, ILogger

_complete = new RetryBlock<AzureServiceBusEnvelope>((e, _) =>
{
return _receiver.CompleteMessageAsync(e.AzureMessage);
return e.CompleteAsync(_cancellation.Token);
}, _logger, _cancellation.Token);

_defer = new RetryBlock<Envelope>(async (envelope, _) =>
Expand All @@ -46,7 +46,7 @@ public BatchedAzureServiceBusListener(AzureServiceBusEndpoint endpoint, ILogger
}, logger, _cancellation.Token);

_deadLetter =
new RetryBlock<AzureServiceBusEnvelope>((e, c) => _receiver.DeadLetterMessageAsync(e.AzureMessage, deadLetterReason:e.Exception?.GetType().NameInCode(), deadLetterErrorDescription:e.Exception?.Message), logger,
new RetryBlock<AzureServiceBusEnvelope>((e, c) => e.DeadLetterAsync(_cancellation.Token, deadLetterReason:e.Exception?.GetType().NameInCode(), deadLetterErrorDescription:e.Exception?.Message), logger,
_cancellation.Token);
}

Expand Down Expand Up @@ -116,7 +116,7 @@ private async Task listenForMessages()
{
try
{
var envelope = new AzureServiceBusEnvelope(message);
var envelope = new AzureServiceBusEnvelope(message, _receiver);
_mapper.MapIncomingToEnvelope(envelope, message);

envelopes.Add(envelope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,22 @@ public InlineAzureServiceBusListener(AzureServiceBusEndpoint endpoint,

_complete = new RetryBlock<AzureServiceBusEnvelope>((e, _) =>
{
return e.Args.CompleteMessageAsync(e.AzureMessage);
return e.CompleteAsync(_cancellation.Token);
}, _logger, _cancellation.Token);

_defer = new RetryBlock<AzureServiceBusEnvelope>(async (envelope, _) =>
{
if (envelope is AzureServiceBusEnvelope e)
if (envelope is { } e)
{
await e.Args.CompleteMessageAsync(e.AzureMessage);
await e.CompleteAsync(_cancellation.Token);
e.IsCompleted = true;
}
await _requeue.SendAsync(envelope);
}, logger, _cancellation.Token);

_deadLetter =
new RetryBlock<AzureServiceBusEnvelope>((e, c) => e.Args.DeadLetterMessageAsync(e.AzureMessage, deadLetterReason:e.Exception?.GetType().NameInCode(), deadLetterErrorDescription:e.Exception?.Message), logger,
new RetryBlock<AzureServiceBusEnvelope>((e, c) => e.DeadLetterAsync(_cancellation.Token, deadLetterReason:e.Exception?.GetType().NameInCode(), deadLetterErrorDescription:e.Exception?.Message), logger,
_cancellation.Token);

_processor.ProcessMessageAsync += processMessageAsync;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,24 +146,21 @@ public SessionSpecificListener(ServiceBusSessionReceiver sessionReceiver, AzureS
_mapper = mapper;
_logger = logger;

_complete = new RetryBlock<AzureServiceBusEnvelope>((e, _) =>
{
return e.Args.CompleteMessageAsync(e.AzureMessage);
}, _logger, _cancellation.Token);
_complete = new RetryBlock<AzureServiceBusEnvelope>((e, _) => e.CompleteAsync(_cancellation.Token), _logger, _cancellation.Token);

_defer = new RetryBlock<AzureServiceBusEnvelope>(async (envelope, _) =>
{
if (envelope is AzureServiceBusEnvelope e)
if (envelope is { } e)
{
await e.Args.CompleteMessageAsync(e.AzureMessage);
await e.CompleteAsync(_cancellation.Token);
e.IsCompleted = true;
}
await requeue.SendAsync(envelope);
}, logger, _cancellation.Token);

_deadLetter =
new RetryBlock<AzureServiceBusEnvelope>((e, c) => _sessionReceiver.DeadLetterMessageAsync(e.AzureMessage, deadLetterReason:e.Exception?.GetType().NameInCode(), deadLetterErrorDescription:e.Exception?.Message), logger,
new RetryBlock<AzureServiceBusEnvelope>((e, c) => e.DeadLetterAsync(_cancellation.Token, deadLetterReason:e.Exception?.GetType().NameInCode(), deadLetterErrorDescription:e.Exception?.Message), logger,
_cancellation.Token);
}

Expand All @@ -176,7 +173,8 @@ public async Task<int> ExecuteAsync(CancellationToken cancellationToken)
{
try
{
var envelope = new AzureServiceBusEnvelope(message);
var envelope = new AzureServiceBusEnvelope(message, _sessionReceiver);

_mapper.MapIncomingToEnvelope(envelope, message);

try
Expand Down

0 comments on commit ffd304b

Please sign in to comment.