Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Flaky Test Fix #25346

Merged
merged 1 commit into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ public async Task BackgroundProcessingToleratesPartitionIdQueryFailure()
var mockProcessor = new Mock<EventProcessor<EventProcessorPartition>>(65, "consumerGroup", "namespace", "eventHub", Mock.Of<TokenCredential>(), default(EventProcessorOptions)) { CallBase = true };

mockConnection
.Setup(conn => conn.GetPartitionIdsAsync(It.IsAny<EventHubsRetryPolicy>(), It.IsAny<CancellationToken>()))
.Throws(expectedException);
.SetupSequence(conn => conn.GetPartitionIdsAsync(It.IsAny<EventHubsRetryPolicy>(), It.IsAny<CancellationToken>()))
.Throws(expectedException)
.ReturnsAsync(new[] { "0", "1" });

mockProcessor
.Setup(processor => processor.CreateConnection())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1813,16 +1813,27 @@ public async Task EnqueueEventsAsyncEnqueuesForAutomaticRouting()

try
{
await mockBufferedProducer.Object.EnqueueEventsAsync(events, cancellationSource.Token);
var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventsAsync(events, cancellationSource.Token);
Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition.");

Assert.That(enqueuedCount, Is.EqualTo(events.Length), "The return value should indicate that the correct number of events were enqueued.");
Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(events.Length), "The total event count should indicate that the correct number of events were enqueued.");
Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(events.Length), "The partition event count should indicate that the correct number of events were enqueued.");

var readEventCount = 0;

while (partitionPublisher.TryReadEvent(out var readEvent))
while (readEventCount < events.Length)
{
++readEventCount;
Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
if (partitionPublisher.TryReadEvent(out var readEvent))
{
++readEventCount;

Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
}

Assert.That(readEventCount, Is.EqualTo(events.Length), "The number of events read should match the source length.");
Expand Down Expand Up @@ -1881,17 +1892,27 @@ public async Task EnqueueEventsAsyncEnqueuesWithAPartitionKey()
try
{
var options = new EnqueueEventOptions { PartitionKey = partitionKey };
await mockBufferedProducer.Object.EnqueueEventsAsync(events, options, cancellationSource.Token);
var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventsAsync(events, options, cancellationSource.Token);

Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition.");
Assert.That(enqueuedCount, Is.EqualTo(events.Length), "The return value should indicate that the correct number of events were enqueued.");
Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(events.Length), "The total event count should indicate that the correct number of events were enqueued.");
Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(events.Length), "The partition event count should indicate that the correct number of events were enqueued.");

var readEventCount = 0;

while (partitionPublisher.TryReadEvent(out var readEvent))
while (readEventCount < events.Length)
{
++readEventCount;
Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }].");
if (partitionPublisher.TryReadEvent(out var readEvent))
{
++readEventCount;

Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
}

Assert.That(readEventCount, Is.EqualTo(events.Length), "The number of events read should match the source length.");
Expand Down Expand Up @@ -1941,17 +1962,27 @@ public async Task EnqueueEventsAsyncEnqueuesWithAPartitionAssignment()
try
{
var options = new EnqueueEventOptions { PartitionId = partitionId };
await mockBufferedProducer.Object.EnqueueEventsAsync(events, options, cancellationSource.Token);
var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventsAsync(events, options, cancellationSource.Token);

Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition.");
Assert.That(enqueuedCount, Is.EqualTo(events.Length), "The return value should indicate that the correct number of events were enqueued.");
Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(events.Length), "The total event count should indicate that the correct number of events were enqueued.");
Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(events.Length), "The partition event count should indicate that the correct number of events were enqueued.");

var readEventCount = 0;

