Skip to content

Commit

Permalink
MySql Source: Fix data processing (airbytehq#6093)
Browse files Browse the repository at this point in the history
* move logic from static JdbcUtils to JdbcSourceOperations

* format

* Add methods for all types in order to have possibility rewrite them and use common type mapping.

* Make BigQuery in line impl with JDBC changes

* fix binary type

* add database creations methods with sourceOperations input

* add MySqlSourceOperations

* fix processing

* Fix CDC processing

* format

* add some tests for double and float

* incr version

* airbytehq#3931 airbytehq#3932 added zero-date converting to null param "zeroDateTimeBehavior=convertToNull"

* remove old tests covered by data type tests + incr ver

* Update airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java

Co-authored-by: Sherif A. Nada <[email protected]>

* Update docs/integrations/sources/mysql.md

Co-authored-by: Sherif A. Nada <[email protected]>

* add back comments to the data type mapping

* incr config version

Co-authored-by: Oleksandr Sheheda <[email protected]>
Co-authored-by: Sherif A. Nada <[email protected]>
  • Loading branch information
3 people authored and schlattk committed Jan 4, 2022
1 parent 678e020 commit c4d44bc
Show file tree
Hide file tree
Showing 18 changed files with 209 additions and 612 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.4.6",
"dockerImageTag": "0.4.8",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql",
"icon": "mysql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
name: MySQL
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.4.6
dockerImageTag: 0.4.8
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
19 changes: 18 additions & 1 deletion airbyte-db/lib/src/main/java/io/airbyte/db/DataTypeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@
import java.sql.SQLException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.function.Function;

public class DataTypeUtils {

public static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); // Quoted "Z" to indicate UTC, no timezone offset
public static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd'T'HH:mm:ss'Z'";
public static final DateFormat DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT_PATTERN); // Quoted "Z" to indicate UTC, no timezone offset

