Skip to content

Commit

Permalink
mysql source : implement support for snapshot of new tables in cdc mo…
Browse files Browse the repository at this point in the history
…de (#16954)

* mysql source : implement support for snapshot of new tables in cdc mode

* undo unwanted changes

* add more assertions

* format

* fix build

* fix build

* revert acceptance test changes

* bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
subodh1810 and octavia-squidington-iii authored Sep 26, 2022
1 parent cf0c082 commit 9974608
Show file tree
Hide file tree
Showing 32 changed files with 367 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.6.13
dockerImageTag: 0.6.14
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6783,7 +6783,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:0.6.13"
- dockerImage: "airbyte/source-mysql:0.6.14"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,29 @@ public AirbyteDebeziumHandler(final JsonNode config,
this.firstRecordWaitTime = firstRecordWaitTime;
}

public AutoCloseableIterator<AirbyteMessage> getSnapshotIterators(final ConfiguredAirbyteCatalog catalog,
public AutoCloseableIterator<AirbyteMessage> getSnapshotIterators(
final ConfiguredAirbyteCatalog catalogContainingStreamsToSnapshot,
final CdcMetadataInjector cdcMetadataInjector,
final Properties connectorProperties,
final Properties snapshotProperties,
final CdcStateHandler cdcStateHandler,
final Instant emittedAt) {
LOGGER.info("Running snapshot for " + catalog.getStreams().size() + " new tables");

LOGGER.info("Running snapshot for " + catalogContainingStreamsToSnapshot.getStreams().size() + " new tables");
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);

final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeDummyStateForSnapshotPurpose();
/*
* TODO(Subodh) : Since Postgres doesn't require schema history this is fine but we need to fix this
* for MySQL and MSSQL
*/
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager = Optional.empty();
final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(connectorProperties,
final DebeziumRecordPublisher tableSnapshotPublisher = new DebeziumRecordPublisher(snapshotProperties,
config,
catalog,
catalogContainingStreamsToSnapshot,
offsetManager,
schemaHistoryManager);
publisher.start(queue);
schemaHistoryManager(new EmptySavedInfo()));
tableSnapshotPublisher.start(queue);

final AutoCloseableIterator<ChangeEvent<String, String>> eventIterator = new DebeziumRecordIterator(
queue,
targetPosition,
publisher::hasClosed,
publisher::close,
tableSnapshotPublisher::hasClosed,
tableSnapshotPublisher::close,
firstRecordWaitTime);

return AutoCloseableIterators.concatWithEagerClose(AutoCloseableIterators
Expand Down Expand Up @@ -155,4 +152,18 @@ public static boolean shouldUseCDC(final ConfiguredAirbyteCatalog catalog) {
.anyMatch(syncMode -> syncMode == SyncMode.INCREMENTAL);
}

private static class EmptySavedInfo implements CdcSavedInfoFetcher {

@Override
public JsonNode getSavedOffset() {
return null;
}

@Override
public Optional<JsonNode> getSavedSchemaHistory() {
return Optional.empty();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.debezium;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -27,11 +28,13 @@
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -177,12 +180,14 @@ private void createAndPopulateActualTable() {
* databases not being synced by Airbyte are not causing issues with our debezium logic
*/
private void createAndPopulateRandomTable() {
createSchema(MODELS_SCHEMA + "_random");
createTable(MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random",
if (!randomTableSchema().equals(MODELS_SCHEMA)) {
createSchema(randomTableSchema());
}
createTable(randomTableSchema(), MODELS_STREAM_NAME + "_random",
columnClause(ImmutableMap.of(COL_ID + "_random", "INTEGER", COL_MAKE_ID + "_random", "INTEGER", COL_MODEL + "_random", "VARCHAR(200)"),
Optional.of(COL_ID + "_random")));
for (final JsonNode recordJson : MODEL_RECORDS_RANDOM) {
writeRecords(recordJson, MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random",
writeRecords(recordJson, randomTableSchema(), MODELS_STREAM_NAME + "_random",
COL_ID + "_random", COL_MAKE_ID + "_random", COL_MODEL + "_random");
}
}
Expand Down Expand Up @@ -585,6 +590,181 @@ void testDiscover() throws Exception {
.collect(Collectors.toList()));
}

@Test
public void newTableSnapshotTest() throws Exception {
final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = getSource()
.read(getConfig(), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> dataFromFirstBatch = AutoCloseableIterators
.toListAndClose(firstBatchIterator);
final Set<AirbyteRecordMessage> recordsFromFirstBatch = extractRecordMessages(
dataFromFirstBatch);
final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch);
assertEquals(1, stateAfterFirstBatch.size());

final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion = stateAfterFirstBatch.get(0);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterFirstSyncCompletion.getType());
assertNotNull(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInStateAfterFirstSyncCompletion = stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertEquals(1, streamsInStateAfterFirstSyncCompletion.size());
assertTrue(streamsInStateAfterFirstSyncCompletion.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
assertNotNull(stateMessageEmittedAfterFirstSyncCompletion.getData());

assertEquals((MODEL_RECORDS.size()), recordsFromFirstBatch.size());
assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordsFromFirstBatch);

final JsonNode state = stateAfterFirstBatch.get(0).getData();

final ConfiguredAirbyteCatalog newTables = CatalogHelpers
.toDefaultConfiguredCatalog(new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_random",
randomTableSchema(),
Field.of(COL_ID + "_random", JsonSchemaType.NUMBER),
Field.of(COL_MAKE_ID + "_random", JsonSchemaType.NUMBER),
Field.of(COL_MODEL + "_random", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID + "_random"))))));

newTables.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL));
final List<ConfiguredAirbyteStream> combinedStreams = new ArrayList<>();
combinedStreams.addAll(CONFIGURED_CATALOG.getStreams());
combinedStreams.addAll(newTables.getStreams());

final ConfiguredAirbyteCatalog updatedCatalog = new ConfiguredAirbyteCatalog().withStreams(combinedStreams);

/*
* Write 20 records to the existing table
*/
final Set<JsonNode> recordsWritten = new HashSet<>();
for (int recordsCreated = 0; recordsCreated < 20; recordsCreated++) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated));
recordsWritten.add(record);
writeModelRecord(record);
}

