From 505a50729f505e7bca782164d0fd2e00612993e1 Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Wed, 24 Nov 2021 14:27:23 -0500 Subject: [PATCH] [Event Hubs Client] Idempotent Error Handling The focus of these changes is to ensure that the AMQP connection and link states are fully reset on any exception after retries are exhausted in order to ensure no data loss. In error scenarios with ambiguous outcomes, it is possible for the client and service state for idempotent sequencing to become out-of-sync with no way to detect or correct the problem. When this happens, it is possible that events being published are incorrectly identified by the service as duplicates and their receipt acknowledged. This leads the client to signal to callers that publishing was successful, though the events were ignored by the service and will never be available to be read. By resetting the connection and forcing the client to use a new publisher identifier, the service will reset state and perform the initial handshake with the client, ensuring that state is properly synchronized between them. --- .../Azure.Messaging.EventHubs.Processor.sln | 23 ++- ...Azure.Messaging.EventHubs.Processor.csproj | 8 +- .../Azure.Messaging.EventHubs.Shared.sln | 10 +- .../Azure.Messaging.EventHubs.sln | 25 ++- .../Azure.Messaging.EventHubs/CHANGELOG.md | 2 + .../Azure.Messaging.EventHubs.Perf.csproj | 2 +- .../src/Amqp/AmqpConsumer.cs | 3 +- .../src/Amqp/AmqpProducer.cs | 4 +- .../src/Azure.Messaging.EventHubs.csproj | 14 +- .../src/Core/TransportProducerPool.cs | 22 ++- .../src/EventData.cs | 30 ++-- .../src/Producer/EventHubProducerClient.cs | 98 ++++++++-- .../Producer/EventHubProducerClientTests.cs | 167 +++++++++++++++++- .../Producer/IdempotentPublishingLiveTests.cs | 94 ++++++++++ .../Producer/TransportProducerPoolTests.cs | 115 ++++++++++++ 15 files changed, 550 insertions(+), 67 deletions(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln index 3b926a0bc12af..86b203e110e04 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 16 -VisualStudioVersion = 16.0.29215.179 +# Visual Studio Version 17 +VisualStudioVersion = 17.0.31912.275 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.EventHubs.Processor", "src\Azure.Messaging.EventHubs.Processor.csproj", "{B9C4A45A-BD10-4C4C-B7BF-EE4F601AE7FC}" EndProject @@ -13,7 +13,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "External", "External", "{79 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.EventHubs", "..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj", "{87A3ED70-190D-4E6B-A568-40DF5B9F3939}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Core.Experimental", "..\..\core\Azure.Core.Experimental\src\Azure.Core.Experimental.csproj", "{0BFCCE8E-85FD-4DF7-8E5D-A6CE9ACDB175}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core", "..\..\core\Azure.Core\src\Azure.Core.csproj", "{00EBD345-7146-442E-8FC7-106FA5FC2A1A}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core.Experimental", "..\..\core\Azure.Core.Experimental\src\Azure.Core.Experimental.csproj", "{B0B9A0F1-50DE-4059-9A2D-51059D9A477C}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -37,10 +39,14 @@ Global {87A3ED70-190D-4E6B-A568-40DF5B9F3939}.Debug|Any CPU.Build.0 = Debug|Any CPU {87A3ED70-190D-4E6B-A568-40DF5B9F3939}.Release|Any CPU.ActiveCfg = Release|Any CPU {87A3ED70-190D-4E6B-A568-40DF5B9F3939}.Release|Any CPU.Build.0 = Release|Any CPU - {0BFCCE8E-85FD-4DF7-8E5D-A6CE9ACDB175}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {0BFCCE8E-85FD-4DF7-8E5D-A6CE9ACDB175}.Debug|Any CPU.Build.0 = Debug|Any CPU - {0BFCCE8E-85FD-4DF7-8E5D-A6CE9ACDB175}.Release|Any CPU.ActiveCfg = Release|Any CPU - {0BFCCE8E-85FD-4DF7-8E5D-A6CE9ACDB175}.Release|Any CPU.Build.0 = Release|Any CPU + {00EBD345-7146-442E-8FC7-106FA5FC2A1A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {00EBD345-7146-442E-8FC7-106FA5FC2A1A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {00EBD345-7146-442E-8FC7-106FA5FC2A1A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {00EBD345-7146-442E-8FC7-106FA5FC2A1A}.Release|Any CPU.Build.0 = Release|Any CPU + {B0B9A0F1-50DE-4059-9A2D-51059D9A477C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B0B9A0F1-50DE-4059-9A2D-51059D9A477C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B0B9A0F1-50DE-4059-9A2D-51059D9A477C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B0B9A0F1-50DE-4059-9A2D-51059D9A477C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -48,7 +54,8 @@ Global GlobalSection(NestedProjects) = preSolution {7DFF0E65-DC9A-410D-9A11-AD6A06860FE1} = {797FF941-76FD-45FD-AC17-A73DFE2BA621} {87A3ED70-190D-4E6B-A568-40DF5B9F3939} = {797FF941-76FD-45FD-AC17-A73DFE2BA621} - {0BFCCE8E-85FD-4DF7-8E5D-A6CE9ACDB175} = {797FF941-76FD-45FD-AC17-A73DFE2BA621} + {00EBD345-7146-442E-8FC7-106FA5FC2A1A} = {797FF941-76FD-45FD-AC17-A73DFE2BA621} + {B0B9A0F1-50DE-4059-9A2D-51059D9A477C} = {797FF941-76FD-45FD-AC17-A73DFE2BA621} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {44BD3BD5-61DF-464D-8627-E00B0BC4B3A3} diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj index bc6b4fd2c1189..45e596944d89d 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj @@ -13,11 +13,13 @@ - + + + + + @@ -63,7 +66,4 @@ Azure.Messaging.EventHubs - - - diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs index 4ed05fd185ff1..7f241b4f0cad5 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs @@ -145,6 +145,25 @@ public virtual PooledProducer GetPooledProducer(string partitionId, }); } + /// + /// Expires the pooled producer for the requested partition immediately. + /// + /// + /// The unique identifier of a partition associated with the Event Hub. + /// true to close the pooled producer even if it is in use; otherwise, false will defer closing until it is no longer in use. + /// + public virtual async Task ExpirePooledProducerAsync(string partitionId, + bool forceClose = false) + { + if (Pool.TryRemove(partitionId, out var poolItem)) + { + if ((poolItem.ActiveInstances.IsEmpty) || (forceClose)) + { + await poolItem.PartitionProducer.CloseAsync(CancellationToken.None).ConfigureAwait(false); + } + } + } + /// /// Closes the producers in the pool and performs any cleanup necessary /// for resources used by the . @@ -188,6 +207,7 @@ private TimerCallback CreateExpirationTimerCallback() return _ => { // Capture the time stamp to use a consistent value. + var now = DateTimeOffset.UtcNow; foreach (var key in Pool.Keys.ToList()) @@ -196,7 +216,7 @@ private TimerCallback CreateExpirationTimerCallback() { if (poolItem.RemoveAfter <= now) { - if (Pool.TryRemove(key, out var _) && !poolItem.ActiveInstances.Any()) + if (Pool.TryRemove(key, out var _) && poolItem.ActiveInstances.IsEmpty) { // At this point the pool item may have been closed already // if there was a context switch between the if conditions diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs index 78d0cf1d6ad19..2cdfde6e1d23b 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs @@ -7,8 +7,8 @@ using System.IO; using System.Text; using Azure.Core; -using Azure.Core.Serialization; using Azure.Core.Amqp; +using Azure.Core.Serialization; using Azure.Messaging.EventHubs.Amqp; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Producer; @@ -48,17 +48,6 @@ public BinaryData EventBody set => _amqpMessage.Body = AmqpMessageBody.FromData(MessageBody.FromReadOnlyMemorySegment(value.ToMemory())); } - /// - /// Hidden property that shadows the property. This is added - /// in order to inherit from . - /// - [EditorBrowsable(EditorBrowsableState.Never)] - public override BinaryData Data - { - get => EventBody; - set => EventBody = value; - } - /// /// A MIME type describing the data contained in the , /// intended to allow consumers to make informed decisions for inspecting and @@ -108,9 +97,22 @@ public override string ContentType } /// - /// Hidden property that indicates that the is not read-only. This is part of - /// the abstraction. + /// Hidden property that shadows the property. This is added + /// in order to inherit from . /// + /// + [EditorBrowsable(EditorBrowsableState.Never)] + public override BinaryData Data + { + get => EventBody; + set => EventBody = value; + } + + /// + /// Hidden property that indicates that the is not read-only. This is part of + /// the abstraction. + /// + /// [EditorBrowsable(EditorBrowsableState.Never)] public override bool IsReadOnly => false; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs index d5d45cfcbacc2..42d402ef5cb06 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs @@ -352,6 +352,7 @@ public EventHubProducerClient(EventHubConnection connection, /// The connection to use as the basis for delegation of client-type operations. /// The transport producer instance to use as the basis for service communication. /// A used to manage a set of partition specific . + /// A set of options to apply when configuring the producer. /// /// /// This constructor is intended to be used internally for functional @@ -360,7 +361,8 @@ public EventHubProducerClient(EventHubConnection connection, /// internal EventHubProducerClient(EventHubConnection connection, TransportProducer transportProducer, - TransportProducerPool partitionProducerPool = default) + TransportProducerPool partitionProducerPool = default, + EventHubProducerClientOptions clientOptions = default) { Argument.AssertNotNull(connection, nameof(connection)); Argument.AssertNotNull(transportProducer, nameof(transportProducer)); @@ -368,7 +370,7 @@ internal EventHubProducerClient(EventHubConnection connection, OwnsConnection = false; Connection = connection; RetryPolicy = new EventHubsRetryOptions().ToRetryPolicy(); - Options = new EventHubProducerClientOptions(); + Options = clientOptions?.Clone() ?? new EventHubProducerClientOptions(); Identifier = Guid.NewGuid().ToString(); PartitionProducerPool = partitionProducerPool ?? new TransportProducerPool(partitionId => transportProducer); @@ -498,7 +500,7 @@ public virtual async Task GetPartitionPropertiesAsync(strin /// /// internal virtual async Task GetPartitionPublishingPropertiesAsync(string partitionId, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default) { Argument.AssertNotClosed(IsClosed, nameof(EventHubProducerClient)); Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId)); @@ -957,20 +959,24 @@ private async Task SendIdempotentAsync(IReadOnlyList eventSet, cancellationToken.ThrowIfCancellationRequested(); EventHubsEventSource.Log.IdempotentPublishStart(EventHubName, options.PartitionId); + var resetStateOnError = false; + var releaseGuard = false; var partitionState = PartitionState.GetOrAdd(options.PartitionId, new PartitionPublishingState(options.PartitionId)); try { - cancellationToken.ThrowIfCancellationRequested(); + if (!partitionState.PublishingGuard.Wait(100, cancellationToken)) + { + await partitionState.PublishingGuard.WaitAsync(cancellationToken).ConfigureAwait(false); + } - await partitionState.PublishingGuard.WaitAsync(cancellationToken).ConfigureAwait(false); + releaseGuard = true; EventHubsEventSource.Log.IdempotentSynchronizationAcquire(EventHubName, options.PartitionId); // Ensure that the partition state has been initialized. if (!partitionState.IsInitialized) { - cancellationToken.ThrowIfCancellationRequested(); await InitializePartitionStateAsync(partitionState, cancellationToken).ConfigureAwait(false); } @@ -989,7 +995,7 @@ private async Task SendIdempotentAsync(IReadOnlyList eventSet, // Publish the events. - cancellationToken.ThrowIfCancellationRequested(); + resetStateOnError = true; EventHubsEventSource.Log.IdempotentSequencePublish(EventHubName, options.PartitionId, firstSequence, lastSequence); await SendInternalAsync(eventSet, options, cancellationToken).ConfigureAwait(false); @@ -1013,12 +1019,40 @@ private async Task SendIdempotentAsync(IReadOnlyList eventSet, eventData.ClearPublishingState(); } + if (resetStateOnError) + { + // Reset the partition state and options to ensure that future attempts + // are safe and do not risk data loss by reusing the same producer group identifier. + + if (!Options.PartitionOptions.TryGetValue(options.PartitionId, out var partitionOptions)) + { + partitionOptions = new PartitionPublishingOptionsInternal(); + Options.PartitionOptions[options.PartitionId] = partitionOptions; + } + + partitionOptions.ProducerGroupId = null; + partitionOptions.OwnerLevel = null; + partitionOptions.StartingSequenceNumber = null; + + partitionState.ProducerGroupId = null; + partitionState.OwnerLevel = null; + partitionState.LastPublishedSequenceNumber = null; + + // Expire the transport producer associated with the partition, to ensure + // that the new idempotent state is used for the next publishing operation. + + await PartitionProducerPool.ExpirePooledProducerAsync(options.PartitionId, forceClose: true).ConfigureAwait(false); + } + throw; } finally { - partitionState.PublishingGuard.Release(); - EventHubsEventSource.Log.IdempotentSynchronizationRelease(EventHubName, options.PartitionId); + if (releaseGuard) + { + partitionState.PublishingGuard.Release(); + EventHubsEventSource.Log.IdempotentSynchronizationRelease(EventHubName, options.PartitionId); + } } } catch (Exception ex) @@ -1052,6 +1086,8 @@ private async Task SendIdempotentAsync(EventDataBatch eventBatch, cancellationToken.ThrowIfCancellationRequested(); EventHubsEventSource.Log.IdempotentPublishStart(EventHubName, options.PartitionId); + var resetStateOnError = false; + var releaseGuard = false; var partitionState = PartitionState.GetOrAdd(options.PartitionId, new PartitionPublishingState(options.PartitionId)); var eventSet = eventBatch.AsEnumerable() switch @@ -1062,16 +1098,18 @@ private async Task SendIdempotentAsync(EventDataBatch eventBatch, try { - cancellationToken.ThrowIfCancellationRequested(); + if (!partitionState.PublishingGuard.Wait(100, cancellationToken)) + { + await partitionState.PublishingGuard.WaitAsync(cancellationToken).ConfigureAwait(false); + } - await partitionState.PublishingGuard.WaitAsync(cancellationToken).ConfigureAwait(false); + releaseGuard = true; EventHubsEventSource.Log.IdempotentSynchronizationAcquire(EventHubName, options.PartitionId); // Ensure that the partition state has been initialized. if (!partitionState.IsInitialized) { - cancellationToken.ThrowIfCancellationRequested(); await InitializePartitionStateAsync(partitionState, cancellationToken).ConfigureAwait(false); } @@ -1090,7 +1128,7 @@ private async Task SendIdempotentAsync(EventDataBatch eventBatch, // Publish the events. - cancellationToken.ThrowIfCancellationRequested(); + resetStateOnError = true; EventHubsEventSource.Log.IdempotentSequencePublish(EventHubName, options.PartitionId, firstSequence, lastSequence); await SendInternalAsync(eventBatch, cancellationToken).ConfigureAwait(false); @@ -1111,12 +1149,40 @@ private async Task SendIdempotentAsync(EventDataBatch eventBatch, eventData.ClearPublishingState(); } + if (resetStateOnError) + { + // Reset the partition state and options to ensure that future attempts + // are safe and do not risk data loss by reusing the same producer group identifier. + + if (!Options.PartitionOptions.TryGetValue(options.PartitionId, out var partitionOptions)) + { + partitionOptions = new PartitionPublishingOptionsInternal(); + Options.PartitionOptions[options.PartitionId] = partitionOptions; + } + + partitionOptions.ProducerGroupId = null; + partitionOptions.OwnerLevel = null; + partitionOptions.StartingSequenceNumber = null; + + partitionState.ProducerGroupId = null; + partitionState.OwnerLevel = null; + partitionState.LastPublishedSequenceNumber = null; + + // Expire the transport producer associated with the partition, to ensure + // that the new idempotent state is used for the next publishing operation. + + await PartitionProducerPool.ExpirePooledProducerAsync(options.PartitionId, forceClose: true).ConfigureAwait(false); + } + throw; } finally { - partitionState.PublishingGuard.Release(); - EventHubsEventSource.Log.IdempotentSynchronizationRelease(EventHubName, options.PartitionId); + if (releaseGuard) + { + partitionState.PublishingGuard.Release(); + EventHubsEventSource.Log.IdempotentSynchronizationRelease(EventHubName, options.PartitionId); + } } } catch (Exception ex) @@ -1361,7 +1427,7 @@ private static int NextSequence(int currentSequence) /// [MethodImpl(MethodImplOptions.AggressiveInlining)] private static PartitionPublishingPropertiesInternal CreatePublishingPropertiesFromPartitionState(EventHubProducerClientOptions options, - PartitionPublishingState state) => + PartitionPublishingState state) => new PartitionPublishingPropertiesInternal(options.EnableIdempotentPartitions, state.ProducerGroupId, state.OwnerLevel, diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs index 1414e7bbde61c..d181dc4b2ad86 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs @@ -1141,7 +1141,7 @@ public async Task SendIdempotentRollsOverSequenceNumbersToZero() using var cancellationSource = new CancellationTokenSource(); cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); - await producer.SendAsync(events, sendOptions); + await producer.SendAsync(events, sendOptions, cancellationSource.Token); for (var index = 0; index < events.Length; ++index) { @@ -1190,7 +1190,7 @@ public void SendIdempotentRollsBackSequenceNumbersOnFailure() using var cancellationSource = new CancellationTokenSource(); cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); - Assert.That(async () => await producer.SendAsync(events, sendOptions), Throws.Exception, "The send operation should have failed."); + Assert.That(async () => await producer.SendAsync(events, sendOptions, cancellationSource.Token), Throws.Exception, "The send operation should have failed."); for (var index = 0; index < events.Length; ++index) { @@ -1201,7 +1201,82 @@ public void SendIdempotentRollsBackSequenceNumbersOnFailure() var partitionStateCollection = GetPartitionState(producer); Assert.That(partitionStateCollection, Is.Not.Null, "The collection for partition state should have been initialized with the client."); Assert.That(partitionStateCollection.TryGetValue(expectedPartition, out var partitionState), Is.True, "The state collection should have an entry for the partition."); - Assert.That(partitionState.LastPublishedSequenceNumber, Is.EqualTo(startingSequence), "The sequence number for partition state should have been rolled back."); + Assert.That(partitionState.LastPublishedSequenceNumber, Is.Null, "The sequence number for partition state should have been reset."); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public void SendIdempotentUpdatesProducerGroupIdOnFailure() + { + var expectedPartition = "5"; + var eventCount = 5; + var startingSequence = 435; + var expectedProperties = new PartitionPublishingPropertiesInternal(true, 123, 456, startingSequence); + var events = EventGenerator.CreateEvents(eventCount).ToArray(); + var sendOptions = new SendEventOptions { PartitionId = expectedPartition }; + var mockTransport = new Mock(); + var connection = new MockConnection(() => mockTransport.Object); + + var producer = new EventHubProducerClient(connection, new EventHubProducerClientOptions + { + EnableIdempotentPartitions = true + }); + + mockTransport + .Setup(transportProducer => transportProducer.ReadInitializationPublishingPropertiesAsync(It.IsAny())) + .ReturnsAsync(expectedProperties); + + mockTransport + .Setup(transportProducer => transportProducer.SendAsync(It.IsAny>(), It.IsAny(), It.IsAny())) + .Returns(Task.FromException(new OverflowException())); + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + Assert.That(async () => await producer.SendAsync(events, sendOptions, cancellationSource.Token), Throws.Exception, "The send operation should have failed."); + + var partitionStateCollection = GetPartitionState(producer); + Assert.That(partitionStateCollection, Is.Not.Null, "The collection for partition state should have been initialized with the client."); + Assert.That(partitionStateCollection.TryGetValue(expectedPartition, out var partitionState), Is.True, "The state collection should have an entry for the partition."); + Assert.That(partitionState.ProducerGroupId, Is.Not.EqualTo(expectedProperties.ProducerGroupId), "The producer group identifier in partition state should have been changed."); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public void SendIdempotentExpiresTheTransportProducerOnFailure() + { + var expectedPartition = "5"; + var eventCount = 5; + var startingSequence = 435; + var expectedProperties = new PartitionPublishingPropertiesInternal(true, 123, 456, startingSequence); + var events = EventGenerator.CreateEvents(eventCount).ToArray(); + var sendOptions = new SendEventOptions { PartitionId = expectedPartition }; + var mockTransport = new Mock(); + var connection = new MockConnection(() => mockTransport.Object); + var mockTransportProducerPool = new MockTransportProducerPool(mockTransport.Object, connection, new BasicRetryPolicy(new EventHubsRetryOptions())); + var producer = new EventHubProducerClient(connection, mockTransport.Object, mockTransportProducerPool, new EventHubProducerClientOptions { EnableIdempotentPartitions = true }); + + mockTransport + .Setup(transportProducer => transportProducer.ReadInitializationPublishingPropertiesAsync(It.IsAny())) + .ReturnsAsync(expectedProperties); + + mockTransport + .Setup(transportProducer => transportProducer.SendAsync(It.IsAny>(), It.IsAny(), It.IsAny())) + .Returns(Task.FromException(new OverflowException())); + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + Assert.That(async () => await producer.SendAsync(events, sendOptions, cancellationSource.Token), Throws.Exception, "The send operation should have failed."); + Assert.That(mockTransportProducerPool.ExpirePooledProducerAsyncWasCalled, Is.True, "The transport producer should have been expired."); } /// @@ -1644,7 +1719,7 @@ public async Task SendIdempotentAppliesSequenceNumbersWithABatch() using var cancellationSource = new CancellationTokenSource(); cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); - await producer.SendAsync(batch); + await producer.SendAsync(batch, cancellationSource.Token); Assert.That(batch.StartingPublishedSequenceNumber, Is.EqualTo(startingSequence + 1), "The batch did not have the correct starting sequence number."); var partitionStateCollection = GetPartitionState(producer); @@ -1687,13 +1762,86 @@ public void SendIdempotentRollsBackSequenceNumbersOnFailureWithABatch() using var cancellationSource = new CancellationTokenSource(); cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); - Assert.That(async () => await producer.SendAsync(batch), Throws.Exception, "The send operation should have failed."); + Assert.That(async () => await producer.SendAsync(batch, cancellationSource.Token), Throws.Exception, "The send operation should have failed."); Assert.That(batch.StartingPublishedSequenceNumber, Is.Null, "The batch should not have a starting sequence number."); var partitionStateCollection = GetPartitionState(producer); Assert.That(partitionStateCollection, Is.Not.Null, "The collection for partition state should have been initialized with the client."); Assert.That(partitionStateCollection.TryGetValue(expectedPartition, out var partitionState), Is.True, "The state collection should have an entry for the partition."); - Assert.That(partitionState.LastPublishedSequenceNumber, Is.EqualTo(startingSequence), "The sequence number for partition state should have been rolled back."); + Assert.That(partitionState.LastPublishedSequenceNumber, Is.Null, "The sequence number for partition state should have been reset."); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public void SendIdempotentUpdatesProducerGroupIdOnFailureWithABatch() + { + var expectedPartition = "5"; + var eventCount = 5; + var startingSequence = 435; + var expectedProperties = new PartitionPublishingPropertiesInternal(true, 123, 456, startingSequence); + var batch = new EventDataBatch(new MockTransportBatch(eventCount), "ns", "eh", new CreateBatchOptions { PartitionId = expectedPartition }); + var mockTransport = new Mock(); + var connection = new MockConnection(() => mockTransport.Object); + + var producer = new EventHubProducerClient(connection, new EventHubProducerClientOptions + { + EnableIdempotentPartitions = true + }); + + mockTransport + .Setup(transportProducer => transportProducer.ReadInitializationPublishingPropertiesAsync(It.IsAny())) + .ReturnsAsync(expectedProperties); + + mockTransport + .Setup(transportProducer => transportProducer.SendAsync(It.IsAny(), It.IsAny())) + .Returns(Task.FromException(new OverflowException())); + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + Assert.That(async () => await producer.SendAsync(batch, cancellationSource.Token), Throws.Exception, "The send operation should have failed."); + + var partitionStateCollection = GetPartitionState(producer); + Assert.That(partitionStateCollection, Is.Not.Null, "The collection for partition state should have been initialized with the client."); + Assert.That(partitionStateCollection.TryGetValue(expectedPartition, out var partitionState), Is.True, "The state collection should have an entry for the partition."); + Assert.That(partitionState.ProducerGroupId, Is.Not.EqualTo(expectedProperties.ProducerGroupId), "The producer group identifier in partition state should have been changed."); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public void SendIdempotentExpiresTheTransportProducerOnFailureWithABatch() + { + var expectedPartition = "5"; + var eventCount = 5; + var startingSequence = 435; + var expectedProperties = new PartitionPublishingPropertiesInternal(true, 123, 456, startingSequence); + var batch = new EventDataBatch(new MockTransportBatch(eventCount), "ns", "eh", new CreateBatchOptions { PartitionId = expectedPartition }); + var mockTransport = new Mock(); + var connection = new MockConnection(() => mockTransport.Object); + var mockTransportProducerPool = new MockTransportProducerPool(mockTransport.Object, connection, new BasicRetryPolicy(new EventHubsRetryOptions())); + var producer = new EventHubProducerClient(connection, mockTransport.Object, mockTransportProducerPool, new EventHubProducerClientOptions { EnableIdempotentPartitions = true }); + + mockTransport + .Setup(transportProducer => transportProducer.ReadInitializationPublishingPropertiesAsync(It.IsAny())) + .ReturnsAsync(expectedProperties); + + mockTransport + .Setup(transportProducer => transportProducer.SendAsync(It.IsAny(), It.IsAny())) + .Returns(Task.FromException(new OverflowException())); + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + Assert.That(async () => await producer.SendAsync(batch, cancellationSource.Token), Throws.Exception, "The send operation should have failed."); + Assert.That(mockTransportProducerPool.ExpirePooledProducerAsyncWasCalled, Is.True, "The transport producer should have been expired."); } /// @@ -2829,6 +2977,7 @@ public override ValueTask DisposeAsync() private class MockTransportProducerPool : TransportProducerPool { public bool GetPooledProducerWasCalled { get; set; } + public bool ExpirePooledProducerAsyncWasCalled { get; set; } public MockPooledProducer MockPooledProducer { get; } @@ -2847,6 +2996,12 @@ public override PooledProducer GetPooledProducer(string partitionId, GetPooledProducerWasCalled = true; return MockPooledProducer; } + + public override Task ExpirePooledProducerAsync(string partitionId, bool forceClose = false) + { + ExpirePooledProducerAsyncWasCalled = true; + return Task.CompletedTask; + } } } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs index cd10143090f28..af660d26b452e 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs @@ -110,6 +110,100 @@ public async Task ProducerCanPublishBatches(EventHubsTransportType transportType } } + /// + /// Verifies that the is able to + /// perform operations when the idempotent publishing feature is enabled. + /// + /// + [Test] + public async Task ProducerCanPublishEventsAfterAnException() + { + await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) + { + var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName); + var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true }; + + await using var producer = new EventHubProducerClient(connectionString, options); + + var partition = (await producer.GetPartitionIdsAsync()).First(); + var sendOptions = new SendEventOptions { PartitionId = partition }; + + // Publish some events to validate that the initial publish works. + + Assert.That(async () => await producer.SendAsync(EventGenerator.CreateSmallEvents(2), sendOptions, cancellationSource.Token), Throws.Nothing, "The first publishing operation was not successful."); + + // Publish an event too large to succeed; this will force the producer to deal with an exception, which should + // update idempotent state. + + using var batch = await producer.CreateBatchAsync(cancellationSource.Token); + + var producerId = (await producer.GetPartitionPublishingPropertiesAsync(partition, cancellationSource.Token)).ProducerGroupId; + var badEvent = new EventData(EventGenerator.CreateRandomBody(batch.MaximumSizeInBytes + 1000)); + Assert.That(async () => await producer.SendAsync(new[] { badEvent }, sendOptions, cancellationSource.Token), Throws.InstanceOf(), "The attempt to publish a too-large event should fail."); + + // Publish a second set of events; this will prove that the producer recovered from the exception. + + Assert.That(async () => await producer.SendAsync(EventGenerator.CreateSmallEvents(3), sendOptions, cancellationSource.Token), Throws.Nothing, "The second publishing operation was not successful."); + + var newProducerId = (await producer.GetPartitionPublishingPropertiesAsync(partition, cancellationSource.Token)).ProducerGroupId; + Assert.That(newProducerId, Is.Not.Null, "The producer group identifier should have a value."); + Assert.That(newProducerId, Is.Not.EqualTo(producerId), "The producer group identifier should have been updated after the exception."); + } + } + + /// + /// Verifies that the is able to + /// perform operations when the idempotent publishing feature is enabled. + /// + /// + [Test] + public async Task ProducerCanPublishBatchesAfterAnException() + { + await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) + { + var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName); + var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true }; + + await using var producer = new EventHubProducerClient(connectionString, options); + + var partition = (await producer.GetPartitionIdsAsync()).First(); + var batchOptions = new CreateBatchOptions { PartitionId = partition }; + + // Publish a batch to validate that the initial publish works. + + using var firstBatch = await producer.CreateBatchAsync(batchOptions, cancellationSource.Token); + + firstBatch.TryAdd(EventGenerator.CreateEvents(1).First()); + Assert.That(async () => await producer.SendAsync(firstBatch, cancellationSource.Token), Throws.Nothing, "The first publishing operation was not successful."); + + // Publish an event too large to succeed; this will force the producer to deal with an exception, which should + // update idempotent state. + + var producerId = (await producer.GetPartitionPublishingPropertiesAsync(partition, cancellationSource.Token)).ProducerGroupId; + + using var badBatch = EventHubsModelFactory.EventDataBatch(firstBatch.MaximumSizeInBytes + 1000, new List(new[] { new EventData(EventGenerator.CreateRandomBody(firstBatch.MaximumSizeInBytes + 1000)) }), new CreateBatchOptions { PartitionId = partition }); + Assert.That(async () => await producer.SendAsync(badBatch, cancellationSource.Token), Throws.InstanceOf(), "The attempt to publish a too-large event should fail."); + + // Publish a second batch of events; this will prove that the producer recovered from the exception. + + using var secondBatch = await producer.CreateBatchAsync(batchOptions, cancellationSource.Token); + + secondBatch.TryAdd(EventGenerator.CreateEvents(1).First()); + secondBatch.TryAdd(EventGenerator.CreateEvents(1).First()); + Assert.That(async () => await producer.SendAsync(secondBatch, cancellationSource.Token), Throws.Nothing, "The second publishing operation was not successful."); + + var newProducerId = (await producer.GetPartitionPublishingPropertiesAsync(partition, cancellationSource.Token)).ProducerGroupId; + Assert.That(newProducerId, Is.Not.Null, "The producer group identifier should have a value."); + Assert.That(newProducerId, Is.Not.EqualTo(producerId), "The producer group identifier should have been updated after the exception."); + } + } + /// /// Verifies that the is able to /// perform operations when the idempotent publishing feature is enabled. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs index 7e3041fff2ced..14a1cf6ff6f6b 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs @@ -51,6 +51,121 @@ public void TransportProducerPoolRemovesExpiredItems() Assert.That(startingPool.TryGetValue("2", out _), Is.True, "PerformExpiration should not remove valid producers."); } + /// + /// The pool periodically removes and closes expired items. + /// + /// + [Test] + public async Task ExpireRemovesTheRequestedItem() + { + var wasFactoryCalled = false; + var transportProducer = new ObservableTransportProducerMock(); + + var startingPool = new ConcurrentDictionary + { + ["0"] = new TransportProducerPool.PoolItem("0", transportProducer), + ["1"] = new TransportProducerPool.PoolItem("1", transportProducer), + ["2"] = new TransportProducerPool.PoolItem("2", transportProducer), + }; + + Func producerFactory = partition => + { + wasFactoryCalled = true; + return transportProducer; + }; + + var transportProducerPool = new TransportProducerPool(producerFactory, pool: startingPool, eventHubProducer: transportProducer); + + // Validate the initial state. + + Assert.That(startingPool.TryGetValue("0", out _), Is.True, "The requested partition should appear in the pool."); + Assert.That(wasFactoryCalled, Is.False, "No producer should not have been created."); + Assert.That(transportProducer.CloseCallCount, Is.EqualTo(0), "The producer should not have been closed."); + + // Expire the producer and validate the removal state. + + await transportProducerPool.ExpirePooledProducerAsync("0"); + + Assert.That(startingPool.TryGetValue("0", out _), Is.False, "The requested partition should have been removed."); + Assert.That(transportProducer.CloseCallCount, Is.EqualTo(1), "The producer should have been closed."); + Assert.That(wasFactoryCalled, Is.False, "The requested partition should not have been created."); + + // Request the producer again and validate a new producer is created. + + Assert.That(transportProducerPool.GetPooledProducer("0"), Is.Not.Null, "The requested partition should be available."); + Assert.That(wasFactoryCalled, Is.True, "A new producer for the requested partition should have been created."); + } + + /// + /// The pool periodically removes and closes expired items. + /// + /// + [Test] + public async Task ExpireCoseNotCloseTheRemovedItemWhenInUse() + { + var transportProducer = new ObservableTransportProducerMock(); + + var startingPool = new ConcurrentDictionary + { + ["0"] = new TransportProducerPool.PoolItem("0", transportProducer), + ["1"] = new TransportProducerPool.PoolItem("1", transportProducer), + ["2"] = new TransportProducerPool.PoolItem("2", transportProducer), + }; + + var transportProducerPool = new TransportProducerPool(partition => transportProducer, pool: startingPool, eventHubProducer: transportProducer); + + // Validate the initial state. + + Assert.That(startingPool.TryGetValue("0", out _), Is.True, "The requested partition should appear in the pool."); + Assert.That(transportProducer.CloseCallCount, Is.EqualTo(0), "The producer should not have been closed."); + + // Request the producer and hold the reference to ensure that it is flagged as being in use. + + await using var poolItem = transportProducerPool.GetPooledProducer("0"); + + // Expire the producer and validate the removal state. + + await transportProducerPool.ExpirePooledProducerAsync("0", forceClose: false); + + Assert.That(startingPool.TryGetValue("0", out _), Is.False, "The requested partition should have been removed."); + Assert.That(transportProducer.CloseCallCount, Is.EqualTo(0), "The producer should not have been closed."); + } + + /// + /// The pool periodically removes and closes expired items. + /// + /// + [Test] + public async Task ExpireClosesTheRemovedItemWhenForced() + { + var transportProducer = new ObservableTransportProducerMock(); + + var startingPool = new ConcurrentDictionary + { + ["0"] = new TransportProducerPool.PoolItem("0", transportProducer), + ["1"] = new TransportProducerPool.PoolItem("1", transportProducer), + ["2"] = new TransportProducerPool.PoolItem("2", transportProducer), + }; + + var transportProducerPool = new TransportProducerPool(partition => transportProducer, pool: startingPool, eventHubProducer: transportProducer); + + // Validate the initial state. + + Assert.That(startingPool.TryGetValue("0", out _), Is.True, "The requested partition should appear in the pool."); + Assert.That(transportProducer.CloseCallCount, Is.EqualTo(0), "The producer should not have been closed."); + + // Request the producer and hold the reference to ensure that it is flagged as being in use. + + await using var poolItem = transportProducerPool.GetPooledProducer("0"); + + // Expire the producer and validate the removal state. + + await transportProducerPool.ExpirePooledProducerAsync("0", forceClose: true); + + Assert.That(startingPool.TryGetValue("0", out _), Is.False, "The requested partition should have been removed."); + Assert.That(transportProducer.CloseCallCount, Is.EqualTo(1), "The producer should have been closed."); + } + /// /// When a is requested /// its will be increased.