Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compatibility using Microsoft.Azure.CosmosDB.BulkExecutor with Microsoft.Azure.Documents.Client #605

Closed
kevinding0218 opened this issue Jul 26, 2019 · 66 comments

Comments

@kevinding0218
Copy link

We're having some trouble and tons of confusion about choosing the correct version of BulkExecutor with .Net cosmos SDK, currently we're building 2 class library projects using different versions of cosmos DB SDK

The first class library project we're using .Net Core with Microsoft.Azure.Cosmos 3.0.0 which is the current one, we use this for reading an item from Cosmos DB collection.
The second class library project we've tried with .Net Framework 4.6.1/4.7.1 with Microsoft.Azure.Documents.Client which is the v2 package, we use this for bulkInsert or bulkUpdate item collections with Cosmos DB.

However, when our v3 project add the reference of v2 project, we're not able to initialize
the connection of cosmos db using Microsoft.Azure.Documents.Client, I wonder if it's an issue
of different versions of Cosmos DB SDK being used here? Could we use the combination of v2 and v3 in same solution?

@j82w
Copy link
Contributor

j82w commented Jul 27, 2019

We are working on adding batch support into the SDK directly to avoid these issues in the future. Take a look at #584.

@j82w
Copy link
Contributor

j82w commented Jul 29, 2019

This PR adding bulk stream support.

@kevinding0218
Copy link
Author

@j82w Thank you sir! Has it been merged yet? Is there any documentation that we can refer?

@j82w
Copy link
Contributor

j82w commented Jul 29, 2019

It has not been merged yet. Right now the plan is to do a preview release with batch and bulk. I don't have a time frame of when it will be available right now. This issue will get updated once it is available.

@kevinding0218
Copy link
Author

@j82w Hi has the functionality of batch operation been able to merged to v3 cosmos client library now?

@j82w
Copy link
Contributor

j82w commented Aug 13, 2019

@kevinding0218 batch is available in the 3.2.0-preview nuget. It's in preview so we are open to feedback. Please try it out.

@kevinding0218
Copy link
Author

@j82w , thank you so much for your reply! We're excited to start to play with it now!

@kevinding0218
Copy link
Author

@j82w, sorry we may have to bother you, in the branch 3.2.0-preview, we were having some trouble to find where the bulk executor api located, we looked through the PR and found there is one for #585 but seems like the change code for bulkexecutor was removed from the branch...Is there any sample or API code that we can refer?

@j82w
Copy link
Contributor

j82w commented Aug 13, 2019

@kevinding0218 I think there is some confusion.
Refining it little more, SDK will enable below two capabilities

  1. Single partition-key scoped transactional batch execution: Its guaranteed to be always transactional. In-case of failures application is expected to retry by fixing related errors. Its in preview at https://www.nuget.org/packages/Microsoft.Azure.Cosmos/3.2.0-preview.
  2. Cross partition-key bulk execution: Non-transaction and un-ordered execution. Its primarily meant for high throughput import/export scenarios (close to bulk executor library). Granularity of execution is a single operation and on-error application can retry the failed single operation. It’s still in development and expected to be previewed by Next week.

@kevinding0218
Copy link
Author

@j82w, Thank you for your clarification! I see, we're looking for execution like bulk-insert or bulk-update (close to bulk executor library with v2) so I guess it might be part of you No.2 feature, right? If so, please feel free to let me know so we'll wait until next week and see if we can use it.

@j82w
Copy link
Contributor

j82w commented Aug 13, 2019

@j82w
Copy link
Contributor

j82w commented Aug 13, 2019

@kevinding0218 what is your scenario for needing bulk support? Is there anything preventing you from using the Batch API?

@ealsur
Copy link
Member

ealsur commented Aug 13, 2019

@kevinding0218 If you are doing bulk insert or bulk update on a known Partition Key, then the Batch API should be very similar.

@kevinding0218
Copy link
Author

@j82w , our scenario is to have the ability to insert/update a batch size of object into the collection in a single operation, yes we did found the BATCH api in master branch, however, the branch 3.2.0-preview seems removing the BATCH API, so we're not sure if this feature will continue in the future or it'll be re-written for any reason...