while (partitionPublisher.TryReadEvent(out var readEvent))
while (readEventCount < events.Length)
{
++readEventCount;
Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
if (partitionPublisher.TryReadEvent(out var readEvent))
{
++readEventCount;

Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
}

Assert.That(readEventCount, Is.EqualTo(events.Length), "The number of events read should match the source length.");
Expand Down Expand Up @@ -2555,16 +2586,27 @@ public async Task EnqueueEventAsyncEnqueuesForAutomaticRouting()

try
{
await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, cancellationSource.Token);
var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, cancellationSource.Token);
Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition.");

Assert.That(enqueuedCount, Is.EqualTo(1), "The return value should indicate that a single event was enqueued.");
Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(1), "The total event count should indicate that a single event was enqueued.");
Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(1), "The partition event count should indicate that a single event was enqueued.");

var readEventCount = 0;

while (partitionPublisher.TryReadEvent(out var readEvent))
while (readEventCount < 1)
{
++readEventCount;
Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
if (partitionPublisher.TryReadEvent(out var readEvent))
{
++readEventCount;

Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
}

Assert.That(readEventCount, Is.EqualTo(1), "A single event should have been enqueued.");
Expand Down Expand Up @@ -2623,17 +2665,27 @@ public async Task EnqueueEventAsyncEnqueuesWithAPartitionKey()
try
{
var options = new EnqueueEventOptions { PartitionKey = partitionKey };
await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, options, cancellationSource.Token);
var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, options, cancellationSource.Token);

Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition.");
Assert.That(enqueuedCount, Is.EqualTo(1), "The return value should indicate that a single event was enqueued.");
Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(1), "The total event count should indicate that a single event was enqueued.");
Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(1), "The partition event count should indicate that a single event was enqueued.");

var readEventCount = 0;

while (partitionPublisher.TryReadEvent(out var readEvent))
while (readEventCount < 1)
{
++readEventCount;
Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }].");
if (partitionPublisher.TryReadEvent(out var readEvent))
{
++readEventCount;

Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
jsquire marked this conversation as resolved.
Show resolved Hide resolved
}

Assert.That(readEventCount, Is.EqualTo(1), "A single event should have been enqueued.");
Expand Down Expand Up @@ -2683,17 +2735,27 @@ public async Task EnqueueEventAsyncEnqueuesWithAPartitionAssignment()
try
{
var options = new EnqueueEventOptions { PartitionId = partitionId };
await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, options, cancellationSource.Token);
var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, options, cancellationSource.Token);

Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition.");
Assert.That(enqueuedCount, Is.EqualTo(1), "The return value should indicate that a single event was enqueued.");
Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(1), "The total event count should indicate that a single event was enqueued.");
Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(1), "The partition event count should indicate that a single event was enqueued.");

var readEventCount = 0;

while (partitionPublisher.TryReadEvent(out var readEvent))
while (readEventCount < 1)
{
++readEventCount;
Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
if (partitionPublisher.TryReadEvent(out var readEvent))
{
++readEventCount;

Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
}

Assert.That(readEventCount, Is.EqualTo(1), "A single event should have been enqueued.");
Expand Down Expand Up @@ -2763,9 +2825,23 @@ public async Task EnqueueEventAsyncWaitsWhenFull()
Assert.That(partitionPublisher.TryReadEvent(out var readBlockerEvent), Is.True, "The blocking event should be available to read immediately.");
Assert.That(blockerEvent.EventBody.ToString(), Is.EqualTo(readBlockerEvent.EventBody.ToString()), $"The event with body: [{ readBlockerEvent.EventBody }] was not enqueued.");

Assert.That(partitionPublisher.TryReadEvent(out var readEvent), Is.True, "An event is expected to be available to read.");
Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
var readEventCount = 0;

while (readEventCount < 1)
{
if (partitionPublisher.TryReadEvent(out var readEvent))
{
++readEventCount;

Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
}

Assert.That(readEventCount, Is.EqualTo(1), "An event should have been available to read.");

await enqueueTask;
Assert.That(partitionPublisher.TryReadEvent(out _), Is.False, "Other than the blocker, a single event should have been enqueued.");
Expand Down