From c0a421dc5769d5c5fbc9505cfb1a7e5f1bc7db70 Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Fri, 22 Jan 2021 18:04:29 -0800 Subject: [PATCH] Add message batch factory (#18146) * Add message batch factory * Export API --- ...ure.Messaging.ServiceBus.netstandard2.0.cs | 7 +- .../src/ApiCompatBaseline.txt | 7 + .../src/Primitives/ServiceBusModelFactory.cs | 127 +++++++++++++++++- .../tests/Message/MessageBatchTests.cs | 53 ++++++++ 4 files changed, 182 insertions(+), 12 deletions(-) create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/src/ApiCompatBaseline.txt create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageBatchTests.cs diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs index 152ea50e8cc1..77b9d745f208 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs @@ -180,18 +180,13 @@ internal ServiceBusMessageBatch() { } public void Dispose() { } public bool TryAddMessage(Azure.Messaging.ServiceBus.ServiceBusMessage message) { throw null; } } - [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public static partial class ServiceBusModelFactory { - [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public static Azure.Messaging.ServiceBus.Administration.QueueProperties QueueProperties(string name, System.TimeSpan lockDuration = default(System.TimeSpan), long maxSizeInMegabytes = (long)0, bool requiresDuplicateDetection = false, bool requiresSession = false, System.TimeSpan defaultMessageTimeToLive = default(System.TimeSpan), System.TimeSpan autoDeleteOnIdle = default(System.TimeSpan), bool deadLetteringOnMessageExpiration = false, System.TimeSpan duplicateDetectionHistoryTimeWindow = default(System.TimeSpan), int maxDeliveryCount = 0, bool enableBatchedOperations = false, Azure.Messaging.ServiceBus.Administration.EntityStatus status = default(Azure.Messaging.ServiceBus.Administration.EntityStatus), string forwardTo = null, string forwardDeadLetteredMessagesTo = null, string userMetadata = null, bool enablePartitioning = false) { throw null; } - [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public static Azure.Messaging.ServiceBus.Administration.RuleProperties RuleProperties(string name, Azure.Messaging.ServiceBus.Administration.RuleFilter filter = null, Azure.Messaging.ServiceBus.Administration.RuleAction action = null) { throw null; } - [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] + public static Azure.Messaging.ServiceBus.ServiceBusMessageBatch ServiceBusMessageBatch(long batchSizeBytes, System.Collections.Generic.IList batchMessageStore, Azure.Messaging.ServiceBus.CreateMessageBatchOptions batchOptions = null, System.Func tryAddCallback = null) { throw null; } public static Azure.Messaging.ServiceBus.ServiceBusReceivedMessage ServiceBusReceivedMessage(System.BinaryData body = null, string messageId = null, string partitionKey = null, string viaPartitionKey = null, string sessionId = null, string replyToSessionId = null, System.TimeSpan timeToLive = default(System.TimeSpan), string correlationId = null, string subject = null, string to = null, string contentType = null, string replyTo = null, System.DateTimeOffset scheduledEnqueueTime = default(System.DateTimeOffset), System.Collections.Generic.IDictionary properties = null, System.Guid lockTokenGuid = default(System.Guid), int deliveryCount = 0, System.DateTimeOffset lockedUntil = default(System.DateTimeOffset), long sequenceNumber = (long)-1, string deadLetterSource = null, long enqueuedSequenceNumber = (long)0, System.DateTimeOffset enqueuedTime = default(System.DateTimeOffset)) { throw null; } - [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public static Azure.Messaging.ServiceBus.Administration.SubscriptionProperties SubscriptionProperties(string topicName, string subscriptionName, System.TimeSpan lockDuration = default(System.TimeSpan), bool requiresSession = false, System.TimeSpan defaultMessageTimeToLive = default(System.TimeSpan), System.TimeSpan autoDeleteOnIdle = default(System.TimeSpan), bool deadLetteringOnMessageExpiration = false, int maxDeliveryCount = 0, bool enableBatchedOperations = false, Azure.Messaging.ServiceBus.Administration.EntityStatus status = default(Azure.Messaging.ServiceBus.Administration.EntityStatus), string forwardTo = null, string forwardDeadLetteredMessagesTo = null, string userMetadata = null) { throw null; } - [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public static Azure.Messaging.ServiceBus.Administration.TopicProperties TopicProperties(string name, long maxSizeInMegabytes = (long)0, bool requiresDuplicateDetection = false, System.TimeSpan defaultMessageTimeToLive = default(System.TimeSpan), System.TimeSpan autoDeleteOnIdle = default(System.TimeSpan), System.TimeSpan duplicateDetectionHistoryTimeWindow = default(System.TimeSpan), bool enableBatchedOperations = false, Azure.Messaging.ServiceBus.Administration.EntityStatus status = default(Azure.Messaging.ServiceBus.Administration.EntityStatus), bool enablePartitioning = false) { throw null; } } public partial class ServiceBusProcessor : System.IAsyncDisposable diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/ApiCompatBaseline.txt b/sdk/servicebus/Azure.Messaging.ServiceBus/src/ApiCompatBaseline.txt new file mode 100644 index 000000000000..4e3c876902d2 --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/ApiCompatBaseline.txt @@ -0,0 +1,7 @@ +# Baselining these until the next GA as they are not actual compat issues. +CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory.TopicProperties(System.String, System.Int64, System.Boolean, System.TimeSpan, System.TimeSpan, System.TimeSpan, System.Boolean, Azure.Messaging.ServiceBus.Administration.EntityStatus, System.Boolean)' in the contract but not the implementation. +CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory.ServiceBusReceivedMessage(System.BinaryData, System.String, System.String, System.String, System.String, System.String, System.TimeSpan, System.String, System.String, System.String, System.String, System.String, System.DateTimeOffset, System.Collections.Generic.IDictionary, System.Guid, System.Int32, System.DateTimeOffset, System.Int64, System.String, System.Int64, System.DateTimeOffset)' in the contract but not the implementation. +CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory.RuleProperties(System.String, Azure.Messaging.ServiceBus.Administration.RuleFilter, Azure.Messaging.ServiceBus.Administration.RuleAction)' in the contract but not the implementation. +CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory.QueueProperties(System.String, System.TimeSpan, System.Int64, System.Boolean, System.Boolean, System.TimeSpan, System.TimeSpan, System.Boolean, System.TimeSpan, System.Int32, System.Boolean, Azure.Messaging.ServiceBus.Administration.EntityStatus, System.String, System.String, System.String, System.Boolean)' in the contract but not the implementation. +CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory' in the contract but not the implementation. +CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory.SubscriptionProperties(System.String, System.String, System.TimeSpan, System.Boolean, System.TimeSpan, System.TimeSpan, System.Boolean, System.Int32, System.Boolean, Azure.Messaging.ServiceBus.Administration.EntityStatus, System.String, System.String, System.String)' in the contract but not the implementation. diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs index f78b649c6391..3c1376623925 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs @@ -7,19 +7,18 @@ using Azure.Core.Amqp; using Azure.Messaging.ServiceBus.Amqp; using Azure.Messaging.ServiceBus.Administration; +using Azure.Messaging.ServiceBus.Core; namespace Azure.Messaging.ServiceBus { /// /// This class contains methods to create certain ServiceBus models. /// - [EditorBrowsable(EditorBrowsableState.Never)] public static class ServiceBusModelFactory { /// /// Creates a new ServiceBusReceivedMessage instance for mocking. /// - [EditorBrowsable(EditorBrowsableState.Never)] public static ServiceBusReceivedMessage ServiceBusReceivedMessage( BinaryData body = default, string messageId = default, @@ -108,7 +107,6 @@ public static ServiceBusReceivedMessage ServiceBusReceivedMessage( /// /// Creates a new instance for mocking. /// - [EditorBrowsable(EditorBrowsableState.Never)] public static QueueProperties QueueProperties( string name, TimeSpan lockDuration = default, @@ -149,7 +147,6 @@ public static QueueProperties QueueProperties( /// /// Creates a new instance for mocking. /// - [EditorBrowsable(EditorBrowsableState.Never)] public static TopicProperties TopicProperties( string name, long maxSizeInMegabytes = default, @@ -176,7 +173,6 @@ public static TopicProperties TopicProperties( /// /// Creates a new instance for mocking. /// - [EditorBrowsable(EditorBrowsableState.Never)] public static SubscriptionProperties SubscriptionProperties( string topicName, string subscriptionName, @@ -209,7 +205,6 @@ public static SubscriptionProperties SubscriptionProperties( /// /// Creates a new instance for mocking. /// - [EditorBrowsable(EditorBrowsableState.Never)] public static RuleProperties RuleProperties( string name, RuleFilter filter = default, @@ -218,5 +213,125 @@ public static RuleProperties RuleProperties( { Action = action }; + + /// + /// Initializes a new instance of the class. + /// + /// + /// The size, in bytes, that the batch should report; this is a static value and will not mutate as messages are added. + /// A list to which messages will be added when calls are successful. + /// The set of options to consider when creating this batch. + /// A function that will be invoked when is called; + /// the return of this callback represents the result of . + /// If not provided, all events will be accepted into the batch. + /// + /// The instance that was created. + /// + public static ServiceBusMessageBatch ServiceBusMessageBatch(long batchSizeBytes, + IList batchMessageStore, + CreateMessageBatchOptions batchOptions = default, + Func tryAddCallback = default) + { + tryAddCallback ??= _ => true; + batchOptions ??= new CreateMessageBatchOptions(); + batchOptions.MaxSizeInBytes ??= long.MaxValue; + + var transportBatch = new ListTransportBatch(batchOptions.MaxSizeInBytes.Value, batchSizeBytes, batchMessageStore, tryAddCallback); + return new ServiceBusMessageBatch(transportBatch); + } + + /// + /// Allows for the transport event batch created by the factory to be injected for testing purposes. + /// + /// + private sealed class ListTransportBatch : TransportMessageBatch + { + /// The backing store for storing events in the batch. + private readonly IList _backingStore; + + /// A callback to be invoked when an adding an event via + private readonly Func _tryAddCallback; + + /// + /// The maximum size allowed for the batch, in bytes. This includes the events in the batch as + /// well as any overhead for the batch itself when sent to the Event Hubs service. + /// + /// + public override long MaxSizeInBytes { get; } + + /// + /// The size of the batch, in bytes, as it will be sent to the Event Hubs + /// service. + /// + /// + public override long SizeInBytes { get; } + + /// + /// The count of events contained in the batch. + /// + /// + public override int Count => _backingStore.Count; + + /// + /// Initializes a new instance of the class. + /// + /// + /// The maximum size allowed for the batch, in bytes. + /// The size of the batch, in bytes; this will be treated as a static value for the property. + /// The backing store for holding events in the batch. + /// A callback for deciding if a TryAdd attempt is successful. + /// + internal ListTransportBatch(long maxSizeInBytes, + long sizeInBytes, + IList backingStore, + Func tryAddCallback) => + (MaxSizeInBytes, SizeInBytes, _backingStore, _tryAddCallback) = (maxSizeInBytes, sizeInBytes, backingStore, tryAddCallback); + + /// + /// Attempts to add an event to the batch, ensuring that the size + /// of the batch does not exceed its maximum. + /// + /// + /// The event to attempt to add to the batch. + /// + /// true if the event was added; otherwise, false. + /// + public override bool TryAddMessage(ServiceBusMessage message) + { + if (_tryAddCallback(message)) + { + _backingStore.Add(message); + return true; + } + + return false; + } + + /// + /// Clears the batch, removing all events and resetting the + /// available size. + /// + /// + public override void Clear() => _backingStore.Clear(); + + /// + /// Represents the batch as an enumerable set of transport-specific + /// representations of an event. + /// + /// + /// The transport-specific event representation being requested. + /// + /// The set of events as an enumerable of the requested type. + /// + public override IEnumerable AsEnumerable() => (IEnumerable)_backingStore; + + /// + /// Performs the task needed to clean up resources used by the . + /// + /// + public override void Dispose() + { + } + } } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageBatchTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageBatchTests.cs new file mode 100644 index 000000000000..44e6f4d50d21 --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageBatchTests.cs @@ -0,0 +1,53 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using NUnit.Framework; + +namespace Azure.Messaging.ServiceBus.Tests.Message +{ + public class MessageBatchTests + { + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public void EventDataBatchRespectsTheTryAddCallback() + { + var eventLimit = 3; + var store = new List(); + var batch = ServiceBusModelFactory.ServiceBusMessageBatch(5, store, tryAddCallback: _ => store.Count < eventLimit); + + while (store.Count < eventLimit) + { + Assert.That(() => batch.TryAddMessage(new ServiceBusMessage(new BinaryData("Test"))), Is.True, $"The batch contains { store.Count } events; adding another should be permitted."); + } + + Assert.That(store.Count, Is.EqualTo(eventLimit), "The batch should be at its limit."); + Assert.That(() => batch.TryAddMessage(new ServiceBusMessage(new BinaryData("Too many"))), Is.False, "The batch is full; it should not be possible to add a new event."); + Assert.That(() => batch.TryAddMessage(new ServiceBusMessage(new BinaryData("Too many"))), Is.False, "The batch is full; a second attempt to add a new event should not succeed."); + + Assert.That(store.Count, Is.EqualTo(eventLimit), "The batch should be at its limit after the failed TryAdd attempts."); + Assert.That(batch.AsEnumerable(), Is.EquivalentTo(store), "The batch enumerable should reflect the events in the backing store."); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public void EventDataBatchIsSafeToDispose() + { + var size = 1024; + var store = new List { new ServiceBusMessage(Array.Empty()), new ServiceBusMessage(Array.Empty()) }; + var options = new CreateMessageBatchOptions { MaxSizeInBytes = 2048 }; + var batch = ServiceBusModelFactory.ServiceBusMessageBatch(size, store, options, _ => false); + + Assert.That(() => batch.Dispose(), Throws.Nothing); + } + } +}