Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DLQ sample #20739

Merged
merged 3 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider picking up the central versions from the repo if we're not building externally?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do that once we move to all package refs.

<PackageReference Include="Azure.Identity" Version="1.2.1" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.2.0-beta.2" />
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
<PackageReference Include="System.CommandLine" Version="2.0.0-beta1.21216.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Azure.Messaging.ServiceBus.csproj" />
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using project reference for now to include processor SubQueue option that was just added.

</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
using System;
using System.Collections.Generic;
using System.CommandLine;
using System.CommandLine.Invocation;
using System.Threading;
using System.Threading.Tasks;
using Azure.Identity;
using Azure.Messaging.ServiceBus;

namespace DeadLetterQueue
{
// This sample shows how to move messages to the Dead-letter queue, how to retrieve
// messages from it, and resubmit corrected message back into the main queue.
public class Program
{
private static ServiceBusClient _client;

public static async Task Main(string[] args)
{
var command = new RootCommand("Demonstrates the DeadLetter feature of Azure Service Bus.")
{
new Option<string>(
alias: "--namespace",
description: "Fully qualified Service Bus Queue namespace to use") { Name = "FullyQualifiedNamespace" },
new Option<string>(
alias: "--queue",
description: "Service Bus Queue Name to use") { IsRequired = true, Name = "QueueName"},
new Option<string>(
alias: "--connection-variable",
description: "The name of an environment variable containing the connection string to use.") { Name = "Connection"},
};
command.Handler = CommandHandler.Create<string, string, string>(RunAsync);
await command.InvokeAsync(args);
}

private static async Task RunAsync(string fullyQualifiedNamespace, string queueName, string connection)
{
if (!string.IsNullOrEmpty(connection))
{
_client = new ServiceBusClient(Environment.GetEnvironmentVariable(connection));
}
else if (!string.IsNullOrEmpty(fullyQualifiedNamespace))
{
_client = new ServiceBusClient(fullyQualifiedNamespace, new DefaultAzureCredential());
}
else
{
throw new ArgumentException(
"Either a fully qualified namespace or a connection string environment variable must be specified.");
}

var cts = new CancellationTokenSource();
var sender = _client.CreateSender(queueName);

// For the delivery count scenario, we first send a single message,
// and then pick it up and abandon it until is "disappears" from the queue.
// Then we fetch the message from the dead-letter queue (DLQ) and inspect it.
await SendMessages(sender, 1);
await ExceedMaxDeliveryAsync(queueName);

// For the fix-up scenario, we send a series of messages to a queue, and
// run a receive loop that explicitly pushes messages into the DLQ when
// they don't satisfy a processing condition. The fix-up receive loop inspects
// the DLQ, fixes the "faulty" messages, and resubmits them into processing.
var sendTask = SendMessages(sender, int.MaxValue);
var receiveTask = ReceiveMessages(queueName, cts.Token);
var fixupTask = PickUpAndFixDeadletters(queueName, sender, cts.Token);

// wait for a key press or 10 seconds
await Task.WhenAny(
Task.Run(() => Console.ReadKey()),
Task.Delay(TimeSpan.FromSeconds(10))
);

// end the processing
cts.Cancel();
// await shutdown and exit
await Task.WhenAll(sendTask, receiveTask, fixupTask);
}

private static Task SendMessages(ServiceBusSender sender, int maxMessages)
{
dynamic data = new[]
{
new {name = "Einstein", firstName = "Albert"}, new {name = "Heisenberg", firstName = "Werner"},
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
new {name = "Curie", firstName = "Marie"}, new {name = "Hawking", firstName = "Steven"},
new {name = "Newton", firstName = "Isaac"}, new {name = "Bohr", firstName = "Niels"},
new {name = "Faraday", firstName = "Michael"}, new {name = "Galilei", firstName = "Galileo"},
new {name = "Kepler", firstName = "Johannes"}, new {name = "Kopernikus", firstName = "Nikolaus"}
};

// send a message for each data entry, but at most maxMessages
// we're sending in a loop, but don't block on each send, but
// rather collect all sends in a list and then wait for all of
// them to complete asynchronously, which is much faster
var tasks = new List<Task>();
for (int i = 0; i < Math.Min(data.Length, maxMessages); i++)
{
// each message has a JSON body with one of the data rows
var message = new ServiceBusMessage(new BinaryData(jsonSerializable: data[i]))
{
ContentType = "application/json", // JSON data
Subject = i % 2 == 0 ? "Scientist" : "Physicist", // random picked header
MessageId = i.ToString(), // message-id
TimeToLive = TimeSpan.FromMinutes(2) // message expires in 2 minutes
};

// start sending this message
lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.DarkYellow;
Console.WriteLine($"Message sent: Id = {message.MessageId}");
Console.ResetColor();
}

;
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
tasks.Add(sender.SendMessageAsync(message).ContinueWith((t) =>
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine($"\tMessage acknowledged: Id = {message.MessageId}");
Console.ResetColor();
}
}));
}

