-
Notifications
You must be signed in to change notification settings - Fork 269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updated Kafka + SQL storage variant, re-using existing kafka storage utils/data model #1026
Conversation
…roperties in one file
…ifactId to ensure ordering
fix selectArtifactMetaDataByGlobalId query bug and add reproducer test
* kafka + sql storage variant, reusing streams variant datamodel * ksql - integration tests * fix streams storage
…istry into learning/kafka-sql
… handle new async behavior
storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/KafkaSqlCoordinator.java
Outdated
Show resolved
Hide resolved
throw e; | ||
} | ||
|
||
return submitter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how the artifact version is being handled in this storage. I know that the -1 is what the streams storage is sending at this point, but then the StreamsTopologyProvider
updates that version appropriately. I don't see something that replaces that functionality here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The version is created when the SQL layer creates the row in the versions
table. I believe the try-catch is purely a fail-fast check for when the artifact doesn't exist.
The way this will work is that a Kafka message with the artifactId and a "create version" action (and the content) will be published. Then all replicas will consume that message and perform the action, which is to create a new version for the given artifactId in the DB. At that point the version ID will be generated and communicated back to the original thread via the coordinator
.
Does that explanation make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it does. Thanks, now I see the query and its usage in the CommonSqlStatements
.
LGTM, but I have a few questions.
|
looks good so far... and I agree with the points to address you described. But in regards to log compaction, something I've just realized, because globalId is generated by the SQL storage we must ensure the generated globalIds after processing a compacted topic are the same.
after log compaction it will be like:
we need to think how are we going to make |
I think I'm going to start a document to work through the details of log compaction as it applies to this use-case. I think it's tricky and if we want to use it, we'll need to get it right. :) |
…utils/data model (#1026) * Added a Kafka+SQL storage variant * introduced overlays for application.properties to avoid putting all properties in one file * Fixed the ksql tests - they all pass! * added some logging to the merge properties mojo * push the UUID into the payload and make the kafka message key the artifactId to ensure ordering * fix selectArtifactMetaDataByGlobalId query bug and add reproducer test * some tweaks based on perf testing * kafka + sql storage variant, reusing streams variant datamodel (#1012) * kafka + sql storage variant, reusing streams variant datamodel * ksql - integration tests * fix streams storage * fixed some bugs in the ksql modified impl, and modified some tests to handle new async behavior * minor TODO * run integration tests and fix storage bug (#1028) * fix ksql storage - error create/update artifact with metadata (#1029) * update after some PR feedback * remove some debug methods * updated the perftest readme Co-authored-by: Fabian Martinez <[email protected]> Co-authored-by: Fabian Martinez <[email protected]>
OK here is an updated implementation. I think we have some interesting ideas here that will ultimately lead to a nice hybrid between the Kafka and SQL approaches that I think will be useful in a variety of deployment configurations.
This implementation currently has a few problems that I think we can address in future iterations.
Regarding (1), I think our goal must be to support log compaction to ensure the smallest log retention requirements for Kafka that we possibly can. To that end, I think we need to evolve this implementation to adhere to the following principals:
shaHash(content)
If my understanding of Log Compaction is correct, the above rules should hopefully be a reasonable starting point, but aiui the idea is to ensure that if Kafka throws away all messages except the most recent (for any given unique "key"), the integrity/correctness of the application should still hold. I think it also has the side benefit of ensuring message ordering.
Still more for me to understand/think about!
Regarding (2) - we have a few places where we are creating or updating meta-data using two Kafka messages. And I think we may not be handling content in the most efficient way possible.
Finally, part of the reason we're doing what we're doing in (2) is because the (3) SQL layer isn't designed to facilitate this hybrid approach. In the next iteration we should enhance the SQL layer to include functionality useful to the hybrid approach. For example (but not limited to):