diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index d5027423a695..1c4aae9e1fc3 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -45,9 +45,12 @@ import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.JdbcStateManager; +import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.SyncMode; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json; @@ -68,6 +71,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -105,6 +109,22 @@ public Set getExcludedInternalSchemas() { return Set.of("information_schema", "pg_catalog", "pg_internal", "catalog_history"); } + @Override + public AirbyteCatalog discover(JsonNode config) throws Exception { + AirbyteCatalog catalog = super.discover(config); + + if (isCdc(config)) { + final List streams = catalog.getStreams().stream() + .map(PostgresSource::removeIncrementalWithoutPk) + .map(PostgresSource::addCdcMetadataColumns) + .collect(Collectors.toList()); + + catalog.setStreams(streams); + } + + return catalog; + } + @Override public List> getCheckOperations(JsonNode config) throws Exception { final List> checkOperations = new ArrayList<>(super.getCheckOperations(config)); @@ -340,6 +360,32 @@ private static boolean isCdc(JsonNode config) { return !(config.get("replication_slot") == null); } + /* + * It isn't possible to recreate the state of the original database unless we include extra + * information (like an oid) when using logical replication. By limiting to Full Refresh when we + * don't have a primary key we dodge the problem for now. As a work around a CDC and non-CDC source + * could be configured if there's a need to replicate a large non-PK table. + */ + private static AirbyteStream removeIncrementalWithoutPk(AirbyteStream stream) { + if (stream.getSourceDefinedPrimaryKey().isEmpty()) { + stream.getSupportedSyncModes().remove(SyncMode.INCREMENTAL); + } + + return stream; + } + + private static AirbyteStream addCdcMetadataColumns(AirbyteStream stream) { + ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema(); + ObjectNode properties = (ObjectNode) jsonSchema.get("properties"); + + final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number")); + properties.set("_ab_cdc_lsn", numberType); + properties.set("_ab_cdc_updated_at", numberType); + properties.set("_ab_cdc_deleted_at", numberType); + + return stream; + } + public static AirbyteMessage convertChangeEvent(ChangeEvent event, Instant emittedAt) { final JsonNode debeziumRecord = Jsons.deserialize(event.value()); final JsonNode before = debeziumRecord.get("before"); diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceCdcTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceCdcTest.java index 2542b89a55de..3b13fed31e31 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceCdcTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceCdcTest.java @@ -58,11 +58,15 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.MountableFile; class PostgresSourceCdcTest { + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceCdcTest.class); + private static final String SLOT_NAME = "debezium_slot"; private static final String STREAM_NAME = "public.id_and_name"; private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of( @@ -135,7 +139,6 @@ void setup() throws Exception { } private JsonNode getConfig(PostgreSQLContainer psqlDb, String dbName) { - System.out.println("psqlDb.getFirstMappedPort() = " + psqlDb.getFirstMappedPort()); return Jsons.jsonNode(ImmutableMap.builder() .put("host", psqlDb.getHost()) .put("port", psqlDb.getFirstMappedPort()) @@ -144,14 +147,6 @@ private JsonNode getConfig(PostgreSQLContainer psqlDb, String dbName) { .put("password", psqlDb.getPassword()) .put("replication_slot", SLOT_NAME) .build()); - // return Jsons.jsonNode(ImmutableMap.builder() - // .put("host", "localhost") - // .put("port", 5432) - // .put("database", "debezium_test") - // .put("username", "postgres") - // .put("password", "") - // .put("replication_slot", SLOT_NAME) - // .build()); } private Database getDatabaseFromConfig(JsonNode config) { @@ -177,6 +172,10 @@ public void testIt() throws Exception { // coerce to incremental so it uses CDC. configuredCatalog.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL)); + AirbyteCatalog catalog = source.discover(getConfig(PSQL_DB, dbName)); + + LOGGER.info("catalog = " + catalog); + final AutoCloseableIterator read = source.read(getConfig(PSQL_DB, dbName), configuredCatalog, null); Thread.sleep(5000);