return Task.WhenAll(tasks);
}


private static async Task ExceedMaxDeliveryAsync(string queueName)
{
ServiceBusReceiver receiver = _client.CreateReceiver(queueName);

while (true)
{
// Ask the broker to return any message readily available or return with no
// result after 2 seconds (allowing for clients with great network latency)
ServiceBusReceivedMessage msg = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(2));
if (msg != null)
{
// Now we immediately abandon the message, which increments the DeliveryCount
Console.WriteLine($"Picked up message: Id = {msg.MessageId}; DeliveryCount {msg.DeliveryCount}");
await receiver.AbandonMessageAsync(msg);
}
else
{
// Once the system moves the message to the DLQ, the main queue is empty
// and the loop exits as ReceiveAsync returns null.
break;
}
}

// For picking up the message from a DLQ, we make a receiver just like for a
// regular queue. We could also use QueueClient and a registered handler here.
// The required path is constructed with the EntityNameHelper.FormatDeadLetterPath()
// helper method, and always follows the pattern "{entity}/$DeadLetterQueue",
// meaning that for a queue "Q1", the path is "Q1/$DeadLetterQueue" and for a
// topic "T1" and subscription "S1", the path is "T1/Subscriptions/S1/$DeadLetterQueue"
ServiceBusReceiver deadletterReceiver = _client.CreateReceiver(queueName,
new ServiceBusReceiverOptions {SubQueue = SubQueue.DeadLetter});
while (true)
{
// receive a message
ServiceBusReceivedMessage message =
await deadletterReceiver.ReceiveMessageAsync(TimeSpan.FromSeconds(10));
if (message != null)
{
// write out the message deadletter information
Console.WriteLine("Deadletter message:");
Console.WriteLine($"DeadLetterReason = {message.DeadLetterReason}");
Console.WriteLine($"DeadLetterErrorDescription = {message.DeadLetterErrorDescription}");

// complete and therefore remove the message from the DLQ
await deadletterReceiver.CompleteMessageAsync(message);
}
else
{
// DLQ was empty on last receive attempt
break;
}
}
}

private static Task ReceiveMessages(string queueName, CancellationToken cancellationToken)
{
var doneReceiving = new TaskCompletionSource<bool>();
ServiceBusProcessor processor = _client.CreateProcessor(queueName);

// close the receiver and factory when the CancellationToken fires
cancellationToken.Register(
async () =>
{
await processor.CloseAsync();
doneReceiving.SetResult(true);
});

processor.ProcessMessageAsync += async args =>
{
ServiceBusReceivedMessage message = args.Message;
// If the message holds JSON data and the label is set to "Scientist",
// we accept the message and print it.
if (message.Subject != null &&
message.ContentType != null &&
message.Subject.Equals("Scientist", StringComparison.InvariantCultureIgnoreCase) &&
message.ContentType.Equals("application/json", StringComparison.InvariantCultureIgnoreCase))
{
var body = message.Body;

// System.Text.JSON does not currently support deserializing to dynamic
Dictionary<string, string> scientist = body.ToObjectFromJson<Dictionary<string, string>>();

lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine(
"\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = {0}, \n\t\t\t\t\t\tSequenceNumber = {1}, \n\t\t\t\t\t\tEnqueuedTime = {2}," +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this would wrap when viewed through GitHub's normal restricted width....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears to, not sure I follow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it shorter

"\n\t\t\t\t\t\tExpiresAt = {4}, \n\t\t\t\t\t\tContentType = \"{3}\", \n\t\t\t\t\t\tContent: [ firstName = {5}, name = {6} ]",
message.MessageId,
message.SequenceNumber,
message.EnqueuedTime,
message.ContentType,
message.ExpiresAt,
scientist["firstName"],
scientist["name"]);
Console.ResetColor();
}

await args.CompleteMessageAsync(message, args.CancellationToken);
}
else
{
// if the messages doesn't fit the criteria above, we deadletter it
await args.DeadLetterMessageAsync(message, cancellationToken: args.CancellationToken);
}
};

processor.ProcessErrorAsync += LogMessageHandlerException;
_ = processor.StartProcessingAsync(cancellationToken);
return doneReceiving.Task;
}

