-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cdc add lsn extraction and comparison #2613
Conversation
32826eb
to
412f382
Compare
pgoutput is only supported 10+ anyways |
import java.sql.SQLException; | ||
import java.util.List; | ||
|
||
public class PostgresUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comments for each of these functions should be enough to understand what's going on without following the refs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -127,6 +138,19 @@ public JsonNode toJdbcConfig(JsonNode config) { | |||
JdbcStateManager stateManager, | |||
Instant emittedAt) { | |||
if (isCdc(config)) { | |||
final String targetLsn = getLsn(database); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should log this on retrieval here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started reviewing but I realized this may not be ready for review so I just left whatever comments I had. ping again if another review is needed
String logicalXLogStr = lsn.substring(0, slashIndex); | ||
// parses as a long but then cast to int. this allows us to retain the full 32 bits of the integer | ||
// as opposed to the reduced value of Integer.MAX_VALUE. | ||
int logicalXlog = (int) Long.parseLong(logicalXLogStr, 16); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be Integer.parseUnsignedInt
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no. Integer.parseUnsignedInt can't parse a value that is greater than max int in java. In this case it is possible we will be parsing something greater than java's max int.
@@ -127,6 +141,30 @@ public JsonNode toJdbcConfig(JsonNode config) { | |||
JdbcStateManager stateManager, | |||
Instant emittedAt) { | |||
if (isCdc(config)) { | |||
final String targetLsn = getLsn(database); | |||
LOGGER.info("identified target lsn: " + targetLsn); | |||
final Predicate<JsonNode> hasReachedLsnPredicate = (record) -> Optional.ofNullable(record.get("value")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the schema of the object being deserialized? is that mentioned anywhere?
lsn extraction wip wip it completes it terminates
8ff54dd
to
04d4cc5
Compare
* spike * more * debezium wip * use oneof for configuration * iterator wrapping structure * push current * working loop * move capability into source * hack it into a sharable state * debezium test runner (#2617) * CDC Wait for Values (#2618) * output actual AirbyteMessages for cdc (#2631) * message conversion * fmt * add lsn extraction and comparison (#2613) * postgres cdc catalog (#2673) * update cdc catalog * A * table selection for cdc (#2690) * table selection for cdc * fix broken merge * also test double quote in name * Add state management to CDC (#2718) * CDC: Fix Producer/Consumer State Machine (#2721) * CDC Postgres Tests (#2777) * fix postgres cdc image name and run check before reading data (#2785) * minor postgres cdc fixes * add test and fix check behavior * fix * improve comment * remove unused props, remove todos, add some more sanity tests (#2791) * cdc: add offset store tests (#2793) * clean (#2798) * postgres cdc docs (#2784) * cdc docs * Update docs/integrations/sources/postgres.md Co-authored-by: Charles <[email protected]> * address gcp * learn too english * add link * add more disk space warnings * add additional cdc use case * add information on how to find postgresql.conf * add how to find the file Co-authored-by: Charles <[email protected]> * various merge conflict fixes (#2799) * cdc standard tests (#2813) * require cdc users to create publications & update docs (#2818) * postgres cdc race condition * working? but different process * add additional logging to help debug in the future * everything done except working config * remove unintended change * Use oneOf in PG CDC spec (#2827) * add oneOf configuration for postgres cdc (#2831) * add oneof configuration for cdc postgres * fmt Co-authored-by: Charles <[email protected]> * fix test (#2834) * fix test * bump version * add docs on creating replica identities (#2838) * add docs on creating replica identities * emphasize danger * grammar * bump pg version in source catalog * generate seed files Co-authored-by: cgardens <[email protected]>
What
Future work
@jrhizor we can merge this into master or directly into feature branch. Will do the right thing one approved.