Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync data from beginning if lsn is no longer valid in postgres cdc #15077

Merged
merged 13 commits into from
Aug 3, 2022
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.codehaus.plexus.util.StringUtils;

public class DebeziumPropertiesManager {

private final JsonNode config;
private final AirbyteFileOffsetBackingStore offsetManager;
private final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager;

private final Properties properties;
private final ConfiguredAirbyteCatalog catalog;

public DebeziumPropertiesManager(final Properties properties,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final AirbyteFileOffsetBackingStore offsetManager,
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager) {
this.properties = properties;
this.config = config;
this.catalog = catalog;
this.offsetManager = offsetManager;
this.schemaHistoryManager = schemaHistoryManager;
}

protected Properties getDebeziumProperties() {
final Properties props = new Properties();
props.putAll(properties);

// debezium engine configuration
props.setProperty("name", "engine");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString());
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be configurable? / what's preventing it from being longer right now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldnt want users to configure this. Its basically how often we want debezium to flush the state in the file. Debezium may or may not flush it though. It will try to but its based on other factors as well. The lower the value the lower the performance in wal processing. We never got back to finalising the right value. Also it would be out of scope for this PR

// default values from debezium CommonConnectorConfig
props.setProperty("max.batch.size", "2048");
props.setProperty("max.queue.size", "8192");

if (schemaHistoryManager.isPresent()) {
// https://debezium.io/documentation/reference/1.9/operations/debezium-server.html#debezium-source-database-history-class
// https://debezium.io/documentation/reference/development/engine.html#_in_the_code
// As mentioned in the documents above, debezium connector for MySQL needs to track the schema
// changes. If we don't do this, we can't fetch records for the table.
props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
props.setProperty("database.history.file.filename", schemaHistoryManager.get().getPath().toString());
}

// https://debezium.io/documentation/reference/configuration/avro.html
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");

// debezium names
props.setProperty("name", config.get(JdbcUtils.DATABASE_KEY).asText());
props.setProperty("database.server.name", config.get(JdbcUtils.DATABASE_KEY).asText());

// db connection configuration
props.setProperty("database.hostname", config.get(JdbcUtils.HOST_KEY).asText());
props.setProperty("database.port", config.get(JdbcUtils.PORT_KEY).asText());
props.setProperty("database.user", config.get(JdbcUtils.USERNAME_KEY).asText());
props.setProperty("database.dbname", config.get(JdbcUtils.DATABASE_KEY).asText());

if (config.has(JdbcUtils.PASSWORD_KEY)) {
props.setProperty("database.password", config.get(JdbcUtils.PASSWORD_KEY).asText());
}

// By default "decimal.handing.mode=precise" which's caused returning this value as a binary.
// The "double" type may cause a loss of precision, so set Debezium's config to store it as a String
// explicitly in its Kafka messages for more details see:
// https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-decimal-types
// https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation
props.setProperty("decimal.handling.mode", "string");

// table selection
final String tableWhitelist = getTableWhitelist(catalog);
props.setProperty("table.include.list", tableWhitelist);

return props;
}

public static String getTableWhitelist(final ConfiguredAirbyteCatalog catalog) {
return catalog.getStreams().stream()
.filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL)
.map(ConfiguredAirbyteStream::getStream)
.map(stream -> stream.getNamespace() + "." + stream.getName())
// debezium needs commas escaped to split properly
.map(x -> StringUtils.escape(x, new char[] {','}, "\\,"))
.collect(Collectors.joining(","));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@
package io.airbyte.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
Expand All @@ -23,42 +19,31 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.codehaus.plexus.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The purpose of this class is to intiliaze and spawn the debezium engine with the right properties
* to fetch records
* The purpose of this class is to initialize and spawn the debezium engine with the right
* properties to fetch records
*/
public class DebeziumRecordPublisher implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordPublisher.class);
private final ExecutorService executor;
private DebeziumEngine<ChangeEvent<String, String>> engine;

private final JsonNode config;
private final AirbyteFileOffsetBackingStore offsetManager;
private final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager;

private final AtomicBoolean hasClosed;
private final AtomicBoolean isClosing;
private final AtomicReference<Throwable> thrownError;
private final CountDownLatch engineLatch;
private final Properties properties;
private final ConfiguredAirbyteCatalog catalog;
private final DebeziumPropertiesManager debeziumPropertiesManager;

