From f7f1aa1d532c755d238071b76101c3886d0a105a Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Thu, 18 Nov 2021 10:46:47 -0500 Subject: [PATCH] [Event Hubs] Sample updates for the Buffered Producer (#25388) * [Event Hubs] Sample updates for the Buffered Producer The focus of these changes is to incorporate the `EventHubBufferedProducer` client into the Event Hubs samples for client types and publishing. Also included is a clarification for the logging sample, calling out the lifetime needs for the listener. --- .../MigrationGuide.md | 15 +- .../samples/Sample02_EventHubsClients.md | 4 +- .../samples/Sample04_PublishingEvents.md | 357 +++++++++++++++--- .../samples/Sample05_ReadingEvents.md | 2 +- .../Sample10_AzureEventSourceListener.md | 4 + .../Sample04_PublishingEventsLiveTests.cs | 325 ++++++++++++++-- 6 files changed, 607 insertions(+), 100 deletions(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md b/sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md index ec9dd8aa7b2f..52cfb5b2f337 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md @@ -210,12 +210,11 @@ var producer = new EventHubProducerClient(connectionString, eventHubName); try { - using var eventBatch = await producer.CreateBatchAsync(); + using EventDataBatch eventBatch = await producer.CreateBatchAsync(); for (var index = 0; index < 5; ++index) { - var eventBody = new BinaryData($"Event #{ index }"); - var eventData = new EventData(eventBody); + var eventData = new EventData($"Event #{ index }"); if (!eventBatch.TryAdd(eventData)) { @@ -286,12 +285,11 @@ try PartitionKey = "Any Value Will Do..." }; - using var eventBatch = await producer.CreateBatchAsync(batchOptions); + using EventDataBatch eventBatch = await producer.CreateBatchAsync(batchOptions); for (var index = 0; index < 5; ++index) { - var eventBody = new BinaryData($"Event #{ index }"); - var eventData = new EventData(eventBody); + var eventData = new EventData($"Event #{ index }"); if (!eventBatch.TryAdd(eventData)) { @@ -364,12 +362,11 @@ try PartitionId = firstPartition }; - using var eventBatch = await producer.CreateBatchAsync(batchOptions); + using EventDataBatch eventBatch = await producer.CreateBatchAsync(batchOptions); for (var index = 0; index < 5; ++index) { - var eventBody = new BinaryData($"Event #{ index }"); - var eventData = new EventData(eventBody); + var eventData = new EventData($"Event #{ index }"); if (!eventBatch.TryAdd(eventData)) { diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample02_EventHubsClients.md b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample02_EventHubsClients.md index 2fec437f96f5..e4bb3eda331d 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample02_EventHubsClients.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample02_EventHubsClients.md @@ -12,7 +12,9 @@ The mainstream set of clients provides an approachable onboarding experience for **Mainstream** -- The [EventHubProducerClient](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.producer?view=azure-dotnet) is responsible for publishing events and supports multiple approaches for selecting the partition to which the event is associated, including automatic routing by the Event Hubs service and specifying an explicit partition. +- The [EventHubBufferedProducerClient](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.producer?view=azure-dotnet) publishes events using a deferred model where events are collected into a buffer and the producer has responsibility for implicitly batching and sending them. More on the design and philosophy behind this type can be found in its [design document](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventhub/Azure.Messaging.EventHubs/design/proposal-event-hub-buffered-producer.md). + +- The [EventHubProducerClient](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.producer.eventhubproducerclient?view=azure-dotnet) publishes events with explicit model where callers have responsibility for management of batches and controlling when events are sent. - The [EventHubConsumerClient](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.consumer.eventhubconsumerclient?view=azure-dotnet) supports reading events from a single partition and also offers an easy way to familiarize yourself with Event Hubs by reading from all partitions without the rigor and complexity that you would need in a production application. For reading events from all partitions in a production scenario, we strongly recommend using the [EventProcessorClient](https://github.com/Azure/azure-sdk-for-net/tree/main/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples) from the [Azure.Messaging.EventHubs.Processor](https://www.nuget.org/packages/Azure.Messaging.EventHubs.Processor) package over the `EventHubConsumerClient`. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample04_PublishingEvents.md b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample04_PublishingEvents.md index f575a577c087..ccf4e0fa32d1 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample04_PublishingEvents.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample04_PublishingEvents.md @@ -4,21 +4,30 @@ This sample demonstrates publishing events to an Event Hub. To begin, please en ## Client types -Event publishing is the responsibility of the `EventHubProducerClient`, which supports each of the publishing scenarios supported by the client library. The `EventHubProducerClient` is safe to cache and use for the lifetime of an application, which is best practice when the events are published regularly or semi-regularly. The `EventHubProducerClient` is responsible for efficient resource management, working to keep resource usage low during periods of inactivity and manage health during periods of higher use. Calling either the `CloseAsync` or `DisposeAsync` method as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up. +Event publishing is the responsibility of an event producer. The client library offers two producers, the `EventHubProducerClient` and `EventHubBufferedProducerClient`, each tailored to a unique pattern of use, but applicable to the same application scenarios. This sample will include code snippets for both types, unless the concept is not applicable to one. More information about the available event producers can be found in [Sample02_EventHubsClients](https://github.com/Azure/azure-sdk-for-net/tree/main/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample02_EventHubsClients.md). + + +Each of the event producer client types are safe to cache and use for the lifetime of an application, which is best practice when the events are published regularly or semi-regularly. The event producers are responsible for efficient resource management, adapting to periods of inactivity and higher use automatically. Calling either the `CloseAsync` or `DisposeAsync` method as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up. + +## Buffering versus explicit batching + +The main consideration for choosing a buffered or non-buffered producer is determinism. When publishing with the `EventHubProducerClient`, each request to publish is an independent operation; when the call completes, your application knows whether the events were successfully published or an exception was encountered. This requires your application to be responsible for managing the publishing flow, controlling how batches of events are built, and when they are published. For many applications, ensuring efficient publishing can introduce a non-trivial amount of complexity. + +The `EventHubBufferedProducerClient` aims to reduce complexity by owning the responsibility for efficiently managing batches and publishing. To do so, applications enqueue events into a buffer which the producer reads and publishes in batches when they are ready. Because publishing happens automatically at some point after an event was enqueued, your application is not aware of the outcome immediately; it must register event handlers that the producer calls to notify the application of publishing success or failure. ## Event lifetime When events are published, they will continue to exist in the Event Hub and be available for consuming until they reach an age where they are older than the [retention period](https://docs.microsoft.com//azure/event-hubs/event-hubs-faq#what-is-the-maximum-retention-period-for-events). After that point in time, the Event Hubs service may chose to remove them from the partition. Once removed, an event is no longer available to be read and cannot be recovered. Though the Event Hubs service is free to remove events older than the retention period, it does not do so deterministically; there is no guarantee of when events will be removed. -## Publishing and partitions +## Publishing size constraints -Every event that is published is sent to one of the [partitions](https://docs.microsoft.com/azure/architecture/reference-architectures/event-hubs/partitioning-in-event-hubs-and-kafka) of the Event Hub. The application may request publishing to a specific partition or allow the Event Hubs service to choose the partition automatically. The `EventHubProducerClient` is not associated with any specific partition and the same instance can be used for each publishing scenario. +There is a limit to the size (in bytes) that can be published in a single operation. To accurately determine the size of an event, it must be measured in the format used by the active protocol in order to properly account for overhead. The size limit is controlled by the Event Hubs service and differs for different types of Event Hub instances. -## Publishing size constraints +Applications using the `EventHubBufferedProducerClient` do not need to track size limitations; the producer will ensure that batches are correctly sized when publishing. -When publishing, there is a limit to the size (in bytes) that can be sent to the Event Hubs service in a single operation. To accurately determine the size of an event, it must be measured in the format used by the active protocol as well as account for overhead. The size limit is controlled by the Event Hubs service and differs for different types of Event Hub instances. Because of this and because there is no accurate way for an application to calculate the size of an event, the client library offers the `EventDataBatch` to help. +When using the `EventHubProducerClient`, the application holds responsibility for managing the size of events to be published. Because there is no accurate way for an application to calculate the size of an event, the client library offers the `EventDataBatch` to help. -The `EventDataBatch` exists to provide a deterministic and accurate means to measure the size of a message sent to the service, minimizing the chance that a publishing operation will fail. Because the batch works in cooperation with the service, it has an understanding of the maximum size and has the ability to measure the exact size of an event when serialized for publishing. For the majority of scenarios, we recommend using the `EventDataBatch` to ensure that your application does not attempt to publish a set of events larger than the Event Hubs service allows. The majority of examples in this sample will demonstrate a batched approach. +The `EventDataBatch` exists to provide a deterministic and accurate means to measure the size of a message sent to the service, minimizing the chance that a publishing operation will fail. Because the batch works in cooperation with the service, it has an understanding of the maximum size and has the ability to measure the exact size of an event when serialized for publishing. For the majority of `EventHubProducerClient` scenarios, we recommend using the `EventDataBatch` to ensure that your application does not attempt to publish a set of events larger than the limit. The majority of examples in this sample will demonstrate a batched approach. All of the events that belong to an `EventDataBatch` are considered part of a single unit of work. When a batch is published, the result is atomic; either publishing was successful for all events in the batch, or it has failed for all events. Partial success or failure when publishing a batch is not possible. @@ -32,10 +41,9 @@ var producer = new EventHubProducerClient(connectionString, eventHubName); try { - using var eventBatch = await producer.CreateBatchAsync(); + using EventDataBatch eventBatch = await producer.CreateBatchAsync(); - var eventBody = new BinaryData("This is an event body"); - var eventData = new EventData(eventBody); + var eventData = new EventData("This is an event body"); if (!eventBatch.TryAdd(eventData)) { @@ -48,13 +56,65 @@ finally } ``` -The `EventDataBatch` is scoped to a single publish operation. Once that operation is complete, a new batch should be created for any additional events to be published. The batch is responsible for unmanaged resources; it is recommended that you `Dispose` the batch after it has been published. +The `EventDataBatch` is scoped to a single publish operation. Once that operation is complete, a new batch should be created for any additional events to be published. Because the batch is responsible for unmanaged resources, it is recommended that you `Dispose` the batch after it has been published. + +## Publishing and partitions + +Every event that is published is sent to one of the [partitions](https://docs.microsoft.com/azure/architecture/reference-architectures/event-hubs/partitioning-in-event-hubs-and-kafka) of the Event Hub. The application may request publishing to a specific partition, grouped using a partition key, or allow the partition to be chosen automatically. + +When using the `EventHubProducerClient`, each batch must choose the partition assignment strategy at the time it is created, and that strategy is applied to all events in the batch. The `EventHubBufferedProducerClient` allows the partition assignment strategy to be chosen for each individual event that is enqueued, and the producer will ensure that batches are constructed with the proper strategy. ## Publishing events with automatic partition assignment Allowing automatic assignment to partitions is recommended when publishing needs to be highly available and shouldn't fail if a single partition is experiencing trouble. Automatic assignment also helps to ensure that event data is evenly distributed among all available partitions, which helps to ensure throughput when publishing and reading data. -When the batch is published, the `EventHubProducerClient` will receive an acknowledgment from the Event Hubs service; so long as no exception is thrown by this call, your application can consider publishing successful. The service assumes responsibility for delivery of the batch. All of your event data will be published to one of the Event Hub partitions, thought here may be a slight delay until it is available to be read. +### Event Hub Buffered Producer Client + +When using the `EventHubBufferedProducerClient`, events enqueued with no options specified will be automatically routed. Because the producer manages publishing, there is no explicit call. When the producer is closed, it will ensure that any remaining enqueued events have been published. All of your event data will be published to one of the Event Hub partitions, though there may be a slight delay until it is available to be read. + +```C# Snippet:EventHubs_Sample04_AutomaticRoutingBuffered +var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>"; +var eventHubName = "<< NAME OF THE EVENT HUB >>"; + +var producer = new EventHubBufferedProducerClient(connectionString, eventHubName); + +// The failure handler is required and invoked after all allowable +// retries were applied. + +producer.SendEventBatchFailedAsync += args => +{ + Debug.WriteLine($"Publishing failed for { args.EventBatch.Count } events. Error: '{ args.Exception.Message }'"); + return Task.CompletedTask; +}; + +// The success handler is optional. + +producer.SendEventBatchSucceededAsync += args => +{ + Debug.WriteLine($"{ args.EventBatch.Count } events were published to partition: '{ args.PartitionId }."); + return Task.CompletedTask; +}; + +try +{ + for (var index = 0; index < 5; ++index) + { + var eventData = new EventData($"Event #{ index }"); + await producer.EnqueueEventAsync(eventData); + } +} +finally +{ + // Closing the producer will flush any + // enqueued events that have not been published. + + await producer.CloseAsync(); +} +``` + +### Event Hub Producer Client + +When using the `EventHubProducerClient` a batch is first created and then published. The `SendAsync` call will receive an acknowledgment from the Event Hubs service; so long as no exception is thrown, your application can consider publishing successful. All of your event data will be published to one of the Event Hub partitions, though there may be a slight delay until it is available to be read. ```C# Snippet:EventHubs_Sample04_AutomaticRouting var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>"; @@ -64,12 +124,11 @@ var producer = new EventHubProducerClient(connectionString, eventHubName); try { - using var eventBatch = await producer.CreateBatchAsync(); + using EventDataBatch eventBatch = await producer.CreateBatchAsync(); for (var index = 0; index < 5; ++index) { - var eventBody = new BinaryData($"Event #{ index }"); - var eventData = new EventData(eventBody); + var eventData = new EventData($"Event #{ index }"); if (!eventBatch.TryAdd(eventData)) { @@ -91,7 +150,60 @@ When publishing events, it may be desirable to request that the Event Hubs servi There is no means of predicting which partition will be associated with a given partition key; we can only be assured that it will be a consistent choice of partition. If you have a need to understand which exact partition an event is published to, you will need to specify the partition directly rather than using a partition key. -When the batch is published, the `EventHubProducerClient` will receive an acknowledgment from the Event Hubs service; so long as no exception is thrown by this call, your application can consider publishing successful. The service assumes responsibility for delivery of the batch. All of your event data will be published to one of the Event Hub partitions, thought here may be a slight delay until it is available to be read. +### Event Hub Buffered Producer Client + +When using the `EventHubBufferedProducerClient`, events are enqueued with a partition key option. Because the producer manages publishing, there is no explicit call. When the producer is closed, it will ensure that any remaining enqueued events have been published. All of your event data will be published to one of the Event Hub partitions, though there may be a slight delay until it is available to be read. + +**Note:** It is important to be aware that if you are using a partition key, you may not also specify a partition identifier; they are mutually exclusive. + +```C# Snippet:EventHubs_Sample04_PartitionKeyBuffered +var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>"; +var eventHubName = "<< NAME OF THE EVENT HUB >>"; + +var producer = new EventHubBufferedProducerClient(connectionString, eventHubName); + +// The failure handler is required and invoked after all allowable +// retries were applied. + +producer.SendEventBatchFailedAsync += args => +{ + Debug.WriteLine($"Publishing failed for { args.EventBatch.Count } events. Error: '{ args.Exception.Message }'"); + return Task.CompletedTask; +}; + +// The success handler is optional. + +producer.SendEventBatchSucceededAsync += args => +{ + Debug.WriteLine($"{ args.EventBatch.Count } events were published to partition: '{ args.PartitionId }."); + return Task.CompletedTask; +}; + +try +{ + var enqueueOptions = new EnqueueEventOptions + { + PartitionKey = "Any Value Will Do..." + }; + + for (var index = 0; index < 5; ++index) + { + var eventData = new EventData($"Event #{ index }"); + await producer.EnqueueEventAsync(eventData, enqueueOptions); + } +} +finally +{ + // Closing the producer will flush any + // enqueued events that have not been published. + + await producer.CloseAsync(); +} +``` + +### Event Hub Producer Client + +When using the `EventHubProducerClient` a batch is first created with a partition key option and then published. The `SendAsync` call will receive an acknowledgment from the Event Hubs service; so long as no exception is thrown, your application can consider publishing successful. All of your event data will be published to one of the Event Hub partitions, though there may be a slight delay until it is available to be read. **Note:** It is important to be aware that if you are using a partition key, you may not also specify a partition identifier; they are mutually exclusive. @@ -108,12 +220,11 @@ try PartitionKey = "Any Value Will Do..." }; - using var eventBatch = await producer.CreateBatchAsync(batchOptions); + using EventDataBatch eventBatch = await producer.CreateBatchAsync(batchOptions); for (var index = 0; index < 5; ++index) { - var eventBody = new BinaryData($"Event #{ index }"); - var eventData = new EventData(eventBody); + var eventData = new EventData($"Event #{ index }"); if (!eventBatch.TryAdd(eventData)) { @@ -133,7 +244,62 @@ finally When publishing, it may be desirable to request that the Event Hubs service place a batch on a specific partition, for organization and processing. For example, you may have designated one partition of your Event Hub as being responsible for all of your telemetry-related events. This can be accomplished by setting the identifier of the desired partition when creating the batch. -When the batch is published, the `EventHubProducerClient` will receive an acknowledgment from the Event Hubs service; so long as no exception is thrown by this call, your application can consider publishing successful. The service assumes responsibility for delivery of the batch. All of your event data will be published to one of the Event Hub partitions, thought here may be a slight delay until it is available to be read. +### Event Hub Buffered Producer Client + +When using the `EventHubBufferedProducerClient`, events are enqueued with a partition identifier option. Because the producer manages publishing, there is no explicit call. When the producer is closed, it will ensure that any remaining enqueued events have been published. All of your event data will be published to one of the Event Hub partitions, though there may be a slight delay until it is available to be read. + +**Note:** It is important to be aware that if you are using a partition key, you may not also specify a partition identifier; they are mutually exclusive. + +```C# Snippet:EventHubs_Sample04_PartitionIdBuffered +var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>"; +var eventHubName = "<< NAME OF THE EVENT HUB >>"; + +var producer = new EventHubBufferedProducerClient(connectionString, eventHubName); + +// The failure handler is required and invoked after all allowable +// retries were applied. + +producer.SendEventBatchFailedAsync += args => +{ + Debug.WriteLine($"Publishing failed for { args.EventBatch.Count } events. Error: '{ args.Exception.Message }'"); + return Task.CompletedTask; +}; + +// The success handler is optional. + +producer.SendEventBatchSucceededAsync += args => +{ + Debug.WriteLine($"{ args.EventBatch.Count } events were published to partition: '{ args.PartitionId }."); + return Task.CompletedTask; +}; + +try +{ + string firstPartition = (await producer.GetPartitionIdsAsync()).First(); + + var enqueueOptions = new EnqueueEventOptions + { + PartitionId = firstPartition + }; + + for (var index = 0; index < 5; ++index) + { + var eventData = new EventData($"Event #{ index }"); + await producer.EnqueueEventAsync(eventData, enqueueOptions); + } +} +finally +{ + // Closing the producer will flush any + // enqueued events that have not been published. + + await producer.CloseAsync(); +} +``` + +### Event Hub Producer Client + +When using the `EventHubProducerClient` a batch is first created with a partition identifier option and then published. The `SendAsync` call will receive an acknowledgment from the Event Hubs service; so long as no exception is thrown, your application can consider publishing successful. All of your event data will be published to one of the Event Hub partitions, though there may be a slight delay until it is available to be read. **Note:** It is important to be aware that if you are using a partition identifier, you may not also specify a partition key; they are mutually exclusive. @@ -152,12 +318,11 @@ try PartitionId = firstPartition }; - using var eventBatch = await producer.CreateBatchAsync(batchOptions); + using EventDataBatch eventBatch = await producer.CreateBatchAsync(batchOptions); for (var index = 0; index < 5; ++index) { - var eventBody = new BinaryData($"Event #{ index }"); - var eventData = new EventData(eventBody); + var eventData = new EventData($"Event #{ index }"); if (!eventBatch.TryAdd(eventData)) { @@ -175,7 +340,7 @@ finally ## Publishing events with custom metadata -Because an event consists mainly of an opaque set of bytes, it may be difficult for consumers of those events to make informed decisions about how to process them. In order to allow event publishers to offer better context for consumers, events may also contain custom metadata, in the form of a set of key/value pairs. One common scenario for the inclusion of metadata is to provide a hint about the type of data contained by an event, so that consumers understand its format and can deserialize it appropriately. +Because an event consists mainly of an opaque set of bytes, it may be difficult for consumers of those events to make informed decisions about how to process them. In order to allow event publishers to offer better context for consumers, events may also contain custom metadata. One common scenario for the inclusion of metadata is to provide a hint about the type of data contained by an event, so that consumers understand its format and can deserialize it appropriately. This metadata is not used by, or in any way meaningful to, the Event Hubs service; it exists only for coordination between event publishers and consumers. @@ -183,72 +348,120 @@ This metadata is not used by, or in any way meaningful to, the Event Hubs servic var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>"; var eventHubName = "<< NAME OF THE EVENT HUB >>"; -var producer = new EventHubProducerClient(connectionString, eventHubName); +var producer = new EventHubBufferedProducerClient(connectionString, eventHubName); + +// The failure handler is required and invoked after all allowable +// retries were applied. + +producer.SendEventBatchFailedAsync += args => +{ + Debug.WriteLine($"Publishing failed for { args.EventBatch.Count } events. Error: '{ args.Exception.Message }'"); + return Task.CompletedTask; +}; + +// The success handler is optional. + +producer.SendEventBatchSucceededAsync += args => +{ + Debug.WriteLine($"{ args.EventBatch.Count } events were published to partition: '{ args.PartitionId }."); + return Task.CompletedTask; +}; try { - using var eventBatch = await producer.CreateBatchAsync(); + var eventData = new EventData("Hello, Event Hubs!") + { + MessageId = "H1", + ContentType = "application/json" + }; - var eventBody = new BinaryData("Hello, Event Hubs!"); - var eventData = new EventData(eventBody); eventData.Properties.Add("EventType", "com.microsoft.samples.hello-event"); eventData.Properties.Add("priority", 1); eventData.Properties.Add("score", 9.0); - if (!eventBatch.TryAdd(eventData)) + await producer.EnqueueEventAsync(eventData); + + eventData = new EventData("Goodbye, Event Hubs!") { - throw new Exception("The first event could not be added."); - } + MessageId = "G1", + ContentType = "application/json" + }; - eventBody = new BinaryData("Goodbye, Event Hubs!"); - eventData = new EventData(eventBody); eventData.Properties.Add("EventType", "com.microsoft.samples.goodbye-event"); eventData.Properties.Add("priority", "17"); eventData.Properties.Add("blob", true); - if (!eventBatch.TryAdd(eventData)) - { - throw new Exception("The second event could not be added."); - } - - await producer.SendAsync(eventBatch); + await producer.EnqueueEventAsync(eventData); } finally { + // Closing the producer will flush any + // enqueued events that have not been published. + await producer.CloseAsync(); } ``` -## Publishing events without an explicit batch +## Tuning throughput for buffered publishing -In scenarios where producers publish events more frequently and aren't concerned with exceeding the size limitation, it is reasonable to bypass the safety offered by using the `EventDataBatch` to offer minor throughput gains and fewer memory allocations. In support of this scenario, the `EventProducerClient` offers a `SendAsync` overload that accepts a set of events. This method delegates validation to the Event Hubs service to avoid the performance cost of a client-side measurement. If the set of events that was published exceeds the size limit, an [EventHubsException](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.eventhubsexception?view=azure-dotnet) will be surfaced with its `Reason` set to [MessageSizeExceeded](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.eventhubsexception.failurereason?view=azure-dotnet). +To ensure consistent performance and throughput, it is common for applications to make decisions around the pattern of publishing that they use - adjusting the frequency that batches are sent and how many operations take place concurrently. Because the `EventHubBufferedProducerClient` manages batches and publishing in the background, your application cannot directly control these aspects. -When events are passed in this form, the `EventProducerClient` will package them as a single publishing operation. When the set is published, the result is atomic; either publishing was successful for all events, or it has failed for all events. Partial success or failure when publishing a batch is not possible. +By default, the `EventHubBufferedProducerClient` uses a set of values that will perform well for general-case scenarios, balancing consistent performance with ensuring that the order of events is maintained. In the case where your application has different needs, it can provide a set of options when constructing the producer that will influence publishing behavior and help ensure that it is optimal for your specific scenarios. -When published, the `EventHubProducerClient` will receive an acknowledgment from the Event Hubs service; so long as no exception is thrown by this call, your application can consider publishing successful. The service assumes responsibility for delivery of the set. All of your event data will be published to one of the Event Hub partitions, thought here may be a slight delay until it is available to be read. +The performance-related settings are: -```C# Snippet:EventHubs_Sample04_NoBatch +- **MaximumWaitTime**: This is the longest that the producer will wait for a batch to be full before publishing. For applications that publish frequently, waiting longer for a full batch may improve efficiency. For applications that publish infrequently or sporadically, a lower value will ensure that events are not held in buffer waiting. The default wait time is 1 second. + +- **MaximumConcurrentSends**: The number of concurrent `SendAsync` calls that the producer will make. Each call is a network request that publishes a single batch. A higher degree of concurrency can improve throughput for applications that use an Event Hub with a large number of partitions. Because the producer is highly asynchronous and is running background tasks, we recommend being careful when selecting a value to avoid creating contention in the thread pool. Testing under normal load is essential. The default concurrency is equal to the number of cores in the host environment. + +- **MaximumConcurrentSendsPerPartition**: The number of concurrent `SendAsync` calls that can be active for a given partition. To maintain the order of events, there should be only a single active send per partition. For applications where preserving the ordering of events is not needed, increasing this value may improve throughput. It is important to note that the _MaximumConcurrentSends_ setting is the dominant constraint and will not be exceeded by this setting. The default concurrency is 1 send per partition. + +- **MaximumEventBufferLengthPerPartition**: The maximum number of events that can be buffered for each individual partition. This is intended to ensure that your application does not run out of memory if buffering happens more frequently than events can be published. When this limit is reached, your application can continue to call `EnqueueEventAsync` or `EnqueueEventsAsync` without an error; the call will block until space is available. For applications that publish a high number of smaller-sized events, increasing this limit may help to improve throughput. For scenarios where the application is buffering large events and needs to control memory use, lowering this limit may be helpful. The default buffer length is 1500 events per partition. + +```C# Snippet:EventHubs_Sample04_BufferedConfiguration var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>"; var eventHubName = "<< NAME OF THE EVENT HUB >>"; -var producer = new EventHubProducerClient(connectionString, eventHubName); +var options = new EventHubBufferedProducerClientOptions +{ + MaximumWaitTime = TimeSpan.FromSeconds(1), + MaximumConcurrentSends = 5, + MaximumConcurrentSendsPerPartition = 1, + MaximumEventBufferLengthPerPartition = 5000 +}; -try +var producer = new EventHubBufferedProducerClient(connectionString, eventHubName, options); + +// The failure handler is required and invoked after all allowable +// retries were applied. + +producer.SendEventBatchFailedAsync += args => { - var eventsToSend = new List(); + Debug.WriteLine($"Publishing failed for { args.EventBatch.Count } events. Error: '{ args.Exception.Message }'"); + return Task.CompletedTask; +}; - for (var index = 0; index < 10; ++index) - { - var eventBody = new BinaryData("Hello, Event Hubs!"); - var eventData = new EventData(eventBody); +// The success handler is optional. - eventsToSend.Add(eventData); - } +producer.SendEventBatchSucceededAsync += args => +{ + Debug.WriteLine($"{ args.EventBatch.Count } events were published to partition: '{ args.PartitionId }."); + return Task.CompletedTask; +}; - await producer.SendAsync(eventsToSend); +try +{ + for (var index = 0; index < 5; ++index) + { + var eventData = new EventData($"Event #{ index }"); + await producer.EnqueueEventAsync(eventData); + } } finally { + // Closing the producer will flush any + // enqueued events that have not been published. + await producer.CloseAsync(); } ``` @@ -277,6 +490,7 @@ try } batches = await BuildBatchesAsync(eventsToSend, producer); + foreach (var batch in batches) { await producer.SendAsync(batch); @@ -331,6 +545,38 @@ private static async Task> BuildBatchesAsync( } ``` +## Publishing events with an implicit batch + +In scenarios where an application using the `EventProducerClient` wishes to publish events more frequently and is not concerned with exceeding the size limitation, it is reasonable to bypass the safety offered by using the `EventDataBatch` to offer minor throughput gains and fewer memory allocations. In support of this scenario, the `EventProducerClient` offers a `SendAsync` overload that accepts a set of events. This method delegates validation to the Event Hubs service to avoid the performance cost of a client-side measurement. If the set of events that was published exceeds the size limit, an [EventHubsException](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.eventhubsexception?view=azure-dotnet) will be surfaced with its `Reason` set to [MessageSizeExceeded](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.eventhubsexception.failurereason?view=azure-dotnet). + +When events are passed in this form, the `EventProducerClient` will package them as a single publishing operation. When the set is published, the result is atomic; either publishing was successful for all events, or it has failed for all events. Partial success or failure when publishing a batch is not possible. + +When published, the `EventHubProducerClient` will receive an acknowledgment from the Event Hubs service; so long as no exception is thrown by this call, your application can consider publishing successful. The service assumes responsibility for delivery of the set. All of your event data will be published to one of the Event Hub partitions, though there may be a slight delay until it is available to be read. + +```C# Snippet:EventHubs_Sample04_NoBatch +var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>"; +var eventHubName = "<< NAME OF THE EVENT HUB >>"; + +var producer = new EventHubProducerClient(connectionString, eventHubName); + +try +{ + var eventsToSend = new List(); + + for (var index = 0; index < 10; ++index) + { + var eventData = new EventData("Hello, Event Hubs!"); + eventsToSend.Add(eventData); + } + + await producer.SendAsync(eventsToSend); +} +finally +{ + await producer.CloseAsync(); +} +``` + ## Restricting a batch to a custom size limit In some scenarios, such as when bandwidth is limited or publishers need to maintain control over how much data is transmitted at a time, a custom size limit (in bytes) may be specified when creating an `EventDataBatch`. This will override the default limit specified by the Event Hub and allows an application to use the `EventDataBatch` to ensure that the size of events can be measured accurately and deterministically. It is important to note that the custom limit may not exceed the limit specified by the Event Hub. @@ -348,12 +594,11 @@ try MaximumSizeInBytes = 350 }; - using var eventBatch = await producer.CreateBatchAsync(batchOptions); + using EventDataBatch eventBatch = await producer.CreateBatchAsync(batchOptions); for (var index = 0; index < 5; ++index) { - var eventBody = new BinaryData($"Event #{ index }"); - var eventData = new EventData(eventBody); + var eventData = new EventData($"Event #{ index }"); if (!eventBatch.TryAdd(eventData)) { diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample05_ReadingEvents.md b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample05_ReadingEvents.md index b461d3d41224..d29529568453 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample05_ReadingEvents.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample05_ReadingEvents.md @@ -6,7 +6,7 @@ This sample demonstrates reading events from an Event Hub. To begin, please ens Reading events is the responsibility of an event consumer. The client library offers several different consumers, each intended to support a specific set of scenarios. The `EventHubConsumerClient` will be the focal point of the samples, as it offers an approachable onboarding experience for exploring Event Hubs as well as supporting some production scenarios. More detail about the available event consumers, including those with more specialized uses, can be found in [Sample02_EventHubsClients](https://github.com/Azure/azure-sdk-for-net/tree/main/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample02_EventHubsClients.md). -Each of the event consumer client types are safe to cache and use for the lifetime of an application, which is best practice when the events are read regularly or semi-regularly. The event consumers are responsible for efficient resource management, working to keep resource usage low during periods of inactivity and manage health during periods of higher use. Calling either the `CloseAsync` or `DisposeAsync` method as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up. +Each of the event consumer client types are safe to cache and use for the lifetime of an application, which is best practice when the events are read regularly or semi-regularly. The event consumers are efficient resource management, adapting to periods of inactivity and higher use automatically. Calling either the `CloseAsync` or `DisposeAsync` method as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up. ## Event lifetime diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample10_AzureEventSourceListener.md b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample10_AzureEventSourceListener.md index 0f973ea16fe6..f71a1bda7447 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample10_AzureEventSourceListener.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample10_AzureEventSourceListener.md @@ -2,6 +2,10 @@ The Event Hubs client library is instrumented using the .NET [`EventSource`](https://docs.microsoft.com/dotnet/api/system.diagnostics.tracing.eventsource) mechanism for logging. When instrumenting or diagnosing issues with applications that consume the library, it is often helpful to have access to the Event Hubs logs. The following scenarios demonstrate how to use the [`AzureEventSourceListener`](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/core/Azure.Core/samples/Diagnostics.md#logging) from the `Azure.Core` package to capture logs emitted by the Event Hubs client library. +## Azure Event Source Listener lifetime + +In order for the `AzureEventSourceListener` to collect logs, it must be in scope and active while the client library is in use. If the listener is disposed or otherwise out of scope, logs cannot be collected. Generally, we recommend creating the listener as a top-level member of the class where the Event Hubs client being inspected is used. + ## Capture all events and write them in to the console The following snippet demonstrates an example of capturing all log information from every [Azure SDK for .NET](https://github.com/Azure/azure-sdk-for-net) library in use and displaying it directly in the Console. Calling the `AzureEventSourceListener.CreateConsoleLogger` factory method with other levels, such as `Critical`, `Error`, `Warning`, or `Informational` will help to filter out unwanted events. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Snippets/Sample04_PublishingEventsLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Snippets/Sample04_PublishingEventsLiveTests.cs index a52bc1782215..1570686bb9ff 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Snippets/Sample04_PublishingEventsLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Snippets/Sample04_PublishingEventsLiveTests.cs @@ -3,7 +3,7 @@ using System; using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; +using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using Azure.Messaging.EventHubs.Producer; @@ -19,7 +19,6 @@ namespace Azure.Messaging.EventHubs.Tests.Snippets [TestFixture] [Category(TestCategory.Live)] [Category(TestCategory.DisallowVisualStudioLiveUnitTesting)] - [SuppressMessage("Style", "IDE0059:Unnecessary assignment of a value", Justification = "Example assignments needed for snippet output content.")] public class Sample04_PublishingEventsLiveTests { /// @@ -45,10 +44,9 @@ public async Task EventBatch() try { - using var eventBatch = await producer.CreateBatchAsync(); + using EventDataBatch eventBatch = await producer.CreateBatchAsync(); - var eventBody = new BinaryData("This is an event body"); - var eventData = new EventData(eventBody); + var eventData = new EventData("This is an event body"); if (!eventBatch.TryAdd(eventData)) { @@ -86,12 +84,11 @@ public async Task AutomaticRouting() try { - using var eventBatch = await producer.CreateBatchAsync(); + using EventDataBatch eventBatch = await producer.CreateBatchAsync(); for (var index = 0; index < 5; ++index) { - var eventBody = new BinaryData($"Event #{ index }"); - var eventData = new EventData(eventBody); + var eventData = new EventData($"Event #{ index }"); if (!eventBatch.TryAdd(eventData)) { @@ -109,6 +106,63 @@ public async Task AutomaticRouting() #endregion } + /// + /// Performs basic smoke test validation of the contained snippet. + /// + /// + [Test] + public async Task AutomaticRoutingBuffered() + { + await using var scope = await EventHubScope.CreateAsync(1); + + #region Snippet:EventHubs_Sample04_AutomaticRoutingBuffered + +#if SNIPPET + var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>"; + var eventHubName = "<< NAME OF THE EVENT HUB >>"; +#else + var connectionString = EventHubsTestEnvironment.Instance.EventHubsConnectionString; + var eventHubName = scope.EventHubName; +#endif + + var producer = new EventHubBufferedProducerClient(connectionString, eventHubName); + + // The failure handler is required and invoked after all allowable + // retries were applied. + + producer.SendEventBatchFailedAsync += args => + { + Debug.WriteLine($"Publishing failed for { args.EventBatch.Count } events. Error: '{ args.Exception.Message }'"); + return Task.CompletedTask; + }; + + // The success handler is optional. + + producer.SendEventBatchSucceededAsync += args => + { + Debug.WriteLine($"{ args.EventBatch.Count } events were published to partition: '{ args.PartitionId }."); + return Task.CompletedTask; + }; + + try + { + for (var index = 0; index < 5; ++index) + { + var eventData = new EventData($"Event #{ index }"); + await producer.EnqueueEventAsync(eventData); + } + } + finally + { + // Closing the producer will flush any + // enqueued events that have not been published. + + await producer.CloseAsync(); + } + + #endregion + } + /// /// Performs basic smoke test validation of the contained snippet. /// @@ -137,12 +191,11 @@ public async Task PartitionKey() PartitionKey = "Any Value Will Do..." }; - using var eventBatch = await producer.CreateBatchAsync(batchOptions); + using EventDataBatch eventBatch = await producer.CreateBatchAsync(batchOptions); for (var index = 0; index < 5; ++index) { - var eventBody = new BinaryData($"Event #{ index }"); - var eventData = new EventData(eventBody); + var eventData = new EventData($"Event #{ index }"); if (!eventBatch.TryAdd(eventData)) { @@ -160,6 +213,68 @@ public async Task PartitionKey() #endregion } + /// + /// Performs basic smoke test validation of the contained snippet. + /// + /// + [Test] + public async Task PartitionKeyBuffered() + { + await using var scope = await EventHubScope.CreateAsync(1); + + #region Snippet:EventHubs_Sample04_PartitionKeyBuffered + +#if SNIPPET + var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>"; + var eventHubName = "<< NAME OF THE EVENT HUB >>"; +#else + var connectionString = EventHubsTestEnvironment.Instance.EventHubsConnectionString; + var eventHubName = scope.EventHubName; +#endif + + var producer = new EventHubBufferedProducerClient(connectionString, eventHubName); + + // The failure handler is required and invoked after all allowable + // retries were applied. + + producer.SendEventBatchFailedAsync += args => + { + Debug.WriteLine($"Publishing failed for { args.EventBatch.Count } events. Error: '{ args.Exception.Message }'"); + return Task.CompletedTask; + }; + + // The success handler is optional. + + producer.SendEventBatchSucceededAsync += args => + { + Debug.WriteLine($"{ args.EventBatch.Count } events were published to partition: '{ args.PartitionId }."); + return Task.CompletedTask; + }; + + try + { + var enqueueOptions = new EnqueueEventOptions + { + PartitionKey = "Any Value Will Do..." + }; + + for (var index = 0; index < 5; ++index) + { + var eventData = new EventData($"Event #{ index }"); + await producer.EnqueueEventAsync(eventData, enqueueOptions); + } + } + finally + { + // Closing the producer will flush any + // enqueued events that have not been published. + + await producer.CloseAsync(); + } + + #endregion + } + /// /// Performs basic smoke test validation of the contained snippet. /// @@ -190,12 +305,11 @@ public async Task PartitionId() PartitionId = firstPartition }; - using var eventBatch = await producer.CreateBatchAsync(batchOptions); + using EventDataBatch eventBatch = await producer.CreateBatchAsync(batchOptions); for (var index = 0; index < 5; ++index) { - var eventBody = new BinaryData($"Event #{ index }"); - var eventData = new EventData(eventBody); + var eventData = new EventData($"Event #{ index }"); if (!eventBatch.TryAdd(eventData)) { @@ -213,6 +327,70 @@ public async Task PartitionId() #endregion } + /// + /// Performs basic smoke test validation of the contained snippet. + /// + /// + [Test] + public async Task PartitionIdBuffered() + { + await using var scope = await EventHubScope.CreateAsync(1); + + #region Snippet:EventHubs_Sample04_PartitionIdBuffered + +#if SNIPPET + var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>"; + var eventHubName = "<< NAME OF THE EVENT HUB >>"; +#else + var connectionString = EventHubsTestEnvironment.Instance.EventHubsConnectionString; + var eventHubName = scope.EventHubName; +#endif + + var producer = new EventHubBufferedProducerClient(connectionString, eventHubName); + + // The failure handler is required and invoked after all allowable + // retries were applied. + + producer.SendEventBatchFailedAsync += args => + { + Debug.WriteLine($"Publishing failed for { args.EventBatch.Count } events. Error: '{ args.Exception.Message }'"); + return Task.CompletedTask; + }; + + // The success handler is optional. + + producer.SendEventBatchSucceededAsync += args => + { + Debug.WriteLine($"{ args.EventBatch.Count } events were published to partition: '{ args.PartitionId }."); + return Task.CompletedTask; + }; + + try + { + string firstPartition = (await producer.GetPartitionIdsAsync()).First(); + + var enqueueOptions = new EnqueueEventOptions + { + PartitionId = firstPartition + }; + + for (var index = 0; index < 5; ++index) + { + var eventData = new EventData($"Event #{ index }"); + await producer.EnqueueEventAsync(eventData, enqueueOptions); + } + } + finally + { + // Closing the producer will flush any + // enqueued events that have not been published. + + await producer.CloseAsync(); + } + + #endregion + } + /// /// Performs basic smoke test validation of the contained snippet. /// @@ -232,38 +410,121 @@ public async Task CustomMetadata() var eventHubName = scope.EventHubName; #endif - var producer = new EventHubProducerClient(connectionString, eventHubName); + var producer = new EventHubBufferedProducerClient(connectionString, eventHubName); + + // The failure handler is required and invoked after all allowable + // retries were applied. + + producer.SendEventBatchFailedAsync += args => + { + Debug.WriteLine($"Publishing failed for { args.EventBatch.Count } events. Error: '{ args.Exception.Message }'"); + return Task.CompletedTask; + }; + + // The success handler is optional. + + producer.SendEventBatchSucceededAsync += args => + { + Debug.WriteLine($"{ args.EventBatch.Count } events were published to partition: '{ args.PartitionId }."); + return Task.CompletedTask; + }; try { - using var eventBatch = await producer.CreateBatchAsync(); + var eventData = new EventData("Hello, Event Hubs!") + { + MessageId = "H1", + ContentType = "application/json" + }; - var eventBody = new BinaryData("Hello, Event Hubs!"); - var eventData = new EventData(eventBody); eventData.Properties.Add("EventType", "com.microsoft.samples.hello-event"); eventData.Properties.Add("priority", 1); eventData.Properties.Add("score", 9.0); - if (!eventBatch.TryAdd(eventData)) + await producer.EnqueueEventAsync(eventData); + + eventData = new EventData("Goodbye, Event Hubs!") { - throw new Exception("The first event could not be added."); - } + MessageId = "G1", + ContentType = "application/json" + }; - eventBody = new BinaryData("Goodbye, Event Hubs!"); - eventData = new EventData(eventBody); eventData.Properties.Add("EventType", "com.microsoft.samples.goodbye-event"); eventData.Properties.Add("priority", "17"); eventData.Properties.Add("blob", true); - if (!eventBatch.TryAdd(eventData)) + await producer.EnqueueEventAsync(eventData); + } + finally + { + // Closing the producer will flush any + // enqueued events that have not been published. + + await producer.CloseAsync(); + } + + #endregion + } + + /// + /// Performs basic smoke test validation of the contained snippet. + /// + /// + [Test] + public async Task BufferedConfiguration() + { + await using var scope = await EventHubScope.CreateAsync(1); + + #region Snippet:EventHubs_Sample04_BufferedConfiguration + +#if SNIPPET + var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>"; + var eventHubName = "<< NAME OF THE EVENT HUB >>"; +#else + var connectionString = EventHubsTestEnvironment.Instance.EventHubsConnectionString; + var eventHubName = scope.EventHubName; +#endif + + var options = new EventHubBufferedProducerClientOptions + { + MaximumWaitTime = TimeSpan.FromSeconds(1), + MaximumConcurrentSends = 5, + MaximumConcurrentSendsPerPartition = 1, + MaximumEventBufferLengthPerPartition = 5000 + }; + + var producer = new EventHubBufferedProducerClient(connectionString, eventHubName, options); + + // The failure handler is required and invoked after all allowable + // retries were applied. + + producer.SendEventBatchFailedAsync += args => + { + Debug.WriteLine($"Publishing failed for { args.EventBatch.Count } events. Error: '{ args.Exception.Message }'"); + return Task.CompletedTask; + }; + + // The success handler is optional. + + producer.SendEventBatchSucceededAsync += args => + { + Debug.WriteLine($"{ args.EventBatch.Count } events were published to partition: '{ args.PartitionId }."); + return Task.CompletedTask; + }; + + try + { + for (var index = 0; index < 5; ++index) { - throw new Exception("The second event could not be added."); + var eventData = new EventData($"Event #{ index }"); + await producer.EnqueueEventAsync(eventData); } - - await producer.SendAsync(eventBatch); } finally { + // Closing the producer will flush any + // enqueued events that have not been published. + await producer.CloseAsync(); } @@ -297,9 +558,7 @@ public async Task NoBatch() for (var index = 0; index < 10; ++index) { - var eventBody = new BinaryData("Hello, Event Hubs!"); - var eventData = new EventData(eventBody); - + var eventData = new EventData("Hello, Event Hubs!"); eventsToSend.Add(eventData); } @@ -346,6 +605,7 @@ public async Task MultipleBatches() } batches = await BuildBatchesAsync(eventsToSend, producer); + #if !SNIPPET batchEventCount = batches.Sum(batch => batch.Count); #endif @@ -400,12 +660,11 @@ public async Task CustomBatchSize() MaximumSizeInBytes = 350 }; - using var eventBatch = await producer.CreateBatchAsync(batchOptions); + using EventDataBatch eventBatch = await producer.CreateBatchAsync(batchOptions); for (var index = 0; index < 5; ++index) { - var eventBody = new BinaryData($"Event #{ index }"); - var eventData = new EventData(eventBody); + var eventData = new EventData($"Event #{ index }"); if (!eventBatch.TryAdd(eventData)) {