From 62e292ecb265c770c29ddb005220c9e4c1954ee8 Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Wed, 28 Apr 2021 17:51:07 -0700 Subject: [PATCH 1/3] Add DLQ sample --- .../DeadLetterQueue/DeadLetterQueue.csproj | 18 ++ .../samples/DeadLetterQueue/Program.cs | 301 ++++++++++++++++++ .../samples/DeadLetterQueue/README.md | 76 +++++ .../samples/Directory.Build.props | 15 + 4 files changed, 410 insertions(+) create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/DeadLetterQueue.csproj create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/Program.cs create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/README.md create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/samples/Directory.Build.props diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/DeadLetterQueue.csproj b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/DeadLetterQueue.csproj new file mode 100644 index 0000000000000..bb9d42b9141b2 --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/DeadLetterQueue.csproj @@ -0,0 +1,18 @@ + + + + Exe + net5.0 + + + + + + + + + + + + + diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/Program.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/Program.cs new file mode 100644 index 0000000000000..dafe101bf1b73 --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/Program.cs @@ -0,0 +1,301 @@ +using System; +using System.Collections.Generic; +using System.CommandLine; +using System.CommandLine.Invocation; +using System.Threading; +using System.Threading.Tasks; +using Azure.Identity; +using Azure.Messaging.ServiceBus; + +namespace DeadLetterQueue +{ + // This sample shows how to move messages to the Dead-letter queue, how to retrieve + // messages from it, and resubmit corrected message back into the main queue. + public class Program + { + private static ServiceBusClient _client; + + public static async Task Main(string[] args) + { + var command = new RootCommand("Demonstrates the DeadLetter feature of Azure Service Bus.") + { + new Option( + alias: "--namespace", + description: "Fully qualified Service Bus Queue namespace to use") { Name = "FullyQualifiedNamespace" }, + new Option( + alias: "--queue", + description: "Service Bus Queue Name to use") { IsRequired = true, Name = "QueueName"}, + new Option( + alias: "--connection-variable", + description: "The name of an environment variable containing the connection string to use.") { Name = "Connection"}, + }; + command.Handler = CommandHandler.Create(RunAsync); + await command.InvokeAsync(args); + } + + private static async Task RunAsync(string fullyQualifiedNamespace, string queueName, string connection) + { + if (!string.IsNullOrEmpty(connection)) + { + _client = new ServiceBusClient(Environment.GetEnvironmentVariable(connection)); + } + else if (!string.IsNullOrEmpty(fullyQualifiedNamespace)) + { + _client = new ServiceBusClient(fullyQualifiedNamespace, new DefaultAzureCredential()); + } + else + { + throw new ArgumentException( + "Either a fully qualified namespace or a connection string environment variable must be specified."); + } + + var cts = new CancellationTokenSource(); + var sender = _client.CreateSender(queueName); + + // For the delivery count scenario, we first send a single message, + // and then pick it up and abandon it until is "disappears" from the queue. + // Then we fetch the message from the dead-letter queue (DLQ) and inspect it. + await SendMessages(sender, 1); + await ExceedMaxDeliveryAsync(queueName); + + // For the fix-up scenario, we send a series of messages to a queue, and + // run a receive loop that explicitly pushes messages into the DLQ when + // they don't satisfy a processing condition. The fix-up receive loop inspects + // the DLQ, fixes the "faulty" messages, and resubmits them into processing. + var sendTask = SendMessages(sender, int.MaxValue); + var receiveTask = ReceiveMessages(queueName, cts.Token); + var fixupTask = PickUpAndFixDeadletters(queueName, sender, cts.Token); + + // wait for a key press or 10 seconds + await Task.WhenAny( + Task.Run(() => Console.ReadKey()), + Task.Delay(TimeSpan.FromSeconds(10)) + ); + + // end the processing + cts.Cancel(); + // await shutdown and exit + await Task.WhenAll(sendTask, receiveTask, fixupTask); + } + + private static Task SendMessages(ServiceBusSender sender, int maxMessages) + { + dynamic data = new[] + { + new {name = "Einstein", firstName = "Albert"}, new {name = "Heisenberg", firstName = "Werner"}, + new {name = "Curie", firstName = "Marie"}, new {name = "Hawking", firstName = "Steven"}, + new {name = "Newton", firstName = "Isaac"}, new {name = "Bohr", firstName = "Niels"}, + new {name = "Faraday", firstName = "Michael"}, new {name = "Galilei", firstName = "Galileo"}, + new {name = "Kepler", firstName = "Johannes"}, new {name = "Kopernikus", firstName = "Nikolaus"} + }; + + // send a message for each data entry, but at most maxMessages + // we're sending in a loop, but don't block on each send, but + // rather collect all sends in a list and then wait for all of + // them to complete asynchronously, which is much faster + var tasks = new List(); + for (int i = 0; i < Math.Min(data.Length, maxMessages); i++) + { + // each message has a JSON body with one of the data rows + var message = new ServiceBusMessage(new BinaryData(jsonSerializable: data[i])) + { + ContentType = "application/json", // JSON data + Subject = i % 2 == 0 ? "Scientist" : "Physicist", // random picked header + MessageId = i.ToString(), // message-id + TimeToLive = TimeSpan.FromMinutes(2) // message expires in 2 minutes + }; + + // start sending this message + lock (Console.Out) + { + Console.ForegroundColor = ConsoleColor.DarkYellow; + Console.WriteLine($"Message sent: Id = {message.MessageId}"); + Console.ResetColor(); + } + + ; + tasks.Add(sender.SendMessageAsync(message).ContinueWith((t) => + { + lock (Console.Out) + { + Console.ForegroundColor = ConsoleColor.Yellow; + Console.WriteLine($"\tMessage acknowledged: Id = {message.MessageId}"); + Console.ResetColor(); + } + })); + } + + return Task.WhenAll(tasks); + } + + + private static async Task ExceedMaxDeliveryAsync(string queueName) + { + ServiceBusReceiver receiver = _client.CreateReceiver(queueName); + + while (true) + { + // Ask the broker to return any message readily available or return with no + // result after 2 seconds (allowing for clients with great network latency) + ServiceBusReceivedMessage msg = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(2)); + if (msg != null) + { + // Now we immediately abandon the message, which increments the DeliveryCount + Console.WriteLine($"Picked up message: Id = {msg.MessageId}; DeliveryCount {msg.DeliveryCount}"); + await receiver.AbandonMessageAsync(msg); + } + else + { + // Once the system moves the message to the DLQ, the main queue is empty + // and the loop exits as ReceiveAsync returns null. + break; + } + } + + // For picking up the message from a DLQ, we make a receiver just like for a + // regular queue. We could also use QueueClient and a registered handler here. + // The required path is constructed with the EntityNameHelper.FormatDeadLetterPath() + // helper method, and always follows the pattern "{entity}/$DeadLetterQueue", + // meaning that for a queue "Q1", the path is "Q1/$DeadLetterQueue" and for a + // topic "T1" and subscription "S1", the path is "T1/Subscriptions/S1/$DeadLetterQueue" + ServiceBusReceiver deadletterReceiver = _client.CreateReceiver(queueName, + new ServiceBusReceiverOptions {SubQueue = SubQueue.DeadLetter}); + while (true) + { + // receive a message + ServiceBusReceivedMessage message = + await deadletterReceiver.ReceiveMessageAsync(TimeSpan.FromSeconds(10)); + if (message != null) + { + // write out the message deadletter information + Console.WriteLine("Deadletter message:"); + Console.WriteLine($"DeadLetterReason = {message.DeadLetterReason}"); + Console.WriteLine($"DeadLetterErrorDescription = {message.DeadLetterErrorDescription}"); + + // complete and therefore remove the message from the DLQ + await deadletterReceiver.CompleteMessageAsync(message); + } + else + { + // DLQ was empty on last receive attempt + break; + } + } + } + + private static Task ReceiveMessages(string queueName, CancellationToken cancellationToken) + { + var doneReceiving = new TaskCompletionSource(); + ServiceBusProcessor processor = _client.CreateProcessor(queueName); + + // close the receiver and factory when the CancellationToken fires + cancellationToken.Register( + async () => + { + await processor.CloseAsync(); + doneReceiving.SetResult(true); + }); + + processor.ProcessMessageAsync += async args => + { + ServiceBusReceivedMessage message = args.Message; + // If the message holds JSON data and the label is set to "Scientist", + // we accept the message and print it. + if (message.Subject != null && + message.ContentType != null && + message.Subject.Equals("Scientist", StringComparison.InvariantCultureIgnoreCase) && + message.ContentType.Equals("application/json", StringComparison.InvariantCultureIgnoreCase)) + { + var body = message.Body; + + // System.Text.JSON does not currently support deserializing to dynamic + Dictionary scientist = body.ToObjectFromJson>(); + + lock (Console.Out) + { + Console.ForegroundColor = ConsoleColor.Cyan; + Console.WriteLine( + "\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = {0}, \n\t\t\t\t\t\tSequenceNumber = {1}, \n\t\t\t\t\t\tEnqueuedTime = {2}," + + "\n\t\t\t\t\t\tExpiresAt = {4}, \n\t\t\t\t\t\tContentType = \"{3}\", \n\t\t\t\t\t\tContent: [ firstName = {5}, name = {6} ]", + message.MessageId, + message.SequenceNumber, + message.EnqueuedTime, + message.ContentType, + message.ExpiresAt, + scientist["firstName"], + scientist["name"]); + Console.ResetColor(); + } + + await args.CompleteMessageAsync(message, args.CancellationToken); + } + else + { + // if the messages doesn't fit the criteria above, we deadletter it + await args.DeadLetterMessageAsync(message, cancellationToken: args.CancellationToken); + } + }; + + processor.ProcessErrorAsync += LogMessageHandlerException; + _ = processor.StartProcessingAsync(cancellationToken); + return doneReceiving.Task; + } + + private static Task PickUpAndFixDeadletters(string queueName, ServiceBusSender resubmitSender, CancellationToken cancellationToken) + { + var doneReceiving = new TaskCompletionSource(); + + // here, we create a receiver on the Deadletter queue + ServiceBusProcessor dlqProcessor = + _client.CreateProcessor(queueName, new ServiceBusProcessorOptions {SubQueue = SubQueue.DeadLetter}); + + // close the receiver and factory when the CancellationToken fires + cancellationToken.Register( + async () => + { + await dlqProcessor.CloseAsync(); + doneReceiving.SetResult(true); + }); + + // register the RegisterMessageHandler callback + dlqProcessor.ProcessMessageAsync += async args => + { + // first, we create a new sendable message of the picked up message + // that we can resubmit. + var resubmitMessage = new ServiceBusMessage(args.Message); + // if the message has an "error" we know the main loop + // can't handle, let's fix the message + if (resubmitMessage.Subject != null && resubmitMessage.Subject.Equals("Physicist")) + { + lock (Console.Out) + { + Console.ForegroundColor = ConsoleColor.Red; + Console.WriteLine( + "\t\tFixing: \n\t\t\tMessageId = {0}, \n\t\t\tSequenceNumber = {1}, \n\t\t\tLabel = {2}", + args.Message.MessageId, + args.Message.SequenceNumber, + args.Message.Subject); + Console.ResetColor(); + } + + // set the label to "Scientist" + resubmitMessage.Subject = "Scientist"; + // and re-enqueue the cloned message + await resubmitSender.SendMessageAsync(resubmitMessage); + } + + // finally complete the original message and remove it from the DLQ + await args.CompleteMessageAsync(args.Message); + }; + dlqProcessor.ProcessErrorAsync += LogMessageHandlerException; + _ = dlqProcessor.StartProcessingAsync(); + return doneReceiving.Task; + } + + private static Task LogMessageHandlerException(ProcessErrorEventArgs e) + { + Console.WriteLine($"Exception: \"{e.Exception.Message}\" {e.EntityPath}"); + return Task.CompletedTask; + } + } +} diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/README.md b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/README.md new file mode 100644 index 0000000000000..336a4bc6e6557 --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/README.md @@ -0,0 +1,76 @@ +--- +page_type: sample +languages: +- csharp + products: +- azure +- azure-service-bus + name: Explore deadlettering in Azure Service Bus + description: This sample shows how to move messages to the Dead-letter queue, how to retrieve messages from it, and resubmit corrected message back into the main queue. +--- + +# Dead-Letter Queues + +This sample shows how to move messages to the Dead-letter queue, how to retrieve +messages from it, and resubmit corrected message back into the main queue. + +## What is a Dead-Letter Queue? + +All Service Bus Queues and Subscriptions have a secondary sub-queue, called the +*dead-letter queue* (DLQ). + +This sub-queue does not need to be explicitly created and cannot be deleted or +otherwise managed independent of the main entity. The purpose of the Dead-Letter +Queue (DLQ) is accept and hold messages that cannot be delivered to any receiver +or messages that could not be processed. Read more about Dead-Letter Queues [in +the product documentation.][1] + +## Sample Code + +The sample implements two scenarios: + +* Send a message and then retrierve and abandon the message until the maximum + delivery count is exhausted and the message is automatically dead-lettered. + +* Send a set of messages, and explicitly dead-letter messages that do not match + a certain criterion and would therefore not be processed correctly. The messages + are then picked up from the dead-letter queue, are automatically corrected, and + resubmitted. + +The sample code is further documented inline in the [Program.cs](Program.cs) C# file. + +## Prerequisites +In order to run the sample, you will need a Service Bus Namespace. For more information on getting setup see the [Getting Started]((https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/servicebus/Azure.Messaging.ServiceBus#getting-started)) section of the Service Bus library Readme. Once you have a Service Bus Namespace, you will need to create a queue that can be used for the sample. + +## Building the Sample + +To build the sample: + +1. Install [.NET Core 3.1](https://dot.net) or newer. + +2. Run in the project directory: + + ```bash + dotnet build + ``` + +## Running the Sample + +You can either run the executable you just build, or build and run the project at the same time. There are two ways to authenticate in the sample. + +### Use Azure Activity Directory +You can use any of the [authentication mechanisms](https://docs.microsoft.com/en-us/dotnet/api/overview/azure/identity-readme?view=azure-dotnet) that the `DefaultAzureCredential` from the Azure.Identity supports. + +To run the sample using Managed Identity: + +```bash +dotnet run -- --namespace --queue +``` +### Use Service Bus connection string +The other way to run the sample is by specifying an environment variable that contains the connection string for the namespace you wish to use: + +```bash +dotnet run -- --connection-variable --queue +``` + + diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Directory.Build.props b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Directory.Build.props new file mode 100644 index 0000000000000..9f435bb1bfc6c --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Directory.Build.props @@ -0,0 +1,15 @@ + + + + true + false + true + false + + + + + + false + + From 8af52728bd7af38b8fd6e85df470a6e4dfbdf950 Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Wed, 28 Apr 2021 21:13:43 -0700 Subject: [PATCH 2/3] Fix broken links --- .../samples/DeadLetterQueue/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/README.md b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/README.md index 336a4bc6e6557..56d6a3022fbda 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/README.md +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/README.md @@ -37,10 +37,10 @@ The sample implements two scenarios: are then picked up from the dead-letter queue, are automatically corrected, and resubmitted. -The sample code is further documented inline in the [Program.cs](Program.cs) C# file. +The sample code is further documented inline in the `Program.cs` C# file. ## Prerequisites -In order to run the sample, you will need a Service Bus Namespace. For more information on getting setup see the [Getting Started]((https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/servicebus/Azure.Messaging.ServiceBus#getting-started)) section of the Service Bus library Readme. Once you have a Service Bus Namespace, you will need to create a queue that can be used for the sample. +In order to run the sample, you will need a Service Bus Namespace. For more information on getting setup see the [Getting Started](https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/servicebus/Azure.Messaging.ServiceBus#getting-started) section of the Service Bus library Readme. Once you have a Service Bus Namespace, you will need to create a queue that can be used for the sample. ## Building the Sample @@ -59,7 +59,7 @@ To build the sample: You can either run the executable you just build, or build and run the project at the same time. There are two ways to authenticate in the sample. ### Use Azure Activity Directory -You can use any of the [authentication mechanisms](https://docs.microsoft.com/en-us/dotnet/api/overview/azure/identity-readme?view=azure-dotnet) that the `DefaultAzureCredential` from the Azure.Identity supports. +You can use any of the [authentication mechanisms](https://docs.microsoft.com/dotnet/api/overview/azure/identity-readme?view=azure-dotnet) that the `DefaultAzureCredential` from the Azure.Identity supports. To run the sample using Managed Identity: From 31a12c6221cd95ec467ade2ff0b9278abf6e6830 Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Thu, 29 Apr 2021 13:10:39 -0700 Subject: [PATCH 3/3] PR FB --- .../DeadLetterQueue/DeadLetterQueue.csproj | 4 ++-- .../samples/DeadLetterQueue/Program.cs | 24 ++++++++++++------- .../samples/DeadLetterQueue/README.md | 4 ++-- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/DeadLetterQueue.csproj b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/DeadLetterQueue.csproj index bb9d42b9141b2..33b411ecd7598 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/DeadLetterQueue.csproj +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/DeadLetterQueue.csproj @@ -1,5 +1,4 @@ - Exe net5.0 @@ -7,7 +6,8 @@ - + + diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/Program.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/Program.cs index dafe101bf1b73..c2f6129cc4114 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/Program.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/Program.cs @@ -82,11 +82,16 @@ private static Task SendMessages(ServiceBusSender sender, int maxMessages) { dynamic data = new[] { - new {name = "Einstein", firstName = "Albert"}, new {name = "Heisenberg", firstName = "Werner"}, - new {name = "Curie", firstName = "Marie"}, new {name = "Hawking", firstName = "Steven"}, - new {name = "Newton", firstName = "Isaac"}, new {name = "Bohr", firstName = "Niels"}, - new {name = "Faraday", firstName = "Michael"}, new {name = "Galilei", firstName = "Galileo"}, - new {name = "Kepler", firstName = "Johannes"}, new {name = "Kopernikus", firstName = "Nikolaus"} + new {name = "Einstein", firstName = "Albert"}, + new {name = "Heisenberg", firstName = "Werner"}, + new {name = "Curie", firstName = "Marie"}, + new {name = "Hawking", firstName = "Steven"}, + new {name = "Newton", firstName = "Isaac"}, + new {name = "Bohr", firstName = "Niels"}, + new {name = "Faraday", firstName = "Michael"}, + new {name = "Galilei", firstName = "Galileo"}, + new {name = "Kepler", firstName = "Johannes"}, + new {name = "Kopernikus", firstName = "Nikolaus"} }; // send a message for each data entry, but at most maxMessages @@ -112,8 +117,8 @@ private static Task SendMessages(ServiceBusSender sender, int maxMessages) Console.WriteLine($"Message sent: Id = {message.MessageId}"); Console.ResetColor(); } - - ; + + // After the send task is complete, output this to the console. tasks.Add(sender.SendMessageAsync(message).ContinueWith((t) => { lock (Console.Out) @@ -215,8 +220,9 @@ private static Task ReceiveMessages(string queueName, CancellationToken cancella { Console.ForegroundColor = ConsoleColor.Cyan; Console.WriteLine( - "\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = {0}, \n\t\t\t\t\t\tSequenceNumber = {1}, \n\t\t\t\t\t\tEnqueuedTime = {2}," + - "\n\t\t\t\t\t\tExpiresAt = {4}, \n\t\t\t\t\t\tContentType = \"{3}\", \n\t\t\t\t\t\tContent: [ firstName = {5}, name = {6} ]", + "\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = {0}, \n\t\t\t\t\t\tSequenceNumber = {1}," + + " \n\t\t\t\t\t\tEnqueuedTime = {2}," + "\n\t\t\t\t\t\tExpiresAt = {4}, \n\t\t\t\t\t\tContentType = \"{3}\"," + + " \n\t\t\t\t\t\tContent: [ firstName = {5}, name = {6} ]", message.MessageId, message.SequenceNumber, message.EnqueuedTime, diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/README.md b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/README.md index 56d6a3022fbda..45f1393c932a6 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/README.md +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/DeadLetterQueue/README.md @@ -61,12 +61,12 @@ You can either run the executable you just build, or build and run the project a ### Use Azure Activity Directory You can use any of the [authentication mechanisms](https://docs.microsoft.com/dotnet/api/overview/azure/identity-readme?view=azure-dotnet) that the `DefaultAzureCredential` from the Azure.Identity supports. -To run the sample using Managed Identity: +To run the sample using Azure Identity: ```bash dotnet run -- --namespace --queue ``` -### Use Service Bus connection string +### Use a Service Bus connection string The other way to run the sample is by specifying an environment variable that contains the connection string for the namespace you wish to use: ```bash