-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 New source: TiDB #11283
🎉 New source: TiDB #11283
Changes from 11 commits
430df4e
cceec7f
b05d5d5
e138f8b
23ccb9c
3bb36ef
4b65720
aa4109e
b7f78fb
605158d
c22969a
16b3746
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
FROM airbyte/integration-base-java:dev AS build | ||
|
||
WORKDIR /airbyte | ||
|
||
ENV APPLICATION source-tidb | ||
|
||
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar | ||
|
||
RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar | ||
|
||
FROM airbyte/integration-base-java:dev | ||
|
||
WORKDIR /airbyte | ||
|
||
ENV APPLICATION source-tidb | ||
|
||
COPY --from=build /airbyte /airbyte | ||
|
||
# Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile. | ||
LABEL io.airbyte.version=0.1.0 | ||
LABEL io.airbyte.name=airbyte/source-tidb |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
# Source Tidb | ||
|
||
This is the repository for the Tidb source connector in Java. | ||
For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/sources/tidb). | ||
|
||
## Local development | ||
|
||
#### Building via Gradle | ||
From the Airbyte repository root, run: | ||
``` | ||
./gradlew :airbyte-integrations:connectors:source-tidb:build | ||
``` | ||
|
||
#### Create credentials | ||
**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`. | ||
Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information. | ||
|
||
**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials. | ||
|
||
### Locally running the connector docker image | ||
|
||
#### Build | ||
Build the connector image via Gradle: | ||
``` | ||
./gradlew :airbyte-integrations:connectors:source-tidb:airbyteDocker | ||
``` | ||
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in | ||
the Dockerfile. | ||
|
||
#### Run | ||
Then run any of the connector commands as follows: | ||
``` | ||
docker run --rm airbyte/source-tidb:dev spec | ||
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-tidb:dev check --config /secrets/config.json | ||
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-tidb:dev discover --config /secrets/config.json | ||
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-tidb:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json | ||
``` | ||
|
||
## Testing | ||
We use `JUnit` for Java tests. | ||
|
||
### Unit and Integration Tests | ||
Place unit tests under `src/test/...` | ||
Place integration tests in `src/test-integration/...` | ||
|
||
#### Acceptance Tests | ||
Airbyte has a standard test suite that all source connectors must pass. Implement the `TODO`s in | ||
`src/test-integration/java/io/airbyte/integrations/sources/TiDBSourceAcceptanceTest.java`. | ||
|
||
### Using gradle to run tests | ||
All commands should be run from airbyte project root. | ||
To run unit tests: | ||
``` | ||
./gradlew :airbyte-integrations:connectors:source-tidb:unitTest | ||
``` | ||
To run acceptance and custom integration tests: | ||
``` | ||
./gradlew :airbyte-integrations:connectors:source-tidb:integrationTest | ||
``` | ||
|
||
## Dependency Management | ||
|
||
### Publishing a new version of the connector | ||
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? | ||
1. Make sure your changes are passing unit and integration tests. | ||
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). | ||
1. Create a Pull Request. | ||
1. Pat yourself on the back for being an awesome contributor. | ||
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
plugins { | ||
id 'application' | ||
id 'airbyte-docker' | ||
id 'airbyte-integration-test-java' | ||
} | ||
|
||
application { | ||
mainClass = 'io.airbyte.integrations.source.tidb.TiDBSource' | ||
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] | ||
} | ||
|
||
dependencies { | ||
implementation project(':airbyte-db:lib') | ||
implementation project(':airbyte-integrations:bases:base-java') | ||
implementation project(':airbyte-protocol:models') | ||
implementation project(':airbyte-integrations:connectors:source-jdbc') | ||
implementation project(':airbyte-integrations:connectors:source-relational-db') | ||
|
||
//TODO Add jdbc driver import here. Ex: implementation 'com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre14' | ||
implementation 'mysql:mysql-connector-java:8.0.22' | ||
|
||
// Add testcontainers and use GenericContainer for TiDB | ||
implementation "org.testcontainers:testcontainers:1.16.3" | ||
|
||
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) | ||
|
||
testImplementation 'org.apache.commons:commons-lang3:3.11' | ||
|
||
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-tidb') | ||
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test') | ||
|
||
integrationTestJavaImplementation "org.testcontainers:testcontainers:1.16.3" | ||
|
||
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) | ||
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.source.tidb; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.google.common.collect.ImmutableMap; | ||
import com.mysql.cj.MysqlType; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.db.jdbc.NoOpJdbcStreamingQueryConfiguration; | ||
import io.airbyte.integrations.base.IntegrationRunner; | ||
import io.airbyte.integrations.base.Source; | ||
import io.airbyte.integrations.base.ssh.SshWrappedSource; | ||
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; | ||
import java.util.List; | ||
import java.util.Set; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class TiDBSource extends AbstractJdbcSource<MysqlType> implements Source { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(TiDBSource.class); | ||
|
||
static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; | ||
public static final List<String> SSL_PARAMETERS = List.of( | ||
"useSSL=true", | ||
"requireSSL=true", | ||
"verifyServerCertificate=false"); | ||
|
||
public static Source sshWrappedSource() { | ||
return new SshWrappedSource(new TiDBSource(), List.of("host"), List.of("port")); | ||
} | ||
|
||
public TiDBSource() { | ||
super(DRIVER_CLASS, new NoOpJdbcStreamingQueryConfiguration(), new TiDBSourceOperations()); | ||
} | ||
|
||
@Override | ||
public JsonNode toDatabaseConfig(final JsonNode config) { | ||
final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s", | ||
config.get("host").asText(), | ||
config.get("port").asText(), | ||
config.get("database").asText())); | ||
|
||
if (config.get("jdbc_url_params") != null | ||
&& !config.get("jdbc_url_params").asText().isEmpty()) { | ||
jdbcUrl.append("&").append(config.get("jdbc_url_params").asText()); | ||
} | ||
|
||
// only if config ssl and ssl == true, use ssl to connect db | ||
if (config.has("ssl") && config.get("ssl").asBoolean()) { | ||
jdbcUrl.append("&").append(String.join("&", SSL_PARAMETERS)); | ||
} | ||
|
||
final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder() | ||
.put("username", config.get("username").asText()) | ||
.put("jdbc_url", jdbcUrl.toString()); | ||
|
||
if (config.has("password")) { | ||
configBuilder.put("password", config.get("password").asText()); | ||
} | ||
|
||
return Jsons.jsonNode(configBuilder.build()); | ||
} | ||
|
||
@Override | ||
public Set<String> getExcludedInternalNameSpaces() { | ||
return Set.of( | ||
"information_schema", | ||
"metrics_schema", | ||
"performance_schema", | ||
"mysql"); | ||
} | ||
Comment on lines
+69
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The internal of TiDB is exactly as MySQL ones? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. TiDB is MySQL compatible but it's implemented from scratch, in golang and rust. So at least for these internal databases, it's definitely not 100% compatible. It doesn't support |
||
|
||
public static void main(final String[] args) throws Exception { | ||
final Source source = TiDBSource.sshWrappedSource(); | ||
LOGGER.info("starting source: {}", TiDBSource.class); | ||
new IntegrationRunner(source).run(args); | ||
LOGGER.info("completed source: {}", TiDBSource.class); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.source.tidb; | ||
|
||
import static io.airbyte.db.jdbc.JdbcConstants.*; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.node.NullNode; | ||
import com.fasterxml.jackson.databind.node.ObjectNode; | ||
import com.mysql.cj.MysqlType; | ||
import com.mysql.cj.jdbc.result.ResultSetMetaData; | ||
import com.mysql.cj.result.Field; | ||
import io.airbyte.db.DataTypeUtils; | ||
import io.airbyte.db.SourceOperations; | ||
import io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations; | ||
import io.airbyte.protocol.models.JsonSchemaType; | ||
import java.sql.PreparedStatement; | ||
import java.sql.ResultSet; | ||
import java.sql.SQLException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class TiDBSourceOperations extends AbstractJdbcCompatibleSourceOperations<MysqlType> implements SourceOperations<ResultSet, MysqlType> { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(TiDBSourceOperations.class); | ||
|
||
@Override | ||
public void setJsonField(ResultSet resultSet, int colIndex, ObjectNode json) throws SQLException { | ||
final ResultSetMetaData metaData = (ResultSetMetaData) resultSet.getMetaData(); | ||
final Field field = metaData.getFields()[colIndex - 1]; | ||
final String columnName = field.getName(); | ||
final MysqlType columnType = field.getMysqlType(); | ||
|
||
switch (columnType) { | ||
case BIT -> { | ||
if (field.getLength() == 1L) { | ||
// BIT(1) is boolean | ||
putBoolean(json, columnName, resultSet, colIndex); | ||
} else { | ||
putBinary(json, columnName, resultSet, colIndex); | ||
} | ||
} | ||
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); | ||
case TINYINT, TINYINT_UNSIGNED -> { | ||
if (field.getLength() == 1L) { | ||
// TINYINT(1) is boolean | ||
putBoolean(json, columnName, resultSet, colIndex); | ||
} else { | ||
putShortInt(json, columnName, resultSet, colIndex); | ||
} | ||
} | ||
case SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED -> putInteger(json, columnName, resultSet, colIndex); | ||
case INT, INT_UNSIGNED -> { | ||
if (field.isUnsigned()) { | ||
putBigInt(json, columnName, resultSet, colIndex); | ||
} else { | ||
putInteger(json, columnName, resultSet, colIndex); | ||
} | ||
} | ||
case BIGINT, BIGINT_UNSIGNED -> putBigInt(json, columnName, resultSet, colIndex); | ||
case FLOAT, FLOAT_UNSIGNED -> putFloat(json, columnName, resultSet, colIndex); | ||
case DOUBLE, DOUBLE_UNSIGNED -> putDouble(json, columnName, resultSet, colIndex); | ||
case DECIMAL, DECIMAL_UNSIGNED -> putBigDecimal(json, columnName, resultSet, colIndex); | ||
case DATE -> putDate(json, columnName, resultSet, colIndex); | ||
case DATETIME, TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex); | ||
case TIME -> putTime(json, columnName, resultSet, colIndex); | ||
case YEAR -> { | ||
final String year = resultSet.getDate(colIndex).toString().split("-")[0]; | ||
json.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> year)); | ||
} | ||
case CHAR, VARCHAR -> { | ||
if (field.isBinary()) { | ||
// when character set is binary, the returned value is binary | ||
putBinary(json, columnName, resultSet, colIndex); | ||
} else { | ||
putString(json, columnName, resultSet, colIndex); | ||
} | ||
} | ||
case TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY, GEOMETRY -> putBinary(json, columnName, resultSet, colIndex); | ||
case TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, JSON, ENUM, SET -> putString(json, columnName, resultSet, colIndex); | ||
case NULL -> json.set(columnName, NullNode.instance); | ||
default -> putDefault(json, columnName, resultSet, colIndex); | ||
} | ||
} | ||
|
||
@Override | ||
public void setStatementField(PreparedStatement preparedStatement, int parameterIndex, MysqlType cursorFieldType, String value) | ||
throws SQLException { | ||
switch (cursorFieldType) { | ||
case BIT -> setBit(preparedStatement, parameterIndex, value); | ||
case BOOLEAN -> setBoolean(preparedStatement, parameterIndex, value); | ||
case TINYINT, TINYINT_UNSIGNED, SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED -> setInteger(preparedStatement, parameterIndex, | ||
value); | ||
case INT, INT_UNSIGNED, BIGINT, BIGINT_UNSIGNED -> setBigInteger(preparedStatement, parameterIndex, value); | ||
case FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED -> setDouble(preparedStatement, parameterIndex, value); | ||
case DECIMAL, DECIMAL_UNSIGNED -> setDecimal(preparedStatement, parameterIndex, value); | ||
case DATE -> setDate(preparedStatement, parameterIndex, value); | ||
case DATETIME, TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value); | ||
case TIME -> setTime(preparedStatement, parameterIndex, value); | ||
case YEAR, CHAR, VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, ENUM, SET -> setString(preparedStatement, parameterIndex, value); | ||
case TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY -> setBinary(preparedStatement, parameterIndex, value); | ||
// since cursor are expected to be comparable, handle cursor typing strictly and error on | ||
// unrecognized types | ||
default -> throw new IllegalArgumentException(String.format("%s is not supported.", cursorFieldType)); | ||
} | ||
} | ||
|
||
@Override | ||
public MysqlType getFieldType(JsonNode field) { | ||
try { | ||
final MysqlType literalType = MysqlType.getByName(field.get(INTERNAL_COLUMN_TYPE_NAME).asText()); | ||
final int columnSize = field.get(INTERNAL_COLUMN_SIZE).asInt(); | ||
|
||
switch (literalType) { | ||
// BIT(1) and TINYINT(1) are interpreted as boolean | ||
case BIT, TINYINT, TINYINT_UNSIGNED -> { | ||
if (columnSize == 1) { | ||
return MysqlType.BOOLEAN; | ||
} | ||
} | ||
// When CHAR[N] and VARCHAR[N] columns have binary character set, the returned | ||
// types are BINARY[N] and VARBINARY[N], respectively. So we don't need to | ||
// convert them here. This is verified in MySqlSourceDatatypeTest. | ||
} | ||
|
||
return literalType; | ||
} catch (final IllegalArgumentException ex) { | ||
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s (type name: %s). Casting to VARCHAR.", | ||
field.get(INTERNAL_COLUMN_NAME), | ||
field.get(INTERNAL_SCHEMA_NAME), | ||
field.get(INTERNAL_TABLE_NAME), | ||
field.get(INTERNAL_COLUMN_TYPE), | ||
field.get(INTERNAL_COLUMN_TYPE_NAME))); | ||
return MysqlType.VARCHAR; | ||
} | ||
} | ||
|
||
@Override | ||
public JsonSchemaType getJsonType(MysqlType mysqlType) { | ||
return switch (mysqlType) { | ||
case | ||
// TINYINT(1) is boolean, but it should have been converted to MysqlType.BOOLEAN in {@link | ||
// getFieldType} | ||
TINYINT, TINYINT_UNSIGNED, SMALLINT, SMALLINT_UNSIGNED, INT, INT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED, BIGINT, BIGINT_UNSIGNED, FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED, DECIMAL, DECIMAL_UNSIGNED -> JsonSchemaType.NUMBER; | ||
case BOOLEAN -> JsonSchemaType.BOOLEAN; | ||
case NULL -> JsonSchemaType.NULL; | ||
// BIT(1) is boolean, but it should have been converted to MysqlType.BOOLEAN in {@link getFieldType} | ||
case BIT, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY, GEOMETRY -> JsonSchemaType.STRING_BASE_64; | ||
default -> JsonSchemaType.STRING; | ||
}; | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the implementation of
mysql-strict-encryt
connector. So far I didn't find any change from Mysql that is specifically applied to TiDB.