Skip to content

Commit

Permalink
Sb cross receiver samples (#39514)
Browse files Browse the repository at this point in the history
* Add samples demonstrating cross receiver settlement

* Fix

* Fix

* fix line break

* Fix sln

* regenerate

* Apply suggestions from code review

Co-authored-by: Jesse Squire <[email protected]>

---------

Co-authored-by: Jesse Squire <[email protected]>
  • Loading branch information
JoshLove-msft and jsquire authored Oct 26, 2023
1 parent f37809b commit ad2201a
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{8B8C
samples\Sample12_ManagingRules.md = samples\Sample12_ManagingRules.md
samples\Sample13_AdvancedConfiguration.md = samples\Sample13_AdvancedConfiguration.md
samples\Sample14_AMQPMessage.md = samples\Sample14_AMQPMessage.md
samples\Sample15_MockingClientTypes.md = samples\Sample15_MockingClientTypes.md
samples\Sample16_CrossReceiverMessageSettlement.md = samples\Sample16_CrossReceiverMessageSettlement.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core.Amqp", "..\..\core\Azure.Core.Amqp\src\Azure.Core.Amqp.csproj", "{2ADA26CA-77E5-4793-927A-A6185FD8AA29}"
Expand Down
2 changes: 2 additions & 0 deletions sdk/servicebus/Azure.Messaging.ServiceBus/samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ description: Samples for the Azure.Messaging.ServiceBus client library
- [Managing rules](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample12_ManagingRules.md)
- [Advanced configuration](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample13_AdvancedConfiguration.md)
- [Interact with the AMQP message](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample14_AMQPMessage.md)
- [Mocking Client Types](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample15_MockingClientTypes.md)
- [Cross-receiver message settlement]
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Cross Receiver Message Settlement

The message settlement APIs on the `ServiceBusReceiver` require passing in the
`ServiceBusReceivedMessage`. This can be limiting for scenarios in which the message needs to be persisted and then
settled across process boundaries. There are two strategies that can be used for settling a message with a different
receiver. The recommended strategy depends on the specific application scenario.

## Storing the entire `ServiceBusReceivedMessage`

If it is necessary to store the entire message and then rehydrate it in another process, use the following strategy.
First we can get the raw AMQP message bytes and lock token as shown below:
*Note: The lock token is not
actually part of the AMQP message so it needs to stored separately from the AMQP bytes.*

```C# Snippet:ServiceBusWriteReceivedMessage
var client1 = new ServiceBusClient(connectionString);
ServiceBusSender sender = client1.CreateSender(queueName);

var message = new ServiceBusMessage("some message");
await sender.SendMessageAsync(message);

ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync();
ReadOnlyMemory<byte> amqpMessageBytes = receivedMessage.GetRawAmqpMessage().ToBytes().ToMemory();
ReadOnlyMemory<byte> lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray();
```

In order to rehydrate the message in another process, we would do the following:

```C# Snippet:ServiceBusReadReceivedMessage
AmqpAnnotatedMessage amqpMessage = AmqpAnnotatedMessage.FromBytes(new BinaryData(amqpMessageBytes));
ServiceBusReceivedMessage rehydratedMessage = ServiceBusReceivedMessage.FromAmqpMessage(amqpMessage, new BinaryData(lockTokenBytes));

var client2 = new ServiceBusClient(connectionString);
ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName);
await receiver2.CompleteMessageAsync(rehydratedMessage);
```

## Storing only the lock token

If the entire message is not needed when settling the message in a different process, you can simply preserve the
lock token. In the example below, we store off the lock token using its GUID bytes. You can also simply store a
string if that is easier for your scenario.

```C# Snippet:ServiceBusWriteReceivedMessageLockToken
var client1 = new ServiceBusClient(connectionString);
ServiceBusSender sender = client1.CreateSender(queueName);

var message = new ServiceBusMessage("some message");
await sender.SendMessageAsync(message);

ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync();
ReadOnlyMemory<byte> lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray();
```

In order to rehydrate the message in another process using the lock token, we would do the following:
*Note: Because we only stored the lock token, when we rehydrate the message all of the properties of the
`ServiceBusReceivedMessage` other than `LockToken` will have default values.*

```C# Snippet:ServiceBusReadReceivedMessageLockToken
ServiceBusReceivedMessage rehydratedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: new Guid(lockTokenBytes.ToArray()));

var client2 = new ServiceBusClient(connectionString);
ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName);
await receiver2.CompleteMessageAsync(rehydratedMessage);
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure.Core.Amqp;
using NUnit.Framework;

namespace Azure.Messaging.ServiceBus.Tests.Samples
{
public class Sample16_CrossReceiverMessageSettlement : ServiceBusLiveTestBase
{
[Test]
public async Task RehydrateReceivedMessageUsingRawBytes()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
#if SNIPPET
string connectionString = "<connection_string>";
string queueName = "<queue_name>";
#else
string connectionString = TestEnvironment.ServiceBusConnectionString;
string queueName = scope.QueueName;
#endif

#region Snippet:ServiceBusWriteReceivedMessage

var client1 = new ServiceBusClient(connectionString);
ServiceBusSender sender = client1.CreateSender(queueName);

var message = new ServiceBusMessage("some message");
await sender.SendMessageAsync(message);

ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync();
ReadOnlyMemory<byte> amqpMessageBytes = receivedMessage.GetRawAmqpMessage().ToBytes().ToMemory();
ReadOnlyMemory<byte> lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray();
#endregion

#region Snippet:ServiceBusReadReceivedMessage
AmqpAnnotatedMessage amqpMessage = AmqpAnnotatedMessage.FromBytes(new BinaryData(amqpMessageBytes));
ServiceBusReceivedMessage rehydratedMessage = ServiceBusReceivedMessage.FromAmqpMessage(amqpMessage, new BinaryData(lockTokenBytes));

var client2 = new ServiceBusClient(connectionString);
ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName);
await receiver2.CompleteMessageAsync(rehydratedMessage);
#endregion

Assert.AreEqual("some message", rehydratedMessage.Body.ToString());
}
}

[Test]
public async Task RehydrateReceivedMessageUsingLockToken()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
#if SNIPPET
string connectionString = "<connection_string>";
string queueName = "<queue_name>";
#else
string connectionString = TestEnvironment.ServiceBusConnectionString;
string queueName = scope.QueueName;
#endif

#region Snippet:ServiceBusWriteReceivedMessageLockToken

var client1 = new ServiceBusClient(connectionString);
ServiceBusSender sender = client1.CreateSender(queueName);

var message = new ServiceBusMessage("some message");
await sender.SendMessageAsync(message);

ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync();
ReadOnlyMemory<byte> lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray();
#endregion

#region Snippet:ServiceBusReadReceivedMessageLockToken

ServiceBusReceivedMessage rehydratedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: new Guid(lockTokenBytes.ToArray()));

var client2 = new ServiceBusClient(connectionString);
ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName);
await receiver2.CompleteMessageAsync(rehydratedMessage);
#endregion
}
}
}
}

0 comments on commit ad2201a

Please sign in to comment.