diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json index 2eba974e2213..9b4879bd783f 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad", "name": "MySQL", "dockerRepository": "airbyte/source-mysql", - "dockerImageTag": "0.4.6", + "dockerImageTag": "0.4.8", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql", "icon": "mysql.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index aedeea74775d..dab210dbb938 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -112,7 +112,7 @@ - sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad name: MySQL dockerRepository: airbyte/source-mysql - dockerImageTag: 0.4.6 + dockerImageTag: 0.4.8 documentationUrl: https://docs.airbyte.io/integrations/sources/mysql icon: mysql.svg sourceType: database diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/DataTypeUtils.java b/airbyte-db/lib/src/main/java/io/airbyte/db/DataTypeUtils.java index 47448b418268..6745c730f988 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/DataTypeUtils.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/DataTypeUtils.java @@ -8,12 +8,17 @@ import java.sql.SQLException; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.time.Duration; import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.function.Function; public class DataTypeUtils { - public static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); // Quoted "Z" to indicate UTC, no timezone offset + public static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd'T'HH:mm:ss'Z'"; + public static final DateFormat DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT_PATTERN); // Quoted "Z" to indicate UTC, no timezone offset public static T returnNullIfInvalid(DataTypeSupplier valueProducer) { return returnNullIfInvalid(valueProducer, ignored -> true); @@ -40,4 +45,16 @@ public static String toISO8601String(java.util.Date date) { return DATE_FORMAT.format(date); } + public static String toISO8601String(LocalDate date) { + return toISO8601String(date.atStartOfDay()); + } + + public static String toISO8601String(LocalDateTime date) { + return date.format(DateTimeFormatter.ofPattern(DATE_FORMAT_PATTERN)); + } + + public static String toISO8601String(Duration duration) { + return DATE_FORMAT.format(Date.from(Instant.ofEpochSecond(Math.abs(duration.getSeconds()), Math.abs(duration.getNano())))); + } + } diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java index 228bd042c96b..09e1c54d38f2 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java @@ -27,7 +27,7 @@ public class StreamingJdbcDatabase extends JdbcDatabase { private final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration; public StreamingJdbcDatabase(DataSource dataSource, JdbcDatabase database, JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration) { - this(dataSource, database, jdbcStreamingQueryConfiguration, JdbcUtils.getDefaultSourceOperations()); + this(dataSource, database, jdbcStreamingQueryConfiguration, database.sourceOperations); } public StreamingJdbcDatabase(DataSource dataSource, diff --git a/airbyte-integrations/bases/debezium/build.gradle b/airbyte-integrations/bases/debezium/build.gradle index 80ffc481a15f..7263e6bb44b5 100644 --- a/airbyte-integrations/bases/debezium/build.gradle +++ b/airbyte-integrations/bases/debezium/build.gradle @@ -7,6 +7,7 @@ project.configurations { } dependencies { implementation project(':airbyte-protocol:models') + implementation project(':airbyte-db:lib') implementation 'io.debezium:debezium-api:1.4.2.Final' implementation 'io.debezium:debezium-embedded:1.4.2.Final' diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java similarity index 59% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java index baf5813a722a..633ed0517dda 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java @@ -4,11 +4,15 @@ package io.airbyte.integrations.debezium.internals; +import io.airbyte.db.DataTypeUtils; import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import java.sql.Timestamp; +import java.time.Duration; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeParseException; +import java.util.Arrays; import java.util.Properties; import org.apache.kafka.connect.data.SchemaBuilder; import org.slf4j.Logger; @@ -21,22 +25,48 @@ * https://debezium.io/documentation/reference/1.4/development/converters.html This is built from * reference with {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter} If you * rename this class then remember to rename the datetime.type property value in - * {@link io.airbyte.integrations.source.mysql.MySqlCdcProperties#getDebeziumProperties()} (If you + * {@link io.airbyte-integrations.source.mysql.MySqlCdcProperties#getDebeziumProperties()} (If you * don't rename, a test would still fail but it might be tricky to figure out where to change the * property name) */ -public class MySQLDateTimeConverter implements CustomConverter { +public class MySQLConverter implements CustomConverter { - private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDateTimeConverter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MySQLConverter.class); + + private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME"}; + private final String[] TEXT_TYPES = {"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT"}; @Override public void configure(Properties props) {} @Override public void converterFor(RelationalColumn field, ConverterRegistration registration) { - if (!"DATETIME".equalsIgnoreCase(field.typeName())) { - return; + if (Arrays.stream(DATE_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { + registerDate(field, registration); + } else if (Arrays.stream(TEXT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { + registerText(field, registration); } + } + + private void registerText(RelationalColumn field, ConverterRegistration registration) { + registration.register(SchemaBuilder.string(), x -> { + if (x == null) { + if (field.isOptional()) { + return null; + } else if (field.hasDefaultValue()) { + return field.defaultValue(); + } + return null; + } + + if (x instanceof byte[]) { + return new String((byte[]) x); + } else + return x.toString(); + }); + } + + private void registerDate(RelationalColumn field, ConverterRegistration registration) { registration.register(SchemaBuilder.string(), x -> { if (x == null) { if (field.isOptional()) { @@ -55,11 +85,15 @@ public void converterFor(RelationalColumn field, ConverterRegistration { + for (AirbyteMessage msg : recordMessages) { String streamName = msg.getRecord().getStream(); List expectedValuesForStream = expectedValues.get(streamName); if (expectedValuesForStream != null) { + var a = msg.getRecord().getData().get(getTestColumnName()); String value = getValueFromJsonNode(msg.getRecord().getData().get(getTestColumnName())); assertTrue(expectedValuesForStream.contains(value), - "Returned value '" + value + "' by streamer " + streamName + " should be in the expected list: " + expectedValuesForStream); + "Returned value '" + value + "' by streamer " + streamName + + " should be in the expected list: " + expectedValuesForStream); expectedValuesForStream.remove(value); } - }); + } expectedValues.forEach((streamName, values) -> assertTrue(values.isEmpty(), "The streamer " + streamName + " should return all expected values. Missing values: " + values)); } - protected String getValueFromJsonNode(JsonNode jsonNode) { + protected String getValueFromJsonNode(JsonNode jsonNode) throws IOException { if (jsonNode != null) { if (jsonNode.isArray()) { return jsonNode.toString(); } - String value = jsonNode.asText(); + String value = (jsonNode.isBinary() ? Arrays.toString(jsonNode.binaryValue()) : jsonNode.asText()); value = (value != null && value.equals("null") ? null : value); return value; } @@ -156,8 +160,6 @@ private void setupDatabaseInternal() throws Exception { * @return configured catalog */ private ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception { - final JsonNode config = getConfig(); - return new ConfiguredAirbyteCatalog().withStreams( testDataHolders .stream() diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index f71903745925..212f9171b2a2 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -86,9 +86,13 @@ public JsonNode toJdbcConfig(JsonNode config) { config.get("host").asText(), config.get("port").asText(), config.get("database").asText())); - +// zero dates by default cannot be parsed into java date objects (they will throw an error) +// in addition, users don't always have agency in fixing them e.g: maybe they don't own the database and can't +// remove zero date values. +// since zero dates are placeholders, we convert them to null by default + jdbcUrl.append("?zeroDateTimeBehavior=convertToNull"); if (!additionalParameters.isEmpty()) { - jdbcUrl.append("?"); + jdbcUrl.append("&"); additionalParameters.forEach(x -> jdbcUrl.append(x).append("&")); } diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 2786fb7e1db2..dcabc6513855 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -254,7 +254,8 @@ public JdbcDatabase createDatabase(JsonNode config) throws SQLException { jdbcConfig.get("jdbc_url").asText(), driverClass, jdbcStreamingQueryConfiguration, - jdbcConfig.has("connection_properties") ? jdbcConfig.get("connection_properties").asText() : null); + jdbcConfig.has("connection_properties") ? jdbcConfig.get("connection_properties").asText() : null, + getSourceOperations()); quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString); diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index 01c78e46dc95..b223be5a9f20 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -8,6 +8,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.4.6 +LABEL io.airbyte.version=0.4.8 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java index e84fde5368dc..0b88b6a9baab 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java @@ -18,11 +18,11 @@ static Properties getDebeziumProperties() { // https://debezium.io/documentation/reference/1.4/development/converters.html /** * {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter} - * {@link io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter} + * {@link MySQLConverter} */ props.setProperty("converters", "boolean, datetime"); props.setProperty("boolean.type", "io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter"); - props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter"); + props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLConverter"); // snapshot config // https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-snapshot-mode diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 2685f99e9cf9..4bd8a846e15d 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -15,6 +15,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.ssh.SshWrappedSource; @@ -58,7 +59,7 @@ public static Source sshWrappedSource() { } public MySqlSource() { - super(DRIVER_CLASS, new MySqlJdbcStreamingQueryConfiguration()); + super(DRIVER_CLASS, new MySqlJdbcStreamingQueryConfiguration(), new MySqlSourceOperations()); } private static AirbyteStream removeIncrementalWithoutPk(AirbyteStream stream) { @@ -178,6 +179,7 @@ public JsonNode toDatabaseConfig(JsonNode config) { // see MySqlJdbcStreamingQueryConfiguration for more context on why useCursorFetch=true is needed. jdbcUrl.append("?useCursorFetch=true"); + jdbcUrl.append("&zeroDateTimeBehavior=convertToNull"); if (config.get("jdbc_url_params") != null && !config.get("jdbc_url_params").asText().isEmpty()) { jdbcUrl.append("&").append(config.get("jdbc_url_params").asText()); } @@ -252,4 +254,9 @@ public enum ReplicationMethod { CDC } + @Override + protected JdbcSourceOperations getSourceOperations() { + return new MySqlSourceOperations(); + } + } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java new file mode 100644 index 000000000000..06b6b304cb16 --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mysql; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.db.jdbc.JdbcSourceOperations; +import java.sql.ResultSet; +import java.sql.SQLException; + +public class MySqlSourceOperations extends JdbcSourceOperations { + + @Override + protected void putBoolean(ObjectNode node, String columnName, ResultSet resultSet, int index) + throws SQLException { + node.put(columnName, resultSet.getInt(index) == 1); + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceDatatypeTest.java index dce10656ecf7..f3129a98ef91 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceDatatypeTest.java @@ -13,6 +13,7 @@ import io.airbyte.integrations.standardtest.source.TestDataHolder; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.protocol.models.JsonSchemaPrimitive; +import org.apache.commons.lang3.StringUtils; import org.jooq.SQLDialect; import org.testcontainers.containers.MySQLContainer; @@ -185,17 +186,16 @@ protected void initTests() { TestDataHolder.builder() .sourceType("float") .airbyteType(JsonSchemaPrimitive.NUMBER) - .addInsertValues("null") - .addNullExpectedValue() + .addInsertValues("null", "10.5") + .addExpectedValues(null, "10.5") .build()); addDataTypeTestData( TestDataHolder.builder() .sourceType("double") .airbyteType(JsonSchemaPrimitive.NUMBER) - .addInsertValues("null", "power(10, 308)", "1/power(10, 45)") - .addExpectedValues(null, String.valueOf(Math.pow(10, 308)), - String.valueOf(1 / Math.pow(10, 45))) + .addInsertValues("null", "power(10, 308)", "1/power(10, 45)", "10.5") + .addExpectedValues(null, String.valueOf(Math.pow(10, 308)), String.valueOf(1 / Math.pow(10, 45)), "10.5") .build()); addDataTypeTestData( @@ -203,41 +203,41 @@ protected void initTests() { .sourceType("decimal") .airbyteType(JsonSchemaPrimitive.NUMBER) .fullSourceDataType("decimal(10,4)") - .addInsertValues("0.1880", "null") + .addInsertValues("0.188", "null") .addExpectedValues("0.1880", null) .build()); addDataTypeTestData( TestDataHolder.builder() - .sourceType("bit") + .sourceType("decimal") .airbyteType(JsonSchemaPrimitive.NUMBER) - .addInsertValues("null", "1", "0") - // @TODO returns True/False instead of 1/0. - // .addExpectedValues(null, "1", "0") + .fullSourceDataType("decimal(19,2)") + .addInsertValues("1700000.00") + .addInsertValues("1700000.00") .build()); addDataTypeTestData( TestDataHolder.builder() - .sourceType("date") - .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null", "'2021-01-00'", "'2021-00-00'", "'0000-00-00'") - .addExpectedValues(null, null, null, null) + .sourceType("bit") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .addInsertValues("null", "1", "0") + .addExpectedValues(null, "true", "false") .build()); addDataTypeTestData( TestDataHolder.builder() - .sourceType("datetime") + .sourceType("date") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null", "'0000-00-00 00:00:00'") - .addExpectedValues(null, null) + .addInsertValues("null", "'2021-01-01'") + .addExpectedValues(null, "2021-01-01T00:00:00Z") .build()); addDataTypeTestData( TestDataHolder.builder() .sourceType("datetime") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("'2013-09-05T10:10:02'") - .addExpectedValues("2013-09-05T10:10:02") + .addInsertValues("null", "'2005-10-10 23:22:21'") + .addExpectedValues(null, "2005-10-10T23:22:21Z") .build()); addDataTypeTestData( @@ -252,8 +252,10 @@ protected void initTests() { TestDataHolder.builder() .sourceType("time") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null", "'-838:59:59.000000'", "'00:00:01.000000'") - .addExpectedValues(null, "-3020399000000", "1000000") + // JDBC driver can process only "clock"(00:00:00-23:59:59) values. + // https://debezium.io/documentation/reference/connectors/mysql.html#mysql-temporal-types + .addInsertValues("null", "'-23:59:59.123456'", "'00:00:00'") + .addExpectedValues(null, "1970-01-01T23:59:59Z", "1970-01-01T00:00:00Z") .build()); addDataTypeTestData( @@ -262,8 +264,7 @@ protected void initTests() { .airbyteType(JsonSchemaPrimitive.STRING) .fullSourceDataType("varchar(256) character set cp1251") .addInsertValues("null", "'тест'") - // @TODO stream returns invalid text "тест" - // .addExpectedValues(null, "тест") + .addExpectedValues(null, "тест") .build()); addDataTypeTestData( @@ -272,8 +273,7 @@ protected void initTests() { .airbyteType(JsonSchemaPrimitive.STRING) .fullSourceDataType("varchar(256) character set utf16") .addInsertValues("null", "0xfffd") - // @TODO streamer returns invalid text "�" - // .addExpectedValues(null, "�") + .addExpectedValues(null, "�") .build()); addDataTypeTestData( @@ -290,51 +290,48 @@ protected void initTests() { .sourceType("varbinary") .airbyteType(JsonSchemaPrimitive.STRING) .fullSourceDataType("varbinary(256)") - .addInsertValues("null", "'test'") - // @TODO Returns binary value instead of text - // .addExpectedValues(null, "test") + .addInsertValues("null", "'test'", "'тест'") + .addExpectedValues(null, "test", "тест") .build()); addDataTypeTestData( TestDataHolder.builder() .sourceType("blob") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null", "'test'") - // @TODO Returns binary value instead of text - // .addExpectedValues(null, "test") + .addInsertValues("null", "'test'", "'тест'") + .addExpectedValues(null, "test", "тест") .build()); addDataTypeTestData( TestDataHolder.builder() .sourceType("mediumtext") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null", "lpad('0', 16777214, '0')") - // @TODO returns null instead of long text - // .addExpectedValues(null, StringUtils.leftPad("0", 16777214, "0")) + .addInsertValues(getLogString(1048000), "'test'", "'тест'") + .addExpectedValues(StringUtils.leftPad("0", 1048000, "0"), "test", "тест") .build()); addDataTypeTestData( TestDataHolder.builder() .sourceType("tinytext") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null") - .addNullExpectedValue() + .addInsertValues("null", "'test'", "'тест'") + .addExpectedValues(null, "test", "тест") .build()); addDataTypeTestData( TestDataHolder.builder() .sourceType("longtext") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null") - .addNullExpectedValue() + .addInsertValues("null", "'test'", "'тест'") + .addExpectedValues(null, "test", "тест") .build()); addDataTypeTestData( TestDataHolder.builder() .sourceType("text") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null") - .addNullExpectedValue() + .addInsertValues("null", "'test'", "'тест'") + .addExpectedValues(null, "test", "тест") .build()); addDataTypeTestData( @@ -356,11 +353,21 @@ protected void initTests() { TestDataHolder.builder() .sourceType("bool") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null", "1", "127", "-128") - // @TODO returns number instead of boolean - // .addExpectedValues(null, "true", "false", "false") + .addInsertValues("null", "1", "0") + .addExpectedValues(null, "true", "false") .build()); } + private String getLogString(int length) { + int maxLpadLength = 262144; + StringBuilder stringBuilder = new StringBuilder("concat("); + int fullChunks = length / maxLpadLength; + for (int i = 1; i <= fullChunks; i++) { + stringBuilder.append("lpad('0', 262144, '0'),"); + } + stringBuilder.append("lpad('0', ").append(length % maxLpadLength).append(", '0'))"); + return stringBuilder.toString(); + } + } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeTest.java index 9682942ed1f4..d4ee7f6dd174 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeTest.java @@ -14,6 +14,7 @@ import io.airbyte.integrations.standardtest.source.TestDataHolder; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.protocol.models.JsonSchemaPrimitive; +import org.apache.commons.lang3.StringUtils; import org.jooq.SQLDialect; import org.testcontainers.containers.MySQLContainer; @@ -59,7 +60,8 @@ protected Database setupDatabase() throws Exception { config.get("port").asText(), config.get("database").asText()), "com.mysql.cj.jdbc.Driver", - SQLDialect.MYSQL); + SQLDialect.MYSQL, + "zeroDateTimeBehavior=convertToNull"); // It disable strict mode in the DB and allows to insert specific values. // For example, it's possible to insert date with zero values "2021-00-00" @@ -155,16 +157,16 @@ protected void initTests() { TestDataHolder.builder() .sourceType("float") .airbyteType(JsonSchemaPrimitive.NUMBER) - .addInsertValues("null") - .addNullExpectedValue() + .addInsertValues("null", "10.5") + .addExpectedValues(null, "10.5") .build()); addDataTypeTestData( TestDataHolder.builder() .sourceType("double") .airbyteType(JsonSchemaPrimitive.NUMBER) - .addInsertValues("null", "power(10, 308)", "1/power(10, 45)") - .addExpectedValues(null, String.valueOf(Math.pow(10, 308)), String.valueOf(1 / Math.pow(10, 45))) + .addInsertValues("null", "power(10, 308)", "1/power(10, 45)", "10.5") + .addExpectedValues(null, String.valueOf(Math.pow(10, 308)), String.valueOf(1 / Math.pow(10, 45)), "10.5") .build()); addDataTypeTestData( @@ -190,54 +192,40 @@ protected void initTests() { .sourceType("bit") .airbyteType(JsonSchemaPrimitive.NUMBER) .addInsertValues("null", "1", "0") - // @TODO returns True/False instead of 1/0. - // .addExpectedValues(null, "1", "0") + .addExpectedValues(null, "true", "false") .build()); addDataTypeTestData( TestDataHolder.builder() .sourceType("date") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null") - .addNullExpectedValue() - // @TODO stream fails when gets Zero date value - // .addInsertValues("'2021-01-00'", "'2021-00-00'", "'0000-00-00'") + .addInsertValues("null", "'2021-01-01'") + .addExpectedValues(null, "2021-01-01T00:00:00Z") .build()); addDataTypeTestData( TestDataHolder.builder() .sourceType("datetime") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null") - .addNullExpectedValue() - // @TODO stream fails when gets Zero date value - // .addInsertValues("'0000-00-00 00:00:00'") + .addInsertValues("null", "'2005-10-10 23:22:21'") + .addExpectedValues(null, "2005-10-10T23:22:21Z") .build()); - // addDataTypeTestData( - // TestDataHolder.builder() - // .sourceType("datetime") - // .airbyteType(JsonSchemaPrimitive.STRING) - // .addInsertValues("'2013-09-05T10:10:02'") - // .addExpectedValues("2013-09-05T10:10:02") - // .build()); - addDataTypeTestData( TestDataHolder.builder() .sourceType("timestamp") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null") - .addNullExpectedValue() + .addInsertValues("null", "'2021-01-00'", "'2021-00-00'", "'0000-00-00'") + .addExpectedValues(null, null, null, null) .build()); addDataTypeTestData( TestDataHolder.builder() .sourceType("time") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null") - .addNullExpectedValue() - // @TODO stream fails when gets Zero date value - // .addInsertValues("'-838:59:59.000000'") + // JDBC driver can process only "clock"(00:00:00-23:59:59) values. + .addInsertValues("null", "'-23:59:59.123456'", "'00:00:00'") + .addExpectedValues(null, "1970-01-01T23:59:59Z", "1970-01-01T00:00:00Z") .build()); addDataTypeTestData( @@ -274,6 +262,7 @@ protected void initTests() { .fullSourceDataType("varbinary(256)") .addInsertValues("null", "'test'") // @TODO Returns binary value instead of text + // #5878 binary value issue // .addExpectedValues(null, "test") .build()); @@ -283,6 +272,7 @@ protected void initTests() { .airbyteType(JsonSchemaPrimitive.STRING) .addInsertValues("null", "'test'") // @TODO Returns binary value instead of text + // #5878 binary value issue // .addExpectedValues(null, "test") .build()); @@ -290,33 +280,32 @@ protected void initTests() { TestDataHolder.builder() .sourceType("mediumtext") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null", "lpad('0', 16777214, '0')") - // @TODO returns null instead of long text - // .addExpectedValues(null, StringUtils.leftPad("0", 16777214, "0")) + .addInsertValues(getLogString(1048000), "'test'") + .addExpectedValues(StringUtils.leftPad("0", 1048000, "0"), "test") .build()); addDataTypeTestData( TestDataHolder.builder() .sourceType("tinytext") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null") - .addNullExpectedValue() + .addInsertValues("null", "'test'") + .addExpectedValues(null, "test") .build()); addDataTypeTestData( TestDataHolder.builder() .sourceType("longtext") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null") - .addNullExpectedValue() + .addInsertValues("null", "'test'") + .addExpectedValues(null, "test") .build()); addDataTypeTestData( TestDataHolder.builder() .sourceType("text") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null") - .addNullExpectedValue() + .addInsertValues("null", "'test'") + .addExpectedValues(null, "test") .build()); addDataTypeTestData( @@ -338,11 +327,22 @@ protected void initTests() { TestDataHolder.builder() .sourceType("bool") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null", "1", "127", "-128") - // @TODO in MySQL boolean returns true only if value equals 1, all other not null values -> false - // .addExpectedValues(null, "true", "false", "false") + // MySql boolean logic: Only value "1" is true + .addInsertValues("null", "1", "0", "127", "-128") + .addExpectedValues(null, "true", "false", "false", "false") .build()); } + private String getLogString(int length) { + int maxLpadLength = 262144; + StringBuilder stringBuilder = new StringBuilder("concat("); + int fullChunks = length / maxLpadLength; + for (int i = 1; i <= fullChunks; i++) { + stringBuilder.append("lpad('0', 262144, '0'),"); + } + stringBuilder.append("lpad('0', ").append(length % maxLpadLength).append(", '0'))"); + return stringBuilder.toString(); + } + } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java deleted file mode 100644 index 9637e0c5b8fb..000000000000 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java +++ /dev/null @@ -1,372 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mysql; - -import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT; -import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; -import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_FILE; -import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_POS; -import static io.airbyte.integrations.source.mysql.MySqlSource.DRIVER_CLASS; -import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET; -import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.util.AutoCloseableIterators; -import io.airbyte.db.Database; -import io.airbyte.db.Databases; -import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.integrations.base.Source; -import io.airbyte.integrations.debezium.CdcSourceTest; -import io.airbyte.integrations.debezium.CdcTargetPosition; -import io.airbyte.protocol.models.AirbyteCatalog; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStateMessage; -import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.JsonSchemaPrimitive; -import io.airbyte.protocol.models.SyncMode; -import java.sql.SQLException; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; -import org.jooq.SQLDialect; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.MySQLContainer; - -public class CdcMySqlSourceTest extends CdcSourceTest { - - private static final String DB_NAME = MODELS_SCHEMA; - private MySQLContainer container; - private Database database; - private MySqlSource source; - private JsonNode config; - - @BeforeEach - public void setup() throws SQLException { - init(); - revokeAllPermissions(); - grantCorrectPermissions(); - super.setup(); - } - - private void init() { - container = new MySQLContainer<>("mysql:8.0"); - container.start(); - source = new MySqlSource(); - database = Databases.createDatabase( - "root", - "test", - String.format("jdbc:mysql://%s:%s", - container.getHost(), - container.getFirstMappedPort()), - DRIVER_CLASS, - SQLDialect.MYSQL); - - config = Jsons.jsonNode(ImmutableMap.builder() - .put("host", container.getHost()) - .put("port", container.getFirstMappedPort()) - .put("database", DB_NAME) - .put("username", container.getUsername()) - .put("password", container.getPassword()) - .put("replication_method", "CDC") - .build()); - } - - private void revokeAllPermissions() { - executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';"); - } - - private void grantCorrectPermissions() { - executeQuery("GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO " + container.getUsername() + "@'%';"); - } - - @AfterEach - public void tearDown() { - try { - database.close(); - container.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Test - public void fullRefreshAndCDCShouldReturnSameRecords() throws Exception { - JsonNode record1 = Jsons.jsonNode(ImmutableMap.of( - "id", 1, - "bool_col", true, - "tiny_int_one_col", true)); - ((ObjectNode) record1).put("tiny_int_two_col", (short) 80); - JsonNode record2 = Jsons.jsonNode(ImmutableMap.of( - "id", 2, - "bool_col", false, - "tiny_int_one_col", false)); - ((ObjectNode) record2).put("tiny_int_two_col", (short) 90); - ImmutableList records = ImmutableList.of(record1, record2); - Set originalData = new HashSet<>(records); - setupForComparisonBetweenFullRefreshAndCDCSnapshot(records); - - AirbyteCatalog discover = source.discover(config); - List streams = discover.getStreams(); - - assertEquals(streams.size(), 1); - JsonNode jsonSchema = streams.get(0).getJsonSchema().get("properties"); - assertEquals(jsonSchema.get("id").get("type").asText(), "number"); - assertEquals(jsonSchema.get("bool_col").get("type").asText(), "boolean"); - assertEquals(jsonSchema.get("tiny_int_one_col").get("type").asText(), "boolean"); - assertEquals(jsonSchema.get("tiny_int_two_col").get("type").asText(), "number"); - - AirbyteCatalog catalog = new AirbyteCatalog().withStreams(streams); - final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers - .toDefaultConfiguredCatalog(catalog); - configuredCatalog.getStreams().forEach(c -> c.setSyncMode(SyncMode.FULL_REFRESH)); - - Set dataFromFullRefresh = extractRecordMessages( - AutoCloseableIterators.toListAndClose(source.read(config, configuredCatalog, null))) - .stream() - .map(AirbyteRecordMessage::getData).collect(Collectors.toSet()); - - configuredCatalog.getStreams().forEach(c -> c.setSyncMode(SyncMode.INCREMENTAL)); - Set dataFromDebeziumSnapshot = - extractRecordMessages(AutoCloseableIterators.toListAndClose(source.read(config, configuredCatalog, null))) - .stream() - .map(airbyteRecordMessage -> { - JsonNode data = airbyteRecordMessage.getData(); - removeCDCColumns((ObjectNode) data); - /** - * Debezium reads TINYINT (expect for TINYINT(1)) as IntNode while FullRefresh reads it as Short Ref - * : {@link io.airbyte.db.jdbc.JdbcUtils#setJsonField(java.sql.ResultSet, int, ObjectNode)} -> case - * TINYINT, SMALLINT -> o.put(columnName, r.getShort(i)); - */ - ((ObjectNode) data) - .put("tiny_int_two_col", (short) data.get("tiny_int_two_col").asInt()); - return data; - }) - .collect(Collectors.toSet()); - - assertEquals(dataFromFullRefresh, originalData); - assertEquals(dataFromFullRefresh, dataFromDebeziumSnapshot); - } - - private void setupForComparisonBetweenFullRefreshAndCDCSnapshot(ImmutableList data) { - executeQuery("CREATE DATABASE " + "test_schema" + ";"); - executeQuery(String.format( - "CREATE TABLE %s.%s(%s INTEGER, %s Boolean, %s TINYINT(1), %s TINYINT(2), PRIMARY KEY (%s));", - "test_schema", "table_with_tiny_int", "id", "bool_col", "tiny_int_one_col", - "tiny_int_two_col", "id")); - - executeQuery(String - .format("INSERT INTO %s.%s (%s, %s, %s, %s) VALUES (%s, %s, %s, %s);", "test_schema", - "table_with_tiny_int", - "id", "bool_col", "tiny_int_one_col", "tiny_int_two_col", - data.get(0).get("id").asInt(), data.get(0).get("bool_col").asBoolean(), - data.get(0).get("tiny_int_one_col").asBoolean() ? 99 : -99, data.get(0).get("tiny_int_two_col").asInt())); - - executeQuery(String - .format("INSERT INTO %s.%s (%s, %s, %s, %s) VALUES (%s, %s, %s, %s);", "test_schema", - "table_with_tiny_int", - "id", "bool_col", "tiny_int_one_col", "tiny_int_two_col", - data.get(1).get("id").asInt(), data.get(1).get("bool_col").asBoolean(), - data.get(1).get("tiny_int_one_col").asBoolean() ? 99 : -99, data.get(1).get("tiny_int_two_col").asInt())); - ((ObjectNode) config).put("database", "test_schema"); - } - - @Test - public void dateTimeDataTypeTest() throws Exception { - JsonNode record1 = Jsons.jsonNode(ImmutableMap.of( - "id", 1, - "datetime_col", "\'2013-09-05T10:10:02\'")); - JsonNode record2 = Jsons.jsonNode(ImmutableMap.of( - "id", 2, - "datetime_col", "\'2013-09-06T10:10:02\'")); - ImmutableList records = ImmutableList.of(record1, record2); - setupForDateTimeDataTypeTest(records); - Set originalData = records.stream().peek(c -> { - String dateTimeValue = c.get("datetime_col").asText(); - ((ObjectNode) c).put("datetime_col", dateTimeValue.substring(1, dateTimeValue.length() - 1)); - }).collect(Collectors.toSet()); - - AirbyteCatalog discover = source.discover(config); - List streams = discover.getStreams(); - - assertEquals(streams.size(), 1); - JsonNode jsonSchema = streams.get(0).getJsonSchema().get("properties"); - assertEquals(jsonSchema.get("id").get("type").asText(), "number"); - assertEquals(jsonSchema.get("datetime_col").get("type").asText(), "string"); - - AirbyteCatalog catalog = new AirbyteCatalog().withStreams(streams); - final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); - - configuredCatalog.getStreams().forEach(c -> c.setSyncMode(SyncMode.INCREMENTAL)); - Set dataFromDebeziumSnapshot = - extractRecordMessages(AutoCloseableIterators.toListAndClose(source.read(config, configuredCatalog, null))) - .stream() - .map(airbyteRecordMessage -> { - JsonNode data = airbyteRecordMessage.getData(); - removeCDCColumns((ObjectNode) data); - return data; - }) - .collect(Collectors.toSet()); - - assertEquals(originalData, dataFromDebeziumSnapshot); - - // TODO: Fix full refresh (non-cdc) mode. The value of the datetime_col is adjusted by the TIMEZONE - // the code is running in, - // in my case it got adjusted to IST i.e. "2013-09-05T15:40:02Z" and "2013-09-06T15:40:02Z". - // configuredCatalog.getStreams().forEach(c -> c.setSyncMode(SyncMode.FULL_REFRESH)); - // Set dataFromFullRefresh = extractRecordMessages( - // AutoCloseableIterators.toListAndClose(source.read(config, configuredCatalog, null))) - // .stream() - // .map(AirbyteRecordMessage::getData).collect(Collectors.toSet()); - // assertEquals(dataFromFullRefresh, originalData); - } - - private void setupForDateTimeDataTypeTest(ImmutableList data) { - executeQuery("CREATE DATABASE " + "test_schema" + ";"); - executeQuery(String.format( - "CREATE TABLE %s.%s(%s INTEGER, %s DATETIME, PRIMARY KEY (%s));", - "test_schema", "table_with_date_time", "id", "datetime_col", "id")); - - executeQuery(String - .format("INSERT INTO %s.%s (%s, %s) VALUES (%s, %s);", "test_schema", - "table_with_date_time", - "id", "datetime_col", - data.get(0).get("id").asInt(), data.get(0).get("datetime_col").asText())); - - executeQuery(String - .format("INSERT INTO %s.%s (%s, %s) VALUES (%s, %s);", "test_schema", - "table_with_date_time", - "id", "datetime_col", - data.get(1).get("id").asInt(), data.get(1).get("datetime_col").asText())); - ((ObjectNode) config).put("database", "test_schema"); - } - - @Override - protected CdcTargetPosition cdcLatestTargetPosition() { - JdbcDatabase jdbcDatabase = Databases.createJdbcDatabase( - config.get("username").asText(), - config.get("password").asText(), - String.format("jdbc:mysql://%s:%s", - config.get("host").asText(), - config.get("port").asInt()), - DRIVER_CLASS); - - return MySqlCdcTargetPosition.targetPosition(jdbcDatabase); - } - - @Override - protected CdcTargetPosition extractPosition(JsonNode record) { - return new MySqlCdcTargetPosition(record.get(CDC_LOG_FILE).asText(), record.get(CDC_LOG_POS).asInt()); - } - - @Override - protected void assertNullCdcMetaData(JsonNode data) { - assertNull(data.get(CDC_LOG_FILE)); - assertNull(data.get(CDC_LOG_POS)); - assertNull(data.get(CDC_UPDATED_AT)); - assertNull(data.get(CDC_DELETED_AT)); - } - - @Override - protected void assertCdcMetaData(JsonNode data, boolean deletedAtNull) { - assertNotNull(data.get(CDC_LOG_FILE)); - assertNotNull(data.get(CDC_LOG_POS)); - assertNotNull(data.get(CDC_UPDATED_AT)); - if (deletedAtNull) { - assertTrue(data.get(CDC_DELETED_AT).isNull()); - } else { - assertFalse(data.get(CDC_DELETED_AT).isNull()); - } - } - - @Override - protected void removeCDCColumns(ObjectNode data) { - data.remove(CDC_LOG_FILE); - data.remove(CDC_LOG_POS); - data.remove(CDC_UPDATED_AT); - data.remove(CDC_DELETED_AT); - } - - @Override - protected void addCdcMetadataColumns(AirbyteStream stream) { - ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema(); - ObjectNode properties = (ObjectNode) jsonSchema.get("properties"); - - final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number")); - - final JsonNode stringType = Jsons.jsonNode(ImmutableMap.of("type", "string")); - properties.set(CDC_LOG_FILE, stringType); - properties.set(CDC_LOG_POS, numberType); - properties.set(CDC_UPDATED_AT, stringType); - properties.set(CDC_DELETED_AT, stringType); - } - - @Override - protected Source getSource() { - return source; - } - - @Override - protected JsonNode getConfig() { - return config; - } - - @Override - protected Database getDatabase() { - return database; - } - - @Override - public void assertExpectedStateMessages(List stateMessages) { - for (AirbyteStateMessage stateMessage : stateMessages) { - assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_CDC_OFFSET)); - assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_DB_HISTORY)); - } - } - - @Override - protected AirbyteCatalog expectedCatalogForDiscover() { - final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG); - - createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", - columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.empty())); - - List streams = expectedCatalog.getStreams(); - // stream with PK - streams.get(0).setSourceDefinedCursor(true); - addCdcMetadataColumns(streams.get(0)); - - AirbyteStream streamWithoutPK = CatalogHelpers.createAirbyteStream( - MODELS_STREAM_NAME + "_2", - MODELS_SCHEMA, - Field.of(COL_ID, JsonSchemaPrimitive.NUMBER), - Field.of(COL_MAKE_ID, JsonSchemaPrimitive.NUMBER), - Field.of(COL_MODEL, JsonSchemaPrimitive.STRING)); - streamWithoutPK.setSourceDefinedPrimaryKey(Collections.emptyList()); - streamWithoutPK.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH)); - addCdcMetadataColumns(streamWithoutPK); - - streams.add(streamWithoutPK); - expectedCatalog.withStreams(streams); - return expectedCatalog; - } - -} diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlTest.java deleted file mode 100644 index 714ecbf15bc4..000000000000 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlTest.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mysql; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.commons.json.Jsons; -import io.airbyte.db.Database; -import io.airbyte.db.Databases; -import io.airbyte.protocol.models.AirbyteCatalog; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import org.apache.commons.lang3.RandomStringUtils; -import org.jooq.SQLDialect; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; -import org.testcontainers.containers.MySQLContainer; - -class MySqlTest { - - private static final String TEST_USER = "test"; - private static final String TEST_PASSWORD = "test"; - private static MySQLContainer container; - - private static String TABLE_NAME = "id_and_name"; - - private JsonNode config; - private Database database; - - @BeforeAll - static void init() throws SQLException { - container = new MySQLContainer<>("mysql:8.0") - .withUsername(TEST_USER) - .withPassword(TEST_PASSWORD) - .withEnv("MYSQL_ROOT_HOST", "%") - .withEnv("MYSQL_ROOT_PASSWORD", TEST_PASSWORD); - container.start(); - Connection connection = DriverManager.getConnection(container.getJdbcUrl(), "root", TEST_PASSWORD); - connection.createStatement().execute("GRANT ALL PRIVILEGES ON *.* TO '" + TEST_USER + "'@'%';\n"); - } - - @BeforeEach - public void setup() throws Exception { - config = Jsons.jsonNode(ImmutableMap.builder() - .put("host", container.getHost()) - .put("port", container.getFirstMappedPort()) - .put("database", "db_" + RandomStringUtils.randomAlphabetic(10)) - .put("username", TEST_USER) - .put("password", TEST_PASSWORD) - .build()); - - final Database masterDatabase = Databases.createDatabase( - config.get("username").asText(), - config.get("password").asText(), - String.format("jdbc:mysql://%s:%s", - config.get("host").asText(), - config.get("port").asText()), - MySqlSource.DRIVER_CLASS, - SQLDialect.MYSQL); - - masterDatabase.query(ctx -> { - ctx.fetch("CREATE DATABASE " + config.get("database").asText()); - return null; - }); - - masterDatabase.close(); - - database = Databases.createDatabase( - config.get("username").asText(), - config.get("password").asText(), - String.format("jdbc:mysql://%s:%s/%s", - config.get("host").asText(), - config.get("port").asText(), - config.get("database").asText()), - MySqlSource.DRIVER_CLASS, - SQLDialect.MYSQL); - } - - @AfterEach - void tearDownMySql() throws Exception { - database.close(); - } - - @AfterAll - static void cleanUp() { - container.close(); - } - - @ParameterizedTest - @ValueSource(strings = { - "TINYINT", - "SMALLINT", - "MEDIUMINT", - "INT", - "BIGINT", - "INT(1)", - "INT(2)", - "INT(3)", - "INT(4)", - "INT(5)", - "INT(6)", - "INT(7)", - "INT(8)" - }) - void testSmallIntTypes(String type) throws Exception { - database.query(ctx -> { - ctx.fetch(String.format("CREATE TABLE %s(id %s)", TABLE_NAME, type)); - ctx.fetch(String.format("INSERT INTO %s(id) VALUES (10)", TABLE_NAME)); - return null; - }); - - final AirbyteCatalog catalog = new MySqlSource().discover(config); - assertEquals("number", catalog.getStreams().get(0).getJsonSchema().get("properties").get("id").get("type").asText()); - } - -} diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index a11bfa70b544..3f34993f7954 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -145,8 +145,8 @@ MySQL data types are mapped to the following data types when synchronizing data. | `bigint` | number | | | `binary` | string | | | `blob` | string | | -| `date` | string | | -| `datetime` | string | | +| `date` | string | MySql JDBC driver has limitation. ZERO-DATE value will be converted to NULL. | +| `datetime` | string | MySql JDBC driver has limitation. ZERO-DATE value will be converted to NULL. | | `decimal` | number | | | `decimal(19, 2)` | number | | | `double` | number | | @@ -166,8 +166,8 @@ MySQL data types are mapped to the following data types when synchronizing data. | `string` | string | | | `tinyint` | number | | | `text` | string | | -| `time` | string | | -| `timestamp` | string | | +| `time` | string | MySql JDBC driver has limitation. Value should be in range between 00:00:00 and 23:59:59. | +| `timestamp` | string | MySql JDBC driver has limitation. ZERO-DATE value will be converted to NULL. | | `tinytext` | string | | | `varbinary(256)` | string | | | `varchar` | string | | @@ -180,6 +180,7 @@ If you do not see a type in this list, assume that it is coerced into a string. | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.4.8 | 2021-09-16 | [6093](https://github.com/airbytehq/airbyte/pull/6093) | Improve reliability of processing various data types like decimals, dates, datetime, binary, and text | | 0.4.7 | 2021-09-30 | [6585](https://github.com/airbytehq/airbyte/pull/6585) | Improved SSH Tunnel key generation steps | | 0.4.6 | 2021-09-29 | [6510](https://github.com/airbytehq/airbyte/pull/6510) | Support SSL connection | | 0.4.5 | 2021-09-17 | [6146](https://github.com/airbytehq/airbyte/pull/6146) | Added option to connect to DB via SSH |