Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New source: TiDB #11283

Merged
merged 12 commits into from
Apr 20, 2022
21 changes: 21 additions & 0 deletions airbyte-integrations/connectors/source-tidb/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM airbyte/integration-base-java:dev AS build

WORKDIR /airbyte

ENV APPLICATION source-tidb

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar

FROM airbyte/integration-base-java:dev

WORKDIR /airbyte

ENV APPLICATION source-tidb

COPY --from=build /airbyte /airbyte

# Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile.
LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/source-tidb
69 changes: 69 additions & 0 deletions airbyte-integrations/connectors/source-tidb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Source Tidb

This is the repository for the Tidb source connector in Java.
For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/sources/tidb).

## Local development

#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:source-tidb:build
```

#### Create credentials
**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`.
Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information.

**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials.

### Locally running the connector docker image

#### Build
Build the connector image via Gradle:
```
./gradlew :airbyte-integrations:connectors:source-tidb:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.

#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/source-tidb:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-tidb:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-tidb:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-tidb:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```

## Testing
We use `JUnit` for Java tests.

### Unit and Integration Tests
Place unit tests under `src/test/...`
Place integration tests in `src/test-integration/...`

#### Acceptance Tests
Airbyte has a standard test suite that all source connectors must pass. Implement the `TODO`s in
`src/test-integration/java/io/airbyte/integrations/sources/tiDBSourceAcceptanceTest.java`.
Daemonxiao marked this conversation as resolved.
Show resolved Hide resolved

