-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CDC: Fix Producer/Consumer State Machine #2721
Conversation
|
||
public class DebeziumEventUtils { | ||
|
||
public static AirbyteMessage convertChangeEvent(ChangeEvent<String, String> event, Instant emittedAt) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: toAirbyteMessage
} | ||
|
||
private static JsonNode formatDebeziumData(JsonNode before, JsonNode after, JsonNode source) { | ||
final ObjectNode base = (ObjectNode) (after.isNull() ? before : after); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we OK to modify the input object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. it is safe in this case. i will add comment to make that clear.
* alive as long as the publisher is not closed or if there are any new records for it to process | ||
* (even if the publisher is closed). | ||
*/ | ||
public class DebeziumRecordConsumer extends AbstractIterator<ChangeEvent<String, String>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is very confusing. It is not really a consumer. It is an Iterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DebeziumRecordIterator
?
return hasClosed.get(); | ||
} | ||
|
||
public synchronized void close() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why dont you make close idempotent instead of synchronized?
public void close() throws Exception {
if (hasclosed.compareAndSet(false, true) {
close stuff & shutdown
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that doesn't quite give us the guarantee that we want. there are things we care about here:
- we only want the internals of close to be called no more than once
- we only want to set
isClosed
totrue
after the internals of close have run.
i think what you're describing only gives us 1. if we introduced a second boolean isClosing
we might be able to use this technique but it leads to a weird behavior where if close is called twice, the second call can return from close while the object still isn't actually closed (unless we add a busy wait). gets kinda complicated. lmk if you think i'm missing something. definitely happy to try a different approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, you're correct!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if it didn't fix the race condition this would be a nice refactor.
b73b178
to
5a9805f
Compare
I'm late; commenting to say this makes sense to me and I appreciated the comments. |
* spike * more * debezium wip * use oneof for configuration * iterator wrapping structure * push current * working loop * move capability into source * hack it into a sharable state * debezium test runner (#2617) * CDC Wait for Values (#2618) * output actual AirbyteMessages for cdc (#2631) * message conversion * fmt * add lsn extraction and comparison (#2613) * postgres cdc catalog (#2673) * update cdc catalog * A * table selection for cdc (#2690) * table selection for cdc * fix broken merge * also test double quote in name * Add state management to CDC (#2718) * CDC: Fix Producer/Consumer State Machine (#2721) * CDC Postgres Tests (#2777) * fix postgres cdc image name and run check before reading data (#2785) * minor postgres cdc fixes * add test and fix check behavior * fix * improve comment * remove unused props, remove todos, add some more sanity tests (#2791) * cdc: add offset store tests (#2793) * clean (#2798) * postgres cdc docs (#2784) * cdc docs * Update docs/integrations/sources/postgres.md Co-authored-by: Charles <[email protected]> * address gcp * learn too english * add link * add more disk space warnings * add additional cdc use case * add information on how to find postgresql.conf * add how to find the file Co-authored-by: Charles <[email protected]> * various merge conflict fixes (#2799) * cdc standard tests (#2813) * require cdc users to create publications & update docs (#2818) * postgres cdc race condition * working? but different process * add additional logging to help debug in the future * everything done except working config * remove unintended change * Use oneOf in PG CDC spec (#2827) * add oneOf configuration for postgres cdc (#2831) * add oneof configuration for cdc postgres * fmt Co-authored-by: Charles <[email protected]> * fix test (#2834) * fix test * bump version * add docs on creating replica identities (#2838) * add docs on creating replica identities * emphasize danger * grammar * bump pg version in source catalog * generate seed files Co-authored-by: cgardens <[email protected]>
What
How
Recommended reading order
PostgresSource.java
DebeziumRecordPublisher.java
DebeziumRecordConsumer.java