Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql source : implement support for snapshot of new tables in cdc mode #16954

Merged
merged 14 commits into from
Sep 26, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,29 @@ public AirbyteDebeziumHandler(final JsonNode config,
this.firstRecordWaitTime = firstRecordWaitTime;
}

public AutoCloseableIterator<AirbyteMessage> getSnapshotIterators(final ConfiguredAirbyteCatalog catalog,
public AutoCloseableIterator<AirbyteMessage> getSnapshotIterators(
final ConfiguredAirbyteCatalog catalogContainingStreamsToSnapshot,
final CdcMetadataInjector cdcMetadataInjector,
final Properties connectorProperties,
final Properties snapshotProperties,
final CdcStateHandler cdcStateHandler,
final Instant emittedAt) {
LOGGER.info("Running snapshot for " + catalog.getStreams().size() + " new tables");

LOGGER.info("Running snapshot for " + catalogContainingStreamsToSnapshot.getStreams().size() + " new tables");
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);

final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeDummyStateForSnapshotPurpose();
/*
* TODO(Subodh) : Since Postgres doesn't require schema history this is fine but we need to fix this
* for MySQL and MSSQL
*/
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager = Optional.empty();
final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(connectorProperties,
final DebeziumRecordPublisher tableSnapshotPublisher = new DebeziumRecordPublisher(snapshotProperties,
config,
catalog,
catalogContainingStreamsToSnapshot,
offsetManager,
schemaHistoryManager);
publisher.start(queue);
schemaHistoryManager(new EmptySavedInfo()));
tableSnapshotPublisher.start(queue);

final AutoCloseableIterator<ChangeEvent<String, String>> eventIterator = new DebeziumRecordIterator(
queue,
targetPosition,
publisher::hasClosed,
publisher::close,
tableSnapshotPublisher::hasClosed,
tableSnapshotPublisher::close,
firstRecordWaitTime);

return AutoCloseableIterators.concatWithEagerClose(AutoCloseableIterators
Expand Down Expand Up @@ -155,4 +152,18 @@ public static boolean shouldUseCDC(final ConfiguredAirbyteCatalog catalog) {
.anyMatch(syncMode -> syncMode == SyncMode.INCREMENTAL);
}

private static class EmptySavedInfo implements CdcSavedInfoFetcher {

@Override
public JsonNode getSavedOffset() {
return null;
}

@Override
public Optional<JsonNode> getSavedSchemaHistory() {
return Optional.empty();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.debezium;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -27,11 +28,13 @@
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -177,12 +180,14 @@ private void createAndPopulateActualTable() {
* databases not being synced by Airbyte are not causing issues with our debezium logic
*/
private void createAndPopulateRandomTable() {
createSchema(MODELS_SCHEMA + "_random");
createTable(MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random",
if (!randomTableSchema().equals(MODELS_SCHEMA)) {
createSchema(randomTableSchema());
}
createTable(randomTableSchema(), MODELS_STREAM_NAME + "_random",
columnClause(ImmutableMap.of(COL_ID + "_random", "INTEGER", COL_MAKE_ID + "_random", "INTEGER", COL_MODEL + "_random", "VARCHAR(200)"),
Optional.of(COL_ID + "_random")));
for (final JsonNode recordJson : MODEL_RECORDS_RANDOM) {
writeRecords(recordJson, MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random",
writeRecords(recordJson, randomTableSchema(), MODELS_STREAM_NAME + "_random",
COL_ID + "_random", COL_MAKE_ID + "_random", COL_MODEL + "_random");
}
}
Expand Down Expand Up @@ -585,6 +590,181 @@ void testDiscover() throws Exception {
.collect(Collectors.toList()));
}

@Test
public void newTableSnapshotTest() throws Exception {
final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = getSource()
.read(getConfig(), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> dataFromFirstBatch = AutoCloseableIterators
.toListAndClose(firstBatchIterator);
final Set<AirbyteRecordMessage> recordsFromFirstBatch = extractRecordMessages(
dataFromFirstBatch);
final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch);
assertEquals(1, stateAfterFirstBatch.size());

final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion = stateAfterFirstBatch.get(0);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterFirstSyncCompletion.getType());
assertNotNull(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInStateAfterFirstSyncCompletion = stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertEquals(1, streamsInStateAfterFirstSyncCompletion.size());
assertTrue(streamsInStateAfterFirstSyncCompletion.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
assertNotNull(stateMessageEmittedAfterFirstSyncCompletion.getData());

assertEquals((MODEL_RECORDS.size()), recordsFromFirstBatch.size());
assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordsFromFirstBatch);

final JsonNode state = stateAfterFirstBatch.get(0).getData();

final ConfiguredAirbyteCatalog newTables = CatalogHelpers
.toDefaultConfiguredCatalog(new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_random",
randomTableSchema(),
Field.of(COL_ID + "_random", JsonSchemaType.NUMBER),
Field.of(COL_MAKE_ID + "_random", JsonSchemaType.NUMBER),
Field.of(COL_MODEL + "_random", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID + "_random"))))));

newTables.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL));
final List<ConfiguredAirbyteStream> combinedStreams = new ArrayList<>();
combinedStreams.addAll(CONFIGURED_CATALOG.getStreams());
combinedStreams.addAll(newTables.getStreams());

final ConfiguredAirbyteCatalog updatedCatalog = new ConfiguredAirbyteCatalog().withStreams(combinedStreams);

/*
* Write 20 records to the existing table
*/
final Set<JsonNode> recordsWritten = new HashSet<>();
for (int recordsCreated = 0; recordsCreated < 20; recordsCreated++) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated));
recordsWritten.add(record);
writeModelRecord(record);
}