private static Task PickUpAndFixDeadletters(string queueName, ServiceBusSender resubmitSender, CancellationToken cancellationToken)
{
var doneReceiving = new TaskCompletionSource<bool>();

// here, we create a receiver on the Deadletter queue
ServiceBusProcessor dlqProcessor =
_client.CreateProcessor(queueName, new ServiceBusProcessorOptions {SubQueue = SubQueue.DeadLetter});

// close the receiver and factory when the CancellationToken fires
cancellationToken.Register(
async () =>
{
await dlqProcessor.CloseAsync();
doneReceiving.SetResult(true);
});

// register the RegisterMessageHandler callback
dlqProcessor.ProcessMessageAsync += async args =>
{
// first, we create a new sendable message of the picked up message
// that we can resubmit.
var resubmitMessage = new ServiceBusMessage(args.Message);
// if the message has an "error" we know the main loop
// can't handle, let's fix the message
if (resubmitMessage.Subject != null && resubmitMessage.Subject.Equals("Physicist"))
{
lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine(
"\t\tFixing: \n\t\t\tMessageId = {0}, \n\t\t\tSequenceNumber = {1}, \n\t\t\tLabel = {2}",
args.Message.MessageId,
args.Message.SequenceNumber,
args.Message.Subject);
Console.ResetColor();
}

// set the label to "Scientist"
resubmitMessage.Subject = "Scientist";
// and re-enqueue the cloned message
await resubmitSender.SendMessageAsync(resubmitMessage);
}

// finally complete the original message and remove it from the DLQ
await args.CompleteMessageAsync(args.Message);
};
dlqProcessor.ProcessErrorAsync += LogMessageHandlerException;
_ = dlqProcessor.StartProcessingAsync();
return doneReceiving.Task;
}

private static Task LogMessageHandlerException(ProcessErrorEventArgs e)
{
Console.WriteLine($"Exception: \"{e.Exception.Message}\" {e.EntityPath}");
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
---
page_type: sample
languages:
- csharp
products:
- azure
- azure-service-bus
name: Explore deadlettering in Azure Service Bus
description: This sample shows how to move messages to the Dead-letter queue, how to retrieve messages from it, and resubmit corrected message back into the main queue.
---

# Dead-Letter Queues

This sample shows how to move messages to the Dead-letter queue, how to retrieve
messages from it, and resubmit corrected message back into the main queue.

## What is a Dead-Letter Queue?

All Service Bus Queues and Subscriptions have a secondary sub-queue, called the
*dead-letter queue* (DLQ).

This sub-queue does not need to be explicitly created and cannot be deleted or
otherwise managed independent of the main entity. The purpose of the Dead-Letter
Queue (DLQ) is accept and hold messages that cannot be delivered to any receiver
or messages that could not be processed. Read more about Dead-Letter Queues [in
the product documentation.][1]

## Sample Code

The sample implements two scenarios:

* Send a message and then retrierve and abandon the message until the maximum
delivery count is exhausted and the message is automatically dead-lettered.

* Send a set of messages, and explicitly dead-letter messages that do not match
a certain criterion and would therefore not be processed correctly. The messages
are then picked up from the dead-letter queue, are automatically corrected, and
resubmitted.

The sample code is further documented inline in the `Program.cs` C# file.

## Prerequisites
In order to run the sample, you will need a Service Bus Namespace. For more information on getting setup see the [Getting Started](https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/servicebus/Azure.Messaging.ServiceBus#getting-started) section of the Service Bus library Readme. Once you have a Service Bus Namespace, you will need to create a queue that can be used for the sample.

## Building the Sample

To build the sample:

1. Install [.NET Core 3.1](https://dot.net) or newer.

2. Run in the project directory:

```bash
dotnet build
```

## Running the Sample

You can either run the executable you just build, or build and run the project at the same time. There are two ways to authenticate in the sample.

### Use Azure Activity Directory
You can use any of the [authentication mechanisms](https://docs.microsoft.com/dotnet/api/overview/azure/identity-readme?view=azure-dotnet) that the `DefaultAzureCredential` from the Azure.Identity supports.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a clever way to set things up. I'm going to steal this at some point....


To run the sample using Managed Identity:
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved

```bash
dotnet run -- --namespace <fully qualified namespace> --queue <queue name>
```
### Use Service Bus connection string
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
The other way to run the sample is by specifying an environment variable that contains the connection string for the namespace you wish to use:

```bash
dotnet run -- --connection-variable <environment variable name> --queue <queue name>
```


Loading