Skip to content

Commit

Permalink
source-tidb: adopt CDK 0.20.4 (#35218)
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar authored and xiaohansong committed Feb 27, 2024
1 parent aad95d6 commit dc7400a
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 145 deletions.
25 changes: 4 additions & 21 deletions airbyte-integrations/connectors/source-tidb/build.gradle
Original file line number Diff line number Diff line change
@@ -1,39 +1,22 @@
plugins {
id 'application'
id 'airbyte-java-connector'
}

airbyteJavaConnector {
cdkVersionRequired = '0.13.2'
cdkVersionRequired = '0.20.4'
features = ['db-sources']
useLocalCdk = false
}

//remove once upgrading the CDK version to 0.4.x or later
java {
compileTestJava {
options.compilerArgs.remove("-Werror")
}
compileJava {
options.compilerArgs.remove("-Werror")
}
}

airbyteJavaConnector.addCdkDependencies()

application {
mainClass = 'io.airbyte.integrations.source.tidb.TiDBSource'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
}

dependencies {
implementation 'mysql:mysql-connector-java:8.0.33'

//TODO Add jdbc driver import here. Ex: implementation 'com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre14'
implementation 'mysql:mysql-connector-java:8.0.22'

testImplementation libs.testcontainers.tidb.source

testImplementation 'org.apache.commons:commons-lang3:3.11'
testFixturesApi 'org.testcontainers:tidb:1.19.4'

integrationTestJavaImplementation libs.testcontainers.tidb.source
testImplementation 'org.hamcrest:hamcrest-all:1.3'
}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-tidb/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 0dad1a35-ccf8-4d03-b73e-6788c00b13ae
dockerImageTag: 0.3.1
dockerImageTag: 0.3.2
dockerRepository: airbyte/source-tidb
githubIssueLabel: source-tidb
icon: tidb.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,10 @@
package io.airbyte.integrations.source.tidb;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.cdk.db.Database;
import io.airbyte.cdk.db.factory.DSLContextFactory;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.ssh.SshHelpers;
import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.cdk.integrations.util.HostPortResolver;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
Expand All @@ -25,58 +19,31 @@
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.HashMap;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Disabled;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.tidb.TiDBContainer;

@Disabled
public class TiDBSourceAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "public.starships";

protected GenericContainer container;
protected JsonNode config;
protected TiDBContainer container = TiDBTestDatabase.container();
protected TiDBTestDatabase testdb;

@Override
protected void setupEnvironment(final TestDestinationEnv testEnv) throws Exception {
container = new GenericContainer(DockerImageName.parse("pingcap/tidb:nightly"))
.withExposedPorts(4000);
container.start();

config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, HostPortResolver.resolveHost(container))
.put(JdbcUtils.PORT_KEY, HostPortResolver.resolvePort(container))
.put(JdbcUtils.USERNAME_KEY, "root")
.put(JdbcUtils.DATABASE_KEY, "test")
.build());

try (final DSLContext dslContext = DSLContextFactory.create(
config.get(JdbcUtils.USERNAME_KEY).asText(),
"",
DatabaseDriver.MYSQL.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
container.getHost(),
container.getFirstMappedPort(),
config.get(JdbcUtils.DATABASE_KEY).asText()),
SQLDialect.MYSQL)) {
final Database database = new Database(dslContext);

database.query(ctx -> {
ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));");
ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');");
ctx.fetch("CREATE TABLE starships(id INTEGER, name VARCHAR(200));");
ctx.fetch("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');");
return null;
});
}
testdb = new TiDBTestDatabase(container)
.with("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));")
.with("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');")
.with("CREATE TABLE starships(id INTEGER, name VARCHAR(200));")
.with("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');");
}

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
container.close();
testdb.with("DROP TABLE id_and_name;")
.with("DROP TABLE starships;");
}

