-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
undefined:1 error upon consuming first avro message #41
Comments
@teod0007 but how the first message was sent to kafka without the schema magic byte? |
@ricardohbin In my scenario, the first message is sent to the topic from a backend java microservice (which also has schema registry and everything setup). What is happening, is that the schema registry will only receive the first schema version of the payload after the first message arrives in the kafka topic. The issue here is that the node application will load all the schemas only once, and for this particular topic, it will come as empty, since at the time of init the topic was empty, and no schemas were created yet. Once the backend microservice sends an avro message to the topic (the first message ever), the schema will also be created on the schema registry, however, the node application will not receive this new schema, since it happened after the init was called. Which will then cause the parsing error. The issue is not the payload itself, since it has everything (magic byte, etc), but the lack of schema in memory. |
@teod0007 maybe a pooling refreshing the schema registry could do the trick, right? |
@ricardohbin Maybe, but it could still leave room for errors (e.g the message arrives before the pooling updated the in-memory schemas). Another approach would be to always update the schema registry upon message arrival (and before parsing). This "could" have performance issues, but I believe they would be negligible If only the requested topic schemas are queried(not all topics at once) |
@ricardohbin Another solution, maybe a bit better performance wise, would be a mix of pooling and on-message request. If no schema is found, the app would try requesting the schemas for the topic at least once. Meanwhile, it would keep the know schemas updated through pooling. |
@teod0007 I will explore some scenarios to this. Thanks for the issue. I will try to simulate this behavior too |
We've had these types of problems, SR gets updated, node service isn't aware of that, new schema-message gets transmitted, node crashes. As a matter of policy we would restart all consuming node services when a SR change affected their topics. |
I just ran into the same issue. At a first glance, the kafka python lib from confluent solves this properly: https://github.com/confluentinc/confluent-kafka-python/blob/master/confluent_kafka/avro/cached_schema_registry_client.py Maybe we can draw some inspiration from that? I haven't checked it out fully yet but the python incarnation of the same consumer (build around that lib) is not crashing for me. |
I am working in this problem. I will add a more defensive JSON.parse to avoid crashes and pooling the schemas (x seconds). The on-message request has problems due nodejs async behavior (but still considering). Stay tunned :) |
The strategy that kafka-avro uses to deal with schemas are a full load in memory. This PR adds the possibility to set a refresh rate to schemas fetch - before, the schemas are only loaded when init. This is not the best strategy at all - cached requests dealing with schemas on demand is a better option. But, to add this option we need to do a major refactoring in the way this lib works and some users are having problems with it. So, let's add this option for now - the full code refactoring and the new strategy will come in the next major release. In addition, a better try/catch handle to avoid crashes. Fixes #41
That may be ok for now, it won't crash the application, but some messages will unfortunately be lost (the first message after schema changes). I did not have a lot of time to work on this yet, but I'm trying to implement a on-demand solution (mostly around error catching) that covers all possible scenarios without the need of polling. Only managed to fix the first message scenario for now (without losing messages). |
I quickly ported the code from the confluent python library to typescript: https://gist.github.com/fubhy/d30944689ae6733ccbb37124e7f0d5ed I currently have no plans to release that as a separate library so we might want to integrate that here but I am not sure if the overall architecture goals of this (your) library are in-line with that approach. |
@fubhy nice, my intention is to do a similar approach in the next major release. @teod0007 it only loses the message if you update the schema and send a message with the new version before the refresh, right? If you put a very short pooling interval, the message will not be lost. How your java backend works? How It changes the version (or id) of the schema? Im asking because I have a similar java microservice running too, but when we updates the schema, we evolve it, then changes an env var with the new id, then restart the microservice. This should be sufficient to pooling works - with the new strategy. |
@ricardohbin My java backend works in a similar way, but just setting up the application will not update the schema after evolution (I'm using .avsc files for schema evolution). The schema registry will be updated only when the first message using the new schema is sent to kafka (kinda on-demand schema update). |
* Adding fetchRefreshRate option to SchemaRegistry The strategy that kafka-avro uses to deal with schemas are a full load in memory. This PR adds the possibility to set a refresh rate to schemas fetch - before, the schemas are only loaded when init. This is not the best strategy at all - cached requests dealing with schemas on demand is a better option. But, to add this option we need to do a major refactoring in the way this lib works and some users are having problems with it. So, let's add this option for now - the full code refactoring and the new strategy will come in the next major release. In addition, a better try/catch handle to avoid crashes. Fixes #41 * Removing try/catch to not found schema messages to avoid silent errors * Update changelog date
* Adding fetchRefreshRate option to SchemaRegistry The strategy that kafka-avro uses to deal with schemas are a full load in memory. This PR adds the possibility to set a refresh rate to schemas fetch - before, the schemas are only loaded when init. This is not the best strategy at all - cached requests dealing with schemas on demand is a better option. But, to add this option we need to do a major refactoring in the way this lib works and some users are having problems with it. So, let's add this option for now - the full code refactoring and the new strategy will come in the next major release. In addition, a better try/catch handle to avoid crashes. Fixes #41 * Removing try/catch to not found schema messages to avoid silent errors * Update changelog date
If the topic being consumed is recently created, the first message will always cause the application to crash. This seems to happen because the schema-registry will only have the first version of the schema after a message is sent to the topic (therefore, if the node server is up before there is anything on the topic, it will not be able to load anything, not even schema changes that comes afterwards)
The error itself occurs when trying to JSON.parse an avro message (kafka-consumer.js:109) since the updated schema was not found.
Is there any way to go around this issue? From what I understood, there is only one instance where the schemas are loaded (init), and no way to update them after the initial load is done. Or am I missing something?
The text was updated successfully, but these errors were encountered: