Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加: #2101

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion core/src/main/bin/datax.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def isWindows():
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (
DATAX_HOME, LOGBACK_FILE)
ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (

ENGINE_COMMAND = "${javaHome}java -server ${jvm} %s -classpath %s ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
DEFAULT_PROPERTY_CONF, CLASS_PATH)
REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"

Expand Down Expand Up @@ -207,6 +208,11 @@ def buildStartCommand(options, args):
commandMap["jvm"] = tempJVMCommand
commandMap["params"] = jobParams
commandMap["job"] = jobResource
jh = os.getenv("JAVA_HOME", "")
if jh:
commandMap["javaHome"] = jh + "/bin/"
else:
commandMap["javaHome"] = ""

return Template(ENGINE_COMMAND).substitute(**commandMap)

Expand Down
86 changes: 86 additions & 0 deletions gaussdwswriter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>gaussdwswriter</artifactId>
<name>gaussdwswriter</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
</dependency>

<dependency>
<groupId>com.huaweicloud.dws</groupId>
<artifactId>huaweicloud-dws-jdbc</artifactId>
<version>8.1.0</version>
</dependency>

</dependencies>

<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
35 changes: 35 additions & 0 deletions gaussdwswriter/src/main/assembly/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/gaussdwswriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>gaussdwswriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/gaussdwswriter</outputDirectory>
</fileSet>
</fileSets>

<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/gaussdwswriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.alibaba.datax.plugin.reader.gaussdwswriter;

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import org.apache.commons.lang3.StringUtils;

import java.util.List;

public class GaussDwsWriter extends Writer {

private static final DataBaseType DATABASE_TYPE = DataBaseType.GaussDWS;

public static class Job extends Writer.Job {
private Configuration originalConfig = null;
private CommonRdbmsWriter.Job commonRdbmsWriterMaster;

@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
// warn:GaussDWS only support insert and ignore mode
String writeMode = this.originalConfig.getString(Key.WRITE_MODE);
String finalWriteMode = StringUtils.defaultString(writeMode).trim().toLowerCase();
if (!finalWriteMode.isEmpty() && !finalWriteMode.startsWith("ignore")) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
String.format("写入模式(writeMode)配置有误. 因为GaussDB不支持配置参数项 writeMode: %s, GaussDB仅使用insert sql 插入数据. 请检查您的配置并作出修改.", writeMode));
}

this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterMaster.init(this.originalConfig);
}

@Override
public void prepare() {
this.commonRdbmsWriterMaster.prepare(this.originalConfig);
}

@Override
public List<Configuration> split(int mandatoryNumber) {
return this.commonRdbmsWriterMaster.split(this.originalConfig, mandatoryNumber);
}

@Override
public void post() {
this.commonRdbmsWriterMaster.post(this.originalConfig);
}

@Override
public void destroy() {
this.commonRdbmsWriterMaster.destroy(this.originalConfig);
}

}

public static class Task extends Writer.Task {
private Configuration writerSliceConfig;
private CommonRdbmsWriter.Task commonRdbmsWriterSlave;

@Override
public void init() {
this.writerSliceConfig = super.getPluginJobConf();
this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE) {
@Override
public String calcValueHolder(String columnType) {
if ("serial".equalsIgnoreCase(columnType)) {
return "?::int";
} else if ("bigserial".equalsIgnoreCase(columnType)) {
return "?::int8";
} else if ("bit".equalsIgnoreCase(columnType)) {
return "?::bit varying";
}
return "?::" + columnType;
}
};
this.commonRdbmsWriterSlave.init(this.writerSliceConfig);
}

@Override
public void prepare() {
this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig);
}

public void startWrite(RecordReceiver recordReceiver) {
this.commonRdbmsWriterSlave.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector());
}

@Override
public void post() {
this.commonRdbmsWriterSlave.post(this.writerSliceConfig);
}

@Override
public void destroy() {
this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);
}

}

}
6 changes: 6 additions & 0 deletions gaussdwswriter/src/main/resources/plugin.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"name": "gaussdwswriter",
"class": "com.alibaba.datax.plugin.reader.gaussdwswriter.GaussDwsWriter",
"description": "useScene: prod. mechanism: Jdbc connection using the database, execute insert sql. warn: The more you know about the database, the less problems you encounter.",
"developer": "alibaba"
}
16 changes: 16 additions & 0 deletions gaussdwswriter/src/main/resources/plugin_job_template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "gaussdwswriter",
"parameter": {
"username": "",
"password": "",
"column": [],
"connection": [
{
"jdbcUrl": "",
"table": []
}
],
"preSql": [],
"postSql": []
}
}
7 changes: 7 additions & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -532,5 +532,12 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>gaussdwswriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
</fileSets>
</assembly>
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public enum DataBaseType {
StarRocks("starrocks", "com.mysql.jdbc.Driver"),
Sybase("sybase", "com.sybase.jdbc4.jdbc.SybDriver"),
GaussDB("gaussdb", "org.opengauss.Driver"),
GaussDWS("gaussdws", "org.postgresql.Driver"),
Databend("databend", "com.databend.jdbc.DatabendDriver"),
Doris("doris","com.mysql.jdbc.Driver");

Expand Down Expand Up @@ -143,6 +144,8 @@ public String appendJDBCSuffixForWriter(String jdbc) {
break;
case GaussDB:
break;
case GaussDWS:
break;
default:
throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type.");
}
Expand Down Expand Up @@ -171,6 +174,7 @@ public String formatPk(String splitPk) {
case Oscar:
break;
case GaussDB:
case GaussDWS:
break;
default:
throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type.");
Expand Down Expand Up @@ -198,6 +202,7 @@ public String quoteColumnName(String columnName) {
case Oscar:
break;
case GaussDB:
case GaussDWS:
break;
default:
throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type");
Expand Down Expand Up @@ -226,6 +231,7 @@ public String quoteTableName(String tableName) {
case Oscar:
break;
case GaussDB:
case GaussDWS:
break;
default:
throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type");
Expand Down
Loading