Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Snowflake: Write extracted_at in UTC #35308

Merged
merged 33 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2aa4aa5
snowflake stuff
edgao Feb 14, 2024
000e461
fixups for snowflake timezone migration stuff
edgao Feb 22, 2024
37bfa77
fix build
edgao Feb 23, 2024
5a380b3
format
edgao Feb 23, 2024
e4e467b
also skip for overwrite sync mode
edgao Feb 26, 2024
4ac67b9
fix compilation errors post rebase
gisripa Mar 1, 2024
927b3d8
fix compilation in test
gisripa Mar 1, 2024
d811907
pr comments
gisripa Mar 1, 2024
f5c9e0e
version bump logistics
gisripa Mar 1, 2024
c109d59
uppercase rawNamespace for test
gisripa Mar 1, 2024
0d9d66d
skip minstamp for overwrite
gisripa Mar 4, 2024
6a4ec88
fmt
gisripa Mar 4, 2024
226dde4
add explicit date format
gisripa Mar 4, 2024
c7b8bff
another nit tz func
gisripa Mar 4, 2024
0dad9c6
add tests??
edgao Mar 5, 2024
f4657b1
2024 >.>
edgao Mar 5, 2024
08b77ee
add test for existing local tz final table records
edgao Mar 5, 2024
8bb1803
delete migration
edgao Mar 5, 2024
1fece83
tsadd dance + fmt
gisripa Mar 5, 2024
dba28d3
test fixes
gisripa Mar 5, 2024
93531b2
try fixing test
edgao Mar 5, 2024
c5898b4
fmt
gisripa Mar 5, 2024
575d340
fix tests for real
gisripa Mar 5, 2024
839df58
fix append mode
gisripa Mar 5, 2024
e3eeac7
add append mode tests
edgao Mar 5, 2024
5ced57c
move timestamp crap into min/max call
edgao Mar 5, 2024
a067fc5
move timestamp fiddling back out of cte for max timestamp
edgao Mar 5, 2024
840116d
format
edgao Mar 5, 2024
a46a655
stop filtering on extractedAt in commitRawTable
edgao Mar 5, 2024
075cf8a
migration fixtures with mixed TZs
gisripa Mar 6, 2024
61b2d99
minor fixes
gisripa Mar 6, 2024
f3c9b3b
remove commitDestinationState temporarily
gisripa Mar 6, 2024
6902830
Revert "remove commitDestinationState temporarily"
edgao Mar 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
plugins {
id 'airbyte-java-connector'
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
}

airbyteJavaConnector {
cdkVersionRequired = '0.23.2'
cdkVersionRequired = '0.23.11'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.5.14
dockerImageTag: 3.6.0
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
Expand All @@ -23,15 +24,18 @@
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV1V2Migrator;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV2TableMigrator;
import io.airbyte.integrations.destination.snowflake.typing_deduping.migrations.SnowflakeState;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -131,7 +135,7 @@ protected JdbcSqlGenerator getSqlGenerator() {
}

@Override
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
protected JdbcDestinationHandler<SnowflakeState> getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) {
throw new UnsupportedOperationException("Snowflake does not yet use the native JDBC DV2 interface");
}

