Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove receiver from MessageProcessor constructor #21054

Merged
merged 1 commit into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ public enum EntityType
}
public partial class MessageProcessor
{
public MessageProcessor(Azure.Messaging.ServiceBus.ServiceBusProcessor processor, Azure.Messaging.ServiceBus.ServiceBusReceiver receiver) { }
protected internal Azure.Messaging.ServiceBus.ServiceBusProcessor Processor { get { throw null; } set { } }
protected internal Azure.Messaging.ServiceBus.ServiceBusReceiver Receiver { get { throw null; } set { } }
public MessageProcessor(Azure.Messaging.ServiceBus.ServiceBusProcessor processor) { }
protected internal Azure.Messaging.ServiceBus.ServiceBusProcessor Processor { get { throw null; } }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also remove setter for now as it can be set in the ctor.

public virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
}
Expand Down Expand Up @@ -108,9 +107,8 @@ public void Configure(Microsoft.Azure.WebJobs.IWebJobsBuilder builder) { }
}
public partial class SessionMessageProcessor
{
public SessionMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, Azure.Messaging.ServiceBus.ServiceBusSessionProcessor processor) { }
protected internal Azure.Messaging.ServiceBus.ServiceBusClient Client { get { throw null; } set { } }
protected internal Azure.Messaging.ServiceBus.ServiceBusSessionProcessor Processor { get { throw null; } set { } }
public SessionMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusSessionProcessor processor) { }
protected internal Azure.Messaging.ServiceBus.ServiceBusSessionProcessor Processor { get { throw null; } }
public virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusSessionMessageActions sessionActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusSessionMessageActions sessionActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,16 @@ public class MessageProcessor
/// <summary>
/// Initializes a new instance of <see cref="MessageProcessor"/>.
/// </summary>
/// <param name="processor">The <see cref="ServiceBusProcessor"/> to use for single dispatch functions.</param>
/// <param name="receiver">The <see cref="ServiceBusReceiver"/> to use for multiple dispatch functions.</param>
public MessageProcessor(ServiceBusProcessor processor, ServiceBusReceiver receiver)
/// <param name="processor">The <see cref="ServiceBusProcessor"/> to use for processing messages from Service Bus.</param>
public MessageProcessor(ServiceBusProcessor processor)
{
Processor = processor ?? throw new ArgumentNullException(nameof(processor));
Receiver = receiver ?? throw new ArgumentNullException(nameof(processor));
}

/// <summary>
/// Gets or sets the <see cref="ServiceBusProcessor"/> that will be used by the <see cref="Processor"/>.
/// </summary>
protected internal ServiceBusProcessor Processor { get; set; }

/// <summary>
/// Gets or sets the <see cref="ServiceBusReceiver"/> that will be used by the <see cref="Processor"/>.
/// </summary>
protected internal ServiceBusReceiver Receiver { get; set; }
protected internal ServiceBusProcessor Processor { get; }

/// <summary>
/// This method is called when there is a new message to process, before the job function is invoked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ public virtual MessageProcessor CreateMessageProcessor(ServiceBusClient client,
Argument.AssertNotNull(client, nameof(client));
Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath));

return new MessageProcessor(CreateProcessor(client, entityPath), GetOrAddMessageReceiver(client, entityPath));
return new MessageProcessor(CreateProcessor(client, entityPath));
}

public virtual ServiceBusProcessor CreateProcessor(ServiceBusClient client, string entityPath)
{
Argument.AssertNotNull(client, nameof(client));
Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath));

// processors cannot be shared across listeners since there is a limit of 1 event handler in the Service Bus SDK.

ServiceBusProcessor processor;
Expand All @@ -81,13 +84,15 @@ public virtual ServiceBusProcessor CreateProcessor(ServiceBusClient client, stri

public virtual ServiceBusSender CreateMessageSender(ServiceBusClient client, string entityPath)
{
Argument.AssertNotNull(client, nameof(client));
Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath));

return _messageSenderCache.GetOrAdd(entityPath, client.CreateSender(entityPath));
}

public virtual ServiceBusReceiver CreateBatchMessageReceiver(ServiceBusClient client, string entityPath)
{
Argument.AssertNotNull(client, nameof(client));
Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath));

return _messageReceiverCache.GetOrAdd(entityPath, (_) => client.CreateReceiver(
Expand All @@ -100,13 +105,17 @@ public virtual ServiceBusReceiver CreateBatchMessageReceiver(ServiceBusClient cl

public virtual SessionMessageProcessor CreateSessionMessageProcessor(ServiceBusClient client, string entityPath)
{
Argument.AssertNotNull(client, nameof(client));
Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath));

return new SessionMessageProcessor(client, CreateSessionProcessor(client, entityPath));
return new SessionMessageProcessor(CreateSessionProcessor(client, entityPath));
}

public virtual ServiceBusSessionProcessor CreateSessionProcessor(ServiceBusClient client, string entityPath)
{
Argument.AssertNotNull(client, nameof(client));
Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath));

