Skip to content

Commit

Permalink
fix temporal type default value bug mysql (#15917)
Browse files Browse the repository at this point in the history
* fix handling for temporal data type columns with default values

* add tests

* minor NIT comment

* bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
subodh1810 and octavia-squidington-iii authored Aug 25, 2022
1 parent f952ace commit 264b72f
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.6.4
dockerImageTag: 0.6.5
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6046,7 +6046,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:0.6.4"
- dockerImage: "airbyte/source-mysql:0.6.5"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
import io.airbyte.db.jdbc.DateTimeConverter;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import io.debezium.time.Conversions;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -42,6 +46,12 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
}
}

private int getTimePrecision(final RelationalColumn field) {
return field.length().orElse(-1);
}

// Ref :
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
final var fieldType = field.typeName();

Expand All @@ -50,13 +60,33 @@ private void registerDate(final RelationalColumn field, final ConverterRegistrat
return DebeziumConverterUtils.convertDefaultValue(field);
}

return switch (fieldType.toUpperCase(Locale.ROOT)) {
case "DATETIME" -> DateTimeConverter.convertToTimestamp(x);
case "DATE" -> DateTimeConverter.convertToDate(x);
case "TIME" -> DateTimeConverter.convertToTime(x);
case "TIMESTAMP" -> DateTimeConverter.convertToTimestampWithTimezone(x);
default -> throw new IllegalArgumentException("Unknown field type " + fieldType.toUpperCase(Locale.ROOT));
};
switch (fieldType.toUpperCase(Locale.ROOT)) {
case "DATETIME":
if (x instanceof final Long l) {
if (getTimePrecision(field) <= 3) {
return DateTimeConverter.convertToTimestamp(Conversions.toInstantFromMillis(l));
}
if (getTimePrecision(field) <= 6) {
return DateTimeConverter.convertToTimestamp(Conversions.toInstantFromMicros(l));
}
}
return DateTimeConverter.convertToTimestamp(x);
case "DATE":
if (x instanceof final Integer i) {
return DateTimeConverter.convertToDate(LocalDate.ofEpochDay(i));
}
return DateTimeConverter.convertToDate(x);
case "TIME":
if (x instanceof Long) {
long l = Math.multiplyExact((Long) x, TimeUnit.MICROSECONDS.toNanos(1));
return DateTimeConverter.convertToTime(LocalTime.ofNanoOfDay(l));
}
return DateTimeConverter.convertToTime(x);
case "TIMESTAMP":
return DateTimeConverter.convertToTimestampWithTimezone(x);
default:
throw new IllegalArgumentException("Unknown field type " + fieldType.toUpperCase(Locale.ROOT));
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.4
LABEL io.airbyte.version=0.6.5
LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.4
LABEL io.airbyte.version=0.6.5
LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -232,31 +232,66 @@ protected void initTests() {
.addExpectedValues("1700000.01")
.build());

for (final String type : Set.of("date", "date not null default '0000-00-00'")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("date")
.fullSourceDataType(type)
.airbyteType(JsonSchemaType.STRING_DATE)
.addInsertValues("'1999-01-08'", "'2021-01-01'")
.addExpectedValues("1999-01-08", "2021-01-01")
.build());
}

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("date")
.airbyteType(JsonSchemaType.STRING_DATE)
.addInsertValues("null", "'2021-01-01'")
.addExpectedValues(null, "2021-01-01")
.addInsertValues("null")
.addExpectedValues((String) null)
.build());

for (final String fullSourceType : Set.of("datetime", "datetime not null default now()")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("datetime")
.fullSourceDataType(fullSourceType)
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("'2005-10-10 23:22:21'", "'2013-09-05T10:10:02'", "'2013-09-06T10:10:02'")
.addExpectedValues("2005-10-10T23:22:21.000000", "2013-09-05T10:10:02.000000", "2013-09-06T10:10:02.000000")
.build());
}

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("datetime")
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("null", "'2005-10-10 23:22:21'", "'2013-09-05T10:10:02'", "'2013-09-06T10:10:02'")
.addExpectedValues(null, "2005-10-10T23:22:21.000000", "2013-09-05T10:10:02.000000", "2013-09-06T10:10:02.000000")
.addInsertValues("null")
.addExpectedValues((String) null)
.build());

addTimestampDataTypeTest();

for (final String fullSourceType : Set.of("time", "time not null default '00:00:00'")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("time")
.fullSourceDataType(fullSourceType)
.airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE)
// JDBC driver can process only "clock"(00:00:00-23:59:59) values.
.addInsertValues("'-22:59:59'", "'23:59:59'", "'00:00:00'")
.addExpectedValues("22:59:59.000000", "23:59:59.000000", "00:00:00.000000")
.build());

}

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("time")
.airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE)
// JDBC driver can process only "clock"(00:00:00-23:59:59) values.
.addInsertValues("null", "'-22:59:59'", "'23:59:59'", "'00:00:00'")
.addExpectedValues(null, "22:59:59.000000", "23:59:59.000000", "00:00:00.000000")
.addInsertValues("null")
.addExpectedValues((String) null)
.build());

addDataTypeTestData(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ If you do not see a type in this list, assume that it is coerced into a string.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------|
| 0.6.5 | 2022-08-25 | [15917](https://github.com/airbytehq/airbyte/pull/15917) | Fix temporal data type default value bug |
| 0.6.4 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field |
| 0.6.3 | 2022-08-12 | [15044](https://github.com/airbytehq/airbyte/pull/15044) | Added the ability to connect using different SSL modes and SSL certificates |
| 0.6.2 | 2022-08-11 | [15538](https://github.com/airbytehq/airbyte/pull/15538) | Allow additional properties in db stream state |
Expand Down

0 comments on commit 264b72f

Please sign in to comment.