Skip to content

Commit

Permalink
Add message batch factory (#18146)
Browse files Browse the repository at this point in the history
* Add message batch factory

* Export API
  • Loading branch information
JoshLove-msft authored Jan 23, 2021
1 parent 0f409d2 commit c0a421d
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,13 @@ internal ServiceBusMessageBatch() { }
public void Dispose() { }
public bool TryAddMessage(Azure.Messaging.ServiceBus.ServiceBusMessage message) { throw null; }
}
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public static partial class ServiceBusModelFactory
{
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public static Azure.Messaging.ServiceBus.Administration.QueueProperties QueueProperties(string name, System.TimeSpan lockDuration = default(System.TimeSpan), long maxSizeInMegabytes = (long)0, bool requiresDuplicateDetection = false, bool requiresSession = false, System.TimeSpan defaultMessageTimeToLive = default(System.TimeSpan), System.TimeSpan autoDeleteOnIdle = default(System.TimeSpan), bool deadLetteringOnMessageExpiration = false, System.TimeSpan duplicateDetectionHistoryTimeWindow = default(System.TimeSpan), int maxDeliveryCount = 0, bool enableBatchedOperations = false, Azure.Messaging.ServiceBus.Administration.EntityStatus status = default(Azure.Messaging.ServiceBus.Administration.EntityStatus), string forwardTo = null, string forwardDeadLetteredMessagesTo = null, string userMetadata = null, bool enablePartitioning = false) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public static Azure.Messaging.ServiceBus.Administration.RuleProperties RuleProperties(string name, Azure.Messaging.ServiceBus.Administration.RuleFilter filter = null, Azure.Messaging.ServiceBus.Administration.RuleAction action = null) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public static Azure.Messaging.ServiceBus.ServiceBusMessageBatch ServiceBusMessageBatch(long batchSizeBytes, System.Collections.Generic.IList<Azure.Messaging.ServiceBus.ServiceBusMessage> batchMessageStore, Azure.Messaging.ServiceBus.CreateMessageBatchOptions batchOptions = null, System.Func<Azure.Messaging.ServiceBus.ServiceBusMessage, bool> tryAddCallback = null) { throw null; }
public static Azure.Messaging.ServiceBus.ServiceBusReceivedMessage ServiceBusReceivedMessage(System.BinaryData body = null, string messageId = null, string partitionKey = null, string viaPartitionKey = null, string sessionId = null, string replyToSessionId = null, System.TimeSpan timeToLive = default(System.TimeSpan), string correlationId = null, string subject = null, string to = null, string contentType = null, string replyTo = null, System.DateTimeOffset scheduledEnqueueTime = default(System.DateTimeOffset), System.Collections.Generic.IDictionary<string, object> properties = null, System.Guid lockTokenGuid = default(System.Guid), int deliveryCount = 0, System.DateTimeOffset lockedUntil = default(System.DateTimeOffset), long sequenceNumber = (long)-1, string deadLetterSource = null, long enqueuedSequenceNumber = (long)0, System.DateTimeOffset enqueuedTime = default(System.DateTimeOffset)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public static Azure.Messaging.ServiceBus.Administration.SubscriptionProperties SubscriptionProperties(string topicName, string subscriptionName, System.TimeSpan lockDuration = default(System.TimeSpan), bool requiresSession = false, System.TimeSpan defaultMessageTimeToLive = default(System.TimeSpan), System.TimeSpan autoDeleteOnIdle = default(System.TimeSpan), bool deadLetteringOnMessageExpiration = false, int maxDeliveryCount = 0, bool enableBatchedOperations = false, Azure.Messaging.ServiceBus.Administration.EntityStatus status = default(Azure.Messaging.ServiceBus.Administration.EntityStatus), string forwardTo = null, string forwardDeadLetteredMessagesTo = null, string userMetadata = null) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public static Azure.Messaging.ServiceBus.Administration.TopicProperties TopicProperties(string name, long maxSizeInMegabytes = (long)0, bool requiresDuplicateDetection = false, System.TimeSpan defaultMessageTimeToLive = default(System.TimeSpan), System.TimeSpan autoDeleteOnIdle = default(System.TimeSpan), System.TimeSpan duplicateDetectionHistoryTimeWindow = default(System.TimeSpan), bool enableBatchedOperations = false, Azure.Messaging.ServiceBus.Administration.EntityStatus status = default(Azure.Messaging.ServiceBus.Administration.EntityStatus), bool enablePartitioning = false) { throw null; }
}
public partial class ServiceBusProcessor : System.IAsyncDisposable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Baselining these until the next GA as they are not actual compat issues.
CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory.TopicProperties(System.String, System.Int64, System.Boolean, System.TimeSpan, System.TimeSpan, System.TimeSpan, System.Boolean, Azure.Messaging.ServiceBus.Administration.EntityStatus, System.Boolean)' in the contract but not the implementation.
CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory.ServiceBusReceivedMessage(System.BinaryData, System.String, System.String, System.String, System.String, System.String, System.TimeSpan, System.String, System.String, System.String, System.String, System.String, System.DateTimeOffset, System.Collections.Generic.IDictionary<System.String, System.Object>, System.Guid, System.Int32, System.DateTimeOffset, System.Int64, System.String, System.Int64, System.DateTimeOffset)' in the contract but not the implementation.
CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory.RuleProperties(System.String, Azure.Messaging.ServiceBus.Administration.RuleFilter, Azure.Messaging.ServiceBus.Administration.RuleAction)' in the contract but not the implementation.
CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory.QueueProperties(System.String, System.TimeSpan, System.Int64, System.Boolean, System.Boolean, System.TimeSpan, System.TimeSpan, System.Boolean, System.TimeSpan, System.Int32, System.Boolean, Azure.Messaging.ServiceBus.Administration.EntityStatus, System.String, System.String, System.String, System.Boolean)' in the contract but not the implementation.
CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory' in the contract but not the implementation.
CannotRemoveAttribute : Attribute 'System.ComponentModel.EditorBrowsableAttribute' exists on 'Azure.Messaging.ServiceBus.ServiceBusModelFactory.SubscriptionProperties(System.String, System.String, System.TimeSpan, System.Boolean, System.TimeSpan, System.TimeSpan, System.Boolean, System.Int32, System.Boolean, Azure.Messaging.ServiceBus.Administration.EntityStatus, System.String, System.String, System.String)' in the contract but not the implementation.
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,18 @@
using Azure.Core.Amqp;
using Azure.Messaging.ServiceBus.Amqp;
using Azure.Messaging.ServiceBus.Administration;
using Azure.Messaging.ServiceBus.Core;

namespace Azure.Messaging.ServiceBus
{
/// <summary>
/// This class contains methods to create certain ServiceBus models.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public static class ServiceBusModelFactory
{
/// <summary>
/// Creates a new ServiceBusReceivedMessage instance for mocking.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public static ServiceBusReceivedMessage ServiceBusReceivedMessage(
BinaryData body = default,
string messageId = default,
Expand Down Expand Up @@ -108,7 +107,6 @@ public static ServiceBusReceivedMessage ServiceBusReceivedMessage(
/// <summary>
/// Creates a new <see cref="QueueProperties"/> instance for mocking.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public static QueueProperties QueueProperties(
string name,
TimeSpan lockDuration = default,
Expand Down Expand Up @@ -149,7 +147,6 @@ public static QueueProperties QueueProperties(
/// <summary>
/// Creates a new <see cref="TopicProperties"/> instance for mocking.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public static TopicProperties TopicProperties(
string name,
long maxSizeInMegabytes = default,
Expand All @@ -176,7 +173,6 @@ public static TopicProperties TopicProperties(
/// <summary>
/// Creates a new <see cref="SubscriptionProperties"/> instance for mocking.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public static SubscriptionProperties SubscriptionProperties(
string topicName,
string subscriptionName,
Expand Down Expand Up @@ -209,7 +205,6 @@ public static SubscriptionProperties SubscriptionProperties(
/// <summary>
/// Creates a new <see cref="RuleProperties"/> instance for mocking.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public static RuleProperties RuleProperties(
string name,
RuleFilter filter = default,
Expand All @@ -218,5 +213,125 @@ public static RuleProperties RuleProperties(
{
Action = action
};

/// <summary>
/// Initializes a new instance of the <see cref="ServiceBusMessageBatch" /> class.
/// </summary>
///
/// <param name="batchSizeBytes">The size, in bytes, that the batch should report; this is a static value and will not mutate as messages are added.</param>
/// <param name="batchMessageStore">A list to which messages will be added when <see cref="ServiceBusMessageBatch.TryAddMessage" /> calls are successful.</param>
/// <param name="batchOptions">The set of options to consider when creating this batch.</param>
/// <param name="tryAddCallback"> A function that will be invoked when <see cref="ServiceBusMessageBatch.TryAddMessage" /> is called;
/// the return of this callback represents the result of <see cref="ServiceBusMessageBatch.TryAddMessage" />.
/// If not provided, all events will be accepted into the batch.</param>
///
/// <returns>The <see cref="ServiceBusMessageBatch" /> instance that was created.</returns>
///
public static ServiceBusMessageBatch ServiceBusMessageBatch(long batchSizeBytes,
IList<ServiceBusMessage> batchMessageStore,
CreateMessageBatchOptions batchOptions = default,
Func<ServiceBusMessage, bool> tryAddCallback = default)
{
tryAddCallback ??= _ => true;
batchOptions ??= new CreateMessageBatchOptions();
batchOptions.MaxSizeInBytes ??= long.MaxValue;

var transportBatch = new ListTransportBatch(batchOptions.MaxSizeInBytes.Value, batchSizeBytes, batchMessageStore, tryAddCallback);
return new ServiceBusMessageBatch(transportBatch);
}

/// <summary>
/// Allows for the transport event batch created by the factory to be injected for testing purposes.
/// </summary>
///
private sealed class ListTransportBatch : TransportMessageBatch
{
/// <summary>The backing store for storing events in the batch.</summary>
private readonly IList<ServiceBusMessage> _backingStore;

/// <summary>A callback to be invoked when an adding an event via <see cref="TryAddMessage"/></summary>
private readonly Func<ServiceBusMessage, bool> _tryAddCallback;

/// <summary>
/// The maximum size allowed for the batch, in bytes. This includes the events in the batch as
/// well as any overhead for the batch itself when sent to the Event Hubs service.
/// </summary>
///
public override long MaxSizeInBytes { get; }

/// <summary>
/// The size of the batch, in bytes, as it will be sent to the Event Hubs
/// service.
/// </summary>
///
public override long SizeInBytes { get; }

/// <summary>
/// The count of events contained in the batch.
/// </summary>
///
public override int Count => _backingStore.Count;

/// <summary>
/// Initializes a new instance of the <see cref="ListTransportBatch"/> class.
/// </summary>
///
/// <param name="maxSizeInBytes"> The maximum size allowed for the batch, in bytes.</param>
/// <param name="sizeInBytes">The size of the batch, in bytes; this will be treated as a static value for the property.</param>
/// <param name="backingStore">The backing store for holding events in the batch.</param>
/// <param name="tryAddCallback">A callback for deciding if a TryAdd attempt is successful.</param>
///
internal ListTransportBatch(long maxSizeInBytes,
long sizeInBytes,
IList<ServiceBusMessage> backingStore,
Func<ServiceBusMessage, bool> tryAddCallback) =>
(MaxSizeInBytes, SizeInBytes, _backingStore, _tryAddCallback) = (maxSizeInBytes, sizeInBytes, backingStore, tryAddCallback);

/// <summary>
/// Attempts to add an event to the batch, ensuring that the size
/// of the batch does not exceed its maximum.
/// </summary>
///
/// <param name="message">The event to attempt to add to the batch.</param>
///
/// <returns><c>true</c> if the event was added; otherwise, <c>false</c>.</returns>
///
public override bool TryAddMessage(ServiceBusMessage message)
{
if (_tryAddCallback(message))
{
_backingStore.Add(message);
return true;
}

return false;
}

/// <summary>
/// Clears the batch, removing all events and resetting the
/// available size.
/// </summary>
///
public override void Clear() => _backingStore.Clear();

/// <summary>
/// Represents the batch as an enumerable set of transport-specific
/// representations of an event.
/// </summary>
///
/// <typeparam name="T">The transport-specific event representation being requested.</typeparam>
///
/// <returns>The set of events as an enumerable of the requested type.</returns>
///
public override IEnumerable<T> AsEnumerable<T>() => (IEnumerable<T>)_backingStore;

/// <summary>
/// Performs the task needed to clean up resources used by the <see cref="TransportMessageBatch" />.
/// </summary>
///
public override void Dispose()
{
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using NUnit.Framework;

namespace Azure.Messaging.ServiceBus.Tests.Message
{
public class MessageBatchTests
{
/// <summary>
/// Verifies functionality of the <see cref="ServiceBusModelFactory.ServiceBusMessageBatch" />
/// method.
/// </summary>
///
[Test]
public void EventDataBatchRespectsTheTryAddCallback()
{
var eventLimit = 3;
var store = new List<ServiceBusMessage>();
var batch = ServiceBusModelFactory.ServiceBusMessageBatch(5, store, tryAddCallback: _ => store.Count < eventLimit);

while (store.Count < eventLimit)
{
Assert.That(() => batch.TryAddMessage(new ServiceBusMessage(new BinaryData("Test"))), Is.True, $"The batch contains { store.Count } events; adding another should be permitted.");
}

Assert.That(store.Count, Is.EqualTo(eventLimit), "The batch should be at its limit.");
Assert.That(() => batch.TryAddMessage(new ServiceBusMessage(new BinaryData("Too many"))), Is.False, "The batch is full; it should not be possible to add a new event.");
Assert.That(() => batch.TryAddMessage(new ServiceBusMessage(new BinaryData("Too many"))), Is.False, "The batch is full; a second attempt to add a new event should not succeed.");

Assert.That(store.Count, Is.EqualTo(eventLimit), "The batch should be at its limit after the failed TryAdd attempts.");
Assert.That(batch.AsEnumerable<ServiceBusMessage>(), Is.EquivalentTo(store), "The batch enumerable should reflect the events in the backing store.");
}

/// <summary>
/// Verifies functionality of the <see cref="EventHubsModelFactory.EventDataBatch" />
/// method.
/// </summary>
///
[Test]
public void EventDataBatchIsSafeToDispose()
{
var size = 1024;
var store = new List<ServiceBusMessage> { new ServiceBusMessage(Array.Empty<byte>()), new ServiceBusMessage(Array.Empty<byte>()) };
var options = new CreateMessageBatchOptions { MaxSizeInBytes = 2048 };
var batch = ServiceBusModelFactory.ServiceBusMessageBatch(size, store, options, _ => false);

Assert.That(() => batch.Dispose(), Throws.Nothing);
}
}
}

0 comments on commit c0a421d

Please sign in to comment.