Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow ChangeFeedProcessor to work with streams #865

Closed
fuocor opened this issue Oct 2, 2019 · 10 comments · Fixed by #2331
Closed

Allow ChangeFeedProcessor to work with streams #865

fuocor opened this issue Oct 2, 2019 · 10 comments · Fixed by #2331
Labels
ChangeFeed discussion-wanted Need a discussion on an area

Comments

@fuocor
Copy link
Contributor

fuocor commented Oct 2, 2019

Is your feature request related to a problem?
I have a messaging application exclusively using streams for Cosmos document management. There are numerous instances where none or only partial (de)serialization is required, With the ChangeFeedProcessor you require a typed delegate which flows through the serializer, something I do not want because all I am doing with document is a block blob copy to cold storage.
I am using Orleans with a dedicated service grain responsible for the change feed, providing a convenient means to offload the work from the principal process flow that saved the document.

I will only do partial deserialization in order to determine how the stream will be routed. The current approach of the CFP requires both deserialization and serialization to move a message forward.

Describe the solution you'd like
A delegate providing the raw stream.

Describe alternatives you've considered
Adding an inline method to save to cold storage,

@j82w
Copy link
Contributor

j82w commented Oct 2, 2019

@ealsur what do you think?

@ealsur ealsur added the discussion-wanted Need a discussion on an area label Oct 2, 2019
@ealsur
Copy link
Member

ealsur commented Oct 2, 2019

What about a delegate with IReadOnlyCollection<Stream>, have you tried that? Or you want the Stream for the entire batch?

Technically, we are sending the response to the serializer FromStream. If you don't customize the serializer, it would call the base FromStream

You could optionally, have your own CosmosSerializer implementation that handles serialization the way you want it?

@fuocor
Copy link
Contributor Author

fuocor commented Oct 2, 2019

If you look at 871 you'll see that there are a number of internal Cosmos types that are inaccessible to a custom serializer making it unlikely in the near term.

Trying the Stream delegate resulted in:

DocDBTrace Warning: 0 : Exception Microsoft.Azure.Cosmos.ChangeFeed.Exceptions.ObserverException: RequestUri: , Exception has been thrown by the Observer.,    at Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing.FeedProcessorCore`1.DispatchChangesAsync(ResponseMessage response, CancellationToken cancellationToken)
   at Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing.FeedProcessorCore`1.RunAsync(CancellationToken cancellationToken)
   at Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement.PartitionSupervisorCore`1.RunAsync(CancellationToken shutdownToken)
   at Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement.PartitionSupervisorCore`1.RunAsync(CancellationToken shutdownToken)
   at Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement.PartitionControllerCore.ProcessPartitionAsync(PartitionSupervisor partitionSupervisor, DocumentServiceLease lease)
DocDBTrace Warning: 0 : Exception Newtonsoft.Json.JsonSerializationException: RequestUri: , Could not create an instance of type System.IO.Stream. Type is an interface or abstract class and cannot be instantiated. Path 'Documents[0]._aid', line 1, position 44.,    at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateNewObject(JsonReader reader, JsonObjectContract objectContract, JsonProperty containerMember, JsonProperty containerProperty, String id, Boolean& createdFromNonDefaultCreator)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObject(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.PopulateList(IList list, JsonReader reader, JsonArrayContract contract, JsonProperty containerProperty, String id)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateList(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, Object existingValue, String id)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.SetPropertyValue(JsonProperty property, JsonConverter propertyConverter, JsonContainerContract containerContract, JsonProperty containerProperty, JsonReader reader, Object target)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.PopulateObject(Object newObject, JsonReader reader, JsonObjectContract contract, JsonProperty member, String id)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObject(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.Deserialize(JsonReader reader, Type objectType, Boolean checkAdditionalContent)
   at Newtonsoft.Json.JsonSerializer.DeserializeInternal(JsonReader reader, Type objectType)
   at Newtonsoft.Json.JsonSerializer.Deserialize(JsonReader reader, Type objectType)
   at Newtonsoft.Json.JsonSerializer.Deserialize[T](JsonReader reader)
   at Microsoft.Azure.Cosmos.CosmosJsonDotNetSerializer.FromStream[T](Stream stream)
   at Microsoft.Azure.Cosmos.CosmosJsonSerializerWrapper.FromStream[T](Stream stream)
   at Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing.FeedProcessorCore`1.DispatchChangesAsync(ResponseMessage response, CancellationToken cancellationToken)

looks like you're unpacking batch.

@fuocor
Copy link
Contributor Author

fuocor commented Oct 2, 2019

I would like the stream from the entire batch.

@ealsur
Copy link
Member

ealsur commented Oct 2, 2019

Yeah, I guess it would try to unpack and send it as individual items, when it's not possible.

We initially did not think about Stream support as this was trying to match the CFP V2 API, but we'll discuss it and see what comes up.

@fuocor
Copy link
Contributor Author

fuocor commented Oct 8, 2019

I have a running version of Stream support for the ChangeFeedProcessor.
Do you have a policy for outside collaboration & PRs?

@ealsur
Copy link
Member

ealsur commented Oct 8, 2019

@fuocor Sure, it's here https://github.com/Azure/azure-cosmos-dotnet-v3#contributing when you open a PR, you will be prompted for any requirement

@MrMint
Copy link

MrMint commented Mar 11, 2020

@ealsur Just a heads up, I gave the IReadOnlyCollection<Stream> delegate a try by copying the base FromStream shortcut for returning a stream when the generic type is assignable. Unfortunately that results in this exception being thrown from the wrapper:

throw new InvalidOperationException("Json Serializer left an open stream.");

We have a very similar use case for the change feed processor, except we route into other actors for aggregation. For us the lack of server side filtering means we spend a lot of resources deserializing documents we don't end up needing to aggregate on when a new processor gets kicked off. Our containers are a few TB's, with documents of multiple types, and about 70% of them are used in aggregation. We are migrating to v3 now and our hoping to see some perf wins here.

@aelij
Copy link
Member

aelij commented Apr 23, 2020

@ealsur
As @MrMint said, IReadOnlyCollection<Stream> doesn't work due to the exception Json Serializer left an open stream. Would you accept a PR to remove it?

@MikeABentley
Copy link

I'm a bit confused as to what the end result of this thread ended up being. I have a situation where I have multiple types of records stored in the same container. Is the solution to register my own custom serializer for dependency injection or is the solution to create a delegate of type Stream and deserialize inside of the delegate using my own serializer instance?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ChangeFeed discussion-wanted Need a discussion on an area
Projects
None yet
6 participants