Skip to content

Commit

Permalink
Source Snowflake: Handle date-time data types (airbytehq#17144)
Browse files Browse the repository at this point in the history
* Source Snowflake: Handle date-time data types

* updated changelog

* replaced star import

* bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
2 people authored and robbinhan committed Sep 29, 2022
1 parent 97eb13d commit f10a304
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@
- name: Snowflake
sourceDefinitionId: e2d65910-8c8b-40a1-ae7d-ee2416b2bfa2
dockerRepository: airbyte/source-snowflake
dockerImageTag: 0.1.23
dockerImageTag: 0.1.24
documentationUrl: https://docs.airbyte.io/integrations/sources/snowflake
icon: snowflake.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10241,7 +10241,7 @@
- - "client_secret"
oauthFlowOutputParameters:
- - "refresh_token"
- dockerImage: "airbyte/source-snowflake:0.1.23"
- dockerImage: "airbyte/source-snowflake:0.1.24"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/snowflake"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-snowflake

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.23
LABEL io.airbyte.version=0.1.24
LABEL io.airbyte.name=airbyte/source-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,33 @@

package io.airbyte.integrations.source.snowflake;

import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_SCHEMA_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_TABLE_NAME;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.db.jdbc.DateTimeConverter;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.protocol.models.JsonSchemaType;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnowflakeSourceOperations extends JdbcSourceOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSourceOperations.class);

@Override
protected void putDouble(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) {
try {
Expand All @@ -25,6 +41,23 @@ protected void putDouble(final ObjectNode node, final String columnName, final R
}
}

@Override
public JDBCType getFieldType(final JsonNode field) {
try {
final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText().toLowerCase();
return "TIMESTAMPLTZ".equalsIgnoreCase(typeName)
? JDBCType.TIMESTAMP_WITH_TIMEZONE
: JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt());
} catch (final IllegalArgumentException ex) {
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.",
field.get(INTERNAL_COLUMN_NAME),
field.get(INTERNAL_SCHEMA_NAME),
field.get(INTERNAL_TABLE_NAME),
field.get(INTERNAL_COLUMN_TYPE)));
return JDBCType.VARCHAR;
}
}

@Override
protected void putBigInt(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) {
try {
Expand Down Expand Up @@ -61,19 +94,25 @@ public JsonSchemaType getJsonType(final JDBCType jdbcType) {
}

/**
* The only difference between this method and the one in {@link JdbcSourceOperations} is that
* the TIMESTAMP_WITH_TIMEZONE columns are also converted using the putTimestamp method.
* This is necessary after the JDBC upgrade from 3.13.9 to 3.13.22. This change may need to be
* added to {@link JdbcSourceOperations#setJsonField} in the future.
* The only difference between this method and the one in {@link JdbcSourceOperations} is that the
* TIMESTAMP_WITH_TIMEZONE columns are also converted using the putTimestamp method. This is
* necessary after the JDBC upgrade from 3.13.9 to 3.13.22. This change may need to be added to
* {@link JdbcSourceOperations#setJsonField} in the future.
* <p/>
* See issue: https://github.com/airbytehq/airbyte/issues/16838.
*/
@Override
public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
final int columnTypeInt = resultSet.getMetaData().getColumnType(colIndex);
final String columnName = resultSet.getMetaData().getColumnName(colIndex);
final JDBCType columnType = safeGetJdbcType(columnTypeInt);
final String columnTypeName = resultSet.getMetaData().getColumnTypeName(colIndex).toLowerCase();

final JDBCType columnType = safeGetJdbcType(columnTypeInt);
// TIMESTAMPLTZ data type detected as JDBCType.TIMESTAMP which is not correct
if ("TIMESTAMPLTZ".equalsIgnoreCase(columnTypeName)) {
putTimestampWithTimezone(json, columnName, resultSet, colIndex);
return;
}
// https://www.cis.upenn.edu/~bcpierce/courses/629/jdkdocs/guide/jdbc/getstart/mapping.doc.html
switch (columnType) {
case BIT, BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
Expand All @@ -86,11 +125,43 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
case CHAR, VARCHAR, LONGVARCHAR -> putString(json, columnName, resultSet, colIndex);
case DATE -> putDate(json, columnName, resultSet, colIndex);
case TIME -> putTime(json, columnName, resultSet, colIndex);
case TIMESTAMP, TIMESTAMP_WITH_TIMEZONE -> putTimestamp(json, columnName, resultSet, colIndex);
case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex);
case TIMESTAMP_WITH_TIMEZONE -> putTimestampWithTimezone(json, columnName, resultSet, colIndex);
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex);
case ARRAY -> putArray(json, columnName, resultSet, colIndex);
default -> putDefault(json, columnName, resultSet, colIndex);
}
}

@Override
protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
final LocalDate date = LocalDate.parse(value);
preparedStatement.setDate(parameterIndex, Date.valueOf(date));
}

@Override
protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
final Timestamp timestamp = resultSet.getTimestamp(index);
node.put(columnName, DateTimeConverter.convertToTimestampWithTimezone(timestamp));
}

@Override
protected void putTimestamp(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
final Timestamp timestamp = resultSet.getTimestamp(index);
node.put(columnName, DateTimeConverter.convertToTimestamp(timestamp));
}