### Using gradle to run tests
All commands should be run from airbyte project root.
To run unit tests:
```
./gradlew :airbyte-integrations:connectors:source-tidb:unitTest
```
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:source-tidb:integrationTest
```

## Dependency Management

### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
36 changes: 36 additions & 0 deletions airbyte-integrations/connectors/source-tidb/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.source.tidb.TiDBSource'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
}

dependencies {
implementation project(':airbyte-db:lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:connectors:source-jdbc')
implementation project(':airbyte-integrations:connectors:source-relational-db')

//TODO Add jdbc driver import here. Ex: implementation 'com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre14'
implementation 'mysql:mysql-connector-java:8.0.22'

// Add testcontainers and use GenericContainer for TiDB
implementation "org.testcontainers:testcontainers:1.16.3"

testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))

testImplementation 'org.apache.commons:commons-lang3:3.11'

integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-tidb')
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')

integrationTestJavaImplementation "org.testcontainers:testcontainers:1.16.3"

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.tidb;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.mysql.cj.MysqlType;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.NoOpJdbcStreamingQueryConfiguration;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.ssh.SshWrappedSource;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class TiDBSource extends AbstractJdbcSource<MysqlType> implements Source {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the implementation of mysql-strict-encryt connector. So far I didn't find any change from Mysql that is specifically applied to TiDB.


private static final Logger LOGGER = LoggerFactory.getLogger(TiDBSource.class);

static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
public static final List<String> SSL_PARAMETERS = List.of(
"useSSL=true",
"requireSSL=true",
"verifyServerCertificate=false");

public static Source sshWrappedSource() {
return new SshWrappedSource(new TiDBSource(), List.of("host"), List.of("port"));
}

public TiDBSource() {
super(DRIVER_CLASS, new NoOpJdbcStreamingQueryConfiguration(), new TiDBSourceOperations());
}

@Override
public JsonNode toDatabaseConfig(final JsonNode config) {
final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()));

if (config.get("jdbc_url_params") != null
&& !config.get("jdbc_url_params").asText().isEmpty()) {
jdbcUrl.append("&").append(config.get("jdbc_url_params").asText());
}

// only if config ssl and ssl == true, use ssl to connect db
if (config.has("ssl") && config.get("ssl").asBoolean()) {
jdbcUrl.append("&").append(String.join("&", SSL_PARAMETERS));
}

final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put("username", config.get("username").asText())
.put("jdbc_url", jdbcUrl.toString());

if (config.has("password")) {
configBuilder.put("password", config.get("password").asText());
}

return Jsons.jsonNode(configBuilder.build());
}

@Override
public Set<String> getExcludedInternalNameSpaces() {
return Set.of(
"information_schema",
"metrics_schema",
"performance_schema",
"mysql");
}
Comment on lines +69 to +74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The internal of TiDB is exactly as MySQL ones?

Copy link
Contributor

@zhangyangyu zhangyangyu Mar 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. TiDB is MySQL compatible but it's implemented from scratch, in golang and rust. So at least for these internal databases, it's definitely not 100% compatible. It doesn't support SYS database https://docs.pingcap.com/tidb/stable/mysql-compatibility#unsupported-features and has its own METRICS_SCHEMA database https://docs.pingcap.com/tidb/stable/metrics-schema#metrics-schema.


public static void main(final String[] args) throws Exception {
final Source source = TiDBSource.sshWrappedSource();
LOGGER.info("starting source: {}", TiDBSource.class);
new IntegrationRunner(source).run(args);
LOGGER.info("completed source: {}", TiDBSource.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package io.airbyte.integrations.source.tidb;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.mysql.cj.MysqlType;
import com.mysql.cj.jdbc.result.ResultSetMetaData;
import com.mysql.cj.result.Field;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.SourceOperations;
import io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations;
import io.airbyte.protocol.models.JsonSchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import static io.airbyte.db.jdbc.JdbcConstants.*;

public class TiDBSourceOperations extends AbstractJdbcCompatibleSourceOperations<MysqlType> implements SourceOperations<ResultSet, MysqlType> {

private static final Logger LOGGER = LoggerFactory.getLogger(TiDBSourceOperations.class);

@Override
public void setJsonField(ResultSet resultSet, int colIndex, ObjectNode json) throws SQLException {
final ResultSetMetaData metaData = (ResultSetMetaData) resultSet.getMetaData();
final Field field = metaData.getFields()[colIndex - 1];
final String columnName = field.getName();
final MysqlType columnType = field.getMysqlType();

switch (columnType) {
case BIT -> {
if (field.getLength() == 1L) {
// BIT(1) is boolean
putBoolean(json, columnName, resultSet, colIndex);
} else {
putBinary(json, columnName, resultSet, colIndex);
}
}
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
case TINYINT, TINYINT_UNSIGNED -> {
if (field.getLength() == 1L) {
// TINYINT(1) is boolean
putBoolean(json, columnName, resultSet, colIndex);
} else {
putShortInt(json, columnName, resultSet, colIndex);
}
}
case SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED -> putInteger(json, columnName, resultSet, colIndex);
case INT, INT_UNSIGNED -> {
if (field.isUnsigned()) {
putBigInt(json, columnName, resultSet, colIndex);
} else {
putInteger(json, columnName, resultSet, colIndex);
}
}
case BIGINT, BIGINT_UNSIGNED -> putBigInt(json, columnName, resultSet, colIndex);
case FLOAT, FLOAT_UNSIGNED -> putFloat(json, columnName, resultSet, colIndex);
case DOUBLE, DOUBLE_UNSIGNED -> putDouble(json, columnName, resultSet, colIndex);
case DECIMAL, DECIMAL_UNSIGNED -> putBigDecimal(json, columnName, resultSet, colIndex);
case DATE -> putDate(json, columnName, resultSet, colIndex);
case DATETIME, TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex);
case TIME -> putTime(json, columnName, resultSet, colIndex);
case YEAR -> {
final String year = resultSet.getDate(colIndex).toString().split("-")[0];
json.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> year));
}
case CHAR, VARCHAR -> {
if (field.isBinary()) {
// when character set is binary, the returned value is binary
putBinary(json, columnName, resultSet, colIndex);
} else {
putString(json, columnName, resultSet, colIndex);
}
}
case TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY, GEOMETRY -> putBinary(json, columnName, resultSet, colIndex);
case TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, JSON, ENUM, SET -> putString(json, columnName, resultSet, colIndex);
case NULL -> json.set(columnName, NullNode.instance);
default -> putDefault(json, columnName, resultSet, colIndex);
}
}

@Override
public void setStatementField(PreparedStatement preparedStatement, int parameterIndex, MysqlType cursorFieldType, String value)
throws SQLException {
switch (cursorFieldType) {
case BIT -> setBit(preparedStatement, parameterIndex, value);
case BOOLEAN -> setBoolean(preparedStatement, parameterIndex, value);
case TINYINT, TINYINT_UNSIGNED, SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED -> setInteger(preparedStatement, parameterIndex,
value);
case INT, INT_UNSIGNED, BIGINT, BIGINT_UNSIGNED -> setBigInteger(preparedStatement, parameterIndex, value);
case FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED -> setDouble(preparedStatement, parameterIndex, value);
case DECIMAL, DECIMAL_UNSIGNED -> setDecimal(preparedStatement, parameterIndex, value);
case DATE -> setDate(preparedStatement, parameterIndex, value);
case DATETIME, TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value);
case TIME -> setTime(preparedStatement, parameterIndex, value);
case YEAR, CHAR, VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, ENUM, SET -> setString(preparedStatement, parameterIndex, value);
case TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY -> setBinary(preparedStatement, parameterIndex, value);
// since cursor are expected to be comparable, handle cursor typing strictly and error on
// unrecognized types
default -> throw new IllegalArgumentException(String.format("%s is not supported.", cursorFieldType));
}
}

@Override
public MysqlType getFieldType(JsonNode field) {
try {
final MysqlType literalType = MysqlType.getByName(field.get(INTERNAL_COLUMN_TYPE_NAME).asText());
final int columnSize = field.get(INTERNAL_COLUMN_SIZE).asInt();

switch (literalType) {
// BIT(1) and TINYINT(1) are interpreted as boolean
case BIT, TINYINT, TINYINT_UNSIGNED -> {
if (columnSize == 1) {
return MysqlType.BOOLEAN;
}
}
// When CHAR[N] and VARCHAR[N] columns have binary character set, the returned
// types are BINARY[N] and VARBINARY[N], respectively. So we don't need to
// convert them here. This is verified in MySqlSourceDatatypeTest.
}

return literalType;
} catch (final IllegalArgumentException ex) {
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s (type name: %s). Casting to VARCHAR.",
field.get(INTERNAL_COLUMN_NAME),
field.get(INTERNAL_SCHEMA_NAME),
field.get(INTERNAL_TABLE_NAME),
field.get(INTERNAL_COLUMN_TYPE),
field.get(INTERNAL_COLUMN_TYPE_NAME)));
return MysqlType.VARCHAR;
}
}

@Override
public JsonSchemaType getJsonType(MysqlType mysqlType) {
return switch (mysqlType) {
case
// TINYINT(1) is boolean, but it should have been converted to MysqlType.BOOLEAN in {@link
// getFieldType}
TINYINT, TINYINT_UNSIGNED, SMALLINT, SMALLINT_UNSIGNED, INT, INT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED, BIGINT, BIGINT_UNSIGNED, FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED, DECIMAL, DECIMAL_UNSIGNED -> JsonSchemaType.NUMBER;
case BOOLEAN -> JsonSchemaType.BOOLEAN;
case NULL -> JsonSchemaType.NULL;
// BIT(1) is boolean, but it should have been converted to MysqlType.BOOLEAN in {@link getFieldType}
case BIT, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY, GEOMETRY -> JsonSchemaType.STRING_BASE_64;
default -> JsonSchemaType.STRING;
};
}
}
Loading