diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln b/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln index 6d0fcc454555..1fc9adf490c3 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln @@ -36,6 +36,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{8B8C samples\Sample12_ManagingRules.md = samples\Sample12_ManagingRules.md samples\Sample13_AdvancedConfiguration.md = samples\Sample13_AdvancedConfiguration.md samples\Sample14_AMQPMessage.md = samples\Sample14_AMQPMessage.md + samples\Sample15_MockingClientTypes.md = samples\Sample15_MockingClientTypes.md + samples\Sample16_CrossReceiverMessageSettlement.md = samples\Sample16_CrossReceiverMessageSettlement.md EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core.Amqp", "..\..\core\Azure.Core.Amqp\src\Azure.Core.Amqp.csproj", "{2ADA26CA-77E5-4793-927A-A6185FD8AA29}" diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/README.md b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/README.md index 03f341ba293d..e3469c2c5c5f 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/README.md +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/README.md @@ -26,3 +26,5 @@ description: Samples for the Azure.Messaging.ServiceBus client library - [Managing rules](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample12_ManagingRules.md) - [Advanced configuration](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample13_AdvancedConfiguration.md) - [Interact with the AMQP message](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample14_AMQPMessage.md) +- [Mocking Client Types](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample15_MockingClientTypes.md) +- [Cross-receiver message settlement] diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md new file mode 100644 index 000000000000..c4fe143f8451 --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md @@ -0,0 +1,67 @@ +# Cross Receiver Message Settlement + +The message settlement APIs on the `ServiceBusReceiver` require passing in the +`ServiceBusReceivedMessage`. This can be limiting for scenarios in which the message needs to be persisted and then +settled across process boundaries. There are two strategies that can be used for settling a message with a different +receiver. The recommended strategy depends on the specific application scenario. + +## Storing the entire `ServiceBusReceivedMessage` + +If it is necessary to store the entire message and then rehydrate it in another process, use the following strategy. +First we can get the raw AMQP message bytes and lock token as shown below: +*Note: The lock token is not +actually part of the AMQP message so it needs to stored separately from the AMQP bytes.* + +```C# Snippet:ServiceBusWriteReceivedMessage +var client1 = new ServiceBusClient(connectionString); +ServiceBusSender sender = client1.CreateSender(queueName); + +var message = new ServiceBusMessage("some message"); +await sender.SendMessageAsync(message); + +ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName); +ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync(); +ReadOnlyMemory amqpMessageBytes = receivedMessage.GetRawAmqpMessage().ToBytes().ToMemory(); +ReadOnlyMemory lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray(); +``` + +In order to rehydrate the message in another process, we would do the following: + +```C# Snippet:ServiceBusReadReceivedMessage +AmqpAnnotatedMessage amqpMessage = AmqpAnnotatedMessage.FromBytes(new BinaryData(amqpMessageBytes)); +ServiceBusReceivedMessage rehydratedMessage = ServiceBusReceivedMessage.FromAmqpMessage(amqpMessage, new BinaryData(lockTokenBytes)); + +var client2 = new ServiceBusClient(connectionString); +ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName); +await receiver2.CompleteMessageAsync(rehydratedMessage); +``` + +## Storing only the lock token + +If the entire message is not needed when settling the message in a different process, you can simply preserve the +lock token. In the example below, we store off the lock token using its GUID bytes. You can also simply store a +string if that is easier for your scenario. + +```C# Snippet:ServiceBusWriteReceivedMessageLockToken +var client1 = new ServiceBusClient(connectionString); +ServiceBusSender sender = client1.CreateSender(queueName); + +var message = new ServiceBusMessage("some message"); +await sender.SendMessageAsync(message); + +ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName); +ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync(); +ReadOnlyMemory lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray(); +``` + +In order to rehydrate the message in another process using the lock token, we would do the following: +*Note: Because we only stored the lock token, when we rehydrate the message all of the properties of the +`ServiceBusReceivedMessage` other than `LockToken` will have default values.* + +```C# Snippet:ServiceBusReadReceivedMessageLockToken +ServiceBusReceivedMessage rehydratedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: new Guid(lockTokenBytes.ToArray())); + +var client2 = new ServiceBusClient(connectionString); +ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName); +await receiver2.CompleteMessageAsync(rehydratedMessage); +``` diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample16_CrossReceiverMessageSettlement.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample16_CrossReceiverMessageSettlement.cs new file mode 100644 index 000000000000..34cdbac0dbb6 --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample16_CrossReceiverMessageSettlement.cs @@ -0,0 +1,91 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Azure.Core.Amqp; +using NUnit.Framework; + +namespace Azure.Messaging.ServiceBus.Tests.Samples +{ + public class Sample16_CrossReceiverMessageSettlement : ServiceBusLiveTestBase + { + [Test] + public async Task RehydrateReceivedMessageUsingRawBytes() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { +#if SNIPPET + string connectionString = ""; + string queueName = ""; +#else + string connectionString = TestEnvironment.ServiceBusConnectionString; + string queueName = scope.QueueName; +#endif + + #region Snippet:ServiceBusWriteReceivedMessage + + var client1 = new ServiceBusClient(connectionString); + ServiceBusSender sender = client1.CreateSender(queueName); + + var message = new ServiceBusMessage("some message"); + await sender.SendMessageAsync(message); + + ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName); + ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync(); + ReadOnlyMemory amqpMessageBytes = receivedMessage.GetRawAmqpMessage().ToBytes().ToMemory(); + ReadOnlyMemory lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray(); + #endregion + + #region Snippet:ServiceBusReadReceivedMessage + AmqpAnnotatedMessage amqpMessage = AmqpAnnotatedMessage.FromBytes(new BinaryData(amqpMessageBytes)); + ServiceBusReceivedMessage rehydratedMessage = ServiceBusReceivedMessage.FromAmqpMessage(amqpMessage, new BinaryData(lockTokenBytes)); + + var client2 = new ServiceBusClient(connectionString); + ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName); + await receiver2.CompleteMessageAsync(rehydratedMessage); + #endregion + + Assert.AreEqual("some message", rehydratedMessage.Body.ToString()); + } + } + + [Test] + public async Task RehydrateReceivedMessageUsingLockToken() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { +#if SNIPPET + string connectionString = ""; + string queueName = ""; +#else + string connectionString = TestEnvironment.ServiceBusConnectionString; + string queueName = scope.QueueName; +#endif + + #region Snippet:ServiceBusWriteReceivedMessageLockToken + + var client1 = new ServiceBusClient(connectionString); + ServiceBusSender sender = client1.CreateSender(queueName); + + var message = new ServiceBusMessage("some message"); + await sender.SendMessageAsync(message); + + ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName); + ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync(); + ReadOnlyMemory lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray(); + #endregion + + #region Snippet:ServiceBusReadReceivedMessageLockToken + + ServiceBusReceivedMessage rehydratedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: new Guid(lockTokenBytes.ToArray())); + + var client2 = new ServiceBusClient(connectionString); + ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName); + await receiver2.CompleteMessageAsync(rehydratedMessage); + #endregion + } + } + } +}