@Override
protected void putDate(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
final Date date = resultSet.getDate(index);
node.put(columnName, DateTimeConverter.convertToDate(date));
}

@Override
protected void putTime(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
// resultSet.getTime() will lose nanoseconds precision
final LocalTime localTime = resultSet.getTimestamp(index).toLocalDateTime().toLocalTime();
node.put(columnName, DateTimeConverter.convertToTime(localTime));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,33 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.integrations.source.relationaldb.models.DbStreamState;
import io.airbyte.integrations.source.snowflake.SnowflakeSource;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.math.BigDecimal;
import java.nio.file.Path;
import java.sql.JDBCType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -159,4 +166,60 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
}

protected List<AirbyteMessage> getTestMessages() {
return List.of(
new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_1,
COL_NAME, "picard",
COL_UPDATED_AT, "2004-10-19")))),
new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_2,
COL_NAME, "crusher",
COL_UPDATED_AT,
"2005-10-19")))),
new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_3,
COL_NAME, "vash",
COL_UPDATED_AT, "2006-10-19")))));
}

@Override
protected void incrementalDateCheck() throws Exception {
super.incrementalCursorCheck(COL_UPDATED_AT,
"2005-10-18",
"2006-10-19",
Lists.newArrayList(getTestMessages().get(1),
getTestMessages().get(2)));
}

@Override
protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(final String namespace) {
final List<AirbyteMessage> expectedMessages = new ArrayList<>();
expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace)
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_4,
COL_NAME, "riker",
COL_UPDATED_AT, "2006-10-19")))));
expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace)
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_5,
COL_NAME, "data",
COL_UPDATED_AT, "2006-10-19")))));
final DbStreamState state = new DbStreamState()
.withStreamName(streamName)
.withStreamNamespace(namespace)
.withCursorField(List.of(COL_ID))
.withCursor("5");
expectedMessages.addAll(createExpectedTestMessages(List.of(state)));
return expectedMessages;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -254,29 +254,29 @@ protected void initTests() {
.sourceType("DATE")
.airbyteType(JsonSchemaType.STRING_DATE)
.addInsertValues("null", "'0001-01-01'", "'9999-12-31'")
.addExpectedValues(null, "0001-01-01T00:00:00Z", "9999-12-31T00:00:00Z")
.addExpectedValues(null, "0001-01-01", "9999-12-31")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("DATETIME")
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("null", "'0001-01-01 00:00:00'", "'9999-12-31 23:59:59'", "'9999-12-31 23:59:59.123456'")
.addExpectedValues(null, "0001-01-01T00:00:00.000000Z", "9999-12-31T23:59:59.000000Z", "9999-12-31T23:59:59.123456Z")
.addExpectedValues(null, "0001-01-01T00:00:00.000000", "9999-12-31T23:59:59.000000", "9999-12-31T23:59:59.123456")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("TIME")
.airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE)
.addInsertValues("null", "'00:00:00'", "'1:59 PM'", "'23:59:59'")
.addExpectedValues(null, "1970-01-01T00:00:00Z", "1970-01-01T13:59:00Z",
"1970-01-01T23:59:59Z")
.addInsertValues("null", "'00:00:00'", "'1:59 PM'", "'23:59:59.123456'")
.addExpectedValues(null, "00:00:00.000000", "13:59:00.000000",
"23:59:59.123456")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("TIMESTAMP")
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("null", "'2018-03-22 12:00:00.123'", "'2018-03-22 12:00:00.123456'")
.addExpectedValues(null, "2018-03-22T12:00:00.123000Z", "2018-03-22T12:00:00.123456Z")
.addExpectedValues(null, "2018-03-22T12:00:00.123000", "2018-03-22T12:00:00.123456")
.build());
addDataTypeTestData(
TestDataHolder.builder()
Expand All @@ -290,7 +290,7 @@ protected void initTests() {
.sourceType("TIMESTAMP_NTZ")
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("null", "'2018-03-22 12:00:00.123 +05:00'", "'2018-03-22 12:00:00.123456 +05:00'")
.addExpectedValues(null, "2018-03-22T12:00:00.123000Z", "2018-03-22T12:00:00.123456Z")
.addExpectedValues(null, "2018-03-22T12:00:00.123000", "2018-03-22T12:00:00.123456")
.build());
addDataTypeTestData(
TestDataHolder.builder()
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ To read more please check official [Snowflake documentation](https://docs.snowfl

| Version | Date | Pull Request | Subject |
|:----------| :--- | :--- | :--- |
| 0.1.24 | 2022-09-26 | [17144](https://github.com/airbytehq/airbyte/pull/17144) | Fixed bug with incorrect date-time datatypes handling |
| 0.1.23 | 2022-09-26 | [17116](https://github.com/airbytehq/airbyte/pull/17116) | added connection string identifier |
| 0.1.22 | 2022-09-21 | [16766](https://github.com/airbytehq/airbyte/pull/16766) | Update JDBC Driver version to 3.13.22 |
| 0.1.21 | 2022-09-14 | [15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
Expand Down

0 comments on commit f10a304

Please sign in to comment.