Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

table selection for cdc #2690

Merged
merged 4 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import io.airbyte.integrations.source.jdbc.JdbcStateManager;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
Expand All @@ -60,7 +62,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.codehaus.plexus.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -133,7 +137,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(JsonN
CloseableLinkedBlockingQueue queue = new CloseableLinkedBlockingQueue(executor::shutdown);

DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
.using(getDebeziumProperties(config))
.using(getDebeziumProperties(config, catalog))
.notifying(record -> {
try {
LOGGER.info("record = " + record);
Expand Down Expand Up @@ -222,7 +226,7 @@ protected JsonNode computeNext() {

// todo: make this use catalog as well
// todo: make this use the state for the files as well
protected static Properties getDebeziumProperties(JsonNode config) {
protected static Properties getDebeziumProperties(JsonNode config, ConfiguredAirbyteCatalog catalog) {
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("plugin.name", "pgoutput");
Expand All @@ -240,7 +244,9 @@ protected static Properties getDebeziumProperties(JsonNode config) {
props.setProperty("drop.tombstones", "false");
props.setProperty("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState");

// props.setProperty("table.include.list", "public.id_and_name"); // todo
final String tableWhitelist = getTableWhitelist(catalog);
System.out.println("tableWhitelist = " + tableWhitelist);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger or remove.

props.setProperty("table.include.list", tableWhitelist);
props.setProperty("database.include.list", config.get("database").asText());
props.setProperty("name", "orders-postgres-connector");
props.setProperty("include_schema_changes", "true");
Expand All @@ -265,6 +271,16 @@ protected static Properties getDebeziumProperties(JsonNode config) {
return props;
}

protected static String getTableWhitelist(ConfiguredAirbyteCatalog catalog) {
return catalog.getStreams().stream()
.map(ConfiguredAirbyteStream::getStream)
.map(AirbyteStream::getName)
// debezium needs commas escaped to split properly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does debezium handle quoted syntax for databases? anything special we need to do to handle it? or does it just convert to uft8 string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test case. I'll also add an issue for properly handling quoting here long term.

.map(x -> StringUtils.escape(x, new char[] {','}, "\\,"))
.collect(Collectors.joining(","));

}

private static boolean isCdc(JsonNode config) {
LOGGER.info("isCdc config: " + config);
return !(config.get("replication_slot") == null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

package io.airbyte.integrations.source.postgres;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -41,7 +43,6 @@
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -54,6 +55,8 @@ class PostgresSourceCdcTest {

private static final String SLOT_NAME = "debezium_slot";
private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.id_,something";
private static final String STREAM_NAME3 = "public.naMéS";
private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
STREAM_NAME,
Expand All @@ -63,13 +66,13 @@ class PostgresSourceCdcTest {
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))),
CatalogHelpers.createAirbyteStream(
STREAM_NAME + "2",
STREAM_NAME2,
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("power", JsonSchemaPrimitive.NUMBER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)),
CatalogHelpers.createAirbyteStream(
"public.names",
STREAM_NAME3,
Field.of("first_name", JsonSchemaPrimitive.STRING),
Field.of("last_name", JsonSchemaPrimitive.STRING),
Field.of("power", JsonSchemaPrimitive.NUMBER))
Expand Down Expand Up @@ -106,13 +109,13 @@ void setup() throws Exception {
ctx.fetch("CREATE TABLE id_and_name(id NUMERIC(20, 10), name VARCHAR(200), power double precision, PRIMARY KEY (id));");
ctx.fetch("CREATE INDEX i1 ON id_and_name (id);");
ctx.fetch("INSERT INTO id_and_name (id, name, power) VALUES (1,'goku', 'Infinity'), (2, 'vegeta', 9000.1);");
ctx.fetch("CREATE TABLE \"id_,something\"(id NUMERIC(20, 10), name VARCHAR(200), power double precision);");
ctx.fetch("INSERT INTO \"id_,something\" (id, name, power) VALUES (1,'goku', 'Infinity'), (2, 'vegeta', 9000.1);");

ctx.fetch("CREATE TABLE id_and_name2(id NUMERIC(20, 10), name VARCHAR(200), power double precision);");
ctx.fetch("INSERT INTO id_and_name2 (id, name, power) VALUES (1,'goku', 'Infinity'), (2, 'vegeta', 9000.1);");

ctx.fetch("CREATE TABLE names(first_name VARCHAR(200), last_name VARCHAR(200), power double precision, PRIMARY KEY (first_name, last_name));");
ctx.fetch(
"INSERT INTO names (first_name, last_name, power) VALUES ('san', 'goku', 'Infinity'), ('prince', 'vegeta', 9000.1);");
"CREATE TABLE \"naMéS\"(first_name VARCHAR(200), last_name VARCHAR(200), power double precision, PRIMARY KEY (first_name, last_name));");
ctx.fetch(
"INSERT INTO \"naMéS\" (first_name, last_name, power) VALUES ('san', 'goku', 'Infinity'), ('prince', 'vegeta', 9000.1);");
return null;
});
database.close();
Expand Down Expand Up @@ -153,11 +156,7 @@ private Database getDatabaseFromConfig(JsonNode config) {
@Test
public void testIt() throws Exception {
final PostgresSource source = new PostgresSource();
final ConfiguredAirbyteCatalog configuredCatalog =
CONFIGURED_CATALOG.withStreams(CONFIGURED_CATALOG.getStreams()
.stream()
.filter(s -> s.getStream().getName().equals(STREAM_NAME))
.collect(Collectors.toList()));
final ConfiguredAirbyteCatalog configuredCatalog = CONFIGURED_CATALOG;
// coerce to incremental so it uses CDC.
configuredCatalog.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL));

Expand All @@ -172,4 +171,12 @@ public void testIt() throws Exception {
}
}

@Test
public void testWhitelistCreation() {
final String expectedWhitelist = "public.id_and_name,public.id_\\,something,public.naMéS";
final String actualWhitelist = PostgresSource.getTableWhitelist(CONFIGURED_CATALOG);

assertEquals(expectedWhitelist, actualWhitelist);
}

}