@kevinding0218
Copy link
Author

@j82w Thank you for sharing the sample. We've gone through the sample code and had couple of confusions there, we're not sure if the example shown is what we look for like performing bulk insert/update/upsert for a subset of items.
For below code snippet:

using (BatchResponse batchResponse = await container.CreateBatch(new Cosmos.PartitionKey(activityType))
        .CreateItem<ToDoActivity>(test1)
        .ReplaceItem<ToDoActivity>(test2.id, test2)
        .UpsertItem<ToDoActivity>(test3)
        .DeleteItem("reading")
        .CreateItemStream(streamPayload1)
        .ReplaceItemStream("eating", streamPayload2)
        .UpsertItemStream(streamPayload3)
        .ExecuteAsync())

The container.CreateBatch(...) method looks more like creating a transaction, not performing a batch job which would deal with a subset of items here, rather it looks dealing with one item at a time but grouping the process in one transaction...

The CreateItem/ReplaceItem/UpsertItem/DeleteItem would be dealing with one single item at a time.

The CreateItemStream/ReplaceItemStream/UpsertItemStream seems like also dealing with one single item at a time...Please feel free to correct me if I am wrong...

If above three methods could deal with a subset of items, then how would we define streamPayload1/streamPayload2/streamPayload3 here (e.g, steamPayload would be equal to the subset of test1 + test 2 + test 3)? There is no declaration of the streamPayload1/streamPayload2/streamPayload3 that we thought the code might be removed somehow...

@kevinding0218
Copy link
Author

@j82w, Hello not sure if you could have a chance to look at our question, regarding the bulk api for inserting/updating bunch of items with cosmos db in v3 SDK, would you happen to know if there is any code sample that we could refer? Thank you very much!

@kevinding0218
Copy link
Author

kevinding0218 commented Aug 19, 2019

@j82w, we've noticed that in ItemManagement there was a comment like "For insert operations where you are creating many items we recommend using a Stored Procedure and pass batches of new items to this sproc." and there is a "BulkImport.js" of using store procedure, is there any C# API to perform the similar functionality? And by considering the performance impact, which one would be better at considering creating/updating many items, store procedure or c# bulk api?

@j82w
Copy link
Contributor

j82w commented Aug 19, 2019

@abhijitpai or @ealsur can you answer the question?

@ealsur
Copy link
Member

ealsur commented Aug 19, 2019

Code samples for the Batch API are an item being tracked #685

Since the API is still in preview, that is the reason there is no documentation out yet. But the code has samples: https://github.com/Azure/azure-cosmos-dotnet-v3/blob/master/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs#L1106

Regarding BulkImport.js, I believe that was a sample scenario on how to deal with multiple item creations before the introduction of Batch. That sample would be replaced when the new samples for Batch come. It still is a valid scenario, but Batch API would be better.

@abhijitpai
Copy link
Contributor

@kevinding0218 Is your goal to atomically insert/update a set of documents in a partition key? Or is it to get high throughput / low overall latency to ingest a large amount of data into Cosmos DB? The former use case is met by the Batch API which is in 3.2-preview, and the latter by the upcoming bulk functionality.

Assuming the former, the stream methods within batch API are just mechanisms to provide your JSON documents in a serialized manner as opposed to typed objects. Each of the Create/Replace/UpsertItemStream method calls take a stream which when read provides the UTF-8 serialized version of one document. The methods are documented here. Upon executing the batch via ExecuteAsync, the batch of operations is executed in a performant manner.

@kevinding0218
Copy link
Author

kevinding0218 commented Aug 19, 2019

Hello @ealsur and @abhijitpai , thank you for both of your reply! Actually we've gone over the code sample #685 and method description of batch api in Container example before, but it seems not match with what we need.

