Skip to content

Commit

Permalink
Fix deadlettering and exception propagation in Grpc service (#39412)
Browse files Browse the repository at this point in the history
* Fix deadlettering in Grpc service

* Propagate exception details

* remove project ref
  • Loading branch information
JoshLove-msft committed Oct 20, 2023
1 parent 7151809 commit 07d61fd
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
# Release History

## 5.14.0-beta.1 (Unreleased)

### Features Added

### Breaking Changes
## 5.13.3 (2023-10-20)

### Bugs Fixed

### Other Changes
- Fixed issue where deadlettering a message without specifying properties to modify could throw
an exception from out of proc extension.
- Include underlying exception details in RpcException when a failure occurs.

## 5.13.2 (2023-10-18)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.

#if NET6_0_OR_GREATER
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure.Core.Amqp.Shared;
Expand Down Expand Up @@ -31,54 +32,98 @@ public SettlementService()

public override async Task<Empty> Complete(CompleteRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
try
{
await tuple.Actions.CompleteMessageAsync(
tuple.Message,
context.CancellationToken).ConfigureAwait(false);
return new Empty();
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
await tuple.Actions.CompleteMessageAsync(
tuple.Message,
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
}
catch (Exception ex)
{
throw new RpcException(new Status(StatusCode.Unknown, ex.ToString()));
}

throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

public override async Task<Empty> Abandon(AbandonRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
try
{
await tuple.Actions.AbandonMessageAsync(
tuple.Message,
DeserializeAmqpMap(request.PropertiesToModify),
context.CancellationToken).ConfigureAwait(false);
return new Empty();
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
await tuple.Actions.AbandonMessageAsync(
tuple.Message,
DeserializeAmqpMap(request.PropertiesToModify),
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
}
catch (Exception ex)
{
throw new RpcException(new Status(StatusCode.Unknown, ex.ToString()));
}

throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

public override async Task<Empty> Defer(DeferRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
try
{
await tuple.Actions.DeferMessageAsync(
tuple.Message,
DeserializeAmqpMap(request.PropertiesToModify),
context.CancellationToken).ConfigureAwait(false);
return new Empty();
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
await tuple.Actions.DeferMessageAsync(
tuple.Message,
DeserializeAmqpMap(request.PropertiesToModify),
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
}
catch (Exception ex)
{
throw new RpcException(new Status(StatusCode.Unknown, ex.ToString()));
}

throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

public override async Task<Empty> Deadletter(DeadletterRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
try
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
if (request.PropertiesToModify == null || request.PropertiesToModify == ByteString.Empty)
{
await tuple.Actions.DeadLetterMessageAsync(
tuple.Message,
request.DeadletterReason,
request.DeadletterErrorDescription,
context.CancellationToken).ConfigureAwait(false);
}
else
{
await tuple.Actions.DeadLetterMessageAsync(
tuple.Message,
DeserializeAmqpMap(request.PropertiesToModify),
request.DeadletterReason,
request.DeadletterErrorDescription,
context.CancellationToken).ConfigureAwait(false);
}

return new Empty();
}
}
catch (Exception ex)
{
await tuple.Actions.DeadLetterMessageAsync(
tuple.Message,
DeserializeAmqpMap(request.PropertiesToModify),
request.DeadletterReason,
request.DeadletterErrorDescription,
context.CancellationToken).ConfigureAwait(false);
return new Empty();
throw new RpcException(new Status(StatusCode.Unknown, ex.ToString()));
}

throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net6.0</TargetFrameworks>
<Description>Microsoft Azure WebJobs SDK ServiceBus Extension</Description>
<Version>5.14.0-beta.1</Version>
<Version>5.13.3</Version>
<!--The ApiCompatVersion is managed automatically and should not generally be modified manually.-->
<!--Since we are adding a new target for net6.0, we need to temporarily condition on netstandard-->
<ApiCompatVersion>5.13.2</ApiCompatVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,32 @@ public async Task BindToMessageAndDeadletter()
}

[Test]
public async Task BindToBatchAndDeadletter()
public async Task BindToMessageAndDeadletterWithNoPropertiesToModify()
{
var host = BuildHost<ServiceBusBindToBatchAndDeadletter>();
var host = BuildHost<ServiceBusBindToMessageAndDeadletterWithNoPropertiesToModify>();
var settlementImpl = host.Services.GetRequiredService<SettlementService>();
var provider = host.Services.GetRequiredService<MessagingProvider>();
ServiceBusBindToMessageAndDeadletterWithNoPropertiesToModify.SettlementService = settlementImpl;

using (host)
{
var message = new ServiceBusMessage("foobar");
await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);
var sender = client.CreateSender(FirstQueueScope.QueueName);
await sender.SendMessageAsync(message);

bool result = _waitHandle1.WaitOne(SBTimeoutMills);
Assert.True(result);
await host.StopAsync();
}
Assert.IsEmpty(provider.ActionsCache);
}

[Test]
public async Task BindToBatchAndDeadletterExceptionValidation()
{
// this test expects errors so set skipValidation=true
var host = BuildHost<ServiceBusBindToBatchAndDeadletter>(skipValidation: true);
var settlementImpl = host.Services.GetRequiredService<SettlementService>();
var provider = host.Services.GetRequiredService<MessagingProvider>();
ServiceBusBindToBatchAndDeadletter.SettlementService = settlementImpl;
Expand Down Expand Up @@ -261,6 +284,31 @@ await SettlementService.Deadletter(
}
}

public class ServiceBusBindToMessageAndDeadletterWithNoPropertiesToModify
{
internal static SettlementService SettlementService { get; set; }
public static async Task BindToMessage(
[ServiceBusTrigger(FirstQueueNameKey)] ServiceBusReceivedMessage message, ServiceBusClient client)
{
Assert.AreEqual("foobar", message.Body.ToString());
await SettlementService.Deadletter(
new DeadletterRequest()
{
Locktoken = message.LockToken,
DeadletterErrorDescription = "description",
DeadletterReason = "reason"
},
new MockServerCallContext());

var receiver = client.CreateReceiver(FirstQueueScope.QueueName, new ServiceBusReceiverOptions {SubQueue = SubQueue.DeadLetter});
var deadletterMessage = await receiver.ReceiveMessageAsync();
Assert.AreEqual("foobar", deadletterMessage.Body.ToString());
Assert.AreEqual("description", deadletterMessage.DeadLetterErrorDescription);
Assert.AreEqual("reason", deadletterMessage.DeadLetterReason);
_waitHandle1.Set();
}
}

public class ServiceBusBindToBatchAndDeadletter
{
internal static SettlementService SettlementService { get; set; }
Expand Down Expand Up @@ -292,6 +340,37 @@ await SettlementService.Deadletter(
Assert.AreEqual("description", deadletterMessage.DeadLetterErrorDescription);
Assert.AreEqual("reason", deadletterMessage.DeadLetterReason);
Assert.AreEqual(42, deadletterMessage.ApplicationProperties["key"]);

var exception = Assert.ThrowsAsync<RpcException>(
async () =>
await SettlementService.Complete(
new CompleteRequest { Locktoken = message.LockToken },
new MockServerCallContext()));
StringAssert.Contains(
"Azure.Messaging.ServiceBus.ServiceBusException: The lock supplied is invalid.",
exception.ToString());

exception = Assert.ThrowsAsync<RpcException>(
async () =>
await SettlementService.Defer(
new DeferRequest { Locktoken = message.LockToken },
new MockServerCallContext()));
StringAssert.Contains(
"Azure.Messaging.ServiceBus.ServiceBusException: The lock supplied is invalid.",
exception.ToString());

exception = Assert.ThrowsAsync<RpcException>(
async () =>
await SettlementService.Deadletter(
new DeadletterRequest() { Locktoken = message.LockToken },
new MockServerCallContext()));
StringAssert.Contains(
"Azure.Messaging.ServiceBus.ServiceBusException: The lock supplied is invalid.",
exception.ToString());

// The service doesn't throw when an already settled message gets abandoned over the mgmt link, so we won't
// test for that here.

_waitHandle1.Set();
}
}
Expand Down

0 comments on commit 07d61fd

Please sign in to comment.