Skip to content

Commit

Permalink
postgres cdc catalog (#2673)
Browse files Browse the repository at this point in the history
* update cdc catalog

* A
  • Loading branch information
jrhizor authored Apr 1, 2021
1 parent 825e324 commit 79094f1
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.JdbcStateManager;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.SyncMode;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
Expand All @@ -68,6 +71,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -105,6 +109,22 @@ public Set<String> getExcludedInternalSchemas() {
return Set.of("information_schema", "pg_catalog", "pg_internal", "catalog_history");
}

@Override
public AirbyteCatalog discover(JsonNode config) throws Exception {
AirbyteCatalog catalog = super.discover(config);

if (isCdc(config)) {
final List<AirbyteStream> streams = catalog.getStreams().stream()
.map(PostgresSource::removeIncrementalWithoutPk)
.map(PostgresSource::addCdcMetadataColumns)
.collect(Collectors.toList());

catalog.setStreams(streams);
}

return catalog;
}

@Override
public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(JsonNode config) throws Exception {
final List<CheckedConsumer<JdbcDatabase, Exception>> checkOperations = new ArrayList<>(super.getCheckOperations(config));
Expand Down Expand Up @@ -340,6 +360,32 @@ private static boolean isCdc(JsonNode config) {
return !(config.get("replication_slot") == null);
}

/*
* It isn't possible to recreate the state of the original database unless we include extra
* information (like an oid) when using logical replication. By limiting to Full Refresh when we
* don't have a primary key we dodge the problem for now. As a work around a CDC and non-CDC source
* could be configured if there's a need to replicate a large non-PK table.
*/
private static AirbyteStream removeIncrementalWithoutPk(AirbyteStream stream) {
if (stream.getSourceDefinedPrimaryKey().isEmpty()) {
stream.getSupportedSyncModes().remove(SyncMode.INCREMENTAL);
}

return stream;
}

private static AirbyteStream addCdcMetadataColumns(AirbyteStream stream) {
ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema();
ObjectNode properties = (ObjectNode) jsonSchema.get("properties");

final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number"));
properties.set("_ab_cdc_lsn", numberType);
properties.set("_ab_cdc_updated_at", numberType);
properties.set("_ab_cdc_deleted_at", numberType);

return stream;
}

public static AirbyteMessage convertChangeEvent(ChangeEvent<String, String> event, Instant emittedAt) {
final JsonNode debeziumRecord = Jsons.deserialize(event.value());
final JsonNode before = debeziumRecord.get("before");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.MountableFile;

class PostgresSourceCdcTest {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceCdcTest.class);

private static final String SLOT_NAME = "debezium_slot";
private static final String STREAM_NAME = "public.id_and_name";
private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of(
Expand Down Expand Up @@ -135,7 +139,6 @@ void setup() throws Exception {
}

private JsonNode getConfig(PostgreSQLContainer<?> psqlDb, String dbName) {
System.out.println("psqlDb.getFirstMappedPort() = " + psqlDb.getFirstMappedPort());
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", psqlDb.getHost())
.put("port", psqlDb.getFirstMappedPort())
Expand All @@ -144,14 +147,6 @@ private JsonNode getConfig(PostgreSQLContainer<?> psqlDb, String dbName) {
.put("password", psqlDb.getPassword())
.put("replication_slot", SLOT_NAME)
.build());
// return Jsons.jsonNode(ImmutableMap.builder()
// .put("host", "localhost")
// .put("port", 5432)
// .put("database", "debezium_test")
// .put("username", "postgres")
// .put("password", "")
// .put("replication_slot", SLOT_NAME)
// .build());
}

private Database getDatabaseFromConfig(JsonNode config) {
Expand All @@ -177,6 +172,10 @@ public void testIt() throws Exception {
// coerce to incremental so it uses CDC.
configuredCatalog.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL));

AirbyteCatalog catalog = source.discover(getConfig(PSQL_DB, dbName));

LOGGER.info("catalog = " + catalog);

final AutoCloseableIterator<AirbyteMessage> read = source.read(getConfig(PSQL_DB, dbName), configuredCatalog, null);

Thread.sleep(5000);
Expand Down

0 comments on commit 79094f1

Please sign in to comment.