Our goal is to atomically insert/update a set of (could be 1,000 items as batch size) documents that might not come with a same partition key, but I can split the group into sub-group within same partition key, then deal over sub-group level for a set of documents which shouldn't be a problem, so let's assume the use case is what you introduced by the Batch API in 3.2-preview. However, as to my previous comment and @abhijitpai 's comment, here is why we don't think it meets our goal:

  1. The CreateItem/ReplaceItem/UpsertItem/DeleteItem would be dealing with one single document at one time, as it's built upon id , which doesn't match with our goal of dealing with a set(bulk) of documents in one go.

  2. The CreateItemStream/ReplaceItemStream/UpsertItemStream also deals with one single document at one time but in a stream UTF-8 serialized version, as it's built upon Item< T> - single item level , which again doesn't match with our goal of dealing with a set(bulk) of documents in one go.

That's where we got confused as we thought the Bulk API should be used to deal with a subset(could be 1,000 items as batch size) of documents at one time, not one single document at one time, in addition we're unable to see an example of how to define the streamPayload as of a subset of documents, and according to @abhijitpai comment that "a stream which when read provides the UTF-8 serialized version of one document.", we don't think the Bulk API is what we're looking for in order to meet our requirement. Please feel free to correct me if I am wrong...

Here is our pseudo code for what we're expecting

// itemList of size 1,000 ToDoItem
List< ToDoItem> itemList = new List< ToDoItem>();
itemList.Add(item1);
itemList.Add(item2);
itemList.Add(item3);
itemList.Add(item4);
...
itemList.Add(item1000);

//Perform Bulk Insert or Update/Replace as 1,000 total items as
itemList.BatchInsertExecution();
itemList.BatchUpdateExecution();

And This is NOT what we're expecting

// itemList of size 1,000 ToDoItem
List< ToDoItem> itemList = new List< ToDoItem>();
itemList.Add(item1);
itemList.Add(item2);
itemList.Add(item3);
itemList.Add(item4);
...
itemList.Add(item1000);

// Loop through 1,000 item and for each one single document perform an insert/update
itemList.ForEach(item => CreateItem(item)):

@ealsur
Copy link
Member

ealsur commented Aug 19, 2019

Let's say you have 1000 items, and each item has a PartitionKey property. Then you could use the Batch API like so:

