From 1aafe45de4b751c9ace375900463993ee8614f9e Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Sat, 5 Jun 2021 22:13:58 -0700 Subject: [PATCH] Batch receive fixes (#21566) * Batch receive fixes * Use AwaitWithCancellation * volatile * Remove unnecessary catch * Fix tests * Fix flaky tests * Fix test --- .../src/Listeners/ServiceBusListener.cs | 299 +++++++++--------- ...Azure.WebJobs.Extensions.ServiceBus.csproj | 8 +- .../Primitives/ServiceBusMessageActions.cs | 12 + ...rviceBusTriggerAttributeBindingProvider.cs | 2 +- .../tests/ServiceBusEndToEndTests.cs | 150 ++++++--- .../tests/ServiceBusSessionsEndToEndTests.cs | 23 +- .../tests/WebJobsServiceBusTestBase.cs | 67 ++-- 7 files changed, 330 insertions(+), 231 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs index 8b602992fa61..903fe41ad0d4 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs @@ -7,46 +7,39 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using Azure.Core; using Azure.Core.Pipeline; using Azure.Messaging.ServiceBus; -using Microsoft.Azure.WebJobs.Extensions.Clients.Shared; using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; -using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Listeners; using Microsoft.Azure.WebJobs.Host.Scale; -using Microsoft.Extensions.Azure; -using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; namespace Microsoft.Azure.WebJobs.ServiceBus.Listeners { internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider { - private readonly MessagingProvider _messagingProvider; private readonly ITriggeredFunctionExecutor _triggerExecutor; - private readonly string _functionId; - private readonly ServiceBusEntityType _entityType; private readonly string _entityPath; private readonly bool _isSessionsEnabled; private readonly bool _autoCompleteMessagesOptionEvaluatedValue; private readonly CancellationTokenSource _cancellationTokenSource; private readonly ServiceBusOptions _serviceBusOptions; - private readonly ILoggerFactory _loggerFactory; private readonly bool _singleDispatch; private readonly ILogger _logger; private readonly Lazy _messageProcessor; - private Lazy _batchReceiver; - private Lazy _client; - private Lazy _sessionMessageProcessor; - private Lazy _scaleMonitor; + private readonly Lazy _batchReceiver; + private readonly Lazy _client; + private readonly Lazy _sessionMessageProcessor; + private readonly Lazy _scaleMonitor; - private bool _disposed; - private bool _started; + private volatile bool _disposed; + private volatile bool _started; // Serialize execution of StopAsync to avoid calling Unregister* concurrently private readonly SemaphoreSlim _stopAsyncSemaphore = new SemaphoreSlim(1, 1); + private CancellationTokenRegistration _batchReceiveRegistration; + private Task _batchLoop; public ServiceBusListener( string functionId, @@ -62,15 +55,11 @@ public ServiceBusListener( bool singleDispatch, ServiceBusClientFactory clientFactory) { - _functionId = functionId; - _entityType = entityType; _entityPath = entityPath; _isSessionsEnabled = isSessionsEnabled; _autoCompleteMessagesOptionEvaluatedValue = autoCompleteMessagesOptionEvaluatedValue; _triggerExecutor = triggerExecutor; _cancellationTokenSource = new CancellationTokenSource(); - _messagingProvider = messagingProvider; - _loggerFactory = loggerFactory; _logger = loggerFactory.CreateLogger(); _client = new Lazy( @@ -78,31 +67,31 @@ public ServiceBusListener( clientFactory.CreateClientFromSetting(connection)); _batchReceiver = new Lazy( - () => _messagingProvider.CreateBatchMessageReceiver( + () => messagingProvider.CreateBatchMessageReceiver( _client.Value, _entityPath, options.ToReceiverOptions())); _messageProcessor = new Lazy( - () => _messagingProvider.CreateMessageProcessor( + () => messagingProvider.CreateMessageProcessor( _client.Value, _entityPath, options.ToProcessorOptions(_autoCompleteMessagesOptionEvaluatedValue))); _sessionMessageProcessor = new Lazy( - () => _messagingProvider.CreateSessionMessageProcessor( + () => messagingProvider.CreateSessionMessageProcessor( _client.Value, _entityPath, options.ToSessionProcessorOptions(_autoCompleteMessagesOptionEvaluatedValue))); _scaleMonitor = new Lazy( () => new ServiceBusScaleMonitor( - _functionId, - _entityType, + functionId, + entityType, _entityPath, connection, _batchReceiver, - _loggerFactory, + loggerFactory, clientFactory)); _singleDispatch = singleDispatch; @@ -133,7 +122,7 @@ public async Task StartAsync(CancellationToken cancellationToken) } else { - StartMessageBatchReceiver(_cancellationTokenSource.Token); + _batchLoop = RunBatchReceiveLoopAsync(_cancellationTokenSource.Token); } _started = true; } @@ -150,23 +139,26 @@ public async Task StopAsync(CancellationToken cancellationToken) throw new InvalidOperationException("The listener has not yet been started or has already been stopped."); } - // StopProcessingAsync method stop new messages from being processed while allowing in-flight messages to complete. - // As the amount of time functions are allowed to complete processing varies by SKU, we specify max timespan - // as the amount of time Service Bus SDK should wait for in-flight messages to complete procesing after - // unregistering the message handler so that functions have as long as the host continues to run time to complete. + _cancellationTokenSource.Cancel(); + + // CloseAsync method stop new messages from being processed while allowing in-flight messages to be processed. if (_singleDispatch) { if (_isSessionsEnabled) { - await _sessionMessageProcessor.Value.Processor.StopProcessingAsync(cancellationToken).ConfigureAwait(false); + await _sessionMessageProcessor.Value.Processor.CloseAsync(cancellationToken).ConfigureAwait(false); } else { - await _messageProcessor.Value.Processor.StopProcessingAsync(cancellationToken).ConfigureAwait(false); + await _messageProcessor.Value.Processor.CloseAsync(cancellationToken).ConfigureAwait(false); } } - // Batch processing will be stopped via the _cancellationTokenSource on its next iteration - _cancellationTokenSource.Cancel(); + else + { + await _batchLoop.ConfigureAwait(false); + await _batchReceiver.Value.CloseAsync(cancellationToken).ConfigureAwait(false); + } + _started = false; } finally @@ -185,33 +177,43 @@ public void Cancel() [SuppressMessage("Microsoft.Usage", "CA2213:DisposableFieldsShouldBeDisposed", MessageId = "_cancellationTokenSource")] public void Dispose() { - if (!_disposed) + if (_disposed) { - // Running callers might still be using the cancellation token. - // Mark it canceled but don't dispose of the source while the callers are running. - // Otherwise, callers would receive ObjectDisposedException when calling token.Register. - // For now, rely on finalization to clean up _cancellationTokenSource's wait handle (if allocated). - _cancellationTokenSource.Cancel(); + return; + } - if (_batchReceiver != null && _batchReceiver.IsValueCreated) - { - _batchReceiver.Value.CloseAsync().Wait(); - _batchReceiver = null; - } + // Running callers might still be using the cancellation token. + // Mark it canceled but don't dispose of the source while the callers are running. + // Otherwise, callers would receive ObjectDisposedException when calling token.Register. + // For now, rely on finalization to clean up _cancellationTokenSource's wait handle (if allocated). + _cancellationTokenSource.Cancel(); - if (_client != null && _client.IsValueCreated) - { -#pragma warning disable AZC0107 // DO NOT call public asynchronous method in synchronous scope. - _client.Value.DisposeAsync().EnsureCompleted(); -#pragma warning restore AZC0107 // DO NOT call public asynchronous method in synchronous scope. - _client = null; - } + if (_batchReceiver.IsValueCreated) + { +#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). + _batchReceiver.Value.CloseAsync(CancellationToken.None).GetAwaiter().GetResult(); +#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). + } - _stopAsyncSemaphore.Dispose(); - _cancellationTokenSource.Dispose(); + if (_messageProcessor.IsValueCreated) + { +#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). + _messageProcessor.Value.Processor.CloseAsync(CancellationToken.None).GetAwaiter().GetResult(); +#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). + } - _disposed = true; + if (_client.IsValueCreated) + { +#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). + _client.Value.DisposeAsync().AsTask().GetAwaiter().GetResult(); +#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). } + + _stopAsyncSemaphore.Dispose(); + _cancellationTokenSource.Dispose(); + _batchReceiveRegistration.Dispose(); + + _disposed = true; } internal async Task ProcessMessageAsync(ProcessMessageEventArgs args) @@ -252,7 +254,7 @@ internal async Task ProcessSessionMessageAsync(ProcessSessionMessageEventArgs ar } } - internal void StartMessageBatchReceiver(CancellationToken cancellationToken) + private async Task RunBatchReceiveLoopAsync(CancellationToken cancellationToken) { ServiceBusClient sessionClient = null; ServiceBusReceiver receiver = null; @@ -265,122 +267,135 @@ internal void StartMessageBatchReceiver(CancellationToken cancellationToken) receiver = _batchReceiver.Value; } - Task.Run(async () => + while (true) { - while (true) + try { - try + if (cancellationToken.IsCancellationRequested) { - if (cancellationToken.IsCancellationRequested) - { - _logger.LogInformation("Message processing has been stopped or cancelled"); - return; - } + _logger.LogInformation("Message processing has been stopped or cancelled"); + return; + } - if (_isSessionsEnabled && (receiver == null || receiver.IsClosed)) + if (_isSessionsEnabled && (receiver == null || receiver.IsClosed)) + { + try { - try - { - receiver = await sessionClient.AcceptNextSessionAsync( - _entityPath, - new ServiceBusSessionReceiverOptions - { - PrefetchCount = _serviceBusOptions.PrefetchCount - }, - cancellationToken).ConfigureAwait(false); - } - catch (ServiceBusException ex) + receiver = await sessionClient.AcceptNextSessionAsync( + _entityPath, + new ServiceBusSessionReceiverOptions + { + PrefetchCount = _serviceBusOptions.PrefetchCount + }, + cancellationToken).ConfigureAwait(false); + } + catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.ServiceTimeout) - { - // it's expected if the entity is empty, try next time - continue; - } + { + // it's expected if the entity is empty, try next time + continue; } + } - IReadOnlyList messages = - await receiver.ReceiveMessagesAsync( - _serviceBusOptions.MaxBatchSize, - cancellationToken: cancellationToken) - .ConfigureAwait(false); + IReadOnlyList messages = + await receiver.ReceiveMessagesAsync( + _serviceBusOptions.MaxBatchSize, + cancellationToken: cancellationToken).AwaitWithCancellation(cancellationToken); - if (messages.Count > 0) + if (messages.Count > 0) + { + ServiceBusReceivedMessage[] messagesArray = messages.ToArray(); + ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateBatch(messagesArray); + if (_isSessionsEnabled) { - ServiceBusReceivedMessage[] messagesArray = messages.ToArray(); - ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateBatch(messagesArray); - if (_isSessionsEnabled) - { - input.MessageActions = new ServiceBusSessionMessageActions((ServiceBusSessionReceiver)receiver); - } - else - { - input.MessageActions = new ServiceBusMessageActions(receiver); - } - FunctionResult result = await _triggerExecutor.TryExecuteAsync(input.GetTriggerFunctionData(), cancellationToken).ConfigureAwait(false); - - if (cancellationToken.IsCancellationRequested) - { - return; - } + input.MessageActions = new ServiceBusSessionMessageActions((ServiceBusSessionReceiver)receiver); + } + else + { + input.MessageActions = new ServiceBusMessageActions(receiver); + } + FunctionResult result = await _triggerExecutor.TryExecuteAsync(input.GetTriggerFunctionData(), cancellationToken).ConfigureAwait(false); - // Complete batch of messages only if the execution was successful - if (_autoCompleteMessagesOptionEvaluatedValue && _started) + // Complete batch of messages only if the execution was successful + if (_autoCompleteMessagesOptionEvaluatedValue) + { + if (result.Succeeded) { - if (result.Succeeded) + List completeTasks = new List(); + foreach (ServiceBusReceivedMessage message in messagesArray) { - List completeTasks = new List(); - foreach (ServiceBusReceivedMessage message in messagesArray) + // skip messages that were settled in the user's function + if (input.MessageActions.SettledMessages.Contains(message)) { - completeTasks.Add(receiver.CompleteMessageAsync(message, cancellationToken)); + continue; } - await Task.WhenAll(completeTasks).ConfigureAwait(false); + + // Pass CancellationToken.None to allow autocompletion to finish even when shutting down + completeTasks.Add(receiver.CompleteMessageAsync(message, CancellationToken.None)); } - else + + await Task.WhenAll(completeTasks).ConfigureAwait(false); + } + else + { + List abandonTasks = new List(); + foreach (ServiceBusReceivedMessage message in messagesArray) { - List abandonTasks = new List(); - foreach (ServiceBusReceivedMessage message in messagesArray) + // skip messages that were settled in the user's function + if (input.MessageActions.SettledMessages.Contains(message)) { - abandonTasks.Add(receiver.AbandonMessageAsync(message, cancellationToken: cancellationToken)); + continue; } - await Task.WhenAll(abandonTasks).ConfigureAwait(false); + + // Pass CancellationToken.None to allow abandon to finish even when shutting down + abandonTasks.Add(receiver.AbandonMessageAsync(message, cancellationToken: CancellationToken.None)); } - } - } - else - { - // Close the session and release the session lock after draining all messages for the accepted session. - if (_isSessionsEnabled) - { - // Use CancellationToken.None to attempt to close the receiver even when shutting down - await receiver.CloseAsync(CancellationToken.None).ConfigureAwait(false); + + await Task.WhenAll(abandonTasks).ConfigureAwait(false); } } } - catch (ObjectDisposedException) + else { - // Ignore as we are stopping the host + // Close the session and release the session lock after draining all messages for the accepted session. + if (_isSessionsEnabled) + { + // Use CancellationToken.None to attempt to close the receiver even when shutting down + await receiver.CloseAsync(CancellationToken.None).ConfigureAwait(false); + } } - catch (Exception ex) - { - // Log another exception - _logger.LogError(ex, $"An unhandled exception occurred in the message batch receive loop"); + } + catch (ObjectDisposedException) + { + // Ignore as we are stopping the host + } + catch (OperationCanceledException) + when(cancellationToken.IsCancellationRequested) + { + // Ignore as we are stopping the host + _logger.LogInformation("Message processing has been stopped or cancelled"); + } + catch (Exception ex) + { + // Log another exception + _logger.LogError(ex, $"An unhandled exception occurred in the message batch receive loop"); - if (_isSessionsEnabled && receiver != null) + if (_isSessionsEnabled && receiver != null) + { + // Attempt to close the session and release session lock to accept a new session on the next loop iteration + try { - // Attempt to close the session and release session lock to accept a new session on the next loop iteration - try - { - // Use CancellationToken.None to attempt to close the receiver even when shutting down - await receiver.CloseAsync(CancellationToken.None).ConfigureAwait(false); - } - catch - { - // Best effort - receiver = null; - } + // Use CancellationToken.None to attempt to close the receiver even when shutting down + await receiver.CloseAsync(CancellationToken.None).ConfigureAwait(false); + } + catch + { + // Best effort + receiver = null; } } } - }, cancellationToken); + } } private void ThrowIfDisposed() diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj index 2f3bf0a2870f..0b8116ba1f05 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj @@ -14,7 +14,8 @@ - + + @@ -28,4 +29,9 @@ + + + + + diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/ServiceBusMessageActions.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/ServiceBusMessageActions.cs index edece7427ef7..e3eb09e3aadc 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/ServiceBusMessageActions.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/ServiceBusMessageActions.cs @@ -17,6 +17,8 @@ public class ServiceBusMessageActions private readonly ProcessMessageEventArgs _eventArgs; private readonly ProcessSessionMessageEventArgs _sessionEventArgs; + internal HashSet SettledMessages { get; } = new(); + internal ServiceBusMessageActions(ProcessSessionMessageEventArgs sessionEventArgs) { _sessionEventArgs = sessionEventArgs; @@ -50,6 +52,8 @@ public virtual async Task AbandonMessageAsync( { await _sessionEventArgs.AbandonMessageAsync(message, propertiesToModify, cancellationToken).ConfigureAwait(false); } + + SettledMessages.Add(message); } /// @@ -69,6 +73,8 @@ public virtual async Task CompleteMessageAsync( { await _sessionEventArgs.CompleteMessageAsync(message, cancellationToken).ConfigureAwait(false); } + + SettledMessages.Add(message); } /// @@ -105,6 +111,8 @@ await _sessionEventArgs.DeadLetterMessageAsync( cancellationToken) .ConfigureAwait(false); } + + SettledMessages.Add(message); } /// @@ -137,6 +145,8 @@ await _sessionEventArgs.DeadLetterMessageAsync( cancellationToken) .ConfigureAwait(false); } + + SettledMessages.Add(message); } /// @@ -169,6 +179,8 @@ await _sessionEventArgs.DeferMessageAsync( cancellationToken) .ConfigureAwait(false); } + + SettledMessages.Add(message); } } } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Triggers/ServiceBusTriggerAttributeBindingProvider.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Triggers/ServiceBusTriggerAttributeBindingProvider.cs index 2b6a7ecca690..ca6301793425 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Triggers/ServiceBusTriggerAttributeBindingProvider.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Triggers/ServiceBusTriggerAttributeBindingProvider.cs @@ -101,7 +101,7 @@ private bool GetAutoCompleteMessagesOptionToUse(ServiceBusTriggerAttribute attri { if (attribute.IsAutoCompleteMessagesOptionSet) { - _logger.LogInformation($"The 'AutoCompleteMessages' option has been overrriden to '{attribute.AutoCompleteMessages}' value for '{functionName}' function."); + _logger.LogInformation($"The 'AutoCompleteMessages' option has been overriden to '{attribute.AutoCompleteMessages}' value for '{functionName}' function."); return attribute.AutoCompleteMessages; } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs index f3fa7fbf2606..9de225af788b 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs @@ -17,6 +17,7 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using Moq; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using NUnit.Framework; @@ -29,15 +30,12 @@ public class ServiceBusEndToEndTests : WebJobsServiceBusTestBase private const string DrainingQueueMessageBody = "queue-message-draining-no-sessions-1"; private const string DrainingTopicMessageBody = "topic-message-draining-no-sessions-1"; - private static EventWaitHandle _eventWait; - // These two variables will be checked at the end of the test private static string _resultMessage1; private static string _resultMessage2; public ServiceBusEndToEndTests() : base(isSession: false) { - _eventWait = new ManualResetEvent(initialState: false); } [Test] @@ -192,18 +190,32 @@ public async Task TestBatch_Messages() [Test] public async Task TestBatch_AutoCompleteMessagesDisabledOnTrigger() { - await TestMultiple(); + await TestMultiple(); } [Test] public async Task TestBatch_AutoCompleteEnabledOnTrigger() { - var host = BuildHost(BuildHostWithAutoCompleteDisabled()); + await TestMultiple( + configurationDelegate: BuildHostWithAutoCompleteDisabled()); + } + + [Test] + public async Task TestBatch_AutoCompleteEnabledOnTrigger_CompleteInFunction() + { + await TestMultiple( + configurationDelegate: BuildHostWithAutoCompleteDisabled()); + } + + [Test] + public async Task TestSingle_AutoCompleteEnabledOnTrigger_CompleteInFunction() + { + await WriteQueueMessage("{'Name': 'Test1', 'Value': 'Value'}"); + var host = BuildHost( + BuildHostWithAutoCompleteDisabled()); using (host) { - await WriteQueueMessage("{'Name': 'Test1', 'Value': 'Value'}"); - await WriteQueueMessage("{'Name': 'Test2', 'Value': 'Value'}"); - bool result = _topicSubscriptionCalled1.WaitOne(SBTimeoutMills); + bool result = _waitHandle1.WaitOne(SBTimeoutMills); Assert.True(result); await host.StopAsync(); } @@ -222,7 +234,7 @@ public async Task TestSingle_JObject() using (host) { await WriteQueueMessage(JsonConvert.SerializeObject(new {Date = DateTimeOffset.Now})); - bool result = _eventWait.WaitOne(SBTimeoutMills); + bool result = _waitHandle1.WaitOne(SBTimeoutMills); Assert.True(result); await host.StopAsync(); } @@ -239,13 +251,19 @@ public async Task TestBatch_NoMessages() // This test uses a TimerTrigger and StorageCoreServices are needed to get the AddTimers to work c.AddAzureStorageCoreServices(); c.AddTimers(); + // Use a large try timeout to validate that stopping the host finishes quickly + c.AddServiceBus(o => o.ClientRetryOptions.TryTimeout = TimeSpan.FromSeconds(60)); }); }); using (host) { - bool result = _eventWait.WaitOne(SBTimeoutMills); + bool result = _waitHandle1.WaitOne(SBTimeoutMills); Assert.True(result); + var start = DateTimeOffset.Now; await host.StopAsync(); + var stop = DateTimeOffset.Now; + + Assert.IsTrue(stop.Subtract(start) < TimeSpan.FromSeconds(10)); } } @@ -267,7 +285,7 @@ public async Task TestSingle_JObject_CustomSettings() using (host) { await WriteQueueMessage(JsonConvert.SerializeObject(new {Date = DateTimeOffset.Now})); - bool result = _eventWait.WaitOne(SBTimeoutMills); + bool result = _waitHandle1.WaitOne(SBTimeoutMills); Assert.True(result); await host.StopAsync(); } @@ -281,7 +299,7 @@ public async Task TestSingle_OutputPoco() { var jobHost = host.GetJobHost(); await jobHost.CallAsync(nameof(ServiceBusOutputPocoTest.OutputPoco)); - bool result = _eventWait.WaitOne(SBTimeoutMills); + bool result = _waitHandle1.WaitOne(SBTimeoutMills); Assert.True(result); await host.StopAsync(); } @@ -301,7 +319,7 @@ public async Task BindToPoco() { await WriteQueueMessage("{ Name: 'foo', Value: 'bar' }"); - bool result = _eventWait.WaitOne(SBTimeoutMills); + bool result = _waitHandle1.WaitOne(SBTimeoutMills); Assert.True(result); var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage).ToList(); @@ -320,7 +338,7 @@ public async Task BindToString() var jobHost = host.GetJobHost(); await jobHost.CallAsync(method, new { input = "foobar" }); - bool result = _eventWait.WaitOne(SBTimeoutMills); + bool result = _waitHandle1.WaitOne(SBTimeoutMills); Assert.True(result); var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage).ToList(); @@ -378,7 +396,7 @@ private async Task TestSingleDrainMode(bool sendToQueue) await WriteTopicMessage(DrainingTopicMessageBody); } - // Wait to ensure function invocatoin has started before draining messages + // Wait to ensure function invocation has started before draining messages Assert.True(_drainValidationPreDelay.WaitOne(SBTimeoutMills)); // Start draining in-flight messages @@ -414,22 +432,23 @@ private static Action BuildDrainHost() })); } - private async Task TestMultiple(bool isXml = false) + private async Task TestMultiple(bool isXml = false, Action configurationDelegate = default) { - var host = BuildHost(); - using (host) + // pre-populate queue before starting listener to allow batch receive to get multiple messages + if (isXml) { - if (isXml) - { - await WriteQueueMessage(new TestPoco() { Name = "Test1", Value = "Value" }); - await WriteQueueMessage(new TestPoco() { Name = "Test2", Value = "Value" }); - } - else - { - await WriteQueueMessage("{'Name': 'Test1', 'Value': 'Value'}"); - await WriteQueueMessage("{'Name': 'Test2', 'Value': 'Value'}"); - } + await WriteQueueMessage(new TestPoco() { Name = "Test1", Value = "Value" }); + await WriteQueueMessage(new TestPoco() { Name = "Test2", Value = "Value" }); + } + else + { + await WriteQueueMessage("{'Name': 'Test1', 'Value': 'Value'}"); + await WriteQueueMessage("{'Name': 'Test2', 'Value': 'Value'}"); + } + var host = BuildHost(configurationDelegate); + using (host) + { bool result = _topicSubscriptionCalled1.WaitOne(SBTimeoutMills); Assert.True(result); await host.StopAsync(); @@ -450,7 +469,7 @@ private async Task TestMultipleDrainMode(bool sendToQueue) await WriteTopicMessage(DrainingTopicMessageBody); } - // Wait to ensure function invocatoin has started before draining messages + // Wait to ensure function invocation has started before draining messages Assert.True(_drainValidationPreDelay.WaitOne(SBTimeoutMills)); // Start draining in-flight messages @@ -694,7 +713,7 @@ public static void TriggerPoco( { Assert.AreEqual("value", received.Value); Assert.AreEqual("name", received.Name); - _eventWait.Set(); + _waitHandle1.Set(); } } @@ -789,7 +808,7 @@ public static async Task SBQueue2SBQueue( public class ServiceBusMultipleMessagesTestJob_BindToMessageArray { - public static void SBQueue2SBQueue( + public static void Run( [ServiceBusTrigger(FirstQueueNameKey)] ServiceBusReceivedMessage[] array, ServiceBusMessageActions messageActions) @@ -801,7 +820,7 @@ public static void SBQueue2SBQueue( public class ServiceBusMultipleMessagesTestJob_BindToPocoArray { - public static void SBQueue2SBQueue( + public static void Run( [ServiceBusTrigger(FirstQueueNameKey)] TestPoco[] array, ServiceBusMessageActions messageActions) { @@ -812,10 +831,10 @@ public static void SBQueue2SBQueue( public class ServiceBusMultipleMessagesTestJob_BindToJObject { - public static void BindToJObject([ServiceBusTrigger(FirstQueueNameKey)] JObject input) + public static void Run([ServiceBusTrigger(FirstQueueNameKey)] JObject input) { Assert.AreEqual(JTokenType.Date, input["Date"].Type); - _eventWait.Set(); + _waitHandle1.Set(); } } @@ -824,7 +843,7 @@ public class ServiceBusMultipleMessagesTestJob_BindToJObject_RespectsCustomJsonS public static void BindToJObject([ServiceBusTrigger(FirstQueueNameKey)] JObject input) { Assert.AreEqual(JTokenType.String, input["Date"].Type); - _eventWait.Set(); + _waitHandle1.Set(); } } @@ -839,7 +858,7 @@ public static void ShouldNotRun([ServiceBusTrigger(FirstQueueNameKey)] ServiceBu // 20 seconds should give enough time for the receive call to complete as the TryTimeout being used is 10 seconds. public static void Run([TimerTrigger("*/20 * * * * *")] TimerInfo timer) { - _eventWait.Set(); + _waitHandle1.Set(); } } @@ -853,7 +872,7 @@ public static void BindToPoco( Assert.AreEqual(input.Name, name); Assert.AreEqual(input.Value, value); logger.LogInformation($"PocoValues({name},{value})"); - _eventWait.Set(); + _waitHandle1.Set(); } [NoAutomaticTrigger] @@ -863,13 +882,13 @@ public static void BindToString( ILogger logger) { logger.LogInformation($"Input({input})"); - _eventWait.Set(); + _waitHandle1.Set(); } } - public class SBQueue2SBQueueAutoCompleteMessagesDisabled + public class TestBatchAutoCompleteMessagesDisabledOnTrigger { - public static async void SBQueue2SBQueue_AutoCompleteMessagesDisabled( + public static async Task RunAsync( [ServiceBusTrigger(FirstQueueNameKey, AutoCompleteMessages = false)] ServiceBusReceivedMessage[] array, ServiceBusMessageActions messageActions) @@ -883,9 +902,25 @@ public static async void SBQueue2SBQueue_AutoCompleteMessagesDisabled( } } - public class TestBatch_AutoCompleteMessagesEnabledOnTrigger + public class TestBatchAutoCompleteMessagesEnabledOnTrigger_CompleteInFunction { - public static void QueueBatchAutoCompleteEnabledOnTrigger( + public static async Task RunAsync( + [ServiceBusTrigger(FirstQueueNameKey, AutoCompleteMessages = true)] + ServiceBusReceivedMessage[] array, + ServiceBusMessageActions messageActions) + { + string[] messages = array.Select(x => x.Body.ToString()).ToArray(); + foreach (var msg in array) + { + await messageActions.CompleteMessageAsync(msg); + } + ServiceBusMultipleTestJobsBase.ProcessMessages(messages); + } + } + + public class TestBatchAutoCompleteMessagesEnabledOnTrigger + { + public static void Run( [ServiceBusTrigger(FirstQueueNameKey, AutoCompleteMessages = true)] ServiceBusReceivedMessage[] array) { @@ -895,9 +930,22 @@ public static void QueueBatchAutoCompleteEnabledOnTrigger( } } + public class TestSingleAutoCompleteMessagesEnabledOnTrigger_CompleteInFunction + { + public static async Task RunAsync( + [ServiceBusTrigger(FirstQueueNameKey, AutoCompleteMessages = true)] + ServiceBusReceivedMessage message, + ServiceBusMessageActions messageActions) + { + // we want to validate that this doesn't trigger an exception in the SDK since AutoComplete = true + await messageActions.CompleteMessageAsync(message); + _waitHandle1.Set(); + } + } + public class DrainModeTestJobQueue { - public static async Task QueueNoSessions( + public static async Task RunAsync( [ServiceBusTrigger(FirstQueueNameKey)] ServiceBusReceivedMessage msg, ServiceBusMessageActions messageActions, CancellationToken cancellationToken, @@ -905,7 +953,7 @@ public static async Task QueueNoSessions( { logger.LogInformation($"DrainModeValidationFunctions.QueueNoSessions: message data {msg.Body}"); _drainValidationPreDelay.Set(); - await DrainModeHelper.WaitForCancellation(cancellationToken); + await DrainModeHelper.WaitForCancellationAsync(cancellationToken); Assert.True(cancellationToken.IsCancellationRequested); await messageActions.CompleteMessageAsync(msg); _drainValidationPostDelay.Set(); @@ -914,7 +962,7 @@ public static async Task QueueNoSessions( public class DrainModeTestJobTopic { - public static async Task TopicNoSessions( + public static async Task RunAsync( [ServiceBusTrigger(TopicNameKey, FirstSubscriptionNameKey)] ServiceBusReceivedMessage msg, ServiceBusMessageActions messageActions, @@ -923,7 +971,7 @@ public static async Task TopicNoSessions( { logger.LogInformation($"DrainModeValidationFunctions.NoSessions: message data {msg.Body}"); _drainValidationPreDelay.Set(); - await DrainModeHelper.WaitForCancellation(cancellationToken); + await DrainModeHelper.WaitForCancellationAsync(cancellationToken); Assert.True(cancellationToken.IsCancellationRequested); await messageActions.CompleteMessageAsync(msg); _drainValidationPostDelay.Set(); @@ -932,7 +980,7 @@ public static async Task TopicNoSessions( public class DrainModeTestJobQueueBatch { - public static async Task QueueNoSessionsBatch( + public static async Task RunAsync( [ServiceBusTrigger(FirstQueueNameKey)] ServiceBusReceivedMessage[] array, ServiceBusMessageActions messageActions, @@ -942,7 +990,7 @@ public static async Task QueueNoSessionsBatch( Assert.True(array.Length > 0); logger.LogInformation($"DrainModeTestJobBatch.QueueNoSessionsBatch: received {array.Length} messages"); _drainValidationPreDelay.Set(); - await DrainModeHelper.WaitForCancellation(cancellationToken); + await DrainModeHelper.WaitForCancellationAsync(cancellationToken); Assert.True(cancellationToken.IsCancellationRequested); foreach (ServiceBusReceivedMessage msg in array) { @@ -954,7 +1002,7 @@ public static async Task QueueNoSessionsBatch( public class DrainModeTestJobTopicBatch { - public static async Task TopicNoSessionsBatch( + public static async Task RunAsync( [ServiceBusTrigger(TopicNameKey, FirstSubscriptionNameKey)] ServiceBusReceivedMessage[] array, ServiceBusMessageActions messageActions, CancellationToken cancellationToken, @@ -963,7 +1011,7 @@ public static async Task TopicNoSessionsBatch( Assert.True(array.Length > 0); logger.LogInformation($"DrainModeTestJobBatch.TopicNoSessionsBatch: received {array.Length} messages"); _drainValidationPreDelay.Set(); - await DrainModeHelper.WaitForCancellation(cancellationToken); + await DrainModeHelper.WaitForCancellationAsync(cancellationToken); Assert.True(cancellationToken.IsCancellationRequested); foreach (ServiceBusReceivedMessage msg in array) { @@ -990,7 +1038,7 @@ public static void SBQueueFunction2( public class DrainModeHelper { - public static async Task WaitForCancellation(CancellationToken cancellationToken) + public static async Task WaitForCancellationAsync(CancellationToken cancellationToken) { // Wait until the drain operation begins, signalled by the cancellation token int elapsedTimeMills = 0; diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusSessionsEndToEndTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusSessionsEndToEndTests.cs index 04874fb64327..ce48fcbf0463 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusSessionsEndToEndTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusSessionsEndToEndTests.cs @@ -372,7 +372,7 @@ private IHost BuildSessionHost(bool addCustomProvider = false, bool autoCompl builder.ConfigureWebJobs(b => b.AddServiceBus(sbOptions => { - // Will be disabled for drain mode validation as messages are completed by functoin code to validate draining allows completion + // Will be disabled for drain mode validation as messages are completed by function code to validate draining allows completion sbOptions.AutoCompleteMessages = autoComplete; sbOptions.MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(MaxAutoRenewDurationMin); sbOptions.MaxConcurrentSessions = 1; @@ -464,7 +464,8 @@ public class ServiceBusSessionsTestJobs1 { public static void SBQueue1Trigger( [ServiceBusTrigger(FirstQueueNameKey, IsSessionsEnabled = true)] - ServiceBusReceivedMessage message, int deliveryCount, + ServiceBusReceivedMessage message, + int deliveryCount, ServiceBusSessionMessageActions messageSession, ILogger log, string lockToken) @@ -477,7 +478,8 @@ public static void SBQueue1Trigger( public static void SBSub1Trigger( [ServiceBusTrigger(TopicNameKey, FirstSubscriptionNameKey, IsSessionsEnabled = true)] - ServiceBusReceivedMessage message, int deliveryCount, + ServiceBusReceivedMessage message, + int deliveryCount, ServiceBusSessionMessageActions messageSession, ILogger log, string lockToken) @@ -773,7 +775,10 @@ public override async Task CompleteProcessingMessageAsync( internal class ServiceBusSessionsTestHelper #pragma warning restore SA1402 // File may only contain a single type { - public static void ProcessMessage(ServiceBusReceivedMessage message, ILogger log, EventWaitHandle waitHandle1, + public static void ProcessMessage( + ServiceBusReceivedMessage message, + ILogger log, + EventWaitHandle waitHandle1, EventWaitHandle waitHandle2) { string messageString = message.Body.ToString(); @@ -792,17 +797,13 @@ public static void ProcessMessage(ServiceBusReceivedMessage message, ILogger log public static string GetLogsAsString(List messages) { - if (messages.Count() != 5 && messages.Count() != 10) - { - } - - string reuslt = string.Empty; + string result = string.Empty; foreach (LogMessage message in messages) { - reuslt += message.FormattedMessage + System.Environment.NewLine; + result += message.FormattedMessage + System.Environment.NewLine; } - return reuslt; + return result; } } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs index 4dc19e15dce0..683d9c88f468 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs @@ -11,6 +11,7 @@ using System.Xml; using Azure.Core.TestFramework; using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; using Azure.Messaging.ServiceBus.Tests; using Microsoft.Azure.WebJobs.Host.TestCommon; using Microsoft.Extensions.Configuration; @@ -58,7 +59,7 @@ public class WebJobsServiceBusTestBase internal const int MaxAutoRenewDurationMin = 5; internal static TimeSpan HostShutdownTimeout = TimeSpan.FromSeconds(120); - protected static QueueScope _firstQueueScope; + internal static QueueScope _firstQueueScope; protected static QueueScope _secondaryNamespaceQueueScope; private QueueScope _secondQueueScope; private QueueScope _thirdQueueScope; @@ -78,16 +79,16 @@ protected WebJobsServiceBusTestBase(bool isSession) } /// - /// Performs the tasks needed to initialize the test fixture. This - /// method runs once for the entire fixture, prior to running any tests. + /// Performs the tasks needed to initialize the test. This + /// method runs once for for each test. /// /// [SetUp] public async Task FixtureSetUp() { - _firstQueueScope = await CreateWithQueue(enablePartitioning: false, enableSession: _isSession); - _secondQueueScope = await CreateWithQueue(enablePartitioning: false, enableSession: _isSession); - _thirdQueueScope = await CreateWithQueue(enablePartitioning: false, enableSession: _isSession); + _firstQueueScope = await CreateWithQueue(enablePartitioning: false, enableSession: _isSession, lockDuration: TimeSpan.FromSeconds(15)); + _secondQueueScope = await CreateWithQueue(enablePartitioning: false, enableSession: _isSession, lockDuration: TimeSpan.FromSeconds(15)); + _thirdQueueScope = await CreateWithQueue(enablePartitioning: false, enableSession: _isSession, lockDuration: TimeSpan.FromSeconds(15)); _topicScope = await CreateWithTopic( enablePartitioning: false, enableSession: _isSession, @@ -95,7 +96,8 @@ public async Task FixtureSetUp() _secondaryNamespaceQueueScope = await CreateWithQueue( enablePartitioning: false, enableSession: _isSession, - overrideNamespace: ServiceBusTestEnvironment.Instance.ServiceBusSecondaryNamespace); + overrideNamespace: ServiceBusTestEnvironment.Instance.ServiceBusSecondaryNamespace, + lockDuration: TimeSpan.FromSeconds(15)); _topicSubscriptionCalled1 = new ManualResetEvent(initialState: false); _topicSubscriptionCalled2 = new ManualResetEvent(initialState: false); _waitHandle1 = new ManualResetEvent(initialState: false); @@ -105,8 +107,8 @@ public async Task FixtureSetUp() } /// - /// Performs the tasks needed to cleanup the test fixture after all - /// tests have run. This method runs once for the entire fixture. + /// Performs the tasks needed to cleanup the test after each + /// test has run. /// /// [TearDown] @@ -146,20 +148,25 @@ protected IHost BuildHost( { settings.Add("AzureWebJobsServiceBus", ServiceBusTestEnvironment.Instance.ServiceBusConnectionString); } + var hostBuilder = new HostBuilder() - .ConfigureAppConfiguration(builder => - { - builder.AddInMemoryCollection(settings); - }) - .ConfigureDefaultTestHost(b => - { - b.AddServiceBus(options => options.ClientRetryOptions.TryTimeout = TimeSpan.FromSeconds(10)); - }) - .ConfigureServices(s => - { - s.Configure(opts => opts.ShutdownTimeout = HostShutdownTimeout); - s.AddHostedService(); - }); + .ConfigureServices(s => + { + s.Configure(opts => opts.ShutdownTimeout = HostShutdownTimeout); + // Configure ServiceBusEndToEndTestService before WebJobs stuff so that the ServiceBusEndToEndTestService.StopAsync will be called after + // the WebJobsHost.StopAsync (service that is started first will be stopped last by the IHost). + // This will allow the logs captured in StopAsync to include everything from WebJobs. + s.AddHostedService(); + }) + .ConfigureAppConfiguration(builder => + { + builder.AddInMemoryCollection(settings); + }) + .ConfigureDefaultTestHost(b => + { + b.AddServiceBus(options => options.ClientRetryOptions.TryTimeout = TimeSpan.FromSeconds(10)); + }); + // do this after the defaults so test-specific values will override the defaults configurationDelegate?.Invoke(hostBuilder); IHost host = hostBuilder.Build(); if (startHost) @@ -235,12 +242,22 @@ public Task StartAsync(CancellationToken cancellationToken) return Task.CompletedTask; } - public Task StopAsync(CancellationToken cancellationToken) + public async Task StopAsync(CancellationToken cancellationToken) { var logs = _host.GetTestLoggerProvider().GetAllLogMessages(); - var errors = logs.Where(p => p.Level == LogLevel.Error); + var errors = logs.Where( + p => p.Level == LogLevel.Error && + // Ignore this error that the SDK logs when cancelling batch receive + !p.FormattedMessage.Contains("ReceiveBatchAsync Exception: System.Threading.Tasks.TaskCanceledException")); Assert.IsEmpty(errors, string.Join(",", errors.Select(e => e.FormattedMessage))); - return Task.CompletedTask; + + var client = new ServiceBusAdministrationClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString); + + // wait for a few seconds to allow updated counts to propagate + await Task.Delay(TimeSpan.FromSeconds(2)); + + QueueRuntimeProperties properties = await client.GetQueueRuntimePropertiesAsync(WebJobsServiceBusTestBase._firstQueueScope.QueueName, CancellationToken.None); + Assert.AreEqual(0, properties.TotalMessageCount); } } } \ No newline at end of file