ServiceBusSessionProcessor processor;
if (ServiceBusEntityPathHelper.ParseEntityType(entityPath) == EntityType.Topic)
{
Expand All @@ -122,15 +131,5 @@ public virtual ServiceBusSessionProcessor CreateSessionProcessor(ServiceBusClien
processor.ProcessErrorAsync += _options.ExceptionReceivedHandler;
return processor;
}

private ServiceBusReceiver GetOrAddMessageReceiver(ServiceBusClient client, string entityPath)
{
return _messageReceiverCache.GetOrAdd(entityPath, (_) => client.CreateReceiver(
entityPath,
new ServiceBusReceiverOptions
{
PrefetchCount = _options.PrefetchCount
}));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,19 @@ namespace Microsoft.Azure.WebJobs.ServiceBus
{
public class SessionMessageProcessor
{
public SessionMessageProcessor(ServiceBusClient client, ServiceBusSessionProcessor processor)
/// <summary>
/// Initializes a new instance of <see cref="SessionMessageProcessor"/>.
/// </summary>
/// <param name="processor">The <see cref="ServiceBusSessionProcessor"/> to use for processing messages from Service Bus.</param>
public SessionMessageProcessor(ServiceBusSessionProcessor processor)
{
Processor = processor ?? throw new ArgumentNullException(nameof(processor));
Client = client ?? throw new ArgumentNullException(nameof(client));
}

/// <summary>
/// Gets or sets the <see cref="ServiceBusSessionProcessor"/> that will be used by the <see cref="SessionMessageProcessor"/>.
/// </summary>
protected internal ServiceBusSessionProcessor Processor { get; set; }

/// <summary>
/// Gets or sets the <see cref="ServiceBusClient"/> that will be used by the <see cref="SessionMessageProcessor"/> to
/// accept new sessions for multiple dispatch functions..
/// Gets the <see cref="ServiceBusSessionProcessor"/> that will be used by the <see cref="SessionMessageProcessor"/>.
/// </summary>
protected internal ServiceBusClient Client { get; set; }
protected internal ServiceBusSessionProcessor Processor { get; }

/// <summary>
/// This method is called when there is a new message to process, before the job function is invoked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public ServiceBusListenerTests()
{
ExceptionHandler = ExceptionReceivedHandler
};
_mockMessageProcessor = new Mock<MessageProcessor>(MockBehavior.Strict, processor, receiver);
_mockMessageProcessor = new Mock<MessageProcessor>(MockBehavior.Strict, processor);

_mockMessagingProvider = new Mock<MessagingProvider>(new OptionsWrapper<ServiceBusOptions>(config));
_mockClientFactory = new Mock<ServiceBusClientFactory>(configuration, Mock.Of<AzureComponentFactory>(), _mockMessagingProvider.Object, new AzureEventSourceLogForwarder(new NullLoggerFactory()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void Setup()
ServiceBusProcessorOptions processorOptions = new ServiceBusProcessorOptions();
ServiceBusProcessor messageProcessor = _client.CreateProcessor(_entityPath);
ServiceBusReceiver receiver = _client.CreateReceiver(_entityPath);
_mockMessageProcessor = new Mock<MessageProcessor>(MockBehavior.Strict, messageProcessor, receiver);
_mockMessageProcessor = new Mock<MessageProcessor>(MockBehavior.Strict, messageProcessor);
var configuration = ConfigurationUtilities.CreateConfiguration(new KeyValuePair<string, string>(_connection, _testConnection));

_serviceBusOptions = new ServiceBusOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public MessageProcessorTests()
var client = new ServiceBusClient("Endpoint = sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=");
var processor = client.CreateProcessor("test-entity");
processor.ProcessErrorAsync += ExceptionReceivedHandler;
_processor = new MessageProcessor(processor, client.CreateReceiver("test-entity"));
_processor = new MessageProcessor(processor);
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -806,15 +806,15 @@ public override MessageProcessor CreateMessageProcessor(ServiceBusClient client,
// TODO decide whether it makes sense to still default error handler when there is a custom provider
// currently user needs to set it.
processor.ProcessErrorAsync += args => Task.CompletedTask;
return new CustomMessageProcessor(processor, receiver, _logger);
return new CustomMessageProcessor(processor, _logger);
}

private class CustomMessageProcessor : MessageProcessor
{
private readonly ILogger _logger;

public CustomMessageProcessor(ServiceBusProcessor processor, ServiceBusReceiver receiver, ILogger logger)
: base(processor, receiver)
public CustomMessageProcessor(ServiceBusProcessor processor, ILogger logger)
: base(processor)
{
_logger = logger;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,8 @@ public CustomMessagingProvider(
_logger = loggerFactory?.CreateLogger(CustomMessagingCategory);
}

public override SessionMessageProcessor CreateSessionMessageProcessor(ServiceBusClient client,
public override SessionMessageProcessor CreateSessionMessageProcessor(
ServiceBusClient client,
string entityPath)
{
ServiceBusSessionProcessor processor;
Expand All @@ -732,17 +733,17 @@ public override SessionMessageProcessor CreateSessionMessageProcessor(ServiceBus
}

processor.ProcessErrorAsync += args => Task.CompletedTask;
return new CustomSessionMessageProcessor(client, processor, _logger);
return new CustomSessionMessageProcessor(processor, _logger);
}

private class CustomSessionMessageProcessor : SessionMessageProcessor
{
private readonly ILogger _logger;

public CustomSessionMessageProcessor(ServiceBusClient client,
public CustomSessionMessageProcessor(
ServiceBusSessionProcessor sessionProcessor,
ILogger logger)
: base(client, sessionProcessor)
: base(sessionProcessor)
{
_logger = logger;
}
Expand All @@ -757,7 +758,8 @@ public override async Task<bool> BeginProcessingMessageAsync(

public override async Task CompleteProcessingMessageAsync(
ServiceBusSessionMessageActions sessionActions,
ServiceBusReceivedMessage message, Executors.FunctionResult result,
ServiceBusReceivedMessage message,
Executors.FunctionResult result,
CancellationToken cancellationToken)
{
_logger?.LogInformation("Custom processor End called!" + message.Body.ToString());
Expand Down