diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln
index 3b926a0bc12a..86b203e110e0 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln
@@ -1,7 +1,7 @@
Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio Version 16
-VisualStudioVersion = 16.0.29215.179
+# Visual Studio Version 17
+VisualStudioVersion = 17.0.31912.275
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.EventHubs.Processor", "src\Azure.Messaging.EventHubs.Processor.csproj", "{B9C4A45A-BD10-4C4C-B7BF-EE4F601AE7FC}"
EndProject
@@ -13,7 +13,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "External", "External", "{79
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.EventHubs", "..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj", "{87A3ED70-190D-4E6B-A568-40DF5B9F3939}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Core.Experimental", "..\..\core\Azure.Core.Experimental\src\Azure.Core.Experimental.csproj", "{0BFCCE8E-85FD-4DF7-8E5D-A6CE9ACDB175}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core", "..\..\core\Azure.Core\src\Azure.Core.csproj", "{00EBD345-7146-442E-8FC7-106FA5FC2A1A}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core.Experimental", "..\..\core\Azure.Core.Experimental\src\Azure.Core.Experimental.csproj", "{B0B9A0F1-50DE-4059-9A2D-51059D9A477C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -37,10 +39,14 @@ Global
{87A3ED70-190D-4E6B-A568-40DF5B9F3939}.Debug|Any CPU.Build.0 = Debug|Any CPU
{87A3ED70-190D-4E6B-A568-40DF5B9F3939}.Release|Any CPU.ActiveCfg = Release|Any CPU
{87A3ED70-190D-4E6B-A568-40DF5B9F3939}.Release|Any CPU.Build.0 = Release|Any CPU
- {0BFCCE8E-85FD-4DF7-8E5D-A6CE9ACDB175}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {0BFCCE8E-85FD-4DF7-8E5D-A6CE9ACDB175}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {0BFCCE8E-85FD-4DF7-8E5D-A6CE9ACDB175}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {0BFCCE8E-85FD-4DF7-8E5D-A6CE9ACDB175}.Release|Any CPU.Build.0 = Release|Any CPU
+ {00EBD345-7146-442E-8FC7-106FA5FC2A1A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {00EBD345-7146-442E-8FC7-106FA5FC2A1A}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {00EBD345-7146-442E-8FC7-106FA5FC2A1A}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {00EBD345-7146-442E-8FC7-106FA5FC2A1A}.Release|Any CPU.Build.0 = Release|Any CPU
+ {B0B9A0F1-50DE-4059-9A2D-51059D9A477C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {B0B9A0F1-50DE-4059-9A2D-51059D9A477C}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {B0B9A0F1-50DE-4059-9A2D-51059D9A477C}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {B0B9A0F1-50DE-4059-9A2D-51059D9A477C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -48,7 +54,8 @@ Global
GlobalSection(NestedProjects) = preSolution
{7DFF0E65-DC9A-410D-9A11-AD6A06860FE1} = {797FF941-76FD-45FD-AC17-A73DFE2BA621}
{87A3ED70-190D-4E6B-A568-40DF5B9F3939} = {797FF941-76FD-45FD-AC17-A73DFE2BA621}
- {0BFCCE8E-85FD-4DF7-8E5D-A6CE9ACDB175} = {797FF941-76FD-45FD-AC17-A73DFE2BA621}
+ {00EBD345-7146-442E-8FC7-106FA5FC2A1A} = {797FF941-76FD-45FD-AC17-A73DFE2BA621}
+ {B0B9A0F1-50DE-4059-9A2D-51059D9A477C} = {797FF941-76FD-45FD-AC17-A73DFE2BA621}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {44BD3BD5-61DF-464D-8627-E00B0BC4B3A3}
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj
index bc6b4fd2c118..45e596944d89 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj
@@ -13,11 +13,13 @@
-
+
+
+
+
+
@@ -63,7 +66,4 @@
Azure.Messaging.EventHubs
-
-
-
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs
index 4ed05fd185ff..7f241b4f0cad 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs
@@ -145,6 +145,25 @@ public virtual PooledProducer GetPooledProducer(string partitionId,
});
}
+ ///
+ /// Expires the pooled producer for the requested partition immediately.
+ ///
+ ///
+ /// The unique identifier of a partition associated with the Event Hub.
+ /// true to close the pooled producer even if it is in use; otherwise, false will defer closing until it is no longer in use.
+ ///
+ public virtual async Task ExpirePooledProducerAsync(string partitionId,
+ bool forceClose = false)
+ {
+ if (Pool.TryRemove(partitionId, out var poolItem))
+ {
+ if ((poolItem.ActiveInstances.IsEmpty) || (forceClose))
+ {
+ await poolItem.PartitionProducer.CloseAsync(CancellationToken.None).ConfigureAwait(false);
+ }
+ }
+ }
+
///
/// Closes the producers in the pool and performs any cleanup necessary
/// for resources used by the .
@@ -188,6 +207,7 @@ private TimerCallback CreateExpirationTimerCallback()
return _ =>
{
// Capture the time stamp to use a consistent value.
+
var now = DateTimeOffset.UtcNow;
foreach (var key in Pool.Keys.ToList())
@@ -196,7 +216,7 @@ private TimerCallback CreateExpirationTimerCallback()
{
if (poolItem.RemoveAfter <= now)
{
- if (Pool.TryRemove(key, out var _) && !poolItem.ActiveInstances.Any())
+ if (Pool.TryRemove(key, out var _) && poolItem.ActiveInstances.IsEmpty)
{
// At this point the pool item may have been closed already
// if there was a context switch between the if conditions
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs
index 78d0cf1d6ad1..2cdfde6e1d23 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs
@@ -7,8 +7,8 @@
using System.IO;
using System.Text;
using Azure.Core;
-using Azure.Core.Serialization;
using Azure.Core.Amqp;
+using Azure.Core.Serialization;
using Azure.Messaging.EventHubs.Amqp;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Producer;
@@ -48,17 +48,6 @@ public BinaryData EventBody
set => _amqpMessage.Body = AmqpMessageBody.FromData(MessageBody.FromReadOnlyMemorySegment(value.ToMemory()));
}
- ///
- /// Hidden property that shadows the property. This is added
- /// in order to inherit from .
- ///
- [EditorBrowsable(EditorBrowsableState.Never)]
- public override BinaryData Data
- {
- get => EventBody;
- set => EventBody = value;
- }
-
///
/// A MIME type describing the data contained in the ,
/// intended to allow consumers to make informed decisions for inspecting and
@@ -108,9 +97,22 @@ public override string ContentType
}
///
- /// Hidden property that indicates that the is not read-only. This is part of
- /// the abstraction.
+ /// Hidden property that shadows the property. This is added
+ /// in order to inherit from .
///
+ ///
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public override BinaryData Data
+ {
+ get => EventBody;
+ set => EventBody = value;
+ }
+
+ ///
+ /// Hidden property that indicates that the is not read-only. This is part of
+ /// the abstraction.
+ ///
+ ///
[EditorBrowsable(EditorBrowsableState.Never)]
public override bool IsReadOnly => false;
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs
index d5d45cfcbacc..42d402ef5cb0 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs
@@ -352,6 +352,7 @@ public EventHubProducerClient(EventHubConnection connection,
/// The connection to use as the basis for delegation of client-type operations.
/// The transport producer instance to use as the basis for service communication.
/// A used to manage a set of partition specific .
+ /// A set of options to apply when configuring the producer.
///
///
/// This constructor is intended to be used internally for functional
@@ -360,7 +361,8 @@ public EventHubProducerClient(EventHubConnection connection,
///
internal EventHubProducerClient(EventHubConnection connection,
TransportProducer transportProducer,
- TransportProducerPool partitionProducerPool = default)
+ TransportProducerPool partitionProducerPool = default,
+ EventHubProducerClientOptions clientOptions = default)
{
Argument.AssertNotNull(connection, nameof(connection));
Argument.AssertNotNull(transportProducer, nameof(transportProducer));
@@ -368,7 +370,7 @@ internal EventHubProducerClient(EventHubConnection connection,
OwnsConnection = false;
Connection = connection;
RetryPolicy = new EventHubsRetryOptions().ToRetryPolicy();
- Options = new EventHubProducerClientOptions();
+ Options = clientOptions?.Clone() ?? new EventHubProducerClientOptions();
Identifier = Guid.NewGuid().ToString();
PartitionProducerPool = partitionProducerPool ?? new TransportProducerPool(partitionId => transportProducer);
@@ -498,7 +500,7 @@ public virtual async Task GetPartitionPropertiesAsync(strin
///
///
internal virtual async Task GetPartitionPublishingPropertiesAsync(string partitionId,
- CancellationToken cancellationToken = default)
+ CancellationToken cancellationToken = default)
{
Argument.AssertNotClosed(IsClosed, nameof(EventHubProducerClient));
Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId));
@@ -957,20 +959,24 @@ private async Task SendIdempotentAsync(IReadOnlyList eventSet,
cancellationToken.ThrowIfCancellationRequested();
EventHubsEventSource.Log.IdempotentPublishStart(EventHubName, options.PartitionId);
+ var resetStateOnError = false;
+ var releaseGuard = false;
var partitionState = PartitionState.GetOrAdd(options.PartitionId, new PartitionPublishingState(options.PartitionId));
try
{
- cancellationToken.ThrowIfCancellationRequested();
+ if (!partitionState.PublishingGuard.Wait(100, cancellationToken))
+ {
+ await partitionState.PublishingGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
+ }
- await partitionState.PublishingGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
+ releaseGuard = true;
EventHubsEventSource.Log.IdempotentSynchronizationAcquire(EventHubName, options.PartitionId);
// Ensure that the partition state has been initialized.
if (!partitionState.IsInitialized)
{
- cancellationToken.ThrowIfCancellationRequested();
await InitializePartitionStateAsync(partitionState, cancellationToken).ConfigureAwait(false);
}
@@ -989,7 +995,7 @@ private async Task SendIdempotentAsync(IReadOnlyList eventSet,
// Publish the events.
- cancellationToken.ThrowIfCancellationRequested();
+ resetStateOnError = true;
EventHubsEventSource.Log.IdempotentSequencePublish(EventHubName, options.PartitionId, firstSequence, lastSequence);
await SendInternalAsync(eventSet, options, cancellationToken).ConfigureAwait(false);
@@ -1013,12 +1019,40 @@ private async Task SendIdempotentAsync(IReadOnlyList eventSet,
eventData.ClearPublishingState();
}
+ if (resetStateOnError)
+ {
+ // Reset the partition state and options to ensure that future attempts
+ // are safe and do not risk data loss by reusing the same producer group identifier.
+
+ if (!Options.PartitionOptions.TryGetValue(options.PartitionId, out var partitionOptions))
+ {
+ partitionOptions = new PartitionPublishingOptionsInternal();
+ Options.PartitionOptions[options.PartitionId] = partitionOptions;
+ }
+
+ partitionOptions.ProducerGroupId = null;
+ partitionOptions.OwnerLevel = null;
+ partitionOptions.StartingSequenceNumber = null;
+
+ partitionState.ProducerGroupId = null;
+ partitionState.OwnerLevel = null;
+ partitionState.LastPublishedSequenceNumber = null;
+
+ // Expire the transport producer associated with the partition, to ensure
+ // that the new idempotent state is used for the next publishing operation.
+
+ await PartitionProducerPool.ExpirePooledProducerAsync(options.PartitionId, forceClose: true).ConfigureAwait(false);
+ }
+
throw;
}
finally
{
- partitionState.PublishingGuard.Release();
- EventHubsEventSource.Log.IdempotentSynchronizationRelease(EventHubName, options.PartitionId);
+ if (releaseGuard)
+ {
+ partitionState.PublishingGuard.Release();
+ EventHubsEventSource.Log.IdempotentSynchronizationRelease(EventHubName, options.PartitionId);
+ }
}
}
catch (Exception ex)
@@ -1052,6 +1086,8 @@ private async Task SendIdempotentAsync(EventDataBatch eventBatch,
cancellationToken.ThrowIfCancellationRequested();
EventHubsEventSource.Log.IdempotentPublishStart(EventHubName, options.PartitionId);
+ var resetStateOnError = false;
+ var releaseGuard = false;
var partitionState = PartitionState.GetOrAdd(options.PartitionId, new PartitionPublishingState(options.PartitionId));
var eventSet = eventBatch.AsEnumerable() switch
@@ -1062,16 +1098,18 @@ private async Task SendIdempotentAsync(EventDataBatch eventBatch,
try
{
- cancellationToken.ThrowIfCancellationRequested();
+ if (!partitionState.PublishingGuard.Wait(100, cancellationToken))
+ {
+ await partitionState.PublishingGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
+ }
- await partitionState.PublishingGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
+ releaseGuard = true;
EventHubsEventSource.Log.IdempotentSynchronizationAcquire(EventHubName, options.PartitionId);
// Ensure that the partition state has been initialized.
if (!partitionState.IsInitialized)
{
- cancellationToken.ThrowIfCancellationRequested();
await InitializePartitionStateAsync(partitionState, cancellationToken).ConfigureAwait(false);
}
@@ -1090,7 +1128,7 @@ private async Task SendIdempotentAsync(EventDataBatch eventBatch,
// Publish the events.
- cancellationToken.ThrowIfCancellationRequested();
+ resetStateOnError = true;
EventHubsEventSource.Log.IdempotentSequencePublish(EventHubName, options.PartitionId, firstSequence, lastSequence);
await SendInternalAsync(eventBatch, cancellationToken).ConfigureAwait(false);
@@ -1111,12 +1149,40 @@ private async Task SendIdempotentAsync(EventDataBatch eventBatch,
eventData.ClearPublishingState();
}
+ if (resetStateOnError)
+ {
+ // Reset the partition state and options to ensure that future attempts
+ // are safe and do not risk data loss by reusing the same producer group identifier.
+
+ if (!Options.PartitionOptions.TryGetValue(options.PartitionId, out var partitionOptions))
+ {
+ partitionOptions = new PartitionPublishingOptionsInternal();
+ Options.PartitionOptions[options.PartitionId] = partitionOptions;
+ }
+
+ partitionOptions.ProducerGroupId = null;
+ partitionOptions.OwnerLevel = null;
+ partitionOptions.StartingSequenceNumber = null;
+
+ partitionState.ProducerGroupId = null;
+ partitionState.OwnerLevel = null;
+ partitionState.LastPublishedSequenceNumber = null;
+
+ // Expire the transport producer associated with the partition, to ensure
+ // that the new idempotent state is used for the next publishing operation.
+
+ await PartitionProducerPool.ExpirePooledProducerAsync(options.PartitionId, forceClose: true).ConfigureAwait(false);
+ }
+
throw;
}
finally
{
- partitionState.PublishingGuard.Release();
- EventHubsEventSource.Log.IdempotentSynchronizationRelease(EventHubName, options.PartitionId);
+ if (releaseGuard)
+ {
+ partitionState.PublishingGuard.Release();
+ EventHubsEventSource.Log.IdempotentSynchronizationRelease(EventHubName, options.PartitionId);
+ }
}
}
catch (Exception ex)
@@ -1361,7 +1427,7 @@ private static int NextSequence(int currentSequence)
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static PartitionPublishingPropertiesInternal CreatePublishingPropertiesFromPartitionState(EventHubProducerClientOptions options,
- PartitionPublishingState state) =>
+ PartitionPublishingState state) =>
new PartitionPublishingPropertiesInternal(options.EnableIdempotentPartitions,
state.ProducerGroupId,
state.OwnerLevel,
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs
index 1414e7bbde61..d181dc4b2ad8 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs
@@ -1141,7 +1141,7 @@ public async Task SendIdempotentRollsOverSequenceNumbersToZero()
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
- await producer.SendAsync(events, sendOptions);
+ await producer.SendAsync(events, sendOptions, cancellationSource.Token);
for (var index = 0; index < events.Length; ++index)
{
@@ -1190,7 +1190,7 @@ public void SendIdempotentRollsBackSequenceNumbersOnFailure()
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
- Assert.That(async () => await producer.SendAsync(events, sendOptions), Throws.Exception, "The send operation should have failed.");
+ Assert.That(async () => await producer.SendAsync(events, sendOptions, cancellationSource.Token), Throws.Exception, "The send operation should have failed.");
for (var index = 0; index < events.Length; ++index)
{
@@ -1201,7 +1201,82 @@ public void SendIdempotentRollsBackSequenceNumbersOnFailure()
var partitionStateCollection = GetPartitionState(producer);
Assert.That(partitionStateCollection, Is.Not.Null, "The collection for partition state should have been initialized with the client.");
Assert.That(partitionStateCollection.TryGetValue(expectedPartition, out var partitionState), Is.True, "The state collection should have an entry for the partition.");
- Assert.That(partitionState.LastPublishedSequenceNumber, Is.EqualTo(startingSequence), "The sequence number for partition state should have been rolled back.");
+ Assert.That(partitionState.LastPublishedSequenceNumber, Is.Null, "The sequence number for partition state should have been reset.");
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ public void SendIdempotentUpdatesProducerGroupIdOnFailure()
+ {
+ var expectedPartition = "5";
+ var eventCount = 5;
+ var startingSequence = 435;
+ var expectedProperties = new PartitionPublishingPropertiesInternal(true, 123, 456, startingSequence);
+ var events = EventGenerator.CreateEvents(eventCount).ToArray();
+ var sendOptions = new SendEventOptions { PartitionId = expectedPartition };
+ var mockTransport = new Mock();
+ var connection = new MockConnection(() => mockTransport.Object);
+
+ var producer = new EventHubProducerClient(connection, new EventHubProducerClientOptions
+ {
+ EnableIdempotentPartitions = true
+ });
+
+ mockTransport
+ .Setup(transportProducer => transportProducer.ReadInitializationPublishingPropertiesAsync(It.IsAny()))
+ .ReturnsAsync(expectedProperties);
+
+ mockTransport
+ .Setup(transportProducer => transportProducer.SendAsync(It.IsAny>(), It.IsAny(), It.IsAny()))
+ .Returns(Task.FromException(new OverflowException()));
+
+ using var cancellationSource = new CancellationTokenSource();
+ cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
+
+ Assert.That(async () => await producer.SendAsync(events, sendOptions, cancellationSource.Token), Throws.Exception, "The send operation should have failed.");
+
+ var partitionStateCollection = GetPartitionState(producer);
+ Assert.That(partitionStateCollection, Is.Not.Null, "The collection for partition state should have been initialized with the client.");
+ Assert.That(partitionStateCollection.TryGetValue(expectedPartition, out var partitionState), Is.True, "The state collection should have an entry for the partition.");
+ Assert.That(partitionState.ProducerGroupId, Is.Not.EqualTo(expectedProperties.ProducerGroupId), "The producer group identifier in partition state should have been changed.");
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ public void SendIdempotentExpiresTheTransportProducerOnFailure()
+ {
+ var expectedPartition = "5";
+ var eventCount = 5;
+ var startingSequence = 435;
+ var expectedProperties = new PartitionPublishingPropertiesInternal(true, 123, 456, startingSequence);
+ var events = EventGenerator.CreateEvents(eventCount).ToArray();
+ var sendOptions = new SendEventOptions { PartitionId = expectedPartition };
+ var mockTransport = new Mock();
+ var connection = new MockConnection(() => mockTransport.Object);
+ var mockTransportProducerPool = new MockTransportProducerPool(mockTransport.Object, connection, new BasicRetryPolicy(new EventHubsRetryOptions()));
+ var producer = new EventHubProducerClient(connection, mockTransport.Object, mockTransportProducerPool, new EventHubProducerClientOptions { EnableIdempotentPartitions = true });
+
+ mockTransport
+ .Setup(transportProducer => transportProducer.ReadInitializationPublishingPropertiesAsync(It.IsAny()))
+ .ReturnsAsync(expectedProperties);
+
+ mockTransport
+ .Setup(transportProducer => transportProducer.SendAsync(It.IsAny>(), It.IsAny(), It.IsAny()))
+ .Returns(Task.FromException(new OverflowException()));
+
+ using var cancellationSource = new CancellationTokenSource();
+ cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
+
+ Assert.That(async () => await producer.SendAsync(events, sendOptions, cancellationSource.Token), Throws.Exception, "The send operation should have failed.");
+ Assert.That(mockTransportProducerPool.ExpirePooledProducerAsyncWasCalled, Is.True, "The transport producer should have been expired.");
}
///
@@ -1644,7 +1719,7 @@ public async Task SendIdempotentAppliesSequenceNumbersWithABatch()
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
- await producer.SendAsync(batch);
+ await producer.SendAsync(batch, cancellationSource.Token);
Assert.That(batch.StartingPublishedSequenceNumber, Is.EqualTo(startingSequence + 1), "The batch did not have the correct starting sequence number.");
var partitionStateCollection = GetPartitionState(producer);
@@ -1687,13 +1762,86 @@ public void SendIdempotentRollsBackSequenceNumbersOnFailureWithABatch()
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
- Assert.That(async () => await producer.SendAsync(batch), Throws.Exception, "The send operation should have failed.");
+ Assert.That(async () => await producer.SendAsync(batch, cancellationSource.Token), Throws.Exception, "The send operation should have failed.");
Assert.That(batch.StartingPublishedSequenceNumber, Is.Null, "The batch should not have a starting sequence number.");
var partitionStateCollection = GetPartitionState(producer);
Assert.That(partitionStateCollection, Is.Not.Null, "The collection for partition state should have been initialized with the client.");
Assert.That(partitionStateCollection.TryGetValue(expectedPartition, out var partitionState), Is.True, "The state collection should have an entry for the partition.");
- Assert.That(partitionState.LastPublishedSequenceNumber, Is.EqualTo(startingSequence), "The sequence number for partition state should have been rolled back.");
+ Assert.That(partitionState.LastPublishedSequenceNumber, Is.Null, "The sequence number for partition state should have been reset.");
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ public void SendIdempotentUpdatesProducerGroupIdOnFailureWithABatch()
+ {
+ var expectedPartition = "5";
+ var eventCount = 5;
+ var startingSequence = 435;
+ var expectedProperties = new PartitionPublishingPropertiesInternal(true, 123, 456, startingSequence);
+ var batch = new EventDataBatch(new MockTransportBatch(eventCount), "ns", "eh", new CreateBatchOptions { PartitionId = expectedPartition });
+ var mockTransport = new Mock();
+ var connection = new MockConnection(() => mockTransport.Object);
+
+ var producer = new EventHubProducerClient(connection, new EventHubProducerClientOptions
+ {
+ EnableIdempotentPartitions = true
+ });
+
+ mockTransport
+ .Setup(transportProducer => transportProducer.ReadInitializationPublishingPropertiesAsync(It.IsAny()))
+ .ReturnsAsync(expectedProperties);
+
+ mockTransport
+ .Setup(transportProducer => transportProducer.SendAsync(It.IsAny(), It.IsAny()))
+ .Returns(Task.FromException(new OverflowException()));
+
+ using var cancellationSource = new CancellationTokenSource();
+ cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
+
+ Assert.That(async () => await producer.SendAsync(batch, cancellationSource.Token), Throws.Exception, "The send operation should have failed.");
+
+ var partitionStateCollection = GetPartitionState(producer);
+ Assert.That(partitionStateCollection, Is.Not.Null, "The collection for partition state should have been initialized with the client.");
+ Assert.That(partitionStateCollection.TryGetValue(expectedPartition, out var partitionState), Is.True, "The state collection should have an entry for the partition.");
+ Assert.That(partitionState.ProducerGroupId, Is.Not.EqualTo(expectedProperties.ProducerGroupId), "The producer group identifier in partition state should have been changed.");
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ public void SendIdempotentExpiresTheTransportProducerOnFailureWithABatch()
+ {
+ var expectedPartition = "5";
+ var eventCount = 5;
+ var startingSequence = 435;
+ var expectedProperties = new PartitionPublishingPropertiesInternal(true, 123, 456, startingSequence);
+ var batch = new EventDataBatch(new MockTransportBatch(eventCount), "ns", "eh", new CreateBatchOptions { PartitionId = expectedPartition });
+ var mockTransport = new Mock();
+ var connection = new MockConnection(() => mockTransport.Object);
+ var mockTransportProducerPool = new MockTransportProducerPool(mockTransport.Object, connection, new BasicRetryPolicy(new EventHubsRetryOptions()));
+ var producer = new EventHubProducerClient(connection, mockTransport.Object, mockTransportProducerPool, new EventHubProducerClientOptions { EnableIdempotentPartitions = true });
+
+ mockTransport
+ .Setup(transportProducer => transportProducer.ReadInitializationPublishingPropertiesAsync(It.IsAny()))
+ .ReturnsAsync(expectedProperties);
+
+ mockTransport
+ .Setup(transportProducer => transportProducer.SendAsync(It.IsAny(), It.IsAny()))
+ .Returns(Task.FromException(new OverflowException()));
+
+ using var cancellationSource = new CancellationTokenSource();
+ cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
+
+ Assert.That(async () => await producer.SendAsync(batch, cancellationSource.Token), Throws.Exception, "The send operation should have failed.");
+ Assert.That(mockTransportProducerPool.ExpirePooledProducerAsyncWasCalled, Is.True, "The transport producer should have been expired.");
}
///
@@ -2829,6 +2977,7 @@ public override ValueTask DisposeAsync()
private class MockTransportProducerPool : TransportProducerPool
{
public bool GetPooledProducerWasCalled { get; set; }
+ public bool ExpirePooledProducerAsyncWasCalled { get; set; }
public MockPooledProducer MockPooledProducer { get; }
@@ -2847,6 +2996,12 @@ public override PooledProducer GetPooledProducer(string partitionId,
GetPooledProducerWasCalled = true;
return MockPooledProducer;
}
+
+ public override Task ExpirePooledProducerAsync(string partitionId, bool forceClose = false)
+ {
+ ExpirePooledProducerAsyncWasCalled = true;
+ return Task.CompletedTask;
+ }
}
}
}
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs
index cd10143090f2..af660d26b452 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs
@@ -110,6 +110,100 @@ public async Task ProducerCanPublishBatches(EventHubsTransportType transportType
}
}
+ ///
+ /// Verifies that the is able to
+ /// perform operations when the idempotent publishing feature is enabled.
+ ///
+ ///
+ [Test]
+ public async Task ProducerCanPublishEventsAfterAnException()
+ {
+ await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
+ {
+ var cancellationSource = new CancellationTokenSource();
+ cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
+
+ var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);
+ var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true };
+
+ await using var producer = new EventHubProducerClient(connectionString, options);
+
+ var partition = (await producer.GetPartitionIdsAsync()).First();
+ var sendOptions = new SendEventOptions { PartitionId = partition };
+
+ // Publish some events to validate that the initial publish works.
+
+ Assert.That(async () => await producer.SendAsync(EventGenerator.CreateSmallEvents(2), sendOptions, cancellationSource.Token), Throws.Nothing, "The first publishing operation was not successful.");
+
+ // Publish an event too large to succeed; this will force the producer to deal with an exception, which should
+ // update idempotent state.
+
+ using var batch = await producer.CreateBatchAsync(cancellationSource.Token);
+
+ var producerId = (await producer.GetPartitionPublishingPropertiesAsync(partition, cancellationSource.Token)).ProducerGroupId;
+ var badEvent = new EventData(EventGenerator.CreateRandomBody(batch.MaximumSizeInBytes + 1000));
+ Assert.That(async () => await producer.SendAsync(new[] { badEvent }, sendOptions, cancellationSource.Token), Throws.InstanceOf(), "The attempt to publish a too-large event should fail.");
+
+ // Publish a second set of events; this will prove that the producer recovered from the exception.
+
+ Assert.That(async () => await producer.SendAsync(EventGenerator.CreateSmallEvents(3), sendOptions, cancellationSource.Token), Throws.Nothing, "The second publishing operation was not successful.");
+
+ var newProducerId = (await producer.GetPartitionPublishingPropertiesAsync(partition, cancellationSource.Token)).ProducerGroupId;
+ Assert.That(newProducerId, Is.Not.Null, "The producer group identifier should have a value.");
+ Assert.That(newProducerId, Is.Not.EqualTo(producerId), "The producer group identifier should have been updated after the exception.");
+ }
+ }
+
+ ///
+ /// Verifies that the is able to
+ /// perform operations when the idempotent publishing feature is enabled.
+ ///
+ ///
+ [Test]
+ public async Task ProducerCanPublishBatchesAfterAnException()
+ {
+ await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
+ {
+ var cancellationSource = new CancellationTokenSource();
+ cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
+
+ var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);
+ var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true };
+
+ await using var producer = new EventHubProducerClient(connectionString, options);
+
+ var partition = (await producer.GetPartitionIdsAsync()).First();
+ var batchOptions = new CreateBatchOptions { PartitionId = partition };
+
+ // Publish a batch to validate that the initial publish works.
+
+ using var firstBatch = await producer.CreateBatchAsync(batchOptions, cancellationSource.Token);
+
+ firstBatch.TryAdd(EventGenerator.CreateEvents(1).First());
+ Assert.That(async () => await producer.SendAsync(firstBatch, cancellationSource.Token), Throws.Nothing, "The first publishing operation was not successful.");
+
+ // Publish an event too large to succeed; this will force the producer to deal with an exception, which should
+ // update idempotent state.
+
+ var producerId = (await producer.GetPartitionPublishingPropertiesAsync(partition, cancellationSource.Token)).ProducerGroupId;
+
+ using var badBatch = EventHubsModelFactory.EventDataBatch(firstBatch.MaximumSizeInBytes + 1000, new List(new[] { new EventData(EventGenerator.CreateRandomBody(firstBatch.MaximumSizeInBytes + 1000)) }), new CreateBatchOptions { PartitionId = partition });
+ Assert.That(async () => await producer.SendAsync(badBatch, cancellationSource.Token), Throws.InstanceOf(), "The attempt to publish a too-large event should fail.");
+
+ // Publish a second batch of events; this will prove that the producer recovered from the exception.
+
+ using var secondBatch = await producer.CreateBatchAsync(batchOptions, cancellationSource.Token);
+
+ secondBatch.TryAdd(EventGenerator.CreateEvents(1).First());
+ secondBatch.TryAdd(EventGenerator.CreateEvents(1).First());
+ Assert.That(async () => await producer.SendAsync(secondBatch, cancellationSource.Token), Throws.Nothing, "The second publishing operation was not successful.");
+
+ var newProducerId = (await producer.GetPartitionPublishingPropertiesAsync(partition, cancellationSource.Token)).ProducerGroupId;
+ Assert.That(newProducerId, Is.Not.Null, "The producer group identifier should have a value.");
+ Assert.That(newProducerId, Is.Not.EqualTo(producerId), "The producer group identifier should have been updated after the exception.");
+ }
+ }
+
///
/// Verifies that the is able to
/// perform operations when the idempotent publishing feature is enabled.
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs
index 7e3041fff2ce..14a1cf6ff6f6 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs
@@ -51,6 +51,121 @@ public void TransportProducerPoolRemovesExpiredItems()
Assert.That(startingPool.TryGetValue("2", out _), Is.True, "PerformExpiration should not remove valid producers.");
}
+ ///
+ /// The pool periodically removes and closes expired items.
+ ///
+ ///
+ [Test]
+ public async Task ExpireRemovesTheRequestedItem()
+ {
+ var wasFactoryCalled = false;
+ var transportProducer = new ObservableTransportProducerMock();
+
+ var startingPool = new ConcurrentDictionary
+ {
+ ["0"] = new TransportProducerPool.PoolItem("0", transportProducer),
+ ["1"] = new TransportProducerPool.PoolItem("1", transportProducer),
+ ["2"] = new TransportProducerPool.PoolItem("2", transportProducer),
+ };
+
+ Func producerFactory = partition =>
+ {
+ wasFactoryCalled = true;
+ return transportProducer;
+ };
+
+ var transportProducerPool = new TransportProducerPool(producerFactory, pool: startingPool, eventHubProducer: transportProducer);
+
+ // Validate the initial state.
+
+ Assert.That(startingPool.TryGetValue("0", out _), Is.True, "The requested partition should appear in the pool.");
+ Assert.That(wasFactoryCalled, Is.False, "No producer should not have been created.");
+ Assert.That(transportProducer.CloseCallCount, Is.EqualTo(0), "The producer should not have been closed.");
+
+ // Expire the producer and validate the removal state.
+
+ await transportProducerPool.ExpirePooledProducerAsync("0");
+
+ Assert.That(startingPool.TryGetValue("0", out _), Is.False, "The requested partition should have been removed.");
+ Assert.That(transportProducer.CloseCallCount, Is.EqualTo(1), "The producer should have been closed.");
+ Assert.That(wasFactoryCalled, Is.False, "The requested partition should not have been created.");
+
+ // Request the producer again and validate a new producer is created.
+
+ Assert.That(transportProducerPool.GetPooledProducer("0"), Is.Not.Null, "The requested partition should be available.");
+ Assert.That(wasFactoryCalled, Is.True, "A new producer for the requested partition should have been created.");
+ }
+
+ ///
+ /// The pool periodically removes and closes expired items.
+ ///
+ ///
+ [Test]
+ public async Task ExpireCoseNotCloseTheRemovedItemWhenInUse()
+ {
+ var transportProducer = new ObservableTransportProducerMock();
+
+ var startingPool = new ConcurrentDictionary
+ {
+ ["0"] = new TransportProducerPool.PoolItem("0", transportProducer),
+ ["1"] = new TransportProducerPool.PoolItem("1", transportProducer),
+ ["2"] = new TransportProducerPool.PoolItem("2", transportProducer),
+ };
+
+ var transportProducerPool = new TransportProducerPool(partition => transportProducer, pool: startingPool, eventHubProducer: transportProducer);
+
+ // Validate the initial state.
+
+ Assert.That(startingPool.TryGetValue("0", out _), Is.True, "The requested partition should appear in the pool.");
+ Assert.That(transportProducer.CloseCallCount, Is.EqualTo(0), "The producer should not have been closed.");
+
+ // Request the producer and hold the reference to ensure that it is flagged as being in use.
+
+ await using var poolItem = transportProducerPool.GetPooledProducer("0");
+
+ // Expire the producer and validate the removal state.
+
+ await transportProducerPool.ExpirePooledProducerAsync("0", forceClose: false);
+
+ Assert.That(startingPool.TryGetValue("0", out _), Is.False, "The requested partition should have been removed.");
+ Assert.That(transportProducer.CloseCallCount, Is.EqualTo(0), "The producer should not have been closed.");
+ }
+
+ ///
+ /// The pool periodically removes and closes expired items.
+ ///
+ ///
+ [Test]
+ public async Task ExpireClosesTheRemovedItemWhenForced()
+ {
+ var transportProducer = new ObservableTransportProducerMock();
+
+ var startingPool = new ConcurrentDictionary
+ {
+ ["0"] = new TransportProducerPool.PoolItem("0", transportProducer),
+ ["1"] = new TransportProducerPool.PoolItem("1", transportProducer),
+ ["2"] = new TransportProducerPool.PoolItem("2", transportProducer),
+ };
+
+ var transportProducerPool = new TransportProducerPool(partition => transportProducer, pool: startingPool, eventHubProducer: transportProducer);
+
+ // Validate the initial state.
+
+ Assert.That(startingPool.TryGetValue("0", out _), Is.True, "The requested partition should appear in the pool.");
+ Assert.That(transportProducer.CloseCallCount, Is.EqualTo(0), "The producer should not have been closed.");
+
+ // Request the producer and hold the reference to ensure that it is flagged as being in use.
+
+ await using var poolItem = transportProducerPool.GetPooledProducer("0");
+
+ // Expire the producer and validate the removal state.
+
+ await transportProducerPool.ExpirePooledProducerAsync("0", forceClose: true);
+
+ Assert.That(startingPool.TryGetValue("0", out _), Is.False, "The requested partition should have been removed.");
+ Assert.That(transportProducer.CloseCallCount, Is.EqualTo(1), "The producer should have been closed.");
+ }
+
///
/// When a is requested
/// its will be increased.