final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = getSource()
.read(getConfig(), updatedCatalog, state);
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
.toListAndClose(secondBatchIterator);

final List<AirbyteStateMessage> stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch);
assertEquals(2, stateAfterSecondBatch.size());

final AirbyteStateMessage stateMessageEmittedAfterSnapshotCompletionInSecondSync = stateAfterSecondBatch.get(0);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterSnapshotCompletionInSecondSync.getType());
assertEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
stateMessageEmittedAfterSnapshotCompletionInSecondSync.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSnapshotState = stateMessageEmittedAfterSnapshotCompletionInSecondSync.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertEquals(2, streamsInSnapshotState.size());
assertTrue(
streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema())));
assertTrue(streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
assertNotNull(stateMessageEmittedAfterSnapshotCompletionInSecondSync.getData());

final AirbyteStateMessage stateMessageEmittedAfterSecondSyncCompletion = stateAfterSecondBatch.get(1);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterSecondSyncCompletion.getType());
assertNotEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSyncCompletionState = stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertEquals(2, streamsInSnapshotState.size());
assertTrue(
streamsInSyncCompletionState.contains(
new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema())));
assertTrue(streamsInSyncCompletionState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
assertNotNull(stateMessageEmittedAfterSecondSyncCompletion.getData());

final Map<String, Set<AirbyteRecordMessage>> recordsStreamWise = extractRecordMessagesStreamWise(dataFromSecondBatch);
assertTrue(recordsStreamWise.containsKey(MODELS_STREAM_NAME));
assertTrue(recordsStreamWise.containsKey(MODELS_STREAM_NAME + "_random"));

final Set<AirbyteRecordMessage> recordsForModelsStreamFromSecondBatch = recordsStreamWise.get(MODELS_STREAM_NAME);
final Set<AirbyteRecordMessage> recordsForModelsRandomStreamFromSecondBatch = recordsStreamWise.get(MODELS_STREAM_NAME + "_random");

assertEquals((MODEL_RECORDS_RANDOM.size()), recordsForModelsRandomStreamFromSecondBatch.size());
assertEquals(20, recordsForModelsStreamFromSecondBatch.size());
assertExpectedRecords(new HashSet<>(MODEL_RECORDS_RANDOM), recordsForModelsRandomStreamFromSecondBatch,
recordsForModelsRandomStreamFromSecondBatch.stream().map(AirbyteRecordMessage::getStream).collect(
Collectors.toSet()),
Sets
.newHashSet(MODELS_STREAM_NAME + "_random"),
randomTableSchema());
assertExpectedRecords(recordsWritten, recordsForModelsStreamFromSecondBatch);

/*
* Write 20 records to both the tables
*/
final Set<JsonNode> recordsWrittenInRandomTable = new HashSet<>();
recordsWritten.clear();
for (int recordsCreated = 30; recordsCreated < 50; recordsCreated++) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated));
writeModelRecord(record);
recordsWritten.add(record);

final JsonNode record2 = Jsons
.jsonNode(ImmutableMap
.of(COL_ID + "_random", 11000 + recordsCreated, COL_MAKE_ID + "_random", 1 + recordsCreated, COL_MODEL + "_random",
"Fiesta-random" + recordsCreated));
writeRecords(record2, randomTableSchema(), MODELS_STREAM_NAME + "_random",
COL_ID + "_random", COL_MAKE_ID + "_random", COL_MODEL + "_random");
recordsWrittenInRandomTable.add(record2);
}

final JsonNode state2 = stateAfterSecondBatch.get(1).getData();
final AutoCloseableIterator<AirbyteMessage> thirdBatchIterator = getSource()
.read(getConfig(), updatedCatalog, state2);
final List<AirbyteMessage> dataFromThirdBatch = AutoCloseableIterators
.toListAndClose(thirdBatchIterator);

final List<AirbyteStateMessage> stateAfterThirdBatch = extractStateMessages(dataFromThirdBatch);
assertEquals(1, stateAfterThirdBatch.size());