final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = getSource()
.read(getConfig(), updatedCatalog, state);
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
.toListAndClose(secondBatchIterator);

final List<AirbyteStateMessage> stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch);
assertEquals(2, stateAfterSecondBatch.size());

final AirbyteStateMessage stateMessageEmittedAfterSnapshotCompletionInSecondSync = stateAfterSecondBatch.get(0);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterSnapshotCompletionInSecondSync.getType());
assertEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
stateMessageEmittedAfterSnapshotCompletionInSecondSync.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSnapshotState = stateMessageEmittedAfterSnapshotCompletionInSecondSync.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertEquals(2, streamsInSnapshotState.size());
assertTrue(
streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema())));
assertTrue(streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
assertNotNull(stateMessageEmittedAfterSnapshotCompletionInSecondSync.getData());

final AirbyteStateMessage stateMessageEmittedAfterSecondSyncCompletion = stateAfterSecondBatch.get(1);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterSecondSyncCompletion.getType());
assertNotEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSyncCompletionState = stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertEquals(2, streamsInSnapshotState.size());
assertTrue(
streamsInSyncCompletionState.contains(
new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema())));
assertTrue(streamsInSyncCompletionState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
assertNotNull(stateMessageEmittedAfterSecondSyncCompletion.getData());

final Map<String, Set<AirbyteRecordMessage>> recordsStreamWise = extractRecordMessagesStreamWise(dataFromSecondBatch);
assertTrue(recordsStreamWise.containsKey(MODELS_STREAM_NAME));
assertTrue(recordsStreamWise.containsKey(MODELS_STREAM_NAME + "_random"));

final Set<AirbyteRecordMessage> recordsForModelsStreamFromSecondBatch = recordsStreamWise.get(MODELS_STREAM_NAME);
final Set<AirbyteRecordMessage> recordsForModelsRandomStreamFromSecondBatch = recordsStreamWise.get(MODELS_STREAM_NAME + "_random");

assertEquals((MODEL_RECORDS_RANDOM.size()), recordsForModelsRandomStreamFromSecondBatch.size());
assertEquals(20, recordsForModelsStreamFromSecondBatch.size());
assertExpectedRecords(new HashSet<>(MODEL_RECORDS_RANDOM), recordsForModelsRandomStreamFromSecondBatch,
recordsForModelsRandomStreamFromSecondBatch.stream().map(AirbyteRecordMessage::getStream).collect(
Collectors.toSet()),
Sets
.newHashSet(MODELS_STREAM_NAME + "_random"),
randomTableSchema());
assertExpectedRecords(recordsWritten, recordsForModelsStreamFromSecondBatch);

/*
* Write 20 records to both the tables
*/
final Set<JsonNode> recordsWrittenInRandomTable = new HashSet<>();
recordsWritten.clear();
for (int recordsCreated = 30; recordsCreated < 50; recordsCreated++) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated));
writeModelRecord(record);
recordsWritten.add(record);

final JsonNode record2 = Jsons
.jsonNode(ImmutableMap
.of(COL_ID + "_random", 11000 + recordsCreated, COL_MAKE_ID + "_random", 1 + recordsCreated, COL_MODEL + "_random",
"Fiesta-random" + recordsCreated));
writeRecords(record2, randomTableSchema(), MODELS_STREAM_NAME + "_random",
COL_ID + "_random", COL_MAKE_ID + "_random", COL_MODEL + "_random");
recordsWrittenInRandomTable.add(record2);
}