@Override
Expand All @@ -91,7 +58,7 @@ protected ConnectorSpecification getSpec() throws Exception {

@Override
protected JsonNode getConfig() {
return config;
return testdb.integrationTestConfigBuilder().build();
}

@Override
Expand All @@ -102,7 +69,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s.%s", config.get(JdbcUtils.DATABASE_KEY).asText(), STREAM_NAME),
String.format("%s.%s", testdb.getDatabaseName(), STREAM_NAME),
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
Expand All @@ -111,7 +78,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s.%s", config.get(JdbcUtils.DATABASE_KEY).asText(), STREAM_NAME2),
String.format("%s.%s", testdb.getDatabaseName(), STREAM_NAME2),
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.commons.json.Jsons;
import org.junit.jupiter.api.Disabled;
import org.testcontainers.tidb.TiDBContainer;
import org.testcontainers.utility.DockerImageName;

@Disabled
class TiDBJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest<TiDBSource, TiDBTestDatabase> {

private final TiDBContainer container = TiDBTestDatabase.container();

@Override
public boolean supportsSchemas() {
return false;
}

@Override
public JsonNode config() {
return Jsons.clone(testdb.configBuilder().build());
return Jsons.clone(testdb.testConfigBuilder().build());
}

@Override
Expand All @@ -31,9 +30,6 @@ protected TiDBSource source() {

@Override
protected TiDBTestDatabase createTestDatabase() {
final TiDBContainer container = new TiDBContainer(DockerImageName.parse("pingcap/tidb:nightly"))
.withExposedPorts(4000);
container.start();
return new TiDBTestDatabase(container).initialized();
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,38 @@
package io.airbyte.integrations.source.tidb;

import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.testutils.ContainerFactory;
import io.airbyte.cdk.testutils.TestDatabase;
import java.util.stream.Stream;
import org.jooq.SQLDialect;
import org.testcontainers.tidb.TiDBContainer;
import org.testcontainers.utility.DockerImageName;

public class TiDBTestDatabase extends
TestDatabase<TiDBContainer, TiDBTestDatabase, TiDBTestDatabase.TiDBDbConfigBuilder> {

protected static final String USER = "root";
protected static final String DATABASE = "test";
private final TiDBContainer container;
TestDatabase<TiDBContainer, TiDBTestDatabase, TiDBTestDatabase.TiDBConfigBuilder> {

protected TiDBTestDatabase(final TiDBContainer container) {
super(container);
this.container = container;
}

@Override
public String getJdbcUrl() {
return container.getJdbcUrl();
public String withNamespace(String name) {
return name;
}

@Override
public String getDatabaseName() {
return DATABASE;
return getContainer().getDatabaseName();
}

@Override
public String getUserName() {
return container.getUsername();
return getContainer().getUsername();
}

@Override
public String getPassword() {
return container.getPassword();
return getContainer().getPassword();
}

@Override
Expand All @@ -64,25 +60,28 @@ public SQLDialect getSqlDialect() {
}

@Override
public void close() {
container.close();
}

@Override
public TiDBDbConfigBuilder configBuilder() {
return new TiDBDbConfigBuilder(this)
.with(JdbcUtils.HOST_KEY, "127.0.0.1")
.with(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
.with(JdbcUtils.USERNAME_KEY, USER)
.with(JdbcUtils.DATABASE_KEY, DATABASE);
public TiDBConfigBuilder configBuilder() {
return new TiDBConfigBuilder(this);
}

static public class TiDBDbConfigBuilder extends TestDatabase.ConfigBuilder<TiDBTestDatabase, TiDBDbConfigBuilder> {
static public class TiDBConfigBuilder extends ConfigBuilder<TiDBTestDatabase, TiDBConfigBuilder> {

protected TiDBDbConfigBuilder(final TiDBTestDatabase testdb) {
protected TiDBConfigBuilder(final TiDBTestDatabase testdb) {
super(testdb);
}

}

static public TiDBContainer container() {
var factory = new ContainerFactory<TiDBContainer>() {

@Override
protected TiDBContainer createNewContainer(DockerImageName dockerImageName) {
return new TiDBContainer(dockerImageName).withExposedPorts(4000);
}

};
return factory.exclusive("pingcap/tidb:nightly");
}

}
1 change: 1 addition & 0 deletions docs/integrations/sources/tidb.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ Now that you have set up the TiDB source connector, check out the following TiDB

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :----------- |-------------------------------------------------------------------------------------------------------------------------------------------|
| 0.3.2 | 2024-02-13 | [35218](https://github.com/airbytehq/airbyte/pull/35218) | Adopt CDK 0.20.4 |
| 0.3.1 | 2024-01-24 | [34453](https://github.com/airbytehq/airbyte/pull/34453) | bump CDK version |
| 0.3.0 | 2023-12-18 | [33485](https://github.com/airbytehq/airbyte/pull/33485) | Remove LEGACY state |
| 0.2.5 | 2023-06-20 | [27212](https://github.com/airbytehq/airbyte/pull/27212) | Fix silent exception swallowing in StreamingJdbcDatabase |
Expand Down

0 comments on commit dc7400a

Please sign in to comment.