final AirbyteStateMessage stateMessageEmittedAfterThirdSyncCompletion = stateAfterThirdBatch.get(0);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterThirdSyncCompletion.getType());
assertNotEquals(stateMessageEmittedAfterThirdSyncCompletion.getGlobal().getSharedState(),
stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSyncCompletionStateAfterThirdSync = stateMessageEmittedAfterThirdSyncCompletion.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertTrue(
streamsInSyncCompletionStateAfterThirdSync.contains(
new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema())));
assertTrue(streamsInSyncCompletionStateAfterThirdSync.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
assertNotNull(stateMessageEmittedAfterThirdSyncCompletion.getData());

final Map<String, Set<AirbyteRecordMessage>> recordsStreamWiseFromThirdBatch = extractRecordMessagesStreamWise(dataFromThirdBatch);
assertTrue(recordsStreamWiseFromThirdBatch.containsKey(MODELS_STREAM_NAME));
assertTrue(recordsStreamWiseFromThirdBatch.containsKey(MODELS_STREAM_NAME + "_random"));

final Set<AirbyteRecordMessage> recordsForModelsStreamFromThirdBatch = recordsStreamWiseFromThirdBatch.get(MODELS_STREAM_NAME);
final Set<AirbyteRecordMessage> recordsForModelsRandomStreamFromThirdBatch = recordsStreamWiseFromThirdBatch.get(MODELS_STREAM_NAME + "_random");

assertEquals(20, recordsForModelsStreamFromThirdBatch.size());
assertEquals(20, recordsForModelsRandomStreamFromThirdBatch.size());
assertExpectedRecords(recordsWritten, recordsForModelsStreamFromThirdBatch);
assertExpectedRecords(recordsWrittenInRandomTable, recordsForModelsRandomStreamFromThirdBatch,
recordsForModelsRandomStreamFromThirdBatch.stream().map(AirbyteRecordMessage::getStream).collect(
Collectors.toSet()),
Sets
.newHashSet(MODELS_STREAM_NAME + "_random"),
randomTableSchema());
}

protected AirbyteCatalog expectedCatalogForDiscover() {
final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG);

Expand All @@ -608,7 +788,7 @@ protected AirbyteCatalog expectedCatalogForDiscover() {

final AirbyteStream randomStream = CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_random",
MODELS_SCHEMA + "_random",
randomTableSchema(),
Field.of(COL_ID + "_random", JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID + "_random", JsonSchemaType.INTEGER),
Field.of(COL_MODEL + "_random", JsonSchemaType.STRING))
Expand All @@ -623,24 +803,29 @@ protected AirbyteCatalog expectedCatalogForDiscover() {
return expectedCatalog;
}

/**
* The schema of a random table which is used as a new table in snapshot test
*/
protected abstract String randomTableSchema();
edgao marked this conversation as resolved.
Show resolved Hide resolved

protected abstract CdcTargetPosition cdcLatestTargetPosition();

protected abstract CdcTargetPosition extractPosition(JsonNode record);
protected abstract CdcTargetPosition extractPosition(final JsonNode record);

protected abstract void assertNullCdcMetaData(JsonNode data);
protected abstract void assertNullCdcMetaData(final JsonNode data);

protected abstract void assertCdcMetaData(JsonNode data, boolean deletedAtNull);
protected abstract void assertCdcMetaData(final JsonNode data, final boolean deletedAtNull);

protected abstract void removeCDCColumns(ObjectNode data);
protected abstract void removeCDCColumns(final ObjectNode data);

protected abstract void addCdcMetadataColumns(AirbyteStream stream);
protected abstract void addCdcMetadataColumns(final AirbyteStream stream);

protected abstract Source getSource();

protected abstract JsonNode getConfig();

protected abstract Database getDatabase();

protected abstract void assertExpectedStateMessages(List<AirbyteStateMessage> stateMessages);
protected abstract void assertExpectedStateMessages(final List<AirbyteStateMessage> stateMessages);

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
Expand All @@ -37,11 +38,15 @@
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.jdbc.StreamingJdbcDatabase;
import io.airbyte.db.jdbc.streaming.JdbcStreamingQueryConfig;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.integrations.source.relationaldb.AbstractRelationalDbSource;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.JsonSchemaType;
import java.net.MalformedURLException;
import java.net.URI;
Expand Down Expand Up @@ -522,4 +527,23 @@ protected String toSslJdbcParam(final SslMode sslMode) {
// Default implementation
return sslMode.name();
}



protected List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final ConfiguredAirbyteCatalog catalog, final StateManager stateManager) {
edgao marked this conversation as resolved.
Show resolved Hide resolved
final Set<AirbyteStreamNameNamespacePair> alreadySyncedStreams = stateManager.getCdcStateManager().getInitialStreamsSynced();
if (alreadySyncedStreams.isEmpty() && (stateManager.getCdcStateManager().getCdcState() == null
|| stateManager.getCdcStateManager().getCdcState().getState() == null)) {
return Collections.emptyList();
}

final Set<AirbyteStreamNameNamespacePair> allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog);

final Set<AirbyteStreamNameNamespacePair> newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySyncedStreams));

return catalog.getStreams().stream()
.filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream.getStream())))
.map(Jsons::clone)
.collect(Collectors.toList());
}
}
Loading