Skip to content

Commit

Permalink
remove unused props, remove todos, add some more sanity tests (#2791)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Apr 7, 2021
1 parent 13f682d commit 201e421
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.codehaus.plexus.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -115,8 +114,6 @@ public void close() throws Exception {
}
}

// todo: make this use catalog as well
// todo: make this use the state for the files as well
protected static Properties getDebeziumProperties(JsonNode config, ConfiguredAirbyteCatalog catalog, AirbyteFileOffsetBackingStore offsetManager) {
final Properties props = new Properties();

Expand All @@ -127,15 +124,15 @@ protected static Properties getDebeziumProperties(JsonNode config, ConfiguredAir
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString());
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer
props.setProperty("snapshot.mode", "exported"); // can use never if we want to manage full refreshes ourselves
props.setProperty("snapshot.mode", "exported");

// https://debezium.io/documentation/reference/configuration/avro.html
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");

// debezium names
props.setProperty("name", "orders-postgres-connector");
props.setProperty("database.server.name", "orders"); // todo
props.setProperty("name", config.get("database").asText());
props.setProperty("database.server.name", config.get("database").asText());

// db connection configuration
props.setProperty("database.hostname", config.get("host").asText());
Expand All @@ -154,11 +151,6 @@ protected static Properties getDebeziumProperties(JsonNode config, ConfiguredAir
props.setProperty("table.include.list", tableWhitelist);
props.setProperty("database.include.list", config.get("database").asText());

// todo (cgardens) do these properties do anything for us?
// reload from
props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory"); // todo: any reason not to use in memory version and
props.setProperty("database.history.file.filename", "/tmp/debezium/dbhistory-" + RandomStringUtils.randomAlphabetic(5) + ".dat");

return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -200,6 +201,7 @@ private Database getDatabaseFromConfig(JsonNode config) {
}

@Test
@DisplayName("On the first First sync, produces returns records that exist in the database.")
void testExistingData() throws Exception {
final AutoCloseableIterator<AirbyteMessage> read = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords = AutoCloseableIterators.toListAndClose(read);
Expand All @@ -212,6 +214,7 @@ void testExistingData() throws Exception {
}

@Test
@DisplayName("When a record is deleted, produces a deletion record.")
void testDelete() throws Exception {
final AutoCloseableIterator<AirbyteMessage> read1 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
Expand All @@ -238,8 +241,39 @@ void testDelete() throws Exception {
assertNotNull(recordMessages2.get(0).getData().get(PostgresSource.CDC_DELETED_AT));
}

@Test
@DisplayName("When a record is updated, produces an update record.")
void testUpdate() throws Exception {
final String updatedModel = "Explorer";
final AutoCloseableIterator<AirbyteMessage> read1 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);

assertExpectedStateMessages(stateMessages1);

database.query(ctx -> {
ctx.execute(String.format("UPDATE %s SET %s = '%s' WHERE %s = %s", MODELS_STREAM_NAME, COL_MODEL, updatedModel, COL_ID, 11));
return null;
});

final JsonNode state = stateMessages1.get(0).getData();
final AutoCloseableIterator<AirbyteMessage> read2 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
final List<AirbyteRecordMessage> recordMessages2 = new ArrayList<>(extractRecordMessages(actualRecords2));
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);

assertExpectedStateMessages(stateMessages2);
assertEquals(1, recordMessages2.size());
assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt());
assertEquals(updatedModel, recordMessages2.get(0).getData().get(COL_MODEL).asText());
assertNotNull(recordMessages2.get(0).getData().get(PostgresSource.CDC_LSN));
assertNotNull(recordMessages2.get(0).getData().get(PostgresSource.CDC_UPDATED_AT));
assertTrue(recordMessages2.get(0).getData().get(PostgresSource.CDC_DELETED_AT).isNull());
}

@SuppressWarnings({"BusyWait", "CodeBlock2Expr"})
@Test
@DisplayName("Verify that when data is inserted into the database while a sync is happening and after the first sync, it all gets replicated.")
void testRecordsProducedDuringAndAfterSync() throws Exception {
final int recordsToCreate = 20;
final AtomicInteger recordsCreated = new AtomicInteger();
Expand Down Expand Up @@ -283,6 +317,7 @@ void testRecordsProducedDuringAndAfterSync() throws Exception {
}

@Test
@DisplayName("When both incremental CDC and full refresh are configured for different streams in a sync, the data is replicated as expected.")
void testCdcAndFullRefreshInSameSync() throws Exception {
final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(CONFIGURED_CATALOG);
// set make stream to full refresh.
Expand Down Expand Up @@ -323,6 +358,45 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
Collections.singleton(MODELS_STREAM_NAME));
}

@Test
@DisplayName("When no records exist, no records are returned.")
void testNoData() throws Exception {
database.query(ctx -> {
ctx.execute(String.format("DELETE FROM %s", MAKES_STREAM_NAME));
return null;
});

database.query(ctx -> {
ctx.execute(String.format("DELETE FROM %s", MODELS_STREAM_NAME));
return null;
});

final AutoCloseableIterator<AirbyteMessage> read = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords = AutoCloseableIterators.toListAndClose(read);

final Set<AirbyteRecordMessage> recordMessages = extractRecordMessages(actualRecords);
final List<AirbyteStateMessage> stateMessages = extractStateMessages(actualRecords);

assertExpectedRecords(Collections.emptySet(), recordMessages);
assertExpectedStateMessages(stateMessages);
}

@Test
@DisplayName("When no changes have been made to the database since the previous sync, no records are returned.")
void testNoDataOnSecondSync() throws Exception {
final AutoCloseableIterator<AirbyteMessage> read1 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final JsonNode state = extractStateMessages(actualRecords1).get(0).getData();

final AutoCloseableIterator<AirbyteMessage> read2 = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);

final Set<AirbyteRecordMessage> recordMessages2 = extractRecordMessages(actualRecords2);
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);

assertExpectedRecords(Collections.emptySet(), recordMessages2);
assertExpectedStateMessages(stateMessages2);

@Test
void testReadWithoutReplicationSlot() throws SQLException {
final String fullReplicationSlot = SLOT_NAME_BASE + "_" + dbName;
Expand Down

0 comments on commit 201e421

Please sign in to comment.