foreach (var group in items.GroupBy(item => item.PartitionKey)
{
    Batch batch = container.CreateBatch(new Cosmos.PartitionKey(group.Key));
    foreach (ToDoItem item in group){
        batch.CreateItem(item);
    }

    BatchResponse batchResponse = await batch.ExecuteAsync();
}

You can group your operations per partition key value and do a batch on that.

@kevinding0218
Copy link
Author

@ealsur Thank you a ton for the code sample, that makes a lot of sense now and we'll try it out!

@SteffenMangold
Copy link

Please keep us informed when Cross partition-key bulk execution is available in a preview version.

@ealsur
Copy link
Member

ealsur commented Aug 27, 2019

@SteffenMangold see #741 to follow-up

@kevinding0218
Copy link
Author

kevinding0218 commented Aug 30, 2019

Hello @ealsur/ @abhijitpai , we've tried with the batch api and it works pretty good! As we're digging more into it and when we are trying to process a large data set, we've encountered some exceptions like This batch request cannot be executed as it is larger than the allowed limit. Please reduce the number of operations in the batch and try again", I've attached a screen shot of our log output. So few things we've noticed here:
image
image

  • The exception was not coming from CosmosException where we thought it would, and if it's from CosmosException with a specific SubStatusCode, we might perform a detection with retry and re-batching

  • In order to avoid such exception happen, is there a way that we could measure our Batch batch request data size before the execution? Currently we didn't find any property within the Batch class that could tell us the current request size, and according to previous concern, since it's not from CosmosException I don't think we can get a size or size information from there as well.

  • what we're thinking is that if batch grows up to a certain size, and if we could detect it before doing the await batch.ExecuteAsync(); or re-batch at certain sizes, for example:

If batch request size is 7MB, we could re-batch into 1.5 MB per batch of total 5 batches then retry
  • In above exception throws, there is no data affected the current collection, I guess it's because batch operation performs a atomic operation? Was this by design? I've seen there is another PR 741 which I am not sure if it does the same way. meaning:
  1. If above data size limitation exception happens, will part of my current batch which equals to the max data size items upserted into cosmos, then the rest of them stays or returns in exception?
  2. If one of my batch items got failed, will rest of them be upserted into cosmos, but I can get that exceptional one item back within exception?

Thank you very much for your help!

@abhijitpai
Copy link
Contributor

Kevin, what you want is not Batch API but bulk stream being introduced in #741 which is the incorporation of bulk executor library functionality into our Cosmos SDK. Using that will ensure you don't need to handle request splitting, and also you do not want atomicity which Batch API will give.

@ealsur
Copy link
Member

ealsur commented Aug 30, 2019

@abhijitpai Should the exception @kevinding0218 reported be a CosmosException?

@kevinding0218
Copy link
Author

@abhijitpai , sure thing, let me try that out as well!

@kevinding0218
Copy link
Author

kevinding0218 commented Sep 6, 2019

@ealsur Also we did some more testing on how to handle the current batch request limitation issue since the bulk stream has not come out yet. We're kind of confusing that we thought the max request data size is 2 MB as discussed, so suppose we have 2,500 items and each item is 1 KB, we're now batching them into two subset (1,500 and 1,000), However, when we tried to batch with 1,500 items, the exception still shows up every time. We even tried to batch them into 800 per subset but it still failed. Please feel free to see our attached screenshot
image
Here, you'd see we're reading the 2,500 items which takes 2,500 RU as each item is 1 KB, then we batch them with 800 a group, but exception still occurred
image

Finally when we change our batch size to 100, it worked...You could see our total RU spent with upserting the 2,500 items here. It doesn't seem that the max limit is 2 MB but could be much smaller.
image

@abhijitpai
Copy link
Contributor

@kevinding0218 Right now, the expectation is that the full batch call from CreateBatch onwards needs to be within the retry as we empty the batch once ExecuteAsync is run on it so that you can use the Batch object to add and run more operations; we can look at changing this behavior if it is not intuitive.

With respect to the request limits, there are two limits on batch requests - one is the size (max 2 MB), and the other is the operation count (max 100).

@kevinding0218
Copy link
Author

@abhijitpai Thank you for making clearer on the request limits! I guess for now we've had to made the operation count of max 100, we're looking forward to seeing the new bulk stream coming up! Please feel free to let us know once it's in release, thank you!

@ealsur
Copy link
Member

ealsur commented Sep 10, 2019

@kevinding0218 Please do not use this issue to discuss other topics. Yes, the SDK retries on Throttles (429) automatically, and can be customized based on that attribute in the configuration.

@kevinding0218
Copy link
Author

Hello, just curious if PR 741 has been fully tested and merged to any recent stable branch? Currently for the Batch API we're still restricted the batch size to be 100 but we're looking forward to any progress made now or in the future, thank you very much for your help!

@ealsur
Copy link
Member

ealsur commented Oct 3, 2019

Batch and Bulk are still in preview, they are both merged in master, but only available on the preview packages: https://www.nuget.org/packages/Microsoft.Azure.Cosmos/3.2.0-preview2

@kevinding0218
Copy link
Author

@ealsur Thank you Matias and I will test it out!

@kevinding0218
Copy link
Author

@ealsur , Hello looks like the batch size limit is still 100? Well 100 is really too narrow with processing the data, as on the otherside the CosmosTrigger also triggered as 100 items being updated, so for the case of 5,000 items that we're processing, it split into 50 batches and our CosmosTriggered Function also triggered 50 time...Is there any plan to increase the limit?

@ealsur
Copy link
Member

ealsur commented Oct 11, 2019

@abhijitpai can probably answer Batch related questions and that max. Bulk (non-transactional) does not have a limit. https://github.com/Azure/azure-cosmos-dotnet-v3/tree/master/Microsoft.Azure.Cosmos.Samples/Usage/BulkSupport, but if you need Transactional support, you will need to use Batch.

@kevinding0218
Copy link
Author

kevinding0218 commented Oct 11, 2019

@ealsur Thank you for your reply! We did noticed the "bulk" (non-transactional) in the sample code, however, that seems like doing a parallel tasks where each task is performing one insert/update action, so considering the performance aspect regarding bulk vs batch, which one would be better to use for large amount of data (~5,000 docs) ?

@abhijitpai , is there any drawback for releasing more batch size on batch operation? The previous bulkexecutor in v2 seems not having this limitation

@ealsur
Copy link
Member

ealsur commented Oct 11, 2019

@kevinding0218 my apologies, we don't have official documentation yet because were are still in preview, but Bulk works through the AllowBulkExecution flag in the CosmosClientOptions:

https://github.com/Azure/azure-cosmos-dotnet-v3/blob/master/Microsoft.Azure.Cosmos.Samples/Usage/BulkSupport/Program.cs#L91

When that flag is true, then concurrent point operations (such as a List of Tasks) won't be one backend operation (see https://github.com/Azure/azure-cosmos-dotnet-v3/blob/master/Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs#L103), they will be grouped and optimized and executed. This yields between 50%-100% more throughput (provided the provisioned RU/s are enough) than doing those operations as single (AllowBatchExecution as false which is the default). The key here is that this mode only benefits bulk concurrent operations, which is the scenario where you used Bulk Executor in V2.

Regarding your question for the size, the Bulk Executor V2 did not have that limit because what it did is, take your 5000 operations and internally, split them into smaller groups and those were the ones that executed. Batch is more straight forward, since the goal is to be a single all-or-nothing transaction.

@IvanMijailovic
Copy link

HI @ealsur, Should I have one instance of DocumentClient and share between all BulkExecutors instances in multi-thread application, or have one instance of DocumentClient per each BulkExecutor? Thanks in advance!

@ealsur
Copy link
Member

ealsur commented Oct 14, 2019

@IvanMijailovic Yes, you certainly can share one instance of DocumentClient across multiple BulkExecutors.

@kevinding0218
Copy link
Author

@ealsur , Thank you for your input! We've tried with using bulk operation by setting the flag "AllowBulkExecution" to true to insert 200 items, it looks like it'll group the 200 items into 5 items per group of tasks, because in our CosmosDBTrigger Functions, we always receive/triggerred 5 items at a time, so for total 200 items we received/triggered for exact 40 times, is this number of bulk operation, the 5 some kind of default value? Could we increase the number here? However, our initial idea is to process as many as documents as possible at one operation (either bulk or batch), for the batch which maximum limit of 100 or bulk grouping by 5 as of now might not fit in our process, as we normally would have to process thousands of items at a time, splitting them into small chunk might create more latency in the overview of entire process.

@ealsur
Copy link
Member

ealsur commented Oct 14, 2019

@kevinding0218 The grouping is based on partitions, just like the Bulk Executor. Are you creating all 200 Tasks in a list and each Task is a CreateItem call?

@kevinding0218
Copy link
Author

@ealsur You're right. We're actually just following the sample code by create 200 tasks in a list and each would be an UpsertItemAsync call, all of our 200 items would have one single common partition key just like the sample code as well, if the grouping is based on partitions, our 200 items should be considered as only one group, correct?

@ealsur
Copy link
Member

ealsur commented Oct 15, 2019

Yes. How do you know that the Bulk is doing batches of 5? Based on what you see on the Change Feed Triggers? Bulk is not transactional, so it's hard to correlate your Change Feed triggers with the backend requests coming from Bulk. Are you setting the AllowBulkExecution flag in the CosmosClientOptions before creating the client or switching it after?

If you are capturing the SDK logs, it should show the Bulk traces.

@kevinding0218
Copy link
Author

@ealsur We're using CosmosDBTrigger Function here so we set the AllowBulkExecution flag in the Options before creating the CosmosClient in Startup.cs. You're right we're monitoring the input count from Change Feed Triggers that we saw every incoming documents are number of 5, therefore we assume the bulk operation was grouping one transaction at 5 as well. Based on your explanation, this seems something more related with Change Feed on the Azure Function side. Is there anyway to increase the Change Feed there (as far as I remember the change feed for CosmosDBTrigger Function would be automatically set) so we could catch as many input documents as possible with the operation of bulk?

@IvanMijailovic
Copy link

@ealsur Should we create DocumentBulkExecutor instance by using a builder per each delete/import/update request and close it when the request is done? Or, create one instance of DocumentBulkExecutor when the application starts and close it on application shutdown? Thanks!

@ealsur
Copy link
Member

ealsur commented Dec 6, 2019

Treat the BulkExecutor instance as a singleton, as per https://docs.microsoft.com/en-us/azure/cosmos-db/bulk-executor-dot-net#performance-tips

@IvanMijailovic
Copy link

@ealsur Do you suggest to have two instances of document client and use one for CRUD collection operations and another one for initialization of BulkExecutor and bulk operations on a collection. Or, use one instance of document client for both?
How does BulkExecutor consume document client, does it only use document client connections or? Thanks in advance!

@ealsur
Copy link
Member

ealsur commented Dec 9, 2019

You can certainly reuse the same instance. DocumentClient is used to execute the Bulk operations, they are normal service requests, so the DocumentClient instance you use to do normal CRUD will do.

@AntonioJDios
Copy link

Hi @ealsur I have a problem. We had bulkexecutor in our implementations. And then I want to upgrade to the new sdk. However I noticed the time between doing a bulkexecutor (1000 documents to one partition key) is less than using the new sdk and the worker approach you define in your blog.
Should we continue using bulk executor? The new sdk replace that bulk executor or are totally different things?

thanks.

@alexmartinezm
Copy link

The new SDK replaces BulkExecutor as you can see here: https://docs.microsoft.com/en-gb/azure/cosmos-db/tutorial-sql-api-dotnet-bulk-import

Is your BulkExecutor implementation way faster than your implementation with the new SDK?

@abhijitpai
Copy link
Contributor

@AntonioJDios - which one of these is your real use case:

  1. You will have 1000 item updates every few minutes that need to be applied to Cosmos DB.
  2. You have a million items to ingest.
  3. Something else - please describe.

@AntonioJDios
Copy link

AntonioJDios commented Feb 12, 2020

@alexmartinezm yes with bulk executor taking to upsert 1300 documents is around 40 seconds while with the new sdk it is spending around 2 minutes.

@abhijitpai The scenario is the next: 1300 documents that will go to just a one partitionkey because then from the FE wi fetch directly the complete partition key (with pagination) So we tried to optimize the query for the front end and we do not mind the writes can be slower than the readings. However the new sdk is much slower than the previous.

do you have any idea?

@kirankumarkolli
Copy link
Member

@AntonioJDios what's the upsert document size?

@AntonioJDios
Copy link

@AntonioJDios what's the upsert document size?
each document it is a simple json something like this

{
"id": "634E1CEBD3F276D0EE7E689132B32D84",
"_rid": "qGA7AJo+vbjspQAAAAAAAA==",
"_self": "dbs/qGA7AA==/colls/qGA7AJo+vbg=/docs/qGA7AJo+vbjspQAAAAAAAA==/",
"_etag": ""67008e60-0000-0000-0000-5c8a64000000"",
"partitionKey": "46a50758-c02d-4969-9fe8-7c73fb725d15-2019-01-09-24",
"deviceId": 94777,
"addressAscIndex": 164,
"address": "Kløvermarken 4",
"zipCode": "7330",
"city": "Brande",
"energyDelivered": "NaN",
"volumeDelivered": 3.93,
"tempDeviation": 0.5,
"inletTemp": 69.7,
"outletTemp": 34.6,
"sn": "6361584",
"deviceType": "xxxx",
"dataQuality": 0.1266,
"_attachments": "attachments/",
"_ts": 1552573440
}

@ealsur
Copy link
Member

ealsur commented Feb 12, 2020

@AntonioJDios Could you open an Issue with the code snippet of how you are inserting the documents when its taking 2 minutes? This thread is about a different issue. Closing because the original ask has been solved long ago.

@ealsur ealsur closed this as completed Feb 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants