From 457a2234d4bcbe2fb378a9a707df142e563aafcf Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Fri, 28 Aug 2020 12:41:51 -0700 Subject: [PATCH 1/6] Include more AMQP details in message --- .../src/Amqp/AmqpAnnotatedMessage.cs | 75 +++++++ .../src/Amqp/AmqpDataBody.cs | 30 +++ .../src/Amqp/AmqpMessageBody.cs | 13 ++ .../src/Amqp/AmqpMessageHeader.cs | 54 +++++ .../src/Amqp/AmqpMessageProperties.cs | 105 +++++++++ .../src/Amqp/AmqpMessageConstants.cs | 23 ++ .../src/Amqp/AmqpMessageConverter.cs | 122 +++++------ .../src/Amqp/AmqpMessageExtensions.cs | 72 ++++-- .../src/Diagnostics/DiagnosticExtensions.cs | 4 +- .../Management/Rules/CorrelationRuleFilter.cs | 10 +- .../Rules/CorrelationRuleFilterExtensions.cs | 6 +- .../src/Plugins/ServiceBusPlugin.cs | 34 +-- .../src/Primitives/ServiceBusMessage.cs | 193 +++++++++++------ .../src/Primitives/ServiceBusModelFactory.cs | 51 +++-- .../Primitives/ServiceBusReceivedMessage.cs | 205 ++++++++++++++---- .../src/Sender/ServiceBusSender.cs | 4 +- .../tests/Amqp/AmqpConverterTests.cs | 8 +- .../Diagnostics/DiagnosticScopeLiveTests.cs | 4 +- .../Infrastructure/ServiceBusTestBase.cs | 2 +- .../ServiceBusManagementClientLiveTests.cs | 2 +- .../tests/Message/MessageLiveTests.cs | 84 +++---- .../tests/Message/MessageTests.cs | 22 +- .../tests/Plugins/PluginLiveTests.cs | 10 +- .../tests/Plugins/PluginTests.cs | 4 +- .../Receiver/SessionReceiverLiveTests.cs | 4 +- .../tests/RuleManager/RuleManagerLiveTests.cs | 20 +- 26 files changed, 838 insertions(+), 323 deletions(-) create mode 100644 sdk/core/Azure.Core.Experimental/src/Amqp/AmqpAnnotatedMessage.cs create mode 100644 sdk/core/Azure.Core.Experimental/src/Amqp/AmqpDataBody.cs create mode 100644 sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageBody.cs create mode 100644 sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageHeader.cs create mode 100644 sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageProperties.cs create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConstants.cs diff --git a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpAnnotatedMessage.cs b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpAnnotatedMessage.cs new file mode 100644 index 000000000000..bec2df590f2c --- /dev/null +++ b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpAnnotatedMessage.cs @@ -0,0 +1,75 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Collections.Generic; + +namespace Azure.Core.Amqp +{ + /// + /// Represents an AMQP message. + /// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format + /// + public class AmqpAnnotatedMessage + { + /// + /// Initializes a new instance by copying the passed in message. + /// + /// The message to copy. + public AmqpAnnotatedMessage(AmqpAnnotatedMessage message) + { + Body = message.Body; + ApplicationProperties = new Dictionary(message.ApplicationProperties); + Properties = new AmqpMessageProperties(message.Properties); + MessageAnnotations = new Dictionary(message.MessageAnnotations); + DeliveryAnnotations = new Dictionary(message.DeliveryAnnotations); + Footer = new Dictionary(message.Footer); + Header = new AmqpMessageHeader(message.Header); + } + + /// + /// Creates a new Data body . + /// + /// The data body sections. + public AmqpAnnotatedMessage(IEnumerable dataBody) + { + Body = new AmqpDataBody(dataBody); + } + + /// + /// The header of the AMQP message. + /// + public AmqpMessageHeader Header { get; set; } = new AmqpMessageHeader(); + + /// + /// The footer of the AMQP message. + /// + public IDictionary Footer { get; set; } = new Dictionary(); + + /// + /// The delivery annotations of the AMQP message. + /// + public IDictionary DeliveryAnnotations { get; set; } = new Dictionary(); + + /// + /// The message annotations of the AMQP message. + /// + public IDictionary MessageAnnotations { get; set; } = new Dictionary(); + + /// + /// The properties of the AMQP message. + /// + public AmqpMessageProperties Properties { get; set; } = new AmqpMessageProperties(); + + /// + /// The application properties of the AMQP message. + /// + public IDictionary ApplicationProperties { get; set; } = new Dictionary(); + + /// + /// The body of the AMQP message. + /// + public AmqpMessageBody Body { get; set; } + + + } +} diff --git a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpDataBody.cs b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpDataBody.cs new file mode 100644 index 000000000000..604524160d0f --- /dev/null +++ b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpDataBody.cs @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Collections.Generic; + +namespace Azure.Core.Amqp +{ + /// + /// Represents the data body of an AMQP message. + /// This consists of one or more data sections. + /// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-data + /// + public class AmqpDataBody : AmqpMessageBody + { + /// + /// Initializes a new instance with the + /// passed in data sections. + /// + /// The data sections. + public AmqpDataBody(IEnumerable data) + { + Data = data; + } + + /// + /// The data sections for the AMQP message body. + /// + public IEnumerable Data { get; internal set; } + } +} diff --git a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageBody.cs b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageBody.cs new file mode 100644 index 000000000000..42aec5a7c61a --- /dev/null +++ b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageBody.cs @@ -0,0 +1,13 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +namespace Azure.Core.Amqp +{ + /// + /// Represents an AMQP message body. + /// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format + /// + public abstract class AmqpMessageBody + { + } +} diff --git a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageHeader.cs b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageHeader.cs new file mode 100644 index 000000000000..3db2ca2a2fb3 --- /dev/null +++ b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageHeader.cs @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; + +namespace Azure.Core.Amqp +{ + /// + /// Represents an AMQP message header. + /// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header + /// + public class AmqpMessageHeader + { + /// + /// Initializes a new instance. + /// + public AmqpMessageHeader() { } + + internal AmqpMessageHeader(AmqpMessageHeader header) + { + Durable = header.Durable; + Priority = header.Priority; + TimeToLive = header.TimeToLive; + FirstAcquirer = header.FirstAcquirer; + DeliveryCount = header.DeliveryCount; + } + + /// + /// The durable value from the AMQP message header. + /// + public bool? Durable { get; set; } + + /// + /// The priority value from the AMQP message header. + /// + public byte? Priority { get; set; } + + /// + /// The ttl value from the AMQP message header. + /// + public TimeSpan? TimeToLive { get; set; } + + /// + /// The first-acquirer value from the AMQP message header. + /// + public bool? FirstAcquirer { get; set; } + + /// + /// The delivery-count value from the AMQP message header. + /// + public uint? DeliveryCount { get; set; } + + } +} diff --git a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageProperties.cs b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageProperties.cs new file mode 100644 index 000000000000..32b2cae62637 --- /dev/null +++ b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageProperties.cs @@ -0,0 +1,105 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; + +namespace Azure.Core.Amqp +{ + /// + /// Represents the AMQP message properties. + /// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties + /// + public class AmqpMessageProperties + { + /// + /// Initializes a new instance. + /// + public AmqpMessageProperties() { } + + /// + /// Initializes a new instance by copying the passed in properties. + /// + /// + public AmqpMessageProperties(AmqpMessageProperties properties) + { + MessageId = properties.MessageId; + UserId = properties.UserId; + To = properties.To; + Subject = properties.Subject; + ReplyTo = properties.ReplyTo; + CorrelationId = properties.CorrelationId; + ContentType = properties.ContentType; + ContentEncoding = properties.ContentEncoding; + AbsoluteExpiryTime = properties.AbsoluteExpiryTime; + CreationTime = properties.CreationTime; + GroupId = properties.GroupId; + GroupSequence = properties.GroupSequence; + ReplyToGroupId = properties.ReplyToGroupId; + } + + /// + /// The message-id value from the AMQP properties. + /// + public string? MessageId { get; set; } + + /// + /// The user-id value from the AMQP properties. + /// + public BinaryData? UserId { get; set; } + + /// + /// The to value from the AMQP properties. + /// + public string? To { get; set; } + + /// + /// The subject value from the AMQP properties. + /// + public string? Subject { get; set; } + + /// + /// The reply-to value from the AMQP properties. + /// + public string? ReplyTo { get; set; } + + /// + /// The correlation-id value from the AMQP properties. + /// + public string? CorrelationId { get; set; } + + /// + /// The content-type value from the AMQP properties. + /// + public string? ContentType { get; set; } + + /// + /// The content-encoding value from the AMQP properties. + /// + public string? ContentEncoding { get; set; } + + /// + /// The absolute-expiry-time value from the AMQP properties. + /// + public DateTime? AbsoluteExpiryTime { get; set; } + + /// + /// The creation-time value from the AMQP properties. + /// + public DateTime? CreationTime { get; set; } + + /// + /// The group-id value from the AMQP properties. + /// + public string? GroupId { get; set; } + + /// + /// The group-sequence value from the AMQP properties. + /// + public uint? GroupSequence { get; set; } + + /// + /// The reply-to-group-id value from the AMQP properties. + /// + public string? ReplyToGroupId { get; set; } + } +} diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConstants.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConstants.cs new file mode 100644 index 000000000000..b76bffc1a27d --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConstants.cs @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using Microsoft.Azure.Amqp; + +namespace Azure.Messaging.ServiceBus.Amqp +{ + internal class AmqpMessageConstants + { + internal const string EnqueuedTimeUtcName = "x-opt-enqueued-time"; + internal const string ScheduledEnqueueTimeUtcName = "x-opt-scheduled-enqueue-time"; + internal const string SequenceNumberName = "x-opt-sequence-number"; + internal const string EnqueueSequenceNumberName = "x-opt-enqueue-sequence-number"; + internal const string LockedUntilName = "x-opt-locked-until"; + internal const string PartitionKeyName = "x-opt-partition-key"; + internal const string PartitionIdName = "x-opt-partition-id"; + internal const string ViaPartitionKeyName = "x-opt-via-partition-key"; + internal const string DeadLetterSourceName = "x-opt-deadletter-source"; + internal const string TimeSpanName = AmqpConstants.Vendor + ":timespan"; + internal const string UriName = AmqpConstants.Vendor + ":uri"; + internal const string DateTimeOffsetName = AmqpConstants.Vendor + ":datetime-offset"; + } +} diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs index 80bcaa12b729..81b2e29eacf2 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs @@ -8,6 +8,7 @@ using System.Linq; using System.Runtime.Serialization; using Azure.Core; +using Azure.Core.Amqp; using Azure.Messaging.ServiceBus.Amqp.Framing; using Azure.Messaging.ServiceBus.Management; using Azure.Messaging.ServiceBus.Primitives; @@ -20,19 +21,7 @@ namespace Azure.Messaging.ServiceBus.Amqp { internal static class AmqpMessageConverter { - private const string EnqueuedTimeUtcName = "x-opt-enqueued-time"; - private const string ScheduledEnqueueTimeUtcName = "x-opt-scheduled-enqueue-time"; - private const string SequenceNumberName = "x-opt-sequence-number"; - private const string EnqueueSequenceNumberName = "x-opt-enqueue-sequence-number"; - private const string LockedUntilName = "x-opt-locked-until"; - private const string PartitionKeyName = "x-opt-partition-key"; - private const string PartitionIdName = "x-opt-partition-id"; - private const string ViaPartitionKeyName = "x-opt-via-partition-key"; - private const string DeadLetterSourceName = "x-opt-deadletter-source"; - private const string TimeSpanName = AmqpConstants.Vendor + ":timespan"; - private const string UriName = AmqpConstants.Vendor + ":uri"; - private const string DateTimeOffsetName = AmqpConstants.Vendor + ":datetime-offset"; - private const int GuidSize = 16; + internal const int GuidSize = 16; /// The size, in bytes, to use as a buffer for stream operations. private const int StreamBufferSizeInBytes = 512; @@ -115,13 +104,13 @@ private static AmqpMessage BuildAmqpBatchFromMessages( if (firstMessage?.PartitionKey != null) { - batchEnvelope.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] = + batchEnvelope.MessageAnnotations.Map[AmqpMessageConstants.PartitionKeyName] = firstMessage.PartitionKey; } if (firstMessage?.ViaPartitionKey != null) { - batchEnvelope.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] = + batchEnvelope.MessageAnnotations.Map[AmqpMessageConstants.ViaPartitionKeyName] = firstMessage.ViaPartitionKey; } @@ -156,7 +145,7 @@ public static AmqpMessage SBMessageToAmqpMessage(SBMessage sbMessage) amqpMessage.Properties.MessageId = sbMessage.MessageId; amqpMessage.Properties.CorrelationId = sbMessage.CorrelationId; amqpMessage.Properties.ContentType = sbMessage.ContentType; - amqpMessage.Properties.Subject = sbMessage.Label; + amqpMessage.Properties.Subject = sbMessage.Subject; amqpMessage.Properties.To = sbMessage.To; amqpMessage.Properties.ReplyTo = sbMessage.ReplyTo; amqpMessage.Properties.GroupId = sbMessage.SessionId; @@ -179,27 +168,27 @@ public static AmqpMessage SBMessageToAmqpMessage(SBMessage sbMessage) if ((sbMessage.ScheduledEnqueueTime != null) && sbMessage.ScheduledEnqueueTime > DateTimeOffset.MinValue) { - amqpMessage.MessageAnnotations.Map.Add(ScheduledEnqueueTimeUtcName, sbMessage.ScheduledEnqueueTime.UtcDateTime); + amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.ScheduledEnqueueTimeUtcName, sbMessage.ScheduledEnqueueTime.UtcDateTime); } if (sbMessage.PartitionKey != null) { - amqpMessage.MessageAnnotations.Map.Add(PartitionKeyName, sbMessage.PartitionKey); + amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.PartitionKeyName, sbMessage.PartitionKey); } if (sbMessage.ViaPartitionKey != null) { - amqpMessage.MessageAnnotations.Map.Add(ViaPartitionKeyName, sbMessage.ViaPartitionKey); + amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.ViaPartitionKeyName, sbMessage.ViaPartitionKey); } - if (sbMessage.Properties != null && sbMessage.Properties.Count > 0) + if (sbMessage.ApplicationProperties != null && sbMessage.ApplicationProperties.Count > 0) { if (amqpMessage.ApplicationProperties == null) { amqpMessage.ApplicationProperties = new ApplicationProperties(); } - foreach (var pair in sbMessage.Properties) + foreach (var pair in sbMessage.ApplicationProperties) { if (TryGetAmqpObjectFromNetObject(pair.Value, MappingType.ApplicationProperty, out var amqpObject)) { @@ -218,19 +207,29 @@ public static AmqpMessage SBMessageToAmqpMessage(SBMessage sbMessage) public static ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqpMessage, bool isPeeked = false) { Argument.AssertNotNull(amqpMessage, nameof(amqpMessage)); + AmqpAnnotatedMessage annotatedMessage; - ServiceBusReceivedMessage sbMessage = amqpMessage.ToServiceBusReceivedMessage(); - var sections = amqpMessage.Sections; + if ((amqpMessage.BodyType & SectionFlag.Data) != 0 && amqpMessage.DataBody != null) + { + annotatedMessage = new AmqpAnnotatedMessage(amqpMessage.GetDataViaDataBody()); + } + else + { + annotatedMessage = new AmqpAnnotatedMessage(new BinaryData[] { new BinaryData(Array.Empty())}); + } + ServiceBusReceivedMessage sbMessage = new ServiceBusReceivedMessage(annotatedMessage); + + SectionFlag sections = amqpMessage.Sections; if ((sections & SectionFlag.Header) != 0) { if (amqpMessage.Header.Ttl != null) { - sbMessage.SentMessage.TimeToLive = TimeSpan.FromMilliseconds(amqpMessage.Header.Ttl.Value); + annotatedMessage.Header.TimeToLive = TimeSpan.FromMilliseconds(amqpMessage.Header.Ttl.Value); } if (amqpMessage.Header.DeliveryCount != null) { - sbMessage.DeliveryCount = isPeeked ? (int)(amqpMessage.Header.DeliveryCount.Value) : (int)(amqpMessage.Header.DeliveryCount.Value + 1); + annotatedMessage.Header.DeliveryCount = isPeeked ? (amqpMessage.Header.DeliveryCount.Value) : (amqpMessage.Header.DeliveryCount.Value + 1); } } @@ -238,42 +237,42 @@ public static ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqpM { if (amqpMessage.Properties.MessageId != null) { - sbMessage.SentMessage.MessageId = amqpMessage.Properties.MessageId.ToString(); + annotatedMessage.Properties.MessageId = amqpMessage.Properties.MessageId.ToString(); } if (amqpMessage.Properties.CorrelationId != null) { - sbMessage.SentMessage.CorrelationId = amqpMessage.Properties.CorrelationId.ToString(); + annotatedMessage.Properties.CorrelationId = amqpMessage.Properties.CorrelationId.ToString(); } if (amqpMessage.Properties.ContentType.Value != null) { - sbMessage.SentMessage.ContentType = amqpMessage.Properties.ContentType.Value; + annotatedMessage.Properties.ContentType = amqpMessage.Properties.ContentType.Value; } if (amqpMessage.Properties.Subject != null) { - sbMessage.SentMessage.Label = amqpMessage.Properties.Subject; + annotatedMessage.Properties.Subject = amqpMessage.Properties.Subject; } if (amqpMessage.Properties.To != null) { - sbMessage.SentMessage.To = amqpMessage.Properties.To.ToString(); + annotatedMessage.Properties.To = amqpMessage.Properties.To.ToString(); } if (amqpMessage.Properties.ReplyTo != null) { - sbMessage.SentMessage.ReplyTo = amqpMessage.Properties.ReplyTo.ToString(); + annotatedMessage.Properties.ReplyTo = amqpMessage.Properties.ReplyTo.ToString(); } if (amqpMessage.Properties.GroupId != null) { - sbMessage.SentMessage.SessionId = amqpMessage.Properties.GroupId; + annotatedMessage.Properties.GroupId = amqpMessage.Properties.GroupId; } if (amqpMessage.Properties.ReplyToGroupId != null) { - sbMessage.SentMessage.ReplyToSessionId = amqpMessage.Properties.ReplyToGroupId; + annotatedMessage.Properties.ReplyToGroupId = amqpMessage.Properties.ReplyToGroupId; } } @@ -285,7 +284,7 @@ public static ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqpM { if (TryGetNetObjectFromAmqpObject(pair.Value, MappingType.ApplicationProperty, out var netObject)) { - sbMessage.SentMessage.Properties[pair.Key.ToString()] = netObject; + annotatedMessage.ApplicationProperties[pair.Key.ToString()] = netObject; } } } @@ -297,38 +296,39 @@ public static ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqpM var key = pair.Key.ToString(); switch (key) { - case EnqueuedTimeUtcName: - sbMessage.EnqueuedTime = (DateTime)pair.Value; + case AmqpMessageConstants.EnqueuedTimeUtcName: + annotatedMessage.MessageAnnotations[AmqpMessageConstants.EnqueuedTimeUtcName] = (DateTime)pair.Value; break; - case ScheduledEnqueueTimeUtcName: - sbMessage.SentMessage.ScheduledEnqueueTime = (DateTime)pair.Value; + case AmqpMessageConstants.ScheduledEnqueueTimeUtcName: + annotatedMessage.MessageAnnotations[AmqpMessageConstants.ScheduledEnqueueTimeUtcName] = (DateTime)pair.Value; break; - case SequenceNumberName: - sbMessage.SequenceNumber = (long)pair.Value; + case AmqpMessageConstants.SequenceNumberName: + annotatedMessage.MessageAnnotations[AmqpMessageConstants.SequenceNumberName] = (long)pair.Value; break; - case EnqueueSequenceNumberName: - sbMessage.EnqueuedSequenceNumber = (long)pair.Value; + case AmqpMessageConstants.EnqueueSequenceNumberName: + annotatedMessage.MessageAnnotations[AmqpMessageConstants.EnqueueSequenceNumberName] = (long)pair.Value; break; - case LockedUntilName: - sbMessage.LockedUntil = (DateTime)pair.Value >= DateTimeOffset.MaxValue.UtcDateTime ? + case AmqpMessageConstants.LockedUntilName: + DateTimeOffset lockedUntil = (DateTime)pair.Value >= DateTimeOffset.MaxValue.UtcDateTime ? DateTimeOffset.MaxValue : (DateTime)pair.Value; + annotatedMessage.MessageAnnotations[AmqpMessageConstants.LockedUntilName] = lockedUntil.UtcDateTime; break; - case PartitionKeyName: - sbMessage.SentMessage.PartitionKey = (string)pair.Value; + case AmqpMessageConstants.PartitionKeyName: + annotatedMessage.MessageAnnotations[AmqpMessageConstants.PartitionKeyName] = (string)pair.Value; break; - case PartitionIdName: - sbMessage.PartitionId = (short)pair.Value; + case AmqpMessageConstants.PartitionIdName: + annotatedMessage.MessageAnnotations[AmqpMessageConstants.PartitionIdName] = (short)pair.Value; break; - case ViaPartitionKeyName: - sbMessage.SentMessage.ViaPartitionKey = (string)pair.Value; + case AmqpMessageConstants.ViaPartitionKeyName: + annotatedMessage.MessageAnnotations[AmqpMessageConstants.ViaPartitionKeyName] = (string)pair.Value; break; - case DeadLetterSourceName: - sbMessage.DeadLetterSource = (string)pair.Value; + case AmqpMessageConstants.DeadLetterSourceName: + annotatedMessage.MessageAnnotations[AmqpMessageConstants.DeadLetterSourceName] = (string)pair.Value; break; default: if (TryGetNetObjectFromAmqpObject(pair.Value, MappingType.ApplicationProperty, out var netObject)) { - sbMessage.SentMessage.Properties[key] = netObject; + annotatedMessage.MessageAnnotations[key] = netObject; } break; } @@ -416,7 +416,7 @@ public static RuleFilter GetFilter(AmqpRuleFilterCodec amqpFilter) MessageId = amqpCorrelationFilter.MessageId, To = amqpCorrelationFilter.To, ReplyTo = amqpCorrelationFilter.ReplyTo, - Label = amqpCorrelationFilter.Label, + Subject = amqpCorrelationFilter.Label, SessionId = amqpCorrelationFilter.SessionId, ReplyToSessionId = amqpCorrelationFilter.ReplyToSessionId, ContentType = amqpCorrelationFilter.ContentType @@ -495,13 +495,13 @@ internal static bool TryGetAmqpObjectFromNetObject(object netObject, MappingType } break; case PropertyValueType.Uri: - amqpObject = new DescribedType((AmqpSymbol)UriName, ((Uri)netObject).AbsoluteUri); + amqpObject = new DescribedType((AmqpSymbol)AmqpMessageConstants.UriName, ((Uri)netObject).AbsoluteUri); break; case PropertyValueType.DateTimeOffset: - amqpObject = new DescribedType((AmqpSymbol)DateTimeOffsetName, ((DateTimeOffset)netObject).UtcTicks); + amqpObject = new DescribedType((AmqpSymbol)AmqpMessageConstants.DateTimeOffsetName, ((DateTimeOffset)netObject).UtcTicks); break; case PropertyValueType.TimeSpan: - amqpObject = new DescribedType((AmqpSymbol)TimeSpanName, ((TimeSpan)netObject).Ticks); + amqpObject = new DescribedType((AmqpSymbol)AmqpMessageConstants.TimeSpanName, ((TimeSpan)netObject).Ticks); break; case PropertyValueType.Unknown: if (netObject is Stream netObjectAsStream) @@ -586,15 +586,15 @@ private static bool TryGetNetObjectFromAmqpObject(object amqpObject, MappingType if (amqpObjectAsDescribedType.Descriptor is AmqpSymbol) { var amqpSymbol = (AmqpSymbol)amqpObjectAsDescribedType.Descriptor; - if (amqpSymbol.Equals((AmqpSymbol)UriName)) + if (amqpSymbol.Equals((AmqpSymbol)AmqpMessageConstants.UriName)) { netObject = new Uri((string)amqpObjectAsDescribedType.Value); } - else if (amqpSymbol.Equals((AmqpSymbol)TimeSpanName)) + else if (amqpSymbol.Equals((AmqpSymbol)AmqpMessageConstants.TimeSpanName)) { netObject = new TimeSpan((long)amqpObjectAsDescribedType.Value); } - else if (amqpSymbol.Equals((AmqpSymbol)DateTimeOffsetName)) + else if (amqpSymbol.Equals((AmqpSymbol)AmqpMessageConstants.DateTimeOffsetName)) { netObject = new DateTimeOffset(new DateTime((long)amqpObjectAsDescribedType.Value, DateTimeKind.Utc)); } @@ -668,7 +668,7 @@ internal static AmqpMap GetCorrelationRuleFilterMap(CorrelationRuleFilter correl [ManagementConstants.Properties.MessageId] = correlationRuleFilter.MessageId, [ManagementConstants.Properties.To] = correlationRuleFilter.To, [ManagementConstants.Properties.ReplyTo] = correlationRuleFilter.ReplyTo, - [ManagementConstants.Properties.Label] = correlationRuleFilter.Label, + [ManagementConstants.Properties.Label] = correlationRuleFilter.Subject, [ManagementConstants.Properties.SessionId] = correlationRuleFilter.SessionId, [ManagementConstants.Properties.ReplyToSessionId] = correlationRuleFilter.ReplyToSessionId, [ManagementConstants.Properties.ContentType] = correlationRuleFilter.ContentType diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs index 1545aba24b02..29eee89f56fb 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Linq; using Azure.Core; +using Azure.Core.Amqp; using Microsoft.Azure.Amqp; using Microsoft.Azure.Amqp.Framing; @@ -12,8 +13,15 @@ namespace Azure.Messaging.ServiceBus.Amqp { internal static class AmqpMessageExtensions { - public static AmqpMessage ToAmqpMessage(this ServiceBusMessage message) => - AmqpMessage.Create(new Data { Value = new ArraySegment(message.Body.ToBytes().IsEmpty ? Array.Empty() : message.Body.ToBytes().ToArray()) }); + public static AmqpMessage ToAmqpMessage(this ServiceBusMessage message) + { + BinaryData body = ((AmqpDataBody)message.AmqpMessage.Body).Data.ConvertAndFlattenData(); + return AmqpMessage.Create( + new Data + { + Value = new ArraySegment(body.ToBytes().IsEmpty ? Array.Empty() : body.ToBytes().ToArray()) + }); + } private static byte[] GetByteArray(this Data data) { @@ -37,14 +45,14 @@ private static byte[] GetByteArray(this Data data) } } - private static IEnumerable GetDataViaDataBody(AmqpMessage message) + public static IEnumerable GetDataViaDataBody(this AmqpMessage message) { foreach (Data data in (message.DataBody ?? Enumerable.Empty())) { byte[] bytes = data.GetByteArray(); if (bytes != null) { - yield return bytes; + yield return BinaryData.FromBytes(bytes); } } } @@ -52,24 +60,24 @@ private static IEnumerable GetDataViaDataBody(AmqpMessage message) // Returns via the out parameter the flattened collection of bytes. // A majority of the time, data will only contain 1 element. // The method is optimized for this situation to return the pre-existing array. - private static byte[] ConvertAndFlattenData(IEnumerable data) + public static BinaryData ConvertAndFlattenData(this IEnumerable dataList) { - byte[] flattened = null; + ReadOnlyMemory flattened = null; List flattenedList = null; var dataCount = 0; - foreach (byte[] byteArray in data) + foreach (BinaryData data in dataList) { // Only the first array is needed if it is the only valid array. // This should be the case 99% of the time. if (dataCount == 0) { - flattened = byteArray; + flattened = data; } else { // We defer creating this list since this case will rarely happen. - flattenedList ??= new List(flattened!); - flattenedList.AddRange(byteArray); + flattenedList ??= new List(flattened.ToArray()!); + flattenedList.AddRange(data.ToBytes().ToArray()); } dataCount++; @@ -80,20 +88,50 @@ private static byte[] ConvertAndFlattenData(IEnumerable data) flattened = flattenedList!.ToArray(); } - return flattened; + return BinaryData.FromBytes(flattened); + } + + public static string GetPartitionKey(this AmqpAnnotatedMessage message) + { + if (message.MessageAnnotations.TryGetValue( + AmqpMessageConstants.PartitionKeyName, + out object val)) + { + return (string)val; + } + return default; } - private static ServiceBusMessage CreateAmqpDataMessage(IEnumerable data) => - new ServiceBusMessage(BinaryData.FromBytes(ConvertAndFlattenData(data) ?? ReadOnlyMemory.Empty)); + public static string GetViaPartitionKey(this AmqpAnnotatedMessage message) + { + if (message.MessageAnnotations.TryGetValue( + AmqpMessageConstants.ViaPartitionKeyName, + out object val)) + { + return (string)val; + } + return default; + } - public static ServiceBusReceivedMessage ToServiceBusReceivedMessage(this AmqpMessage message) + public static TimeSpan GetTimeToLive(this AmqpAnnotatedMessage message) { - if ((message.BodyType & SectionFlag.Data) != 0 && message.DataBody != null) + TimeSpan? ttl = message.Header.TimeToLive; + if (ttl == default) { - return new ServiceBusReceivedMessage { SentMessage = CreateAmqpDataMessage(GetDataViaDataBody(message)) }; + return TimeSpan.MaxValue; } + return ttl.Value; + } - return new ServiceBusReceivedMessage(); + public static DateTimeOffset GetScheduledEnqueueTime(this AmqpAnnotatedMessage message) + { + if (message.MessageAnnotations.TryGetValue( + AmqpMessageConstants.ScheduledEnqueueTimeUtcName, + out object val)) + { + return (DateTime)val; + } + return default; } } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/DiagnosticExtensions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/DiagnosticExtensions.cs index 6aaf6a564b5e..b8d057445ebf 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/DiagnosticExtensions.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/DiagnosticExtensions.cs @@ -64,7 +64,7 @@ private static void AddLinkedDiagnostics(this DiagnosticScope scope, IEnumerable { foreach (ServiceBusReceivedMessage message in messages) { - AddLinkedDiagnostics(scope, message.SentMessage.Properties); + AddLinkedDiagnostics(scope, message.AmqpMessage.ApplicationProperties); } } } @@ -75,7 +75,7 @@ private static void AddLinkedDiagnostics(this DiagnosticScope scope, IEnumerable { foreach (ServiceBusMessage message in messages) { - AddLinkedDiagnostics(scope, message.Properties); + AddLinkedDiagnostics(scope, message.ApplicationProperties); } } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Management/Rules/CorrelationRuleFilter.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Management/Rules/CorrelationRuleFilter.cs index 5432366b0d46..90c1f0681edc 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Management/Rules/CorrelationRuleFilter.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Management/Rules/CorrelationRuleFilter.cs @@ -17,7 +17,7 @@ namespace Azure.Messaging.ServiceBus.Management /// /// A CorrelationRuleFilter holds a set of conditions that are matched against one of more of an arriving message's user and system properties. /// A common use is a match against the property, but the application can also choose to match against - /// , , , , + /// , , , , /// , , , and any user-defined properties. /// A match exists when an arriving message's value for a property is equal to the value specified in the correlation filter. For string expressions, /// the comparison is case-sensitive. When specifying multiple match properties, the filter combines them as a logical AND condition, @@ -57,7 +57,7 @@ internal override RuleFilter Clone() => MessageId = MessageId, To = To, ReplyTo = ReplyTo, - Label = Label, + Subject = Subject, SessionId = SessionId, ReplyToSessionId = ReplyToSessionId, ContentType = ContentType, @@ -109,7 +109,7 @@ public string ReplyTo /// Application specific label. /// /// The application specific label. - public string Label + public string Subject { get; set; @@ -175,7 +175,7 @@ public override string ToString() AppendPropertyExpression(ref isFirstExpression, stringBuilder, "sys.MessageId", MessageId); AppendPropertyExpression(ref isFirstExpression, stringBuilder, "sys.To", To); AppendPropertyExpression(ref isFirstExpression, stringBuilder, "sys.ReplyTo", ReplyTo); - AppendPropertyExpression(ref isFirstExpression, stringBuilder, "sys.Label", Label); + AppendPropertyExpression(ref isFirstExpression, stringBuilder, "sys.Label", Subject); AppendPropertyExpression(ref isFirstExpression, stringBuilder, "sys.SessionId", SessionId); AppendPropertyExpression(ref isFirstExpression, stringBuilder, "sys.ReplyToSessionId", ReplyToSessionId); AppendPropertyExpression(ref isFirstExpression, stringBuilder, "sys.ContentType", ContentType); @@ -238,7 +238,7 @@ public override bool Equals(RuleFilter other) && string.Equals(MessageId, correlationRuleFilter.MessageId, StringComparison.OrdinalIgnoreCase) && string.Equals(To, correlationRuleFilter.To, StringComparison.OrdinalIgnoreCase) && string.Equals(ReplyTo, correlationRuleFilter.ReplyTo, StringComparison.OrdinalIgnoreCase) - && string.Equals(Label, correlationRuleFilter.Label, StringComparison.OrdinalIgnoreCase) + && string.Equals(Subject, correlationRuleFilter.Subject, StringComparison.OrdinalIgnoreCase) && string.Equals(SessionId, correlationRuleFilter.SessionId, StringComparison.OrdinalIgnoreCase) && string.Equals(ReplyToSessionId, correlationRuleFilter.ReplyToSessionId, StringComparison.OrdinalIgnoreCase) && string.Equals(ContentType, correlationRuleFilter.ContentType, StringComparison.OrdinalIgnoreCase)) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Management/Rules/CorrelationRuleFilterExtensions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Management/Rules/CorrelationRuleFilterExtensions.cs index b9d741077cda..35d3fe86e559 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Management/Rules/CorrelationRuleFilterExtensions.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Management/Rules/CorrelationRuleFilterExtensions.cs @@ -29,7 +29,7 @@ public static RuleFilter ParseFromXElement(XElement xElement) correlationRuleFilter.ReplyTo = element.Value; break; case "Label": - correlationRuleFilter.Label = element.Value; + correlationRuleFilter.Subject = element.Value; break; case "SessionId": correlationRuleFilter.SessionId = element.Value; @@ -85,8 +85,8 @@ public static XElement Serialize(this CorrelationRuleFilter filter, string filte new XElement(XName.Get("To", ManagementClientConstants.ServiceBusNamespace), filter.To), string.IsNullOrWhiteSpace(filter.ReplyTo) ? null : new XElement(XName.Get("ReplyTo", ManagementClientConstants.ServiceBusNamespace), filter.ReplyTo), - string.IsNullOrWhiteSpace(filter.Label) ? null : - new XElement(XName.Get("Label", ManagementClientConstants.ServiceBusNamespace), filter.Label), + string.IsNullOrWhiteSpace(filter.Subject) ? null : + new XElement(XName.Get("Label", ManagementClientConstants.ServiceBusNamespace), filter.Subject), string.IsNullOrWhiteSpace(filter.SessionId) ? null : new XElement(XName.Get("SessionId", ManagementClientConstants.ServiceBusNamespace), filter.SessionId), string.IsNullOrWhiteSpace(filter.ReplyToSessionId) ? null : diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Plugins/ServiceBusPlugin.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Plugins/ServiceBusPlugin.cs index af2ef8302191..82fd570c8d82 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Plugins/ServiceBusPlugin.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Plugins/ServiceBusPlugin.cs @@ -4,6 +4,8 @@ using System; using System.Threading.Tasks; using Azure.Core; +using Azure.Core.Amqp; +using Azure.Messaging.ServiceBus.Amqp; namespace Azure.Messaging.ServiceBus.Plugins { @@ -37,18 +39,18 @@ public virtual ValueTask AfterMessageReceiveAsync(ServiceBusReceivedMessage mess #pragma warning disable CA1822 // Mark members as static protected void SetBody(ServiceBusReceivedMessage message, BinaryData body) { - message.SentMessage.Body = body; + message.AmqpMessage.Body = new AmqpDataBody(new BinaryData[] { body }); } /// - /// Set a key/value pair on the . + /// Set a key/value pair on the . /// /// The message to modify. /// The key to add or update the value of. /// The value to set for the associated key. protected void SetUserProperty(ServiceBusReceivedMessage message, string key, object value) { - message.SentMessage.Properties[key] = value; + message.AmqpMessage.ApplicationProperties[key] = value; } /// @@ -61,7 +63,7 @@ protected void SetUserProperty(ServiceBusReceivedMessage message, string key, ob protected void SetContentType(ServiceBusReceivedMessage message, string contentType) { - message.SentMessage.ContentType = contentType; + message.AmqpMessage.Properties.ContentType = contentType; } /// @@ -73,18 +75,18 @@ protected void SetContentType(ServiceBusReceivedMessage message, string contentT protected void SetCorrelationId(ServiceBusReceivedMessage message, string correlationId) { - message.SentMessage.CorrelationId = correlationId; + message.AmqpMessage.Properties.CorrelationId = correlationId; } /// - /// Sets the . + /// Sets the . /// /// The message to modify. /// The label to set on the message. protected void SetLabel(ServiceBusReceivedMessage message, string label) { - message.SentMessage.Label = label; + message.AmqpMessage.Properties.Subject = label; } /// @@ -95,7 +97,7 @@ protected void SetLabel(ServiceBusReceivedMessage message, string label) protected void SetMessageId(ServiceBusReceivedMessage message, string messageId) { - message.SentMessage.MessageId = messageId; + message.AmqpMessage.Properties.MessageId = messageId; } /// @@ -106,7 +108,7 @@ protected void SetMessageId(ServiceBusReceivedMessage message, string messageId) protected void SetPartitionKey(ServiceBusReceivedMessage message, string partitionKey) { - message.SentMessage.PartitionKey = partitionKey; + message.AmqpMessage.MessageAnnotations[AmqpMessageConstants.PartitionKeyName] = partitionKey; } /// @@ -117,7 +119,7 @@ protected void SetPartitionKey(ServiceBusReceivedMessage message, string partiti protected void SetReplyTo(ServiceBusReceivedMessage message, string replyTo) { - message.SentMessage.ReplyTo = replyTo; + message.AmqpMessage.Properties.ReplyTo = replyTo; } /// @@ -128,7 +130,7 @@ protected void SetReplyTo(ServiceBusReceivedMessage message, string replyTo) protected void SetReplyToSessionId(ServiceBusReceivedMessage message, string replyToSessionId) { - message.SentMessage.ReplyToSessionId = replyToSessionId; + message.AmqpMessage.Properties.ReplyToGroupId = replyToSessionId; } /// @@ -138,7 +140,7 @@ protected void SetReplyToSessionId(ServiceBusReceivedMessage message, string rep /// The session ID to set on the message. protected void SetSessionId(ServiceBusReceivedMessage message, string sessionId) { - message.SentMessage.SessionId = sessionId; + message.AmqpMessage.Properties.GroupId= sessionId; } /// @@ -148,7 +150,7 @@ protected void SetSessionId(ServiceBusReceivedMessage message, string sessionId) /// The scheduled enqueue time to set on the message. protected void SetScheduledEnqueueTime(ServiceBusReceivedMessage message, DateTimeOffset scheduledEnqueueTime) { - message.SentMessage.ScheduledEnqueueTime = scheduledEnqueueTime; + message.AmqpMessage.MessageAnnotations[AmqpMessageConstants.ScheduledEnqueueTimeUtcName] = scheduledEnqueueTime.UtcDateTime; } /// @@ -159,7 +161,7 @@ protected void SetScheduledEnqueueTime(ServiceBusReceivedMessage message, DateTi protected void SetTimeToLive(ServiceBusReceivedMessage message, TimeSpan timeToLive) { - message.SentMessage.TimeToLive = timeToLive; + message.AmqpMessage.Header.TimeToLive = timeToLive; } /// @@ -169,7 +171,7 @@ protected void SetTimeToLive(ServiceBusReceivedMessage message, TimeSpan timeToL /// The to value to set on the message. protected void SetTo(ServiceBusReceivedMessage message, string to) { - message.SentMessage.To = to; + message.AmqpMessage.Properties.To = to; } /// @@ -179,7 +181,7 @@ protected void SetTo(ServiceBusReceivedMessage message, string to) /// The via partition key to set on the message. protected void SetViaPartitionKey(ServiceBusReceivedMessage message, string viaPartitionKey) { - message.SentMessage.ViaPartitionKey = viaPartitionKey; + message.AmqpMessage.MessageAnnotations[AmqpMessageConstants.ViaPartitionKeyName] = viaPartitionKey; } #pragma warning restore CA1822 // Mark members as static } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusMessage.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusMessage.cs index 3796fd0e70d5..e509495e4065 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusMessage.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusMessage.cs @@ -5,24 +5,20 @@ using System.Collections.Generic; using System.Globalization; using Azure.Core; +using Azure.Core.Amqp; +using Azure.Messaging.ServiceBus.Amqp; namespace Azure.Messaging.ServiceBus { /// - /// The message object used to communicate and transfer data with Service Bus. + /// The is used to send data to Service Bus Queues and Topics. + /// When receiving messages, the is used. /// /// /// The message structure is discussed in detail in the product documentation. /// public class ServiceBusMessage { - private string _messageId; - private string _sessionId; - private string _replyToSessionId; - private string _partitionKey; - private string _viaPartitionKey; - private TimeSpan _timeToLive; - /// /// Creates a new message. /// @@ -35,10 +31,9 @@ public ServiceBusMessage() : /// Creates a new message from the specified string, using UTF-8 encoding. /// /// The payload of the message as a string. - public ServiceBusMessage(string body) + public ServiceBusMessage(string body) : + this(new BinaryData(body)) { - Body = new BinaryData(body); - Properties = new Dictionary(); } /// @@ -56,8 +51,7 @@ public ServiceBusMessage(ReadOnlyMemory body) : /// The payload of the message. public ServiceBusMessage(BinaryData body) { - Body = body; - Properties = new Dictionary(); + AmqpMessage = new AmqpAnnotatedMessage(new BinaryData[] { body }); } /// @@ -67,27 +61,30 @@ public ServiceBusMessage(BinaryData body) public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) { Argument.AssertNotNull(receivedMessage, nameof(receivedMessage)); - - Body = receivedMessage.SentMessage.Body; - ContentType = receivedMessage.ContentType; - CorrelationId = receivedMessage.CorrelationId; - Label = receivedMessage.Label; - MessageId = receivedMessage.MessageId; - PartitionKey = receivedMessage.PartitionKey; - Properties = new Dictionary(receivedMessage.SentMessage.Properties); - ReplyTo = receivedMessage.ReplyTo; - ReplyToSessionId = receivedMessage.ReplyToSessionId; - SessionId = receivedMessage.SessionId; - ScheduledEnqueueTime = receivedMessage.ScheduledEnqueueTime; - TimeToLive = receivedMessage.TimeToLive; - To = receivedMessage.To; - ViaPartitionKey = receivedMessage.ViaPartitionKey; + AmqpMessage = new AmqpAnnotatedMessage(receivedMessage.AmqpMessage); } /// /// Gets or sets the body of the message. /// - public BinaryData Body { get; set; } + public BinaryData Body + { + get + { + if (AmqpMessage.Body is AmqpDataBody dataBody) + { + return dataBody.Data.ConvertAndFlattenData(); + } + else + { + return default; + } + } + set + { + AmqpMessage.Body = new AmqpDataBody(new BinaryData[] { value }); + } + } /// /// Gets or sets the MessageId to identify the message. @@ -102,12 +99,12 @@ public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) /// public string MessageId { - get => _messageId; + get => AmqpMessage.Properties.MessageId; set { ValidateMessageId(value); - _messageId = value; + AmqpMessage.Properties.MessageId = value; } } @@ -121,12 +118,14 @@ public string MessageId /// public string PartitionKey { - get => _partitionKey; - + get + { + return AmqpMessage.GetPartitionKey(); + } set { ValidatePartitionKey(value); - _partitionKey = value; + AmqpMessage.MessageAnnotations[AmqpMessageConstants.PartitionKeyName] = value; } } @@ -140,12 +139,14 @@ public string PartitionKey /// public string ViaPartitionKey { - get => _viaPartitionKey; - + get + { + return AmqpMessage.GetViaPartitionKey(); + } set { ValidatePartitionKey(value); - _viaPartitionKey = value; + AmqpMessage.MessageAnnotations[AmqpMessageConstants.ViaPartitionKeyName] = value; } } @@ -160,12 +161,12 @@ public string ViaPartitionKey /// public string SessionId { - get => _sessionId; + get => AmqpMessage.Properties.GroupId; set { ValidateSessionId(value); - _sessionId = value; + AmqpMessage.Properties.GroupId = value; } } @@ -177,12 +178,12 @@ public string SessionId /// public string ReplyToSessionId { - get => _replyToSessionId; + get => AmqpMessage.Properties.ReplyToGroupId; set { ValidateSessionId(value); - _replyToSessionId = value; + AmqpMessage.Properties.ReplyToGroupId = value; } } @@ -191,28 +192,22 @@ public string ReplyToSessionId /// /// The message’s time to live value. /// - /// This value is the relative duration after which the message expires, starting from the instant - /// the message has been accepted and stored by the broker, as captured in "SystemPropertiesCollection.EnqueuedTimeUtc"/>. + /// This value is the relative duration after which the message expires. /// When not set explicitly, the assumed value is the DefaultTimeToLive for the respective queue or topic. /// A message-level value cannot be longer than the entity's DefaultTimeToLive /// setting and it is silently adjusted if it does. - /// See Expiration + /// See. Expiration /// public TimeSpan TimeToLive { get { - if (_timeToLive == TimeSpan.Zero) - { - return TimeSpan.MaxValue; - } - return _timeToLive; + return AmqpMessage.GetTimeToLive(); } - set { Argument.AssertPositive(value, nameof(TimeToLive)); - _timeToLive = value; + AmqpMessage.Header.TimeToLive = value; } } @@ -223,7 +218,17 @@ public TimeSpan TimeToLive /// for example reflecting the MessageId of a message that is being replied to. /// See Message Routing and Correlation. /// - public string CorrelationId { get; set; } + public string CorrelationId + { + get + { + return AmqpMessage.Properties.CorrelationId; + } + set + { + AmqpMessage.Properties.CorrelationId = value; + } + } /// Gets or sets an application specific label. /// The application specific label @@ -231,7 +236,17 @@ public TimeSpan TimeToLive /// This property enables the application to indicate the purpose of the message to the receiver in a standardized /// fashion, similar to an email subject line. The mapped AMQP property is "subject". /// - public string Label { get; set; } + public string Subject + { + get + { + return AmqpMessage.Properties.Subject; + } + set + { + AmqpMessage.Properties.Subject = value; + } + } /// Gets or sets the "to" address. /// The "to" address. @@ -241,7 +256,17 @@ public TimeSpan TimeToLive /// auto-forward chaining scenarios to indicate the /// intended logical destination of the message. /// - public string To { get; set; } + public string To + { + get + { + return AmqpMessage.Properties.To; + } + set + { + AmqpMessage.Properties.To = value; + } + } /// Gets or sets the content type descriptor. /// RFC2045 Content-Type descriptor. @@ -249,7 +274,17 @@ public TimeSpan TimeToLive /// Optionally describes the payload of the message, with a descriptor following the format of /// RFC2045, Section 5, for example "application/json". /// - public string ContentType { get; set; } + public string ContentType + { + get + { + return AmqpMessage.Properties.ContentType; + } + set + { + AmqpMessage.Properties.ContentType = value; + } + } /// Gets or sets the address of an entity to send replies to. /// The reply entity address. @@ -259,7 +294,17 @@ public TimeSpan TimeToLive /// absolute or relative path of the queue or topic it expects the reply to be sent to. /// See Message Routing and Correlation. /// - public string ReplyTo { get; set; } + public string ReplyTo + { + get + { + return AmqpMessage.Properties.ReplyTo; + } + set + { + AmqpMessage.Properties.ReplyTo = value; + } + } /// Gets or sets the date and time in UTC at which the message will be enqueued. This /// property returns the time in UTC; when setting the property, the supplied DateTime value must also be in UTC. @@ -267,22 +312,42 @@ public TimeSpan TimeToLive /// It is utilized to delay messages sending to a specific time in the future. /// Message enqueuing time does not mean that the message will be sent at the same time. It will get enqueued, but the actual sending time /// depends on the queue's workload and its state. - public DateTimeOffset ScheduledEnqueueTime { get; set; } + public DateTimeOffset ScheduledEnqueueTime + { + get + { + return AmqpMessage.GetScheduledEnqueueTime(); + } + set + { + AmqpMessage.MessageAnnotations[AmqpMessageConstants.ScheduledEnqueueTimeUtcName] = value.UtcDateTime; + } + } /// - /// Gets the "user properties" bag, which can be used for custom message metadata. + /// + /// + public AmqpAnnotatedMessage AmqpMessage { get; set; } + + /// + /// Gets the application properties bag, which can be used for custom message metadata. /// /// /// Only following value types are supported: /// byte, sbyte, char, short, ushort, int, uint, long, ulong, float, double, decimal, /// bool, Guid, string, Uri, DateTime, DateTimeOffset, TimeSpan /// - public IDictionary Properties { get; internal set; } - - /////// - ///// Gets the , which is used to store properties that are set by the system. - ///// - //public SystemPropertiesCollection SystemProperties { get; internal set; } + public IDictionary ApplicationProperties + { + get + { + return AmqpMessage.ApplicationProperties; + } + internal set + { + AmqpMessage.ApplicationProperties = value; + } + } /// Returns a string that represents the current message. /// The string representation of the current message. diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs index 9912277323cd..0feb134b4d0d 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs @@ -5,6 +5,8 @@ using System.Collections.Generic; using System.ComponentModel; using Azure.Core; +using Azure.Core.Amqp; +using Azure.Messaging.ServiceBus.Amqp; namespace Azure.Messaging.ServiceBus { @@ -27,7 +29,7 @@ public static ServiceBusReceivedMessage ServiceBusReceivedMessage( string replyToSessionId = default, TimeSpan timeToLive = default, string correlationId = default, - string label = default, + string subject = default, string to = default, string contentType = default, string replyTo = default, @@ -41,55 +43,52 @@ public static ServiceBusReceivedMessage ServiceBusReceivedMessage( long enqueuedSequenceNumber = default, DateTimeOffset enqueuedTime = default) { - var sentMessage = new ServiceBusMessage - { - Body = body, - CorrelationId = correlationId, - Label = label, - To = to, - ContentType = contentType, - ReplyTo = replyTo, - ScheduledEnqueueTime = scheduledEnqueueTime - }; + var amqpMessage = new AmqpAnnotatedMessage(new BinaryData[] { body }); + amqpMessage.Properties.CorrelationId = correlationId; + amqpMessage.Properties.Subject = subject; + amqpMessage.Properties.To = to; + amqpMessage.Properties.ContentType = contentType; + amqpMessage.Properties.ReplyTo = replyTo; + amqpMessage.MessageAnnotations[AmqpMessageConstants.ScheduledEnqueueTimeUtcName] = scheduledEnqueueTime.UtcDateTime; + if (messageId != default) { - sentMessage.MessageId = messageId; + amqpMessage.Properties.MessageId = messageId; } if (partitionKey != default) { - sentMessage.PartitionKey = partitionKey; + amqpMessage.MessageAnnotations[AmqpMessageConstants.PartitionKeyName] = partitionKey; } if (viaPartitionKey != default) { - sentMessage.ViaPartitionKey = viaPartitionKey; + amqpMessage.MessageAnnotations[AmqpMessageConstants.ViaPartitionKeyName] = viaPartitionKey; } if (sessionId != default) { - sentMessage.SessionId = sessionId; + amqpMessage.Properties.GroupId = sessionId; } if (replyToSessionId != default) { - sentMessage.ReplyToSessionId = replyToSessionId; + amqpMessage.Properties.ReplyToGroupId = replyToSessionId; } if (timeToLive != default) { - sentMessage.TimeToLive = timeToLive; + amqpMessage.Header.TimeToLive = timeToLive; } if (properties != default) { - sentMessage.Properties = properties; + amqpMessage.ApplicationProperties = properties; } + amqpMessage.Header.DeliveryCount = (uint)deliveryCount; + amqpMessage.MessageAnnotations[AmqpMessageConstants.LockedUntilName] = lockedUntil.UtcDateTime; + amqpMessage.MessageAnnotations[AmqpMessageConstants.SequenceNumberName] = sequenceNumber; + amqpMessage.MessageAnnotations[AmqpMessageConstants.DeadLetterSourceName] = deadLetterSource; + amqpMessage.MessageAnnotations[AmqpMessageConstants.EnqueueSequenceNumberName] = enqueuedSequenceNumber; + amqpMessage.MessageAnnotations[AmqpMessageConstants.EnqueuedTimeUtcName] = enqueuedTime.UtcDateTime; - return new ServiceBusReceivedMessage + return new ServiceBusReceivedMessage(amqpMessage) { - SentMessage = sentMessage, LockTokenGuid = lockTokenGuid, - DeliveryCount = deliveryCount, - LockedUntil = lockedUntil, - SequenceNumber = sequenceNumber, - DeadLetterSource = deadLetterSource, - EnqueuedSequenceNumber = enqueuedSequenceNumber, - EnqueuedTime = enqueuedTime }; } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusReceivedMessage.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusReceivedMessage.cs index 3b7236cda1b1..2b6a25b62662 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusReceivedMessage.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusReceivedMessage.cs @@ -6,6 +6,8 @@ using System.Collections.ObjectModel; using System.Globalization; using Azure.Core; +using Azure.Core.Amqp; +using Azure.Messaging.ServiceBus.Amqp; namespace Azure.Messaging.ServiceBus { @@ -14,21 +16,58 @@ namespace Azure.Messaging.ServiceBus /// public class ServiceBusReceivedMessage { + + /// + /// Creates a new message from the specified payload. + /// + /// The payload of the message in bytes + internal ServiceBusReceivedMessage(ReadOnlyMemory body) + { + AmqpMessage = new AmqpAnnotatedMessage(new BinaryData[] { BinaryData.FromBytes(body) }); + } + + /// + /// Creates a new message from the specified Amqp message. + /// + internal ServiceBusReceivedMessage(AmqpAnnotatedMessage message) + { + AmqpMessage = message; + } + + internal ServiceBusReceivedMessage(): this(body: default) + { } + /// /// Indicates whether the user has settled the message as part of their callback. /// If they have done so, we will not autocomplete. /// internal bool IsSettled { get; set; } - internal ServiceBusMessage SentMessage { get; set; } = new ServiceBusMessage(); + /// + /// + /// + public AmqpAnnotatedMessage AmqpMessage { get; internal set; } /// /// Gets the body of the message. /// - public BinaryData Body => SentMessage.Body; + public BinaryData Body + { + get + { + if (AmqpMessage.Body is AmqpDataBody dataBody) + { + return dataBody.Data.ConvertAndFlattenData(); + } + else + { + return default; + } + } + } /// - /// Gets or sets the MessageId to identify the message. + /// Gets the MessageId to identify the message. /// /// /// The message identifier is an application-defined value that uniquely identifies the @@ -38,7 +77,7 @@ public class ServiceBusReceivedMessage /// feature identifies and removes second and further submissions of messages with the /// same MessageId. /// - public string MessageId => SentMessage.MessageId; + public string MessageId => AmqpMessage.Properties.MessageId; /// Gets a partition key for sending a message to a partitioned entity. /// The partition key. Maximum length is 128 characters. @@ -48,7 +87,7 @@ public class ServiceBusReceivedMessage /// order is correctly recorded. The partition is chosen by a hash function over this value and cannot be chosen /// directly. For session-aware entities, the property overrides this value. /// - public string PartitionKey => SentMessage.PartitionKey; + public string PartitionKey => AmqpMessage.GetPartitionKey(); /// Gets a partition key for sending a message into an entity via a partitioned transfer queue. /// The partition key. Maximum length is 128 characters. @@ -58,7 +97,7 @@ public class ServiceBusReceivedMessage /// messages are kept together and in order as they are transferred. /// See Transfers and Send Via. /// - public string ViaPartitionKey => SentMessage.ViaPartitionKey; + public string ViaPartitionKey => AmqpMessage.GetViaPartitionKey(); /// Gets the session identifier for a session-aware entity. /// The session identifier. Maximum length is 128 characters. @@ -69,7 +108,7 @@ public class ServiceBusReceivedMessage /// For session-unaware entities, this value is ignored. /// See Message Sessions. /// - public string SessionId => SentMessage.SessionId; + public string SessionId => AmqpMessage.Properties.GroupId; /// Gets a session identifier augmenting the address. /// Session identifier. Maximum length is 128 characters. @@ -77,7 +116,7 @@ public class ServiceBusReceivedMessage /// This value augments the ReplyTo information and specifies which SessionId should be set /// for the reply when sent to the reply entity. See Message Routing and Correlation /// - public string ReplyToSessionId => SentMessage.ReplyToSessionId; + public string ReplyToSessionId => AmqpMessage.Properties.ReplyToGroupId; /// /// Gets the message’s "time to live" value. @@ -85,13 +124,13 @@ public class ServiceBusReceivedMessage /// The message’s time to live value. /// /// This value is the relative duration after which the message expires, starting from the instant - /// the message has been accepted and stored by the broker, as captured in "SystemPropertiesCollection.EnqueuedTimeUtc"/>. + /// the message has been accepted and stored by the broker, as captured in . /// When not set explicitly, the assumed value is the DefaultTimeToLive for the respective queue or topic. /// A message-level value cannot be longer than the entity's DefaultTimeToLive /// setting and it is silently adjusted if it does. /// See Expiration /// - public TimeSpan TimeToLive => SentMessage.TimeToLive; + public TimeSpan TimeToLive => AmqpMessage.GetTimeToLive(); /// Gets the a correlation identifier. /// Correlation identifier. @@ -100,7 +139,7 @@ public class ServiceBusReceivedMessage /// for example reflecting the MessageId of a message that is being replied to. /// See Message Routing and Correlation. /// - public string CorrelationId => SentMessage.CorrelationId; + public string CorrelationId => AmqpMessage.Properties.CorrelationId; /// Gets an application specific label. /// The application specific label @@ -108,7 +147,7 @@ public class ServiceBusReceivedMessage /// This property enables the application to indicate the purpose of the message to the receiver in a standardized /// fashion, similar to an email subject line. The mapped AMQP property is "subject". /// - public string Label => SentMessage.Label; + public string Subject => AmqpMessage.Properties.Subject; /// Gets the "to" address. /// The "to" address. @@ -118,7 +157,7 @@ public class ServiceBusReceivedMessage /// auto-forward chaining scenarios to indicate the /// intended logical destination of the message. /// - public string To => SentMessage.To; + public string To => AmqpMessage.Properties.To; /// Gets the content type descriptor. /// RFC2045 Content-Type descriptor. @@ -126,7 +165,7 @@ public class ServiceBusReceivedMessage /// Optionally describes the payload of the message, with a descriptor following the format of /// RFC2045, Section 5, for example "application/json". /// - public string ContentType => SentMessage.ContentType; + public string ContentType => AmqpMessage.Properties.ContentType; /// Gets the address of an entity to send replies to. /// The reply entity address. @@ -136,7 +175,7 @@ public class ServiceBusReceivedMessage /// absolute or relative path of the queue or topic it expects the reply to be sent to. /// See Message Routing and Correlation. /// - public string ReplyTo => SentMessage.ReplyTo; + public string ReplyTo => AmqpMessage.Properties.ReplyTo; /// Gets the date and time in UTC at which the message will be enqueued. This /// property returns the time in UTC; when setting the property, the supplied DateTime value must also be in UTC. @@ -144,7 +183,7 @@ public class ServiceBusReceivedMessage /// It is utilized to delay messages sending to a specific time in the future. /// Message enqueuing time does not mean that the message will be sent at the same time. It will get enqueued, but the actual sending time /// depends on the queue's workload and its state. - public DateTimeOffset ScheduledEnqueueTime => SentMessage.ScheduledEnqueueTime; + public DateTimeOffset ScheduledEnqueueTime => AmqpMessage.GetScheduledEnqueueTime(); /// /// Gets the "user properties" bag, which can be used for custom message metadata. @@ -154,7 +193,7 @@ public class ServiceBusReceivedMessage /// byte, sbyte, char, short, ushort, int, uint, long, ulong, float, double, decimal, /// bool, Guid, string, Uri, DateTime, DateTimeOffset, TimeSpan /// - public IReadOnlyDictionary Properties => new ReadOnlyDictionary (SentMessage.Properties); + public IReadOnlyDictionary ApplicationProperties => new ReadOnlyDictionary(AmqpMessage.ApplicationProperties); /// /// User property key representing deadletter reason, when a message is received from a deadletter subqueue of an entity. @@ -185,7 +224,17 @@ public class ServiceBusReceivedMessage /// Number of deliveries that have been attempted for this message. The count is incremented when a message lock expires, /// or the message is explicitly abandoned by the receiver. This property is read-only. /// - public int DeliveryCount { get; internal set; } + public int DeliveryCount + { + get + { + return (int)AmqpMessage.Header.DeliveryCount; + } + internal set + { + AmqpMessage.Header.DeliveryCount = (uint)value; + } + } /// Gets the date and time in UTC until which the message will be locked in the queue/subscription. /// The date and time until which the message will be locked in the queue/subscription. @@ -194,7 +243,23 @@ public class ServiceBusReceivedMessage /// instant until which the message is held locked in the queue/subscription. When the lock expires, the /// is incremented and the message is again available for retrieval. This property is read-only. /// - public DateTimeOffset LockedUntil { get; internal set; } + public DateTimeOffset LockedUntil + { + get + { + if (AmqpMessage.MessageAnnotations.TryGetValue( + AmqpMessageConstants.LockedUntilName, + out object val)) + { + return (DateTime)val; + } + return default; + } + internal set + { + AmqpMessage.MessageAnnotations[AmqpMessageConstants.LockedUntilName] = value.UtcDateTime; + } + } /// Gets the unique number assigned to a message by Service Bus. /// @@ -203,7 +268,23 @@ public class ServiceBusReceivedMessage /// the topmost 16 bits reflect the partition identifier. Sequence numbers monotonically increase. /// They roll over to 0 when the 48-64 bit range is exhausted. This property is read-only. /// - public long SequenceNumber { get; internal set; } = -1; + public long SequenceNumber + { + get + { + if (AmqpMessage.MessageAnnotations.TryGetValue( + AmqpMessageConstants.SequenceNumberName, + out object val)) + { + return (long)val; + } + return default; + } + internal set + { + AmqpMessage.MessageAnnotations[AmqpMessageConstants.SequenceNumberName] = value; + } + } /// /// Gets the name of the queue or subscription that this message was enqueued on, before it was deadlettered. @@ -212,7 +293,23 @@ public class ServiceBusReceivedMessage /// Only set in messages that have been dead-lettered and subsequently auto-forwarded from the dead-letter queue /// to another entity. Indicates the entity in which the message was dead-lettered. This property is read-only. /// - public string DeadLetterSource { get; internal set; } + public string DeadLetterSource + { + get + { + if (AmqpMessage.MessageAnnotations.TryGetValue( + AmqpMessageConstants.DeadLetterSourceName, + out object val)) + { + return (string)val; + } + return default; + } + internal set + { + AmqpMessage.MessageAnnotations[AmqpMessageConstants.DeadLetterSourceName] = value; + } + } internal short PartitionId { get; set; } @@ -222,7 +319,23 @@ public class ServiceBusReceivedMessage /// For messages that have been auto-forwarded, this property reflects the sequence number /// that had first been assigned to the message at its original point of submission. This property is read-only. /// - public long EnqueuedSequenceNumber { get; internal set; } + public long EnqueuedSequenceNumber + { + get + { + if (AmqpMessage.MessageAnnotations.TryGetValue( + AmqpMessageConstants.EnqueueSequenceNumberName, + out object val)) + { + return (long)val; + } + return default; + } + internal set + { + AmqpMessage.MessageAnnotations[AmqpMessageConstants.EnqueueSequenceNumberName] = value; + } + } /// Gets or sets the date and time of the sent time in UTC. /// The enqueue time in UTC. @@ -231,7 +344,23 @@ public class ServiceBusReceivedMessage /// This value can be used as an authoritative and neutral arrival time indicator when /// the receiver does not want to trust the sender's clock. This property is read-only. /// - public DateTimeOffset EnqueuedTime { get; internal set; } + public DateTimeOffset EnqueuedTime + { + get + { + if (AmqpMessage.MessageAnnotations.TryGetValue( + AmqpMessageConstants.EnqueuedTimeUtcName, + out object val)) + { + return (DateTime)val; + } + return default; + } + internal set + { + AmqpMessage.MessageAnnotations[AmqpMessageConstants.EnqueuedTimeUtcName] = value.UtcDateTime; + } + } internal Guid LockTokenGuid { get; set; } @@ -246,11 +375,14 @@ public DateTimeOffset ExpiresAt { get { + if (AmqpMessage.Properties.AbsoluteExpiryTime != default) + { + return (DateTimeOffset)AmqpMessage.Properties.AbsoluteExpiryTime; + } if (TimeToLive >= DateTimeOffset.MaxValue.Subtract(EnqueuedTime)) { return DateTimeOffset.MaxValue; } - return EnqueuedTime.Add(TimeToLive); } } @@ -262,7 +394,7 @@ public string DeadLetterReason { get { - if (Properties.TryGetValue(DeadLetterReasonHeader, out object reason)) + if (ApplicationProperties.TryGetValue(DeadLetterReasonHeader, out object reason)) { return reason as string; } @@ -277,7 +409,7 @@ public string DeadLetterErrorDescription { get { - if (Properties.TryGetValue(DeadLetterErrorDescriptionHeader, out object description)) + if (ApplicationProperties.TryGetValue(DeadLetterErrorDescriptionHeader, out object description)) { return description as string; } @@ -285,27 +417,6 @@ public string DeadLetterErrorDescription } } - /// - /// Creates a new message from the specified payload. - /// - /// The payload of the message in bytes - internal ServiceBusReceivedMessage(ReadOnlyMemory body) - { - SentMessage = new ServiceBusMessage(body); - } - - /// - /// Creates a new message from the specified payload. - /// - internal ServiceBusReceivedMessage() - { - } - - /////// - ///// Gets the , which is used to store properties that are set by the system. - ///// - //public SystemPropertiesCollection SystemProperties { get; internal set; } - /// Returns a string that represents the current message. /// The string representation of the current message. public override string ToString() diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs index b8e113cc8483..0864a522e000 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs @@ -254,7 +254,7 @@ private void InstrumentMessages(IEnumerable messages) { foreach (ServiceBusMessage message in messages) { - if (!message.Properties.ContainsKey(DiagnosticProperty.DiagnosticIdAttribute)) + if (!message.ApplicationProperties.ContainsKey(DiagnosticProperty.DiagnosticIdAttribute)) { using DiagnosticScope messageScope = _scopeFactory.CreateScope( DiagnosticProperty.MessageActivityName, @@ -264,7 +264,7 @@ private void InstrumentMessages(IEnumerable messages) Activity activity = Activity.Current; if (activity != null) { - message.Properties[DiagnosticProperty.DiagnosticIdAttribute] = activity.Id; + message.ApplicationProperties[DiagnosticProperty.DiagnosticIdAttribute] = activity.Id; } } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpConverterTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpConverterTests.cs index 462b7f32e1d1..6b3f284621b1 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpConverterTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpConverterTests.cs @@ -50,26 +50,26 @@ public void ConvertSBMessageToAmqpMessageAndBack() ViaPartitionKey = viaPartitionKey, SessionId = sessionId, CorrelationId = correlationId, - Label = label, + Subject = label, To = to, ContentType = contentType, ReplyTo = replyTo, ReplyToSessionId = replyToSessionId, TimeToLive = timeToLive, }; - sbMessage.Properties.Add("UserProperty", "SomeUserProperty"); + sbMessage.ApplicationProperties.Add("UserProperty", "SomeUserProperty"); var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(sbMessage); var convertedSbMessage = AmqpMessageConverter.AmqpMessageToSBMessage(amqpMessage); - Assert.AreEqual("SomeUserProperty", convertedSbMessage.Properties["UserProperty"]); + Assert.AreEqual("SomeUserProperty", convertedSbMessage.ApplicationProperties["UserProperty"]); Assert.AreEqual(messageBody, convertedSbMessage.Body.ToBytes().ToArray()); Assert.AreEqual(messageId, convertedSbMessage.MessageId); Assert.AreEqual(partitionKey, convertedSbMessage.PartitionKey); Assert.AreEqual(viaPartitionKey, convertedSbMessage.ViaPartitionKey); Assert.AreEqual(sessionId, convertedSbMessage.SessionId); Assert.AreEqual(correlationId, convertedSbMessage.CorrelationId); - Assert.AreEqual(label, convertedSbMessage.Label); + Assert.AreEqual(label, convertedSbMessage.Subject); Assert.AreEqual(to, convertedSbMessage.To); Assert.AreEqual(contentType, convertedSbMessage.ContentType); Assert.AreEqual(replyTo, convertedSbMessage.ReplyTo); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/DiagnosticScopeLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/DiagnosticScopeLiveTests.cs index 1da73ec0caa8..c5a5d9c2ff1f 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/DiagnosticScopeLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/DiagnosticScopeLiveTests.cs @@ -235,7 +235,7 @@ public async Task SenderReceiverActivities(bool useSessions) foreach (var msg in msgs) { var seq = await sender.ScheduleMessageAsync(msg, DateTimeOffset.UtcNow.AddMinutes(1)); - Assert.IsNotNull(msg.Properties[DiagnosticProperty.DiagnosticIdAttribute]); + Assert.IsNotNull(msg.ApplicationProperties[DiagnosticProperty.DiagnosticIdAttribute]); (string Key, object Value, DiagnosticListener) startMessage = _listener.Events.Dequeue(); Activity messageActivity = (Activity)startMessage.Value; @@ -391,7 +391,7 @@ private Activity[] AssertSendActivities(bool useSessions, ServiceBusSender sende IList messageActivities = new List(); foreach (var msg in msgs) { - Assert.IsNotNull(msg.Properties[DiagnosticProperty.DiagnosticIdAttribute]); + Assert.IsNotNull(msg.ApplicationProperties[DiagnosticProperty.DiagnosticIdAttribute]); (string Key, object Value, DiagnosticListener) startMessage = _listener.Events.Dequeue(); messageActivities.Add((Activity)startMessage.Value); AssertCommonTags((Activity)startMessage.Value, sender.EntityPath, sender.FullyQualifiedNamespace); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusTestBase.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusTestBase.cs index 5ee9eab7438c..f3196126d92d 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusTestBase.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusTestBase.cs @@ -42,7 +42,7 @@ protected ServiceBusMessage GetMessage(string sessionId = null, string partition { var msg = new ServiceBusMessage(GetRandomBuffer(100)) { - Label = $"test-{Guid.NewGuid()}", + Subject = $"test-{Guid.NewGuid()}", MessageId = Guid.NewGuid().ToString() }; if (sessionId != null) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Management/ServiceBusManagementClientLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Management/ServiceBusManagementClientLiveTests.cs index 30ad063aff57..11686e5129d6 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Management/ServiceBusManagementClientLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Management/ServiceBusManagementClientLiveTests.cs @@ -306,7 +306,7 @@ await client.CreateSubscriptionAsync( { ContentType = "contentType", CorrelationId = "correlationId", - Label = "label", + Subject = "label", MessageId = "messageId", ReplyTo = "replyTo", ReplyToSessionId = "replyToSessionId", diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs index 8b9373b6511c..a785c0d061a9 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs @@ -25,48 +25,48 @@ public async Task MessagePropertiesShouldSupportValidPropertyTypes() /// byte, sbyte, char, short, ushort, int, uint, long, ulong, float, double, decimal, /// bool, Guid, string, Uri, DateTime, DateTimeOffset, TimeSpan var msg = new ServiceBusMessage(); - msg.Properties.Add("byte", (byte)2); - msg.Properties.Add("sbyte", (sbyte)3); - msg.Properties.Add("char", 'c'); - msg.Properties.Add("short", (short)4); - msg.Properties.Add("ushort", (ushort)5); - msg.Properties.Add("int", (int)6); - msg.Properties.Add("uint", (uint)7); - msg.Properties.Add("long", (long)8); - msg.Properties.Add("ulong", (ulong)9); - msg.Properties.Add("float", (float)10.0); - msg.Properties.Add("double", (double)11.0); - msg.Properties.Add("decimal", (decimal)12.0); - msg.Properties.Add("bool", true); - msg.Properties.Add("Guid", Guid.NewGuid()); - msg.Properties.Add("string", "value"); - msg.Properties.Add("Uri", new Uri("http://nonExistingServiceBusWebsite.com")); - msg.Properties.Add("DateTime", DateTime.UtcNow); - msg.Properties.Add("DateTimeOffset", DateTimeOffset.UtcNow); - msg.Properties.Add("TimeSpan", TimeSpan.FromMinutes(5)); + msg.ApplicationProperties.Add("byte", (byte)2); + msg.ApplicationProperties.Add("sbyte", (sbyte)3); + msg.ApplicationProperties.Add("char", 'c'); + msg.ApplicationProperties.Add("short", (short)4); + msg.ApplicationProperties.Add("ushort", (ushort)5); + msg.ApplicationProperties.Add("int", (int)6); + msg.ApplicationProperties.Add("uint", (uint)7); + msg.ApplicationProperties.Add("long", (long)8); + msg.ApplicationProperties.Add("ulong", (ulong)9); + msg.ApplicationProperties.Add("float", (float)10.0); + msg.ApplicationProperties.Add("double", (double)11.0); + msg.ApplicationProperties.Add("decimal", (decimal)12.0); + msg.ApplicationProperties.Add("bool", true); + msg.ApplicationProperties.Add("Guid", Guid.NewGuid()); + msg.ApplicationProperties.Add("string", "value"); + msg.ApplicationProperties.Add("Uri", new Uri("http://nonExistingServiceBusWebsite.com")); + msg.ApplicationProperties.Add("DateTime", DateTime.UtcNow); + msg.ApplicationProperties.Add("DateTimeOffset", DateTimeOffset.UtcNow); + msg.ApplicationProperties.Add("TimeSpan", TimeSpan.FromMinutes(5)); await sender.SendMessageAsync(msg); var receivedMsg = await receiver.ReceiveMessageAsync(); - Assert.IsInstanceOf(typeof(byte), receivedMsg.Properties["byte"]); - Assert.IsInstanceOf(typeof(sbyte), receivedMsg.Properties["sbyte"]); - Assert.IsInstanceOf(typeof(char), receivedMsg.Properties["char"]); - Assert.IsInstanceOf(typeof(short), receivedMsg.Properties["short"]); - Assert.IsInstanceOf(typeof(ushort), receivedMsg.Properties["ushort"]); - Assert.IsInstanceOf(typeof(int), receivedMsg.Properties["int"]); - Assert.IsInstanceOf(typeof(uint), receivedMsg.Properties["uint"]); - Assert.IsInstanceOf(typeof(long), receivedMsg.Properties["long"]); - Assert.IsInstanceOf(typeof(ulong), receivedMsg.Properties["ulong"]); - Assert.IsInstanceOf(typeof(float), receivedMsg.Properties["float"]); - Assert.IsInstanceOf(typeof(double), receivedMsg.Properties["double"]); - Assert.IsInstanceOf(typeof(decimal), receivedMsg.Properties["decimal"]); - Assert.IsInstanceOf(typeof(bool), receivedMsg.Properties["bool"]); - Assert.IsInstanceOf(typeof(Guid), receivedMsg.Properties["Guid"]); - Assert.IsInstanceOf(typeof(string), receivedMsg.Properties["string"]); - Assert.IsInstanceOf(typeof(Uri), receivedMsg.Properties["Uri"]); - Assert.IsInstanceOf(typeof(DateTime), receivedMsg.Properties["DateTime"]); - Assert.IsInstanceOf(typeof(DateTimeOffset), receivedMsg.Properties["DateTimeOffset"]); - Assert.IsInstanceOf(typeof(TimeSpan), receivedMsg.Properties["TimeSpan"]); + Assert.IsInstanceOf(typeof(byte), receivedMsg.ApplicationProperties["byte"]); + Assert.IsInstanceOf(typeof(sbyte), receivedMsg.ApplicationProperties["sbyte"]); + Assert.IsInstanceOf(typeof(char), receivedMsg.ApplicationProperties["char"]); + Assert.IsInstanceOf(typeof(short), receivedMsg.ApplicationProperties["short"]); + Assert.IsInstanceOf(typeof(ushort), receivedMsg.ApplicationProperties["ushort"]); + Assert.IsInstanceOf(typeof(int), receivedMsg.ApplicationProperties["int"]); + Assert.IsInstanceOf(typeof(uint), receivedMsg.ApplicationProperties["uint"]); + Assert.IsInstanceOf(typeof(long), receivedMsg.ApplicationProperties["long"]); + Assert.IsInstanceOf(typeof(ulong), receivedMsg.ApplicationProperties["ulong"]); + Assert.IsInstanceOf(typeof(float), receivedMsg.ApplicationProperties["float"]); + Assert.IsInstanceOf(typeof(double), receivedMsg.ApplicationProperties["double"]); + Assert.IsInstanceOf(typeof(decimal), receivedMsg.ApplicationProperties["decimal"]); + Assert.IsInstanceOf(typeof(bool), receivedMsg.ApplicationProperties["bool"]); + Assert.IsInstanceOf(typeof(Guid), receivedMsg.ApplicationProperties["Guid"]); + Assert.IsInstanceOf(typeof(string), receivedMsg.ApplicationProperties["string"]); + Assert.IsInstanceOf(typeof(Uri), receivedMsg.ApplicationProperties["Uri"]); + Assert.IsInstanceOf(typeof(DateTime), receivedMsg.ApplicationProperties["DateTime"]); + Assert.IsInstanceOf(typeof(DateTimeOffset), receivedMsg.ApplicationProperties["DateTimeOffset"]); + Assert.IsInstanceOf(typeof(TimeSpan), receivedMsg.ApplicationProperties["TimeSpan"]); } } @@ -100,10 +100,10 @@ public async Task CreateFromReceivedMessageCopiesProperties() msg.Body = new BinaryData(GetRandomBuffer(100)); msg.ContentType = "contenttype"; msg.CorrelationId = "correlationid"; - msg.Label = "label"; + msg.Subject = "label"; msg.MessageId = "messageId"; msg.PartitionKey = "key"; - msg.Properties.Add("testProp", "my prop"); + msg.ApplicationProperties.Add("testProp", "my prop"); msg.ReplyTo = "replyto"; msg.ReplyToSessionId = "replytosession"; msg.ScheduledEnqueueTime = DateTimeOffset.Now; @@ -128,12 +128,12 @@ void AssertMessagesEqual(ServiceBusMessage sentMessage, ServiceBusReceivedMessag Assert.IsTrue(received.Body.ToBytes().ToArray().SequenceEqual(sentMessage.Body.ToBytes().ToArray())); Assert.AreEqual(received.ContentType, sentMessage.ContentType); Assert.AreEqual(received.CorrelationId, sentMessage.CorrelationId); - Assert.AreEqual(received.Label, sentMessage.Label); + Assert.AreEqual(received.Subject, sentMessage.Subject); Assert.AreEqual(received.ContentType, sentMessage.ContentType); Assert.AreEqual(received.CorrelationId, sentMessage.CorrelationId); Assert.AreEqual(received.MessageId, sentMessage.MessageId); Assert.AreEqual(received.PartitionKey, sentMessage.PartitionKey); - Assert.AreEqual((string)received.Properties["testProp"], (string)sentMessage.Properties["testProp"]); + Assert.AreEqual((string)received.ApplicationProperties["testProp"], (string)sentMessage.ApplicationProperties["testProp"]); Assert.AreEqual(received.ReplyTo, sentMessage.ReplyTo); Assert.AreEqual(received.ReplyToSessionId, sentMessage.ReplyToSessionId); Assert.AreEqual(received.ScheduledEnqueueTime.UtcDateTime.Second, sentMessage.ScheduledEnqueueTime.UtcDateTime.Second); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageTests.cs index 2a6761bc98c8..9ba20dbfce9e 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageTests.cs @@ -60,13 +60,13 @@ public void CreateReceivedMessageViaFactory() Assert.AreEqual(default(string), receivedMessage.ReplyToSessionId); Assert.AreEqual(TimeSpan.MaxValue, receivedMessage.TimeToLive); Assert.AreEqual(default(string), receivedMessage.CorrelationId); - Assert.AreEqual(default(string), receivedMessage.Label); + Assert.AreEqual(default(string), receivedMessage.Subject); Assert.AreEqual(default(string), receivedMessage.To); Assert.AreEqual(default(string), receivedMessage.ContentType); Assert.AreEqual(default(string), receivedMessage.ReplyTo); Assert.AreEqual(default(DateTimeOffset), receivedMessage.ScheduledEnqueueTime); - Assert.IsNotNull(receivedMessage.Properties); - Assert.IsEmpty(receivedMessage.Properties); + Assert.IsNotNull(receivedMessage.ApplicationProperties); + Assert.IsEmpty(receivedMessage.ApplicationProperties); Assert.AreEqual(default(Guid), receivedMessage.LockTokenGuid); Assert.AreEqual(default(int), receivedMessage.DeliveryCount); Assert.AreEqual(default(DateTimeOffset), receivedMessage.LockedUntil); @@ -110,22 +110,22 @@ public void CreateReceivedMessageViaFactory() Assert.AreEqual("replyToSessionId4556", receivedMessage.ReplyToSessionId); Assert.AreEqual(TimeSpan.FromMinutes(5).ToString(), receivedMessage.TimeToLive.ToString()); Assert.AreEqual("correlationId8877", receivedMessage.CorrelationId); - Assert.AreEqual("label4523", receivedMessage.Label); + Assert.AreEqual("label4523", receivedMessage.Subject); Assert.AreEqual("to9887", receivedMessage.To); Assert.AreEqual("contentType0538", receivedMessage.ContentType); Assert.AreEqual("replyTo2598", receivedMessage.ReplyTo); - Assert.AreEqual(new DateTimeOffset(fixedDate, TimeSpan.FromHours(2)).ToString(), receivedMessage.ScheduledEnqueueTime.ToString()); - Assert.IsNotNull(receivedMessage.Properties); - Assert.IsNotEmpty(receivedMessage.Properties); - Assert.AreEqual(new[]{ "42", "properties0864"}, receivedMessage.Properties.Keys); - Assert.AreEqual(new object[]{ 6420, "testValue"}, receivedMessage.Properties.Values); + Assert.AreEqual(new DateTimeOffset(fixedDate, TimeSpan.FromHours(2)).UtcDateTime, receivedMessage.ScheduledEnqueueTime.UtcDateTime); + Assert.IsNotNull(receivedMessage.ApplicationProperties); + Assert.IsNotEmpty(receivedMessage.ApplicationProperties); + Assert.AreEqual(new[]{ "42", "properties0864"}, receivedMessage.ApplicationProperties.Keys); + Assert.AreEqual(new object[]{ 6420, "testValue"}, receivedMessage.ApplicationProperties.Values); Assert.AreEqual("f5ae57c7-963b-4864-ae19-32b12451e5d8", receivedMessage.LockTokenGuid.ToString()); Assert.AreEqual(4321, receivedMessage.DeliveryCount); - Assert.AreEqual(new DateTimeOffset(fixedDate, TimeSpan.FromMinutes(18)).ToString(), receivedMessage.LockedUntil.ToString()); + Assert.AreEqual(new DateTimeOffset(fixedDate, TimeSpan.FromMinutes(18)).UtcDateTime, receivedMessage.LockedUntil.UtcDateTime); Assert.AreEqual(3456, receivedMessage.SequenceNumber); Assert.AreEqual("deadLetterSource5773", receivedMessage.DeadLetterSource); Assert.AreEqual(7632, receivedMessage.EnqueuedSequenceNumber); - Assert.AreEqual(new DateTimeOffset(fixedDate, TimeSpan.FromSeconds(120)).ToString(), receivedMessage.EnqueuedTime.ToString()); + Assert.AreEqual(new DateTimeOffset(fixedDate, TimeSpan.FromSeconds(120)).UtcDateTime, receivedMessage.EnqueuedTime.UtcDateTime); } } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Plugins/PluginLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Plugins/PluginLiveTests.cs index f0beaf173fe0..ffc2d8d1f20c 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Plugins/PluginLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Plugins/PluginLiveTests.cs @@ -30,8 +30,8 @@ public async Task OrderOfPluginsRespected() await sender.SendMessageAsync(sendMessage); var receivedMessage = await receiver.ReceiveMessageAsync(); - var firstSendPluginUserProperty = (bool)receivedMessage.Properties["FirstSendPlugin"]; - var secondSendPluginUserProperty = (bool)receivedMessage.Properties["SecondSendPlugin"]; + var firstSendPluginUserProperty = (bool)receivedMessage.ApplicationProperties["FirstSendPlugin"]; + var secondSendPluginUserProperty = (bool)receivedMessage.ApplicationProperties["SecondSendPlugin"]; Assert.True(firstSendPluginUserProperty); Assert.True(secondSendPluginUserProperty); @@ -252,7 +252,7 @@ private class FirstPlugin : ServiceBusPlugin { public override ValueTask BeforeMessageSendAsync(ServiceBusMessage message) { - message.Properties.Add("FirstSendPlugin", true); + message.ApplicationProperties.Add("FirstSendPlugin", true); return default; } } @@ -264,8 +264,8 @@ private class SecondPlugin : ServiceBusPlugin public override ValueTask BeforeMessageSendAsync(ServiceBusMessage message) { // Ensure that the first plugin actually ran first - Assert.True((bool)message.Properties["FirstSendPlugin"]); - message.Properties.Add("SecondSendPlugin", true); + Assert.True((bool)message.ApplicationProperties["FirstSendPlugin"]); + message.ApplicationProperties.Add("SecondSendPlugin", true); return default; } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Plugins/PluginTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Plugins/PluginTests.cs index 7124b2ef273e..dd63f46adbb3 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Plugins/PluginTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Plugins/PluginTests.cs @@ -21,10 +21,10 @@ public void PluginSetsReceivedMessageProperties() plugin.AfterMessageReceiveAsync(msg); Assert.AreEqual("body", msg.Body.ToString()); Assert.AreEqual("contentType", msg.ContentType); - Assert.AreEqual("propertyValue", msg.Properties["propertyKey"]); + Assert.AreEqual("propertyValue", msg.ApplicationProperties["propertyKey"]); Assert.AreEqual("deadLetterDescription", msg.DeadLetterErrorDescription); Assert.AreEqual("deadLetterReason", msg.DeadLetterReason); - Assert.AreEqual("label", msg.Label); + Assert.AreEqual("label", msg.Subject); Assert.AreEqual("messageId", msg.MessageId); Assert.AreEqual("partitionKey", msg.PartitionKey); Assert.AreEqual("replyTo", msg.ReplyTo); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs index 7a681d41271b..41ba93135e19 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs @@ -523,8 +523,8 @@ public async Task DeadLetterMessagesSubscription(bool useSpecificSession) Assert.AreEqual(messageEnum.Current.SessionId, msg.SessionId); Assert.IsNull(msg.DeadLetterErrorDescription); Assert.IsNull(msg.DeadLetterReason); - Assert.IsNotNull(msg.Properties[ServiceBusReceivedMessage.DeadLetterReasonHeader]); - Assert.IsNotNull(msg.Properties[ServiceBusReceivedMessage.DeadLetterErrorDescriptionHeader]); + Assert.IsNotNull(msg.ApplicationProperties[ServiceBusReceivedMessage.DeadLetterReasonHeader]); + Assert.IsNotNull(msg.ApplicationProperties[ServiceBusReceivedMessage.DeadLetterErrorDescriptionHeader]); await deadLetterReceiver.CompleteMessageAsync(msg.LockToken); } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/RuleManager/RuleManagerLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/RuleManager/RuleManagerLiveTests.cs index 9ffbd7b0e0ce..9f0a5e8db552 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/RuleManager/RuleManagerLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/RuleManager/RuleManagerLiveTests.cs @@ -38,7 +38,7 @@ public async Task AddGetAndRemoveRules() Filter = new CorrelationRuleFilter { CorrelationId = "correlationId", - Label = "label", + Subject = "label", MessageId = "messageId", Properties = { @@ -72,7 +72,7 @@ public async Task AddGetAndRemoveRules() var correlationRuleFilter = correlationRule.Filter as CorrelationRuleFilter; Assert.NotNull(correlationRuleFilter); Assert.AreEqual("correlationId", correlationRuleFilter.CorrelationId); - Assert.AreEqual("label", correlationRuleFilter.Label); + Assert.AreEqual("label", correlationRuleFilter.Subject); Assert.AreEqual("messageId", correlationRuleFilter.MessageId); Assert.AreEqual("replyTo", correlationRuleFilter.ReplyTo); Assert.AreEqual("replyToSessionId", correlationRuleFilter.ReplyToSessionId); @@ -100,13 +100,13 @@ public async Task ThrowIfAddSameRuleNameTwice() await ruleManager.AddRuleAsync(new RuleProperties { - Filter = new CorrelationRuleFilter { Label = "yellow" }, + Filter = new CorrelationRuleFilter { Subject = "yellow" }, Name = "CorrelationRuleFilter" }); Assert.That(async () => await ruleManager.AddRuleAsync(new RuleProperties { - Filter = new CorrelationRuleFilter { Label = "red" }, + Filter = new CorrelationRuleFilter { Subject = "red" }, Name = "CorrelationRuleFilter" }), Throws.InstanceOf()); } @@ -233,7 +233,7 @@ public async Task CorrelationRuleFilterOnTheMessageProperties() await ruleManager.AddRuleAsync(new RuleProperties { - Filter = new CorrelationRuleFilter { Label = "red" }, + Filter = new CorrelationRuleFilter { Subject = "red" }, Name = "CorrelationMsgPropertyRule" }); @@ -324,7 +324,7 @@ await ruleManager.AddRuleAsync(new RuleProperties expectedOrders); foreach (var message in receivedMessages) { - Assert.AreEqual("high", message.Properties["priority"], "Priority of the receivedMessage is different than expected"); + Assert.AreEqual("high", message.ApplicationProperties["priority"], "Priority of the receivedMessage is different than expected"); } } } @@ -438,7 +438,7 @@ await ruleManager.AddRuleAsync(new RuleProperties expectedOrders); foreach (var message in receivedMessages) { - Assert.AreEqual("high", message.Properties["priority"], "Priority of the receivedMessage is different than expected"); + Assert.AreEqual("high", message.ApplicationProperties["priority"], "Priority of the receivedMessage is different than expected"); } } } @@ -540,8 +540,8 @@ private async Task SendMessages(ServiceBusClient client, string topicName) var message = new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(Orders[i]))) { CorrelationId = Orders[i].Priority, - Label = Orders[i].Color, - Properties = + Subject = Orders[i].Color, + ApplicationProperties = { { "color", Orders[i].Color }, { "quantity", Orders[i].Quantity }, @@ -568,7 +568,7 @@ private async Task> ReceiveAndAssertMessages( { receivedMessages.Add(item); messageEnum.MoveNext(); - Assert.AreEqual(messageEnum.Current.Color, item.Label); + Assert.AreEqual(messageEnum.Current.Color, item.Subject); remainingMessages--; } } From a52952c1e60cbfb57364d1cacd90388147192423 Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Fri, 28 Aug 2020 12:54:35 -0700 Subject: [PATCH 2/6] Export api --- .../Azure.Core.Experimental.netstandard2.0.cs | 51 +++++++++++++++++++ ...ure.Messaging.ServiceBus.netstandard2.0.cs | 14 ++--- 2 files changed, 59 insertions(+), 6 deletions(-) diff --git a/sdk/core/Azure.Core.Experimental/api/Azure.Core.Experimental.netstandard2.0.cs b/sdk/core/Azure.Core.Experimental/api/Azure.Core.Experimental.netstandard2.0.cs index 07f7bc5d3501..0357c48485fe 100644 --- a/sdk/core/Azure.Core.Experimental/api/Azure.Core.Experimental.netstandard2.0.cs +++ b/sdk/core/Azure.Core.Experimental/api/Azure.Core.Experimental.netstandard2.0.cs @@ -105,6 +105,57 @@ public void AppendTest(string path, string rawJsonValue) { } public override string ToString() { throw null; } } } +namespace Azure.Core.Amqp +{ + public partial class AmqpAnnotatedMessage + { + public AmqpAnnotatedMessage(Azure.Core.Amqp.AmqpAnnotatedMessage message) { } + public AmqpAnnotatedMessage(System.Collections.Generic.IEnumerable dataBody) { } + public System.Collections.Generic.IDictionary ApplicationProperties { get { throw null; } set { } } + public Azure.Core.Amqp.AmqpMessageBody Body { get { throw null; } set { } } + public System.Collections.Generic.IDictionary DeliveryAnnotations { get { throw null; } set { } } + public System.Collections.Generic.IDictionary Footer { get { throw null; } set { } } + public Azure.Core.Amqp.AmqpMessageHeader Header { get { throw null; } set { } } + public System.Collections.Generic.IDictionary MessageAnnotations { get { throw null; } set { } } + public Azure.Core.Amqp.AmqpMessageProperties Properties { get { throw null; } set { } } + } + public partial class AmqpDataBody : Azure.Core.Amqp.AmqpMessageBody + { + public AmqpDataBody(System.Collections.Generic.IEnumerable data) { } + public System.Collections.Generic.IEnumerable Data { get { throw null; } } + } + public abstract partial class AmqpMessageBody + { + protected AmqpMessageBody() { } + } + public partial class AmqpMessageHeader + { + public AmqpMessageHeader() { } + public uint? DeliveryCount { get { throw null; } set { } } + public bool? Durable { get { throw null; } set { } } + public bool? FirstAcquirer { get { throw null; } set { } } + public byte? Priority { get { throw null; } set { } } + public System.TimeSpan? TimeToLive { get { throw null; } set { } } + } + public partial class AmqpMessageProperties + { + public AmqpMessageProperties() { } + public AmqpMessageProperties(Azure.Core.Amqp.AmqpMessageProperties properties) { } + public System.DateTime? AbsoluteExpiryTime { get { throw null; } set { } } + public string? ContentEncoding { get { throw null; } set { } } + public string? ContentType { get { throw null; } set { } } + public string? CorrelationId { get { throw null; } set { } } + public System.DateTime? CreationTime { get { throw null; } set { } } + public string? GroupId { get { throw null; } set { } } + public uint? GroupSequence { get { throw null; } set { } } + public string? MessageId { get { throw null; } set { } } + public string? ReplyTo { get { throw null; } set { } } + public string? ReplyToGroupId { get { throw null; } set { } } + public string? Subject { get { throw null; } set { } } + public string? To { get { throw null; } set { } } + public Azure.BinaryData? UserId { get { throw null; } set { } } + } +} namespace Azure.Core.GeoJson { public sealed partial class GeoBoundingBox : System.IEquatable diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs index 2a4048d4cdda..0a7f93d53e7d 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs @@ -152,17 +152,18 @@ public ServiceBusMessage(Azure.BinaryData body) { } public ServiceBusMessage(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage receivedMessage) { } public ServiceBusMessage(System.ReadOnlyMemory body) { } public ServiceBusMessage(string body) { } + public Azure.Core.Amqp.AmqpAnnotatedMessage AmqpMessage { get { throw null; } set { } } + public System.Collections.Generic.IDictionary ApplicationProperties { get { throw null; } } public Azure.BinaryData Body { get { throw null; } set { } } public string ContentType { get { throw null; } set { } } public string CorrelationId { get { throw null; } set { } } - public string Label { get { throw null; } set { } } public string MessageId { get { throw null; } set { } } public string PartitionKey { get { throw null; } set { } } - public System.Collections.Generic.IDictionary Properties { get { throw null; } } public string ReplyTo { get { throw null; } set { } } public string ReplyToSessionId { get { throw null; } set { } } public System.DateTimeOffset ScheduledEnqueueTime { get { throw null; } set { } } public string SessionId { get { throw null; } set { } } + public string Subject { get { throw null; } set { } } public System.TimeSpan TimeToLive { get { throw null; } set { } } public string To { get { throw null; } set { } } public string ViaPartitionKey { get { throw null; } set { } } @@ -181,7 +182,7 @@ public void Dispose() { } public static partial class ServiceBusModelFactory { [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] - public static Azure.Messaging.ServiceBus.ServiceBusReceivedMessage ServiceBusReceivedMessage(Azure.BinaryData body = default(Azure.BinaryData), string messageId = null, string partitionKey = null, string viaPartitionKey = null, string sessionId = null, string replyToSessionId = null, System.TimeSpan timeToLive = default(System.TimeSpan), string correlationId = null, string label = null, string to = null, string contentType = null, string replyTo = null, System.DateTimeOffset scheduledEnqueueTime = default(System.DateTimeOffset), System.Collections.Generic.IDictionary properties = null, System.Guid lockTokenGuid = default(System.Guid), int deliveryCount = 0, System.DateTimeOffset lockedUntil = default(System.DateTimeOffset), long sequenceNumber = (long)-1, string deadLetterSource = null, long enqueuedSequenceNumber = (long)0, System.DateTimeOffset enqueuedTime = default(System.DateTimeOffset)) { throw null; } + public static Azure.Messaging.ServiceBus.ServiceBusReceivedMessage ServiceBusReceivedMessage(Azure.BinaryData body = default(Azure.BinaryData), string messageId = null, string partitionKey = null, string viaPartitionKey = null, string sessionId = null, string replyToSessionId = null, System.TimeSpan timeToLive = default(System.TimeSpan), string correlationId = null, string subject = null, string to = null, string contentType = null, string replyTo = null, System.DateTimeOffset scheduledEnqueueTime = default(System.DateTimeOffset), System.Collections.Generic.IDictionary properties = null, System.Guid lockTokenGuid = default(System.Guid), int deliveryCount = 0, System.DateTimeOffset lockedUntil = default(System.DateTimeOffset), long sequenceNumber = (long)-1, string deadLetterSource = null, long enqueuedSequenceNumber = (long)0, System.DateTimeOffset enqueuedTime = default(System.DateTimeOffset)) { throw null; } } public partial class ServiceBusProcessor { @@ -225,6 +226,8 @@ public ServiceBusProcessorOptions() { } public partial class ServiceBusReceivedMessage { internal ServiceBusReceivedMessage() { } + public Azure.Core.Amqp.AmqpAnnotatedMessage AmqpMessage { get { throw null; } } + public System.Collections.Generic.IReadOnlyDictionary ApplicationProperties { get { throw null; } } public Azure.BinaryData Body { get { throw null; } } public string ContentType { get { throw null; } } public string CorrelationId { get { throw null; } } @@ -235,17 +238,16 @@ internal ServiceBusReceivedMessage() { } public long EnqueuedSequenceNumber { get { throw null; } } public System.DateTimeOffset EnqueuedTime { get { throw null; } } public System.DateTimeOffset ExpiresAt { get { throw null; } } - public string Label { get { throw null; } } public System.DateTimeOffset LockedUntil { get { throw null; } } public string LockToken { get { throw null; } } public string MessageId { get { throw null; } } public string PartitionKey { get { throw null; } } - public System.Collections.Generic.IReadOnlyDictionary Properties { get { throw null; } } public string ReplyTo { get { throw null; } } public string ReplyToSessionId { get { throw null; } } public System.DateTimeOffset ScheduledEnqueueTime { get { throw null; } } public long SequenceNumber { get { throw null; } } public string SessionId { get { throw null; } } + public string Subject { get { throw null; } } public System.TimeSpan TimeToLive { get { throw null; } } public string To { get { throw null; } } public string ViaPartitionKey { get { throw null; } } @@ -467,12 +469,12 @@ public CorrelationRuleFilter() { } public CorrelationRuleFilter(string correlationId) { } public string ContentType { get { throw null; } set { } } public string CorrelationId { get { throw null; } set { } } - public string Label { get { throw null; } set { } } public string MessageId { get { throw null; } set { } } public System.Collections.Generic.IDictionary Properties { get { throw null; } } public string ReplyTo { get { throw null; } set { } } public string ReplyToSessionId { get { throw null; } set { } } public string SessionId { get { throw null; } set { } } + public string Subject { get { throw null; } set { } } public string To { get { throw null; } set { } } public override bool Equals(Azure.Messaging.ServiceBus.Management.RuleFilter other) { throw null; } public override bool Equals(object obj) { throw null; } From 078d6c82393080db8ca217466e3956b7827f7fd9 Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Fri, 28 Aug 2020 14:49:01 -0700 Subject: [PATCH 3/6] PR feedback --- .../src/Amqp/AmqpAnnotatedMessage.cs | 8 ++--- .../src/Amqp/AmqpDataBody.cs | 2 +- .../src/Amqp/AmqpMessageBody.cs | 2 +- .../src/Amqp/AmqpMessageHeader.cs | 9 ++++-- .../src/Amqp/AmqpMessageProperties.cs | 4 +-- .../src/Amqp/AmqpMessageConstants.cs | 4 +++ .../src/Amqp/AmqpMessageConverter.cs | 13 ++++---- .../src/Amqp/AmqpMessageExtensions.cs | 2 +- .../Framing/AmqpCorrelationRuleFilterCodec.cs | 8 ++--- .../Management/Rules/CorrelationRuleFilter.cs | 4 +-- .../src/Plugins/ServiceBusPlugin.cs | 2 +- .../src/Primitives/ServiceBusMessage.cs | 30 +++++++++++-------- .../Primitives/ServiceBusReceivedMessage.cs | 10 ++++--- 13 files changed, 57 insertions(+), 41 deletions(-) diff --git a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpAnnotatedMessage.cs b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpAnnotatedMessage.cs index bec2df590f2c..a0f2d4e54e18 100644 --- a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpAnnotatedMessage.cs +++ b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpAnnotatedMessage.cs @@ -7,7 +7,7 @@ namespace Azure.Core.Amqp { /// /// Represents an AMQP message. - /// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format + /// /// public class AmqpAnnotatedMessage { @@ -29,7 +29,9 @@ public AmqpAnnotatedMessage(AmqpAnnotatedMessage message) /// /// Creates a new Data body . /// - /// The data body sections. + /// The data sections comprising the message body. + /// + /// public AmqpAnnotatedMessage(IEnumerable dataBody) { Body = new AmqpDataBody(dataBody); @@ -69,7 +71,5 @@ public AmqpAnnotatedMessage(IEnumerable dataBody) /// The body of the AMQP message. /// public AmqpMessageBody Body { get; set; } - - } } diff --git a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpDataBody.cs b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpDataBody.cs index 604524160d0f..4f2a77ff14bc 100644 --- a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpDataBody.cs +++ b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpDataBody.cs @@ -8,7 +8,7 @@ namespace Azure.Core.Amqp /// /// Represents the data body of an AMQP message. /// This consists of one or more data sections. - /// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-data + /// /// public class AmqpDataBody : AmqpMessageBody { diff --git a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageBody.cs b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageBody.cs index 42aec5a7c61a..016ca9ce90d0 100644 --- a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageBody.cs +++ b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageBody.cs @@ -5,7 +5,7 @@ namespace Azure.Core.Amqp { /// /// Represents an AMQP message body. - /// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format + /// /// public abstract class AmqpMessageBody { diff --git a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageHeader.cs b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageHeader.cs index 3db2ca2a2fb3..f8210a8a3d7e 100644 --- a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageHeader.cs +++ b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageHeader.cs @@ -7,7 +7,7 @@ namespace Azure.Core.Amqp { /// /// Represents an AMQP message header. - /// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header + /// /// public class AmqpMessageHeader { @@ -16,6 +16,10 @@ public class AmqpMessageHeader /// public AmqpMessageHeader() { } + /// + /// Initializes a new instance by copying the passed in header. + /// + /// The header to copy. internal AmqpMessageHeader(AmqpMessageHeader header) { Durable = header.Durable; @@ -26,7 +30,7 @@ internal AmqpMessageHeader(AmqpMessageHeader header) } /// - /// The durable value from the AMQP message header. + /// The durable value from the AMQP transport header. /// public bool? Durable { get; set; } @@ -49,6 +53,5 @@ internal AmqpMessageHeader(AmqpMessageHeader header) /// The delivery-count value from the AMQP message header. /// public uint? DeliveryCount { get; set; } - } } diff --git a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageProperties.cs b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageProperties.cs index 32b2cae62637..3426a4e3b93b 100644 --- a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageProperties.cs +++ b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageProperties.cs @@ -7,7 +7,7 @@ namespace Azure.Core.Amqp { /// /// Represents the AMQP message properties. - /// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties + /// /// public class AmqpMessageProperties { @@ -19,7 +19,7 @@ public AmqpMessageProperties() { } /// /// Initializes a new instance by copying the passed in properties. /// - /// + /// The properties to copy. public AmqpMessageProperties(AmqpMessageProperties properties) { MessageId = properties.MessageId; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConstants.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConstants.cs index b76bffc1a27d..f020cdde37dd 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConstants.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConstants.cs @@ -5,6 +5,10 @@ namespace Azure.Messaging.ServiceBus.Amqp { + /// + /// Service Bus specific constants that are used to identify keys for various + /// MessageAnnotations in an AMQP message. + /// internal class AmqpMessageConstants { internal const string EnqueuedTimeUtcName = "x-opt-enqueued-time"; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs index 81b2e29eacf2..24696df46467 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs @@ -21,7 +21,10 @@ namespace Azure.Messaging.ServiceBus.Amqp { internal static class AmqpMessageConverter { - internal const int GuidSize = 16; + /// + /// The size, in bytes, to use for extracting the delivery tag bytes into . + /// + private const int GuidSizeInBytes = 16; /// The size, in bytes, to use as a buffer for stream operations. private const int StreamBufferSizeInBytes = 512; @@ -335,10 +338,10 @@ public static ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqpM } } - if (amqpMessage.DeliveryTag.Count == GuidSize) + if (amqpMessage.DeliveryTag.Count == GuidSizeInBytes) { - var guidBuffer = new byte[GuidSize]; - Buffer.BlockCopy(amqpMessage.DeliveryTag.Array, amqpMessage.DeliveryTag.Offset, guidBuffer, 0, GuidSize); + var guidBuffer = new byte[GuidSizeInBytes]; + Buffer.BlockCopy(amqpMessage.DeliveryTag.Array, amqpMessage.DeliveryTag.Offset, guidBuffer, 0, GuidSizeInBytes); sbMessage.LockTokenGuid = new Guid(guidBuffer); } @@ -416,7 +419,7 @@ public static RuleFilter GetFilter(AmqpRuleFilterCodec amqpFilter) MessageId = amqpCorrelationFilter.MessageId, To = amqpCorrelationFilter.To, ReplyTo = amqpCorrelationFilter.ReplyTo, - Subject = amqpCorrelationFilter.Label, + Subject = amqpCorrelationFilter.Subject, SessionId = amqpCorrelationFilter.SessionId, ReplyToSessionId = amqpCorrelationFilter.ReplyToSessionId, ContentType = amqpCorrelationFilter.ContentType diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs index 29eee89f56fb..0c213b37c641 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs @@ -62,7 +62,7 @@ public static IEnumerable GetDataViaDataBody(this AmqpMessage messag // The method is optimized for this situation to return the pre-existing array. public static BinaryData ConvertAndFlattenData(this IEnumerable dataList) { - ReadOnlyMemory flattened = null; + ReadOnlyMemory flattened = default; List flattenedList = null; var dataCount = 0; foreach (BinaryData data in dataList) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/Framing/AmqpCorrelationRuleFilterCodec.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/Framing/AmqpCorrelationRuleFilterCodec.cs index c5bc291b597d..5c27cd000007 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/Framing/AmqpCorrelationRuleFilterCodec.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/Framing/AmqpCorrelationRuleFilterCodec.cs @@ -27,7 +27,7 @@ public AmqpCorrelationRuleFilterCodec() : base(Name, Code) public string ReplyTo { get; set; } - public string Label { get; set; } + public string Subject { get; set; } public string SessionId { get; set; } @@ -54,7 +54,7 @@ protected override void OnEncode(ByteBuffer buffer) AmqpCodec.EncodeString(MessageId, buffer); AmqpCodec.EncodeString(To, buffer); AmqpCodec.EncodeString(ReplyTo, buffer); - AmqpCodec.EncodeString(Label, buffer); + AmqpCodec.EncodeString(Subject, buffer); AmqpCodec.EncodeString(SessionId, buffer); AmqpCodec.EncodeString(ReplyToSessionId, buffer); AmqpCodec.EncodeString(ContentType, buffer); @@ -85,7 +85,7 @@ protected override void OnDecode(ByteBuffer buffer, int count) if (count-- > 0) { - Label = AmqpCodec.DecodeString(buffer); + Subject = AmqpCodec.DecodeString(buffer); } if (count-- > 0) @@ -115,7 +115,7 @@ protected override int OnValueSize() AmqpCodec.GetStringEncodeSize(MessageId) + AmqpCodec.GetStringEncodeSize(To) + AmqpCodec.GetStringEncodeSize(ReplyTo) + - AmqpCodec.GetStringEncodeSize(Label) + + AmqpCodec.GetStringEncodeSize(Subject) + AmqpCodec.GetStringEncodeSize(SessionId) + AmqpCodec.GetStringEncodeSize(ReplyToSessionId) + AmqpCodec.GetStringEncodeSize(ContentType) + diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Management/Rules/CorrelationRuleFilter.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Management/Rules/CorrelationRuleFilter.cs index 90c1f0681edc..ace6fc77d18a 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Management/Rules/CorrelationRuleFilter.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Management/Rules/CorrelationRuleFilter.cs @@ -106,9 +106,9 @@ public string ReplyTo } /// - /// Application specific label. + /// Application specific subject. /// - /// The application specific label. + /// The application specific subject. public string Subject { get; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Plugins/ServiceBusPlugin.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Plugins/ServiceBusPlugin.cs index 82fd570c8d82..cf39d29a6e15 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Plugins/ServiceBusPlugin.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Plugins/ServiceBusPlugin.cs @@ -140,7 +140,7 @@ protected void SetReplyToSessionId(ServiceBusReceivedMessage message, string rep /// The session ID to set on the message. protected void SetSessionId(ServiceBusReceivedMessage message, string sessionId) { - message.AmqpMessage.Properties.GroupId= sessionId; + message.AmqpMessage.Properties.GroupId = sessionId; } /// diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusMessage.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusMessage.cs index e509495e4065..05533b881071 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusMessage.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusMessage.cs @@ -15,7 +15,8 @@ namespace Azure.Messaging.ServiceBus /// When receiving messages, the is used. /// /// - /// The message structure is discussed in detail in the product documentation. + /// The message structure is discussed in detail in the + /// product documentation. /// public class ServiceBusMessage { @@ -93,7 +94,7 @@ public BinaryData Body /// The message identifier is an application-defined value that uniquely identifies the /// message and its payload. The identifier is a free-form string and can reflect a GUID /// or an identifier derived from the application context. If enabled, the - /// duplicate detection + /// duplicate detection /// feature identifies and removes second and further submissions of messages with the /// same MessageId. /// @@ -111,7 +112,7 @@ public string MessageId /// Gets or sets a partition key for sending a message to a partitioned entity. /// The partition key. Maximum length is 128 characters. /// - /// For partitioned entities, + /// For partitioned entities, /// setting this value enables assigning related messages to the same internal partition, so that submission sequence /// order is correctly recorded. The partition is chosen by a hash function over this value and cannot be chosen /// directly. For session-aware entities, the property overrides this value. @@ -135,7 +136,7 @@ public string PartitionKey /// If a message is sent via a transfer queue in the scope of a transaction, this value selects the /// transfer queue partition: This is functionally equivalent to and ensures that /// messages are kept together and in order as they are transferred. - /// See Transfers and Send Via. + /// See Transfers and Send Via. /// public string ViaPartitionKey { @@ -157,7 +158,7 @@ public string ViaPartitionKey /// affiliation of the message. Messages with the same session identifier are subject /// to summary locking and enable exact in-order processing and demultiplexing. /// For session-unaware entities, this value is ignored. - /// See Message Sessions. + /// See Message Sessions. /// public string SessionId { @@ -174,7 +175,8 @@ public string SessionId /// Session identifier. Maximum length is 128 characters. /// /// This value augments the ReplyTo information and specifies which SessionId should be set - /// for the reply when sent to the reply entity. See Message Routing and Correlation + /// for the reply when sent to the reply entity. + /// See Message Routing and Correlation /// public string ReplyToSessionId { @@ -196,7 +198,7 @@ public string ReplyToSessionId /// When not set explicitly, the assumed value is the DefaultTimeToLive for the respective queue or topic. /// A message-level value cannot be longer than the entity's DefaultTimeToLive /// setting and it is silently adjusted if it does. - /// See. Expiration + /// See Expiration. /// public TimeSpan TimeToLive { @@ -216,7 +218,7 @@ public TimeSpan TimeToLive /// /// Allows an application to specify a context for the message for the purposes of correlation, /// for example reflecting the MessageId of a message that is being replied to. - /// See Message Routing and Correlation. + /// See Message Routing and Correlation. /// public string CorrelationId { @@ -230,8 +232,8 @@ public string CorrelationId } } - /// Gets or sets an application specific label. - /// The application specific label + /// Gets or sets an application specific subject. + /// The application specific subject. /// /// This property enables the application to indicate the purpose of the message to the receiver in a standardized /// fashion, similar to an email subject line. The mapped AMQP property is "subject". @@ -253,7 +255,7 @@ public string Subject /// /// This property is reserved for future use in routing scenarios and presently ignored by the broker itself. /// Applications can use this value in rule-driven - /// auto-forward chaining scenarios to indicate the + /// auto-forward chaining scenarios to indicate the /// intended logical destination of the message. /// public string To @@ -292,7 +294,7 @@ public string ContentType /// This optional and application-defined value is a standard way to express a reply path /// to the receiver of the message. When a sender expects a reply, it sets the value to the /// absolute or relative path of the queue or topic it expects the reply to be sent to. - /// See Message Routing and Correlation. + /// See Message Routing and Correlation. /// public string ReplyTo { @@ -325,7 +327,9 @@ public DateTimeOffset ScheduledEnqueueTime } /// - /// + /// Gets or sets the raw Amqp message data that will be transmitted over the wire. + /// This can be used to enable scenarios that require setting AMQP header, footer, property, or annotation + /// data that is not exposed as top level properties in the ServiceBusMessage. /// public AmqpAnnotatedMessage AmqpMessage { get; set; } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusReceivedMessage.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusReceivedMessage.cs index 2b6a25b62662..6067a8d0cccb 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusReceivedMessage.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusReceivedMessage.cs @@ -20,10 +20,10 @@ public class ServiceBusReceivedMessage /// /// Creates a new message from the specified payload. /// - /// The payload of the message in bytes + /// The payload of the message represented as bytes. internal ServiceBusReceivedMessage(ReadOnlyMemory body) + : this(new AmqpAnnotatedMessage(new BinaryData[] { BinaryData.FromBytes(body) })) { - AmqpMessage = new AmqpAnnotatedMessage(new BinaryData[] { BinaryData.FromBytes(body) }); } /// @@ -44,7 +44,9 @@ internal ServiceBusReceivedMessage(): this(body: default) internal bool IsSettled { get; set; } /// - /// + /// Gets the raw Amqp message data that will be transmitted over the wire. + /// This can be used to enable scenarios that require reading AMQP header, footer, property, or annotation + /// data that is not exposed as top level properties in the ServiceBusMessage. /// public AmqpAnnotatedMessage AmqpMessage { get; internal set; } @@ -186,7 +188,7 @@ public BinaryData Body public DateTimeOffset ScheduledEnqueueTime => AmqpMessage.GetScheduledEnqueueTime(); /// - /// Gets the "user properties" bag, which can be used for custom message metadata. + /// Gets the application properties bag, which can be used for custom message metadata. /// /// /// Only following value types are supported: From 0380574c852058592b174b32fcd8aab2aac4ed18 Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Fri, 28 Aug 2020 14:50:38 -0700 Subject: [PATCH 4/6] Update doc --- .../src/Amqp/AmqpMessageHeader.cs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageHeader.cs b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageHeader.cs index f8210a8a3d7e..4401b2b39c6d 100644 --- a/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageHeader.cs +++ b/sdk/core/Azure.Core.Experimental/src/Amqp/AmqpMessageHeader.cs @@ -6,7 +6,7 @@ namespace Azure.Core.Amqp { /// - /// Represents an AMQP message header. + /// Represents an AMQP message transport header. /// /// public class AmqpMessageHeader @@ -17,9 +17,10 @@ public class AmqpMessageHeader public AmqpMessageHeader() { } /// - /// Initializes a new instance by copying the passed in header. + /// Initializes a new instance by copying the passed in + /// AMQP message transport header. /// - /// The header to copy. + /// The AMQP message transport header to copy. internal AmqpMessageHeader(AmqpMessageHeader header) { Durable = header.Durable; @@ -30,27 +31,27 @@ internal AmqpMessageHeader(AmqpMessageHeader header) } /// - /// The durable value from the AMQP transport header. + /// The durable value from the AMQP message transport header. /// public bool? Durable { get; set; } /// - /// The priority value from the AMQP message header. + /// The priority value from the AMQP message transport header. /// public byte? Priority { get; set; } /// - /// The ttl value from the AMQP message header. + /// The ttl value from the AMQP message transport header. /// public TimeSpan? TimeToLive { get; set; } /// - /// The first-acquirer value from the AMQP message header. + /// The first-acquirer value from the AMQP message transport header. /// public bool? FirstAcquirer { get; set; } /// - /// The delivery-count value from the AMQP message header. + /// The delivery-count value from the AMQP message transport header. /// public uint? DeliveryCount { get; set; } } From 16694a487af9ca7803b6a9f4a12dc9dd1b071deb Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Fri, 28 Aug 2020 17:46:26 -0700 Subject: [PATCH 5/6] Improve multi data section --- .../src/Amqp/AmqpMessageExtensions.cs | 47 ++++++++----------- .../tests/Message/MessageLiveTests.cs | 33 ++++++++++++- 2 files changed, 52 insertions(+), 28 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs index 0c213b37c641..d25221fc8f28 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs @@ -15,12 +15,18 @@ internal static class AmqpMessageExtensions { public static AmqpMessage ToAmqpMessage(this ServiceBusMessage message) { - BinaryData body = ((AmqpDataBody)message.AmqpMessage.Body).Data.ConvertAndFlattenData(); - return AmqpMessage.Create( - new Data + return AmqpMessage.Create(((AmqpDataBody)message.AmqpMessage.Body).Data.AsAmqpData()); + } + + private static IEnumerable AsAmqpData(this IEnumerable binaryData) + { + foreach (BinaryData data in binaryData) + { + yield return new Data { - Value = new ArraySegment(body.ToBytes().IsEmpty ? Array.Empty() : body.ToBytes().ToArray()) - }); + Value = new ArraySegment(data.ToBytes().IsEmpty ? Array.Empty() : data.ToBytes().ToArray()) + }; + } } private static byte[] GetByteArray(this Data data) @@ -62,33 +68,20 @@ public static IEnumerable GetDataViaDataBody(this AmqpMessage messag // The method is optimized for this situation to return the pre-existing array. public static BinaryData ConvertAndFlattenData(this IEnumerable dataList) { - ReadOnlyMemory flattened = default; - List flattenedList = null; - var dataCount = 0; + var writer = new ArrayBufferWriter(); + Memory memory; foreach (BinaryData data in dataList) { - // Only the first array is needed if it is the only valid array. - // This should be the case 99% of the time. - if (dataCount == 0) - { - flattened = data; - } - else - { - // We defer creating this list since this case will rarely happen. - flattenedList ??= new List(flattened.ToArray()!); - flattenedList.AddRange(data.ToBytes().ToArray()); - } - - dataCount++; + ReadOnlyMemory bytes = data.ToBytes(); + memory = writer.GetMemory(bytes.Length); + bytes.CopyTo(memory); + writer.Advance(bytes.Length); } - - if (dataCount > 1) + if (writer.WrittenCount == 0) { - flattened = flattenedList!.ToArray(); + return new BinaryData(); } - - return BinaryData.FromBytes(flattened); + return BinaryData.FromBytes(writer.WrittenMemory); } public static string GetPartitionKey(this AmqpAnnotatedMessage message) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs index a785c0d061a9..38c04da1a56a 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs @@ -4,7 +4,7 @@ using System; using System.Linq; using System.Threading.Tasks; -using Azure.Core; +using Azure.Core.Amqp; using Azure.Core.Serialization; using NUnit.Framework; @@ -173,6 +173,37 @@ public async Task SendJsonBodyMessage() } } + [Test] + public async Task CanSendMultipleDataSections() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + var sender = client.CreateSender(scope.QueueName); + var amqp = new AmqpAnnotatedMessage( + new BinaryData[] + { + new BinaryData(GetRandomBuffer(100)), + new BinaryData(GetRandomBuffer(100)) + }); + var msg = new ServiceBusMessage() + { + AmqpMessage = amqp + }; + + await sender.SendMessageAsync(msg); + + var receiver = client.CreateReceiver(scope.QueueName); + var received = await receiver.ReceiveMessageAsync(); + var bodyEnum = ((AmqpDataBody)received.AmqpMessage.Body).Data.GetEnumerator(); + foreach (BinaryData data in ((AmqpDataBody)msg.AmqpMessage.Body).Data) + { + bodyEnum.MoveNext(); + Assert.AreEqual(data.ToBytes().ToArray(), bodyEnum.Current.ToBytes().ToArray()); + } + } + } + private class TestBody { public string A { get; set; } From d9eff17f534952e7f80d296263a55dd9984a2587 Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Fri, 28 Aug 2020 19:29:01 -0700 Subject: [PATCH 6/6] Fix bug caused by lazy enumeration of message body --- .../src/Amqp/AmqpMessageExtensions.cs | 10 ++++------ .../tests/Message/MessageLiveTests.cs | 12 +++++++++++- .../tests/Receiver/ReceiverLiveTests.cs | 8 +++++--- .../tests/Receiver/SessionReceiverLiveTests.cs | 1 + 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs index d25221fc8f28..af47a8a051b2 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs @@ -51,16 +51,14 @@ private static byte[] GetByteArray(this Data data) } } - public static IEnumerable GetDataViaDataBody(this AmqpMessage message) + public static IList GetDataViaDataBody(this AmqpMessage message) { + IList dataList = new List(); foreach (Data data in (message.DataBody ?? Enumerable.Empty())) { - byte[] bytes = data.GetByteArray(); - if (bytes != null) - { - yield return BinaryData.FromBytes(bytes); - } + dataList.Add(BinaryData.FromBytes(data.GetByteArray())); } + return dataList; } // Returns via the out parameter the flattened collection of bytes. diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs index 38c04da1a56a..e3fc1fc47cba 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs @@ -196,10 +196,20 @@ public async Task CanSendMultipleDataSections() var receiver = client.CreateReceiver(scope.QueueName); var received = await receiver.ReceiveMessageAsync(); var bodyEnum = ((AmqpDataBody)received.AmqpMessage.Body).Data.GetEnumerator(); + int ct = 0; foreach (BinaryData data in ((AmqpDataBody)msg.AmqpMessage.Body).Data) { bodyEnum.MoveNext(); - Assert.AreEqual(data.ToBytes().ToArray(), bodyEnum.Current.ToBytes().ToArray()); + var bytes = data.ToBytes().ToArray(); + Assert.AreEqual(bytes, bodyEnum.Current.ToBytes().ToArray()); + if (ct++ == 0) + { + Assert.AreEqual(bytes, received.Body.ToBytes().Slice(0, 100).ToArray()); + } + else + { + Assert.AreEqual(bytes, received.Body.ToBytes().Slice(100, 100).ToArray()); + } } } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs index e91d25a34284..8e0cbf744dc2 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs @@ -245,6 +245,7 @@ public async Task DeadLetterMessages() remainingMessages--; messageEnum.MoveNext(); Assert.AreEqual(messageEnum.Current.MessageId, item.MessageId); + Assert.AreEqual(messageEnum.Current.Body.ToBytes().ToArray(), item.Body.ToBytes().ToArray()); await receiver.DeadLetterMessageAsync(item.LockToken); } } @@ -307,13 +308,14 @@ public async Task DeferMessages() } Assert.AreEqual(0, remainingMessages); - IReadOnlyList deferedMessages = await receiver.ReceiveDeferredMessagesAsync(sequenceNumbers); + IReadOnlyList deferredMessages = await receiver.ReceiveDeferredMessagesAsync(sequenceNumbers); var messageList = messages.ToList(); - Assert.AreEqual(messageList.Count, deferedMessages.Count); + Assert.AreEqual(messageList.Count, deferredMessages.Count); for (int i = 0; i < messageList.Count; i++) { - Assert.AreEqual(messageList[i].MessageId, deferedMessages[i].MessageId); + Assert.AreEqual(messageList[i].MessageId, deferredMessages[i].MessageId); + Assert.AreEqual(messageList[i].Body.ToBytes().ToArray(), deferredMessages[i].Body.ToBytes().ToArray()); } // verify that looking up a non-existent sequence number will throw diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs index 41ba93135e19..47708842d853 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs @@ -782,6 +782,7 @@ public async Task GetAndSetSessionStateTest(bool isSessionSpecified) ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync(); Assert.AreEqual(message.MessageId, receivedMessage.MessageId); Assert.AreEqual(message.SessionId, receivedMessage.SessionId); + Assert.AreEqual(message.Body.ToBytes().ToArray(), receivedMessage.Body.ToBytes().ToArray()); var sessionStateString = "Received Message From Session!"; var sessionState = Encoding.UTF8.GetBytes(sessionStateString);