diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordPublisher.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordPublisher.java index 37a0807f10e8..895aa735365e 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordPublisher.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/DebeziumRecordPublisher.java @@ -41,7 +41,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import org.apache.commons.lang3.RandomStringUtils; import org.codehaus.plexus.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,8 +114,6 @@ public void close() throws Exception { } } - // todo: make this use catalog as well - // todo: make this use the state for the files as well protected static Properties getDebeziumProperties(JsonNode config, ConfiguredAirbyteCatalog catalog, AirbyteFileOffsetBackingStore offsetManager) { final Properties props = new Properties(); @@ -127,15 +124,15 @@ protected static Properties getDebeziumProperties(JsonNode config, ConfiguredAir props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore"); props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString()); props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer - props.setProperty("snapshot.mode", "exported"); // can use never if we want to manage full refreshes ourselves + props.setProperty("snapshot.mode", "exported"); // https://debezium.io/documentation/reference/configuration/avro.html props.setProperty("key.converter.schemas.enable", "false"); props.setProperty("value.converter.schemas.enable", "false"); // debezium names - props.setProperty("name", "orders-postgres-connector"); - props.setProperty("database.server.name", "orders"); // todo + props.setProperty("name", config.get("database").asText()); + props.setProperty("database.server.name", config.get("database").asText()); // db connection configuration props.setProperty("database.hostname", config.get("host").asText()); @@ -154,11 +151,6 @@ protected static Properties getDebeziumProperties(JsonNode config, ConfiguredAir props.setProperty("table.include.list", tableWhitelist); props.setProperty("database.include.list", config.get("database").asText()); - // todo (cgardens) do these properties do anything for us? - // reload from - props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory"); // todo: any reason not to use in memory version and - props.setProperty("database.history.file.filename", "/tmp/debezium/dbhistory-" + RandomStringUtils.randomAlphabetic(5) + ".dat"); - return props; } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceCdcTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceCdcTest.java index e084dd1192ff..5d2e040921f6 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceCdcTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceCdcTest.java @@ -74,6 +74,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -200,6 +201,7 @@ private Database getDatabaseFromConfig(JsonNode config) { } @Test + @DisplayName("On the first First sync, produces returns records that exist in the database.") void testExistingData() throws Exception { final AutoCloseableIterator read = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null); final List actualRecords = AutoCloseableIterators.toListAndClose(read); @@ -212,6 +214,7 @@ void testExistingData() throws Exception { } @Test + @DisplayName("When a record is deleted, produces a deletion record.") void testDelete() throws Exception { final AutoCloseableIterator read1 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null); final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); @@ -238,8 +241,39 @@ void testDelete() throws Exception { assertNotNull(recordMessages2.get(0).getData().get(PostgresSource.CDC_DELETED_AT)); } + @Test + @DisplayName("When a record is updated, produces an update record.") + void testUpdate() throws Exception { + final String updatedModel = "Explorer"; + final AutoCloseableIterator read1 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null); + final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); + final List stateMessages1 = extractStateMessages(actualRecords1); + + assertExpectedStateMessages(stateMessages1); + + database.query(ctx -> { + ctx.execute(String.format("UPDATE %s SET %s = '%s' WHERE %s = %s", MODELS_STREAM_NAME, COL_MODEL, updatedModel, COL_ID, 11)); + return null; + }); + + final JsonNode state = stateMessages1.get(0).getData(); + final AutoCloseableIterator read2 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, state); + final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); + final List recordMessages2 = new ArrayList<>(extractRecordMessages(actualRecords2)); + final List stateMessages2 = extractStateMessages(actualRecords2); + + assertExpectedStateMessages(stateMessages2); + assertEquals(1, recordMessages2.size()); + assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt()); + assertEquals(updatedModel, recordMessages2.get(0).getData().get(COL_MODEL).asText()); + assertNotNull(recordMessages2.get(0).getData().get(PostgresSource.CDC_LSN)); + assertNotNull(recordMessages2.get(0).getData().get(PostgresSource.CDC_UPDATED_AT)); + assertTrue(recordMessages2.get(0).getData().get(PostgresSource.CDC_DELETED_AT).isNull()); + } + @SuppressWarnings({"BusyWait", "CodeBlock2Expr"}) @Test + @DisplayName("Verify that when data is inserted into the database while a sync is happening and after the first sync, it all gets replicated.") void testRecordsProducedDuringAndAfterSync() throws Exception { final int recordsToCreate = 20; final AtomicInteger recordsCreated = new AtomicInteger(); @@ -283,6 +317,7 @@ void testRecordsProducedDuringAndAfterSync() throws Exception { } @Test + @DisplayName("When both incremental CDC and full refresh are configured for different streams in a sync, the data is replicated as expected.") void testCdcAndFullRefreshInSameSync() throws Exception { final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(CONFIGURED_CATALOG); // set make stream to full refresh. @@ -323,6 +358,45 @@ void testCdcAndFullRefreshInSameSync() throws Exception { Collections.singleton(MODELS_STREAM_NAME)); } + @Test + @DisplayName("When no records exist, no records are returned.") + void testNoData() throws Exception { + database.query(ctx -> { + ctx.execute(String.format("DELETE FROM %s", MAKES_STREAM_NAME)); + return null; + }); + + database.query(ctx -> { + ctx.execute(String.format("DELETE FROM %s", MODELS_STREAM_NAME)); + return null; + }); + + final AutoCloseableIterator read = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null); + final List actualRecords = AutoCloseableIterators.toListAndClose(read); + + final Set recordMessages = extractRecordMessages(actualRecords); + final List stateMessages = extractStateMessages(actualRecords); + + assertExpectedRecords(Collections.emptySet(), recordMessages); + assertExpectedStateMessages(stateMessages); + } + + @Test + @DisplayName("When no changes have been made to the database since the previous sync, no records are returned.") + void testNoDataOnSecondSync() throws Exception { + final AutoCloseableIterator read1 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null); + final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); + final JsonNode state = extractStateMessages(actualRecords1).get(0).getData(); + + final AutoCloseableIterator read2 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, state); + final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); + + final Set recordMessages2 = extractRecordMessages(actualRecords2); + final List stateMessages2 = extractStateMessages(actualRecords2); + + assertExpectedRecords(Collections.emptySet(), recordMessages2); + assertExpectedStateMessages(stateMessages2); + @Test void testReadWithoutReplicationSlot() throws SQLException { final String fullReplicationSlot = SLOT_NAME_BASE + "_" + dbName;