diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/api/Microsoft.Azure.WebJobs.Extensions.ServiceBus.netstandard2.0.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/api/Microsoft.Azure.WebJobs.Extensions.ServiceBus.netstandard2.0.cs index 41df15111a39..62ba01da5a8b 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/api/Microsoft.Azure.WebJobs.Extensions.ServiceBus.netstandard2.0.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/api/Microsoft.Azure.WebJobs.Extensions.ServiceBus.netstandard2.0.cs @@ -51,9 +51,8 @@ public enum EntityType } public partial class MessageProcessor { - public MessageProcessor(Azure.Messaging.ServiceBus.ServiceBusProcessor processor, Azure.Messaging.ServiceBus.ServiceBusReceiver receiver) { } - protected internal Azure.Messaging.ServiceBus.ServiceBusProcessor Processor { get { throw null; } set { } } - protected internal Azure.Messaging.ServiceBus.ServiceBusReceiver Receiver { get { throw null; } set { } } + public MessageProcessor(Azure.Messaging.ServiceBus.ServiceBusProcessor processor) { } + protected internal Azure.Messaging.ServiceBus.ServiceBusProcessor Processor { get { throw null; } } public virtual System.Threading.Tasks.Task BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; } public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; } } @@ -108,9 +107,8 @@ public void Configure(Microsoft.Azure.WebJobs.IWebJobsBuilder builder) { } } public partial class SessionMessageProcessor { - public SessionMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, Azure.Messaging.ServiceBus.ServiceBusSessionProcessor processor) { } - protected internal Azure.Messaging.ServiceBus.ServiceBusClient Client { get { throw null; } set { } } - protected internal Azure.Messaging.ServiceBus.ServiceBusSessionProcessor Processor { get { throw null; } set { } } + public SessionMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusSessionProcessor processor) { } + protected internal Azure.Messaging.ServiceBus.ServiceBusSessionProcessor Processor { get { throw null; } } public virtual System.Threading.Tasks.Task BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusSessionMessageActions sessionActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; } public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusSessionMessageActions sessionActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; } } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessageProcessor.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessageProcessor.cs index 11eb60004332..e31fbc226fe4 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessageProcessor.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessageProcessor.cs @@ -17,23 +17,16 @@ public class MessageProcessor /// /// Initializes a new instance of . /// - /// The to use for single dispatch functions. - /// The to use for multiple dispatch functions. - public MessageProcessor(ServiceBusProcessor processor, ServiceBusReceiver receiver) + /// The to use for processing messages from Service Bus. + public MessageProcessor(ServiceBusProcessor processor) { Processor = processor ?? throw new ArgumentNullException(nameof(processor)); - Receiver = receiver ?? throw new ArgumentNullException(nameof(processor)); } /// /// Gets or sets the that will be used by the . /// - protected internal ServiceBusProcessor Processor { get; set; } - - /// - /// Gets or sets the that will be used by the . - /// - protected internal ServiceBusReceiver Receiver { get; set; } + protected internal ServiceBusProcessor Processor { get; } /// /// This method is called when there is a new message to process, before the job function is invoked. diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs index 1988614e3f0f..0df9247aba36 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs @@ -56,11 +56,14 @@ public virtual MessageProcessor CreateMessageProcessor(ServiceBusClient client, Argument.AssertNotNull(client, nameof(client)); Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath)); - return new MessageProcessor(CreateProcessor(client, entityPath), GetOrAddMessageReceiver(client, entityPath)); + return new MessageProcessor(CreateProcessor(client, entityPath)); } public virtual ServiceBusProcessor CreateProcessor(ServiceBusClient client, string entityPath) { + Argument.AssertNotNull(client, nameof(client)); + Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath)); + // processors cannot be shared across listeners since there is a limit of 1 event handler in the Service Bus SDK. ServiceBusProcessor processor; @@ -81,6 +84,7 @@ public virtual ServiceBusProcessor CreateProcessor(ServiceBusClient client, stri public virtual ServiceBusSender CreateMessageSender(ServiceBusClient client, string entityPath) { + Argument.AssertNotNull(client, nameof(client)); Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath)); return _messageSenderCache.GetOrAdd(entityPath, client.CreateSender(entityPath)); @@ -88,6 +92,7 @@ public virtual ServiceBusSender CreateMessageSender(ServiceBusClient client, str public virtual ServiceBusReceiver CreateBatchMessageReceiver(ServiceBusClient client, string entityPath) { + Argument.AssertNotNull(client, nameof(client)); Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath)); return _messageReceiverCache.GetOrAdd(entityPath, (_) => client.CreateReceiver( @@ -100,13 +105,17 @@ public virtual ServiceBusReceiver CreateBatchMessageReceiver(ServiceBusClient cl public virtual SessionMessageProcessor CreateSessionMessageProcessor(ServiceBusClient client, string entityPath) { + Argument.AssertNotNull(client, nameof(client)); Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath)); - return new SessionMessageProcessor(client, CreateSessionProcessor(client, entityPath)); + return new SessionMessageProcessor(CreateSessionProcessor(client, entityPath)); } public virtual ServiceBusSessionProcessor CreateSessionProcessor(ServiceBusClient client, string entityPath) { + Argument.AssertNotNull(client, nameof(client)); + Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath)); + ServiceBusSessionProcessor processor; if (ServiceBusEntityPathHelper.ParseEntityType(entityPath) == EntityType.Topic) { @@ -122,15 +131,5 @@ public virtual ServiceBusSessionProcessor CreateSessionProcessor(ServiceBusClien processor.ProcessErrorAsync += _options.ExceptionReceivedHandler; return processor; } - - private ServiceBusReceiver GetOrAddMessageReceiver(ServiceBusClient client, string entityPath) - { - return _messageReceiverCache.GetOrAdd(entityPath, (_) => client.CreateReceiver( - entityPath, - new ServiceBusReceiverOptions - { - PrefetchCount = _options.PrefetchCount - })); - } } } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/SessionMessageProcessor.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/SessionMessageProcessor.cs index af0a61e0cc3a..3d54172a1b4e 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/SessionMessageProcessor.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/SessionMessageProcessor.cs @@ -11,22 +11,19 @@ namespace Microsoft.Azure.WebJobs.ServiceBus { public class SessionMessageProcessor { - public SessionMessageProcessor(ServiceBusClient client, ServiceBusSessionProcessor processor) + /// + /// Initializes a new instance of . + /// + /// The to use for processing messages from Service Bus. + public SessionMessageProcessor(ServiceBusSessionProcessor processor) { Processor = processor ?? throw new ArgumentNullException(nameof(processor)); - Client = client ?? throw new ArgumentNullException(nameof(client)); } /// - /// Gets or sets the that will be used by the . - /// - protected internal ServiceBusSessionProcessor Processor { get; set; } - - /// - /// Gets or sets the that will be used by the to - /// accept new sessions for multiple dispatch functions.. + /// Gets the that will be used by the . /// - protected internal ServiceBusClient Client { get; set; } + protected internal ServiceBusSessionProcessor Processor { get; } /// /// This method is called when there is a new message to process, before the job function is invoked. diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusListenerTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusListenerTests.cs index 7bb7b7a0c446..0f73c8a78cc4 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusListenerTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusListenerTests.cs @@ -47,7 +47,7 @@ public ServiceBusListenerTests() { ExceptionHandler = ExceptionReceivedHandler }; - _mockMessageProcessor = new Mock(MockBehavior.Strict, processor, receiver); + _mockMessageProcessor = new Mock(MockBehavior.Strict, processor); _mockMessagingProvider = new Mock(new OptionsWrapper(config)); _mockClientFactory = new Mock(configuration, Mock.Of(), _mockMessagingProvider.Object, new AzureEventSourceLogForwarder(new NullLoggerFactory())); diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs index 83702fb4225d..e1f25c2f7836 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs @@ -47,7 +47,7 @@ public void Setup() ServiceBusProcessorOptions processorOptions = new ServiceBusProcessorOptions(); ServiceBusProcessor messageProcessor = _client.CreateProcessor(_entityPath); ServiceBusReceiver receiver = _client.CreateReceiver(_entityPath); - _mockMessageProcessor = new Mock(MockBehavior.Strict, messageProcessor, receiver); + _mockMessageProcessor = new Mock(MockBehavior.Strict, messageProcessor); var configuration = ConfigurationUtilities.CreateConfiguration(new KeyValuePair(_connection, _testConnection)); _serviceBusOptions = new ServiceBusOptions(); diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessageProcessorTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessageProcessorTests.cs index 4d7a0b549a74..e54ecc9b543e 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessageProcessorTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessageProcessorTests.cs @@ -19,7 +19,7 @@ public MessageProcessorTests() var client = new ServiceBusClient("Endpoint = sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123="); var processor = client.CreateProcessor("test-entity"); processor.ProcessErrorAsync += ExceptionReceivedHandler; - _processor = new MessageProcessor(processor, client.CreateReceiver("test-entity")); + _processor = new MessageProcessor(processor); } [Test] diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs index d463359df029..9bc82247a0b6 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs @@ -806,15 +806,15 @@ public override MessageProcessor CreateMessageProcessor(ServiceBusClient client, // TODO decide whether it makes sense to still default error handler when there is a custom provider // currently user needs to set it. processor.ProcessErrorAsync += args => Task.CompletedTask; - return new CustomMessageProcessor(processor, receiver, _logger); + return new CustomMessageProcessor(processor, _logger); } private class CustomMessageProcessor : MessageProcessor { private readonly ILogger _logger; - public CustomMessageProcessor(ServiceBusProcessor processor, ServiceBusReceiver receiver, ILogger logger) - : base(processor, receiver) + public CustomMessageProcessor(ServiceBusProcessor processor, ILogger logger) + : base(processor) { _logger = logger; } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusSessionsEndToEndTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusSessionsEndToEndTests.cs index a75a1e875d6d..9dbc6533b1bd 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusSessionsEndToEndTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusSessionsEndToEndTests.cs @@ -717,7 +717,8 @@ public CustomMessagingProvider( _logger = loggerFactory?.CreateLogger(CustomMessagingCategory); } - public override SessionMessageProcessor CreateSessionMessageProcessor(ServiceBusClient client, + public override SessionMessageProcessor CreateSessionMessageProcessor( + ServiceBusClient client, string entityPath) { ServiceBusSessionProcessor processor; @@ -732,17 +733,17 @@ public override SessionMessageProcessor CreateSessionMessageProcessor(ServiceBus } processor.ProcessErrorAsync += args => Task.CompletedTask; - return new CustomSessionMessageProcessor(client, processor, _logger); + return new CustomSessionMessageProcessor(processor, _logger); } private class CustomSessionMessageProcessor : SessionMessageProcessor { private readonly ILogger _logger; - public CustomSessionMessageProcessor(ServiceBusClient client, + public CustomSessionMessageProcessor( ServiceBusSessionProcessor sessionProcessor, ILogger logger) - : base(client, sessionProcessor) + : base(sessionProcessor) { _logger = logger; } @@ -757,7 +758,8 @@ public override async Task BeginProcessingMessageAsync( public override async Task CompleteProcessingMessageAsync( ServiceBusSessionMessageActions sessionActions, - ServiceBusReceivedMessage message, Executors.FunctionResult result, + ServiceBusReceivedMessage message, + Executors.FunctionResult result, CancellationToken cancellationToken) { _logger?.LogInformation("Custom processor End called!" + message.Body.ToString());