final JsonNode state2 = stateAfterSecondBatch.get(1).getData();
final AutoCloseableIterator<AirbyteMessage> thirdBatchIterator = getSource()
.read(getConfig(), updatedCatalog, state2);
final List<AirbyteMessage> dataFromThirdBatch = AutoCloseableIterators
.toListAndClose(thirdBatchIterator);

final List<AirbyteStateMessage> stateAfterThirdBatch = extractStateMessages(dataFromThirdBatch);
assertEquals(1, stateAfterThirdBatch.size());

final AirbyteStateMessage stateMessageEmittedAfterThirdSyncCompletion = stateAfterThirdBatch.get(0);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterThirdSyncCompletion.getType());
assertNotEquals(stateMessageEmittedAfterThirdSyncCompletion.getGlobal().getSharedState(),
stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSyncCompletionStateAfterThirdSync = stateMessageEmittedAfterThirdSyncCompletion.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertTrue(
streamsInSyncCompletionStateAfterThirdSync.contains(
new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema())));
assertTrue(streamsInSyncCompletionStateAfterThirdSync.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
assertNotNull(stateMessageEmittedAfterThirdSyncCompletion.getData());

final Map<String, Set<AirbyteRecordMessage>> recordsStreamWiseFromThirdBatch = extractRecordMessagesStreamWise(dataFromThirdBatch);
assertTrue(recordsStreamWiseFromThirdBatch.containsKey(MODELS_STREAM_NAME));
assertTrue(recordsStreamWiseFromThirdBatch.containsKey(MODELS_STREAM_NAME + "_random"));

final Set<AirbyteRecordMessage> recordsForModelsStreamFromThirdBatch = recordsStreamWiseFromThirdBatch.get(MODELS_STREAM_NAME);
final Set<AirbyteRecordMessage> recordsForModelsRandomStreamFromThirdBatch = recordsStreamWiseFromThirdBatch.get(MODELS_STREAM_NAME + "_random");

assertEquals(20, recordsForModelsStreamFromThirdBatch.size());
assertEquals(20, recordsForModelsRandomStreamFromThirdBatch.size());
assertExpectedRecords(recordsWritten, recordsForModelsStreamFromThirdBatch);
assertExpectedRecords(recordsWrittenInRandomTable, recordsForModelsRandomStreamFromThirdBatch,
recordsForModelsRandomStreamFromThirdBatch.stream().map(AirbyteRecordMessage::getStream).collect(
Collectors.toSet()),
Sets
.newHashSet(MODELS_STREAM_NAME + "_random"),
randomTableSchema());
}

protected AirbyteCatalog expectedCatalogForDiscover() {
final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG);

Expand All @@ -608,7 +788,7 @@ protected AirbyteCatalog expectedCatalogForDiscover() {

final AirbyteStream randomStream = CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_random",
MODELS_SCHEMA + "_random",
randomTableSchema(),
Field.of(COL_ID + "_random", JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID + "_random", JsonSchemaType.INTEGER),
Field.of(COL_MODEL + "_random", JsonSchemaType.STRING))
Expand All @@ -623,24 +803,29 @@ protected AirbyteCatalog expectedCatalogForDiscover() {
return expectedCatalog;
}

/**
* The schema of a random table which is used as a new table in snapshot test
*/
protected abstract String randomTableSchema();

protected abstract CdcTargetPosition cdcLatestTargetPosition();

protected abstract CdcTargetPosition extractPosition(JsonNode record);
protected abstract CdcTargetPosition extractPosition(final JsonNode record);

protected abstract void assertNullCdcMetaData(JsonNode data);
protected abstract void assertNullCdcMetaData(final JsonNode data);

protected abstract void assertCdcMetaData(JsonNode data, boolean deletedAtNull);
protected abstract void assertCdcMetaData(final JsonNode data, final boolean deletedAtNull);

protected abstract void removeCDCColumns(ObjectNode data);
protected abstract void removeCDCColumns(final ObjectNode data);

protected abstract void addCdcMetadataColumns(AirbyteStream stream);
protected abstract void addCdcMetadataColumns(final AirbyteStream stream);

protected abstract Source getSource();

protected abstract JsonNode getConfig();

protected abstract Database getDatabase();

protected abstract void assertExpectedStateMessages(List<AirbyteStateMessage> stateMessages);
protected abstract void assertExpectedStateMessages(final List<AirbyteStateMessage> stateMessages);

}
Loading

0 comments on commit 9974608

Please sign in to comment.