public DebeziumRecordPublisher(final Properties properties,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final AirbyteFileOffsetBackingStore offsetManager,
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager) {
this.properties = properties;
this.config = config;
this.catalog = catalog;
this.offsetManager = offsetManager;
this.schemaHistoryManager = schemaHistoryManager;
this.debeziumPropertiesManager = new DebeziumPropertiesManager(properties, config, catalog, offsetManager,
schemaHistoryManager);
this.hasClosed = new AtomicBoolean(false);
this.isClosing = new AtomicBoolean(false);
this.thrownError = new AtomicReference<>();
Expand All @@ -68,7 +53,7 @@ public DebeziumRecordPublisher(final Properties properties,

public void start(final Queue<ChangeEvent<String, String>> queue) {
engine = DebeziumEngine.create(Json.class)
.using(getDebeziumProperties())
.using(debeziumPropertiesManager.getDebeziumProperties())
.using(new OffsetCommitPolicy.AlwaysCommitOffsetPolicy())
.notifying(e -> {
// debezium outputs a tombstone event that has a value of null. this is an artifact of how it
Expand Down Expand Up @@ -120,69 +105,4 @@ public void close() throws Exception {
}
}

protected Properties getDebeziumProperties() {
final Properties props = new Properties();
props.putAll(properties);

// debezium engine configuration
props.setProperty("name", "engine");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString());
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer
// default values from debezium CommonConnectorConfig
props.setProperty("max.batch.size", "2048");
props.setProperty("max.queue.size", "8192");

if (schemaHistoryManager.isPresent()) {
// https://debezium.io/documentation/reference/1.9/operations/debezium-server.html#debezium-source-database-history-class
// https://debezium.io/documentation/reference/development/engine.html#_in_the_code
// As mentioned in the documents above, debezium connector for MySQL needs to track the schema
// changes. If we don't do this, we can't fetch records for the table.
props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
props.setProperty("database.history.file.filename", schemaHistoryManager.get().getPath().toString());
}

// https://debezium.io/documentation/reference/configuration/avro.html
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");

// debezium names
props.setProperty("name", config.get(JdbcUtils.DATABASE_KEY).asText());
props.setProperty("database.server.name", config.get(JdbcUtils.DATABASE_KEY).asText());

// db connection configuration
props.setProperty("database.hostname", config.get(JdbcUtils.HOST_KEY).asText());
props.setProperty("database.port", config.get(JdbcUtils.PORT_KEY).asText());
props.setProperty("database.user", config.get(JdbcUtils.USERNAME_KEY).asText());
props.setProperty("database.dbname", config.get(JdbcUtils.DATABASE_KEY).asText());

if (config.has(JdbcUtils.PASSWORD_KEY)) {
props.setProperty("database.password", config.get(JdbcUtils.PASSWORD_KEY).asText());
}

// By default "decimal.handing.mode=precise" which's caused returning this value as a binary.
// The "double" type may cause a loss of precision, so set Debezium's config to store it as a String
// explicitly in its Kafka messages for more details see:
// https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-decimal-types
// https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation
props.setProperty("decimal.handling.mode", "string");

// table selection
final String tableWhitelist = getTableWhitelist(catalog);
props.setProperty("table.include.list", tableWhitelist);

return props;
}

@VisibleForTesting
public static String getTableWhitelist(final ConfiguredAirbyteCatalog catalog) {
return catalog.getStreams().stream()
.filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL)
.map(ConfiguredAirbyteStream::getStream)
.map(stream -> stream.getNamespace() + "." + stream.getName())
// debezium needs commas escaped to split properly
.map(x -> StringUtils.escape(x, new char[] {','}, "\\,"))
.collect(Collectors.joining(","));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import io.airbyte.commons.json.Jsons;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresOffsetContext.Loader;
import java.util.Collections;
import java.util.Map;

public class PostgresCustomLoader extends Loader {

private Map<String, ?> offset;

public PostgresCustomLoader(PostgresConnectorConfig connectorConfig) {
super(connectorConfig);
}

@Override
public PostgresOffsetContext load(Map<String, ?> offset) {
this.offset = Jsons.clone(offset);
return super.load(offset);
}

public Map<String, ?> getRawOffset() {
return Collections.unmodifiableMap(offset);
}

}
Loading