Expand All @@ -151,22 +155,33 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final TyperDeduper typerDeduper;
final JdbcDatabase database = getDatabase(getDataSource(config));
final String databaseName = config.get(JdbcUtils.DATABASE_KEY).asText();
final SnowflakeDestinationHandler snowflakeDestinationHandler = new SnowflakeDestinationHandler(databaseName, database);
final String rawTableSchemaName;
final CatalogParser catalogParser;
if (TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent()) {
catalogParser = new CatalogParser(sqlGenerator, TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get());
rawTableSchemaName = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get();
catalogParser = new CatalogParser(sqlGenerator, rawTableSchemaName);
} else {
rawTableSchemaName = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
catalogParser = new CatalogParser(sqlGenerator);
}
final SnowflakeDestinationHandler snowflakeDestinationHandler = new SnowflakeDestinationHandler(databaseName, database, rawTableSchemaName);
parsedCatalog = catalogParser.parseCatalog(catalog);
final SnowflakeV1V2Migrator migrator = new SnowflakeV1V2Migrator(getNamingResolver(), database, databaseName);
final SnowflakeV2TableMigrator v2TableMigrator = new SnowflakeV2TableMigrator(database, databaseName, sqlGenerator, snowflakeDestinationHandler);
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final List<Migration<SnowflakeState>> migrations = List.of();
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
typerDeduper =
new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator, migrations);
} else {
typerDeduper =
new DefaultTyperDeduper(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
new DefaultTyperDeduper<>(
sqlGenerator,
snowflakeDestinationHandler,
parsedCatalog,
migrator,
v2TableMigrator,
migrations);
}

return StagingConsumerFactory.builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition;
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
import io.airbyte.integrations.base.destination.typing_deduping.Array;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStateImpl;
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus;
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.Union;
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;
import io.airbyte.integrations.destination.snowflake.typing_deduping.migrations.SnowflakeState;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
Expand All @@ -40,20 +43,23 @@
import java.util.stream.Collectors;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.apache.commons.text.StringSubstitutor;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnowflakeDestinationHandler extends JdbcDestinationHandler {
public class SnowflakeDestinationHandler extends JdbcDestinationHandler<SnowflakeState> {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDestinationHandler.class);
public static final String EXCEPTION_COMMON_PREFIX = "JavaScript execution error: Uncaught Execution of multiple statements failed on statement";

private final String databaseName;
private final JdbcDatabase database;

public SnowflakeDestinationHandler(final String databaseName, final JdbcDatabase database) {
super(databaseName, database);
this.databaseName = databaseName;
public SnowflakeDestinationHandler(final String databaseName, final JdbcDatabase database, final String rawTableSchema) {
// Postgres is close enough to Snowflake SQL for our purposes.
super(databaseName, database, rawTableSchema, SQLDialect.POSTGRES);
// We don't quote the database name in any queries, so just upcase it.
this.databaseName = databaseName.toUpperCase();
this.database = database;
}

Expand Down Expand Up @@ -107,7 +113,7 @@ AND table_schema IN (%s)
AND table_name IN (%s)
""".formatted(paramHolder, paramHolder);
final String[] bindValues = new String[streamIds.size() * 2 + 1];
bindValues[0] = databaseName.toUpperCase();
bindValues[0] = databaseName;
System.arraycopy(namespaces, 0, bindValues, 1, namespaces.length);
System.arraycopy(names, 0, bindValues, namespaces.length + 1, names.length);
final List<JsonNode> results = database.queryJsons(query, bindValues);
Expand All @@ -120,14 +126,18 @@ AND table_name IN (%s)
return tableRowCounts;
}

public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception {
private InitialRawTableStatus getInitialRawTableState(final StreamId id, final DestinationSyncMode destinationSyncMode) throws Exception {
// Short-circuit for overwrite, table will be truncated anyway
if (destinationSyncMode == DestinationSyncMode.OVERWRITE) {
return new InitialRawTableStatus(false, false, Optional.empty());
}
final ResultSet tables = database.getMetaData().getTables(
databaseName,
id.rawNamespace(),
id.rawName(),
null);
if (!tables.next()) {
return new InitialRawTableState(false, Optional.empty());
return new InitialRawTableStatus(false, false, Optional.empty());
}
// Snowflake timestamps have nanosecond precision, so decrement by 1ns
// And use two explicit queries because COALESCE doesn't short-circuit.
Expand All @@ -136,33 +146,55 @@ public InitialRawTableState getInitialRawTableState(final StreamId id) throws Ex
conn -> conn.createStatement().executeQuery(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace(
"""
SELECT to_varchar(
TIMESTAMPADD(NANOSECOND, -1, MIN("_airbyte_extracted_at")),
'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM'
) AS MIN_TIMESTAMP
FROM ${raw_table}
WHERE "_airbyte_loaded_at" IS NULL
WITH MIN_TS AS (
SELECT TIMESTAMPADD(NANOSECOND, -1,
MIN(TIMESTAMPADD(
HOUR,
EXTRACT(timezone_hour from "_airbyte_extracted_at"),
TIMESTAMPADD(
MINUTE,
EXTRACT(timezone_minute from "_airbyte_extracted_at"),
CONVERT_TIMEZONE('UTC', "_airbyte_extracted_at")
)
))) AS MIN_TIMESTAMP
FROM ${raw_table}
WHERE "_airbyte_loaded_at" IS NULL
) SELECT TO_VARCHAR(MIN_TIMESTAMP,'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM') as MIN_TIMESTAMP_UTC from MIN_TS;
""")),
// The query will always return exactly one record, so use .get(0)
record -> record.getString("MIN_TIMESTAMP")).get(0));
record -> record.getString("MIN_TIMESTAMP_UTC")).get(0));
if (minUnloadedTimestamp.isPresent()) {
return new InitialRawTableState(true, minUnloadedTimestamp.map(Instant::parse));
return new InitialRawTableStatus(true, true, minUnloadedTimestamp.map(Instant::parse));
}

// If there are no unloaded raw records, then we can safely skip all existing raw records.
// This second query just finds the newest raw record.

// This is _technically_ wrong, because during the DST transition we might select
// the wrong max timestamp. We _should_ do the UTC conversion inside the CTE, but that's a lot
// of work for a very small edge case.
// We released the fix to write extracted_at in UTC before DST changed, so this is fine.
final Optional<String> maxTimestamp = Optional.ofNullable(database.queryStrings(
conn -> conn.createStatement().executeQuery(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace(
"""
SELECT to_varchar(
MAX("_airbyte_extracted_at"),
'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM'
) AS MIN_TIMESTAMP
FROM ${raw_table}
WITH MAX_TS AS (
SELECT MAX("_airbyte_extracted_at")
AS MAX_TIMESTAMP
FROM ${raw_table}
) SELECT TO_VARCHAR(
TIMESTAMPADD(
HOUR,
EXTRACT(timezone_hour from MAX_TIMESTAMP),
TIMESTAMPADD(
MINUTE,
EXTRACT(timezone_minute from MAX_TIMESTAMP),
CONVERT_TIMEZONE('UTC', MAX_TIMESTAMP)
)
),'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM') as MAX_TIMESTAMP_UTC from MAX_TS;
""")),
record -> record.getString("MIN_TIMESTAMP")).get(0));
return new InitialRawTableState(false, maxTimestamp.map(Instant::parse));
record -> record.getString("MAX_TIMESTAMP_UTC")).get(0));
return new InitialRawTableStatus(true, false, maxTimestamp.map(Instant::parse));
}

@Override
Expand All @@ -171,7 +203,7 @@ public void execute(final Sql sql) throws Exception {
final UUID queryId = UUID.randomUUID();
for (final String transaction : transactions) {
final UUID transactionId = UUID.randomUUID();
LOGGER.debug("Executing sql {}-{}: {}", queryId, transactionId, transaction);
LOGGER.info("Executing sql {}-{}: {}", queryId, transactionId, transaction);
final long startTime = System.currentTimeMillis();

try {
Expand All @@ -190,7 +222,7 @@ public void execute(final Sql sql) throws Exception {
throw new RuntimeException(trimmedMessage, e);
}

LOGGER.debug("Sql {}-{} completed in {} ms", queryId, transactionId, System.currentTimeMillis() - startTime);
LOGGER.info("Sql {}-{} completed in {} ms", queryId, transactionId, System.currentTimeMillis() - startTime);
}
}

Expand Down Expand Up @@ -250,7 +282,9 @@ protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, f
}

@Override
public List<DestinationInitialState> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
public List<DestinationInitialStatus<SnowflakeState>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
final Map<AirbyteStreamNameNamespacePair, SnowflakeState> destinationStates = super.getAllDestinationStates();

List<StreamId> streamIds = streamConfigs.stream().map(StreamConfig::id).toList();
final LinkedHashMap<String, LinkedHashMap<String, TableDefinition>> existingTables = findExistingTables(database, databaseName, streamIds);
final LinkedHashMap<String, LinkedHashMap<String, Integer>> tableRowCounts = getFinalTableRowCount(streamIds);
Expand All @@ -267,8 +301,15 @@ public List<DestinationInitialState> gatherInitialState(List<StreamConfig> strea
isSchemaMismatch = !existingSchemaMatchesStreamConfig(streamConfig, existingTable);
isFinalTableEmpty = hasRowCount && tableRowCounts.get(namespace).get(name) == 0;
}
final InitialRawTableState initialRawTableState = getInitialRawTableState(streamConfig.id());
return new DestinationInitialStateImpl(streamConfig, isFinalTablePresent, initialRawTableState, isSchemaMismatch, isFinalTableEmpty);
final InitialRawTableStatus initialRawTableState = getInitialRawTableState(streamConfig.id(), streamConfig.destinationSyncMode());
final SnowflakeState destinationState = destinationStates.getOrDefault(streamConfig.id().asPair(), toDestinationState(Jsons.emptyObject()));
return new DestinationInitialStatus<>(
streamConfig,
isFinalTablePresent,
initialRawTableState,
isSchemaMismatch,
isFinalTableEmpty,
destinationState);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -290,6 +331,12 @@ protected String toJdbcTypeName(AirbyteType airbyteType) {
};
}

@Override
protected SnowflakeState toDestinationState(JsonNode json) {
return new SnowflakeState(
json.hasNonNull("needsSoftReset") && json.get("needsSoftReset").asBoolean());
}

private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
return switch (airbyteProtocolType) {
case STRING -> "TEXT";
Expand Down
Loading
Loading