public static <T> T returnNullIfInvalid(DataTypeSupplier<T> valueProducer) {
return returnNullIfInvalid(valueProducer, ignored -> true);
Expand All @@ -40,4 +45,16 @@ public static String toISO8601String(java.util.Date date) {
return DATE_FORMAT.format(date);
}

public static String toISO8601String(LocalDate date) {
return toISO8601String(date.atStartOfDay());
}

public static String toISO8601String(LocalDateTime date) {
return date.format(DateTimeFormatter.ofPattern(DATE_FORMAT_PATTERN));
}

public static String toISO8601String(Duration duration) {
return DATE_FORMAT.format(Date.from(Instant.ofEpochSecond(Math.abs(duration.getSeconds()), Math.abs(duration.getNano()))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class StreamingJdbcDatabase extends JdbcDatabase {
private final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration;

public StreamingJdbcDatabase(DataSource dataSource, JdbcDatabase database, JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration) {
this(dataSource, database, jdbcStreamingQueryConfiguration, JdbcUtils.getDefaultSourceOperations());
this(dataSource, database, jdbcStreamingQueryConfiguration, database.sourceOperations);
}

public StreamingJdbcDatabase(DataSource dataSource,
Expand Down
1 change: 1 addition & 0 deletions airbyte-integrations/bases/debezium/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ project.configurations {
}
dependencies {
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-db:lib')

implementation 'io.debezium:debezium-api:1.4.2.Final'
implementation 'io.debezium:debezium-embedded:1.4.2.Final'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@

package io.airbyte.integrations.debezium.internals;

import io.airbyte.db.DataTypeUtils;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
Expand All @@ -21,22 +25,48 @@
* https://debezium.io/documentation/reference/1.4/development/converters.html This is built from
* reference with {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter} If you
* rename this class then remember to rename the datetime.type property value in
* {@link io.airbyte.integrations.source.mysql.MySqlCdcProperties#getDebeziumProperties()} (If you
* {@link io.airbyte-integrations.source.mysql.MySqlCdcProperties#getDebeziumProperties()} (If you
* don't rename, a test would still fail but it might be tricky to figure out where to change the
* property name)
*/
public class MySQLDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
public class MySQLConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDateTimeConverter.class);
private static final Logger LOGGER = LoggerFactory.getLogger(MySQLConverter.class);

private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME"};
private final String[] TEXT_TYPES = {"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT"};

@Override
public void configure(Properties props) {}

@Override
public void converterFor(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
if (!"DATETIME".equalsIgnoreCase(field.typeName())) {
return;
if (Arrays.stream(DATE_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerDate(field, registration);
} else if (Arrays.stream(TEXT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerText(field, registration);
}
}

private void registerText(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), x -> {
if (x == null) {
if (field.isOptional()) {
return null;
} else if (field.hasDefaultValue()) {
return field.defaultValue();
}
return null;
}

if (x instanceof byte[]) {
return new String((byte[]) x);
} else
return x.toString();
});
}

private void registerDate(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), x -> {
if (x == null) {
if (field.isOptional()) {
Expand All @@ -55,11 +85,15 @@ public void converterFor(RelationalColumn field, ConverterRegistration<SchemaBui
* Secondly, we use LocalDateTime to handle this cause it represents DATETIME datatype in JAVA
*/
if (x instanceof LocalDateTime) {
return x.toString();
return DataTypeUtils.toISO8601String((LocalDateTime) x);
} else if (x instanceof LocalDate) {
return DataTypeUtils.toISO8601String((LocalDate) x);
} else if (x instanceof Duration) {
return DataTypeUtils.toISO8601String((Duration) x);
} else if (x instanceof Timestamp) {
return ((Timestamp) x).toLocalDateTime().toString();
return DataTypeUtils.toISO8601String(((Timestamp) x).toLocalDateTime());
} else if (x instanceof Number) {
return new Timestamp(((Number) x).longValue()).toLocalDateTime().toString();
return DataTypeUtils.toISO8601String(new Timestamp(((Number) x).longValue()).toLocalDateTime());
} else if (x instanceof String) {
try {
return LocalDateTime.parse((String) x).toString();
Expand All @@ -68,7 +102,7 @@ public void converterFor(RelationalColumn field, ConverterRegistration<SchemaBui
return x.toString();
}
}
LOGGER.warn("Cannot convert value '{}' to LocalDateTime", x);
LOGGER.warn("Uncovered date class type '{}'. Use default converter", x.getClass().getName());
return x.toString();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -99,28 +101,30 @@ public void testDataTypes() throws Exception {
expectedValues.put(testDataHolder.getNameWithTestPrefix(), testDataHolder.getExpectedValues());
});

recordMessages.forEach(msg -> {
for (AirbyteMessage msg : recordMessages) {
String streamName = msg.getRecord().getStream();
List<String> expectedValuesForStream = expectedValues.get(streamName);
if (expectedValuesForStream != null) {
var a = msg.getRecord().getData().get(getTestColumnName());
String value = getValueFromJsonNode(msg.getRecord().getData().get(getTestColumnName()));
assertTrue(expectedValuesForStream.contains(value),
"Returned value '" + value + "' by streamer " + streamName + " should be in the expected list: " + expectedValuesForStream);
"Returned value '" + value + "' by streamer " + streamName
+ " should be in the expected list: " + expectedValuesForStream);
expectedValuesForStream.remove(value);
}
});
}

expectedValues.forEach((streamName, values) -> assertTrue(values.isEmpty(),
"The streamer " + streamName + " should return all expected values. Missing values: " + values));
}

protected String getValueFromJsonNode(JsonNode jsonNode) {
protected String getValueFromJsonNode(JsonNode jsonNode) throws IOException {
if (jsonNode != null) {
if (jsonNode.isArray()) {
return jsonNode.toString();
}

String value = jsonNode.asText();
String value = (jsonNode.isBinary() ? Arrays.toString(jsonNode.binaryValue()) : jsonNode.asText());
value = (value != null && value.equals("null") ? null : value);
return value;
}
Expand Down Expand Up @@ -156,8 +160,6 @@ private void setupDatabaseInternal() throws Exception {
* @return configured catalog
*/
private ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
final JsonNode config = getConfig();

return new ConfiguredAirbyteCatalog().withStreams(
testDataHolders
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,13 @@ public JsonNode toJdbcConfig(JsonNode config) {
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()));

// zero dates by default cannot be parsed into java date objects (they will throw an error)
// in addition, users don't always have agency in fixing them e.g: maybe they don't own the database and can't
// remove zero date values.
// since zero dates are placeholders, we convert them to null by default
jdbcUrl.append("?zeroDateTimeBehavior=convertToNull");
if (!additionalParameters.isEmpty()) {
jdbcUrl.append("?");
jdbcUrl.append("&");
additionalParameters.forEach(x -> jdbcUrl.append(x).append("&"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ public JdbcDatabase createDatabase(JsonNode config) throws SQLException {
jdbcConfig.get("jdbc_url").asText(),
driverClass,
jdbcStreamingQueryConfiguration,
jdbcConfig.has("connection_properties") ? jdbcConfig.get("connection_properties").asText() : null);
jdbcConfig.has("connection_properties") ? jdbcConfig.get("connection_properties").asText() : null,
getSourceOperations());

quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString);

Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.4.6
LABEL io.airbyte.version=0.4.8

LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ static Properties getDebeziumProperties() {
// https://debezium.io/documentation/reference/1.4/development/converters.html
/**
* {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter}
* {@link io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter}
* {@link MySQLConverter}
*/
props.setProperty("converters", "boolean, datetime");
props.setProperty("boolean.type", "io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter");
props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter");
props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLConverter");

// snapshot config
// https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-snapshot-mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.ssh.SshWrappedSource;
Expand Down Expand Up @@ -58,7 +59,7 @@ public static Source sshWrappedSource() {
}

public MySqlSource() {
super(DRIVER_CLASS, new MySqlJdbcStreamingQueryConfiguration());
super(DRIVER_CLASS, new MySqlJdbcStreamingQueryConfiguration(), new MySqlSourceOperations());
}

private static AirbyteStream removeIncrementalWithoutPk(AirbyteStream stream) {
Expand Down Expand Up @@ -178,6 +179,7 @@ public JsonNode toDatabaseConfig(JsonNode config) {

// see MySqlJdbcStreamingQueryConfiguration for more context on why useCursorFetch=true is needed.
jdbcUrl.append("?useCursorFetch=true");
jdbcUrl.append("&zeroDateTimeBehavior=convertToNull");
if (config.get("jdbc_url_params") != null && !config.get("jdbc_url_params").asText().isEmpty()) {
jdbcUrl.append("&").append(config.get("jdbc_url_params").asText());
}
Expand Down Expand Up @@ -252,4 +254,9 @@ public enum ReplicationMethod {
CDC
}

@Override
protected JdbcSourceOperations getSourceOperations() {
return new MySqlSourceOperations();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import java.sql.ResultSet;
import java.sql.SQLException;

public class MySqlSourceOperations extends JdbcSourceOperations {

@Override
protected void putBoolean(ObjectNode node, String columnName, ResultSet resultSet, int index)
throws SQLException {
node.put(columnName, resultSet.getInt(index) == 1);
}

}
Loading

0 comments on commit c4d44bc

Please sign in to comment.