Skip to content

Commit

Permalink
[Event Hubs] Processor Event Dispatch Cancellation (#22695)
Browse files Browse the repository at this point in the history
The focus of these changes is to fix the loop responsible for dispatching
events to the handler for processing so that it respects cancellation when
the processor signals the associated token.  Currently, the loop will dispatch
all events in the batch to the handler, potentially causing duplicate processing
and checkpoint interleaving.
  • Loading branch information
jsquire authored Jul 16, 2021
1 parent fb173c5 commit 51212bb
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 0 deletions.
2 changes: 2 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Fixed an issue where partition processing would ignore cancellation when the processor was shutting down or partition ownership changed and continue dispatching events to the handler until the entire batch was complete. Cancellation will now be properly respected.

### Other Changes

## 5.5.0 (2021-07-07)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,16 @@ protected override async Task OnProcessingEventBatchAsync(IEnumerable<EventData>

foreach (var eventData in events)
{
// If cancellation was requested, then either partition ownership was lost or the processor is
// shutting down. In either case, dispatching of events to be handled should cease. Since this
// flow is entirely internal, there's no benefit to throwing a cancellation exception; instead,
// just exit the loop.

if (cancellationToken.IsCancellationRequested)
{
break;
}

emptyBatch = false;

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,45 @@ public async Task EventProcessingLogsExceptions()
cancellationSource.Cancel();
}

/// <summary>
/// Verifies functionality of the <see cref="EventProcessorClient" /> events.
/// </summary>
///
[Test]
public async Task EventProcessingRespectsCancellation()
{
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

var eventBatch = new[]
{
new MockEventData(new byte[] { 0x11 }, offset: 123, sequenceNumber: 123),
new MockEventData(new byte[] { 0x22 }, offset: 456, sequenceNumber: 456),
new MockEventData(new byte[] { 0x33 }, offset: 789, sequenceNumber: 789),
new MockEventData(new byte[] { 0x44 }, offset: 000, sequenceNumber: 000)
};

var processedEventsCount = 0;
var partitionId = "0";
var mockStorageManager = new Mock<StorageManager>();
var processorClient = new TestEventProcessorClient(mockStorageManager.Object, "consumerGroup", "namespace", "eventHub", Mock.Of<TokenCredential>(), Mock.Of<EventHubConnection>(), default);

processorClient.ProcessEventAsync += eventArgs =>
{
++processedEventsCount;
cancellationSource.Cancel();
return Task.CompletedTask;
};

await processorClient.InvokeOnProcessingEventBatchAsync(eventBatch, new TestEventProcessorPartition(partitionId), cancellationSource.Token);

Assert.That(cancellationSource.IsCancellationRequested, Is.True, "The cancellation token should have been signaled.");
Assert.That(processedEventsCount, Is.EqualTo(1), "The event handler should not have been triggered after cancellation.");

cancellationSource.Cancel();
}

/// <summary>
/// Verifies functionality of the <see cref="EventProcessorClient.GetCheckpointAsync" />
/// method.
Expand Down

0 comments on commit 51212bb

Please sign in to comment.