diff --git a/chunjun-connectors/chunjun-connector-opengauss/pom.xml b/chunjun-connectors/chunjun-connector-opengauss/pom.xml
new file mode 100644
index 0000000000..d110e479bd
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/pom.xml
@@ -0,0 +1,62 @@
+
+
+
+
+
+ chunjun-connectors
+ com.dtstack.chunjun
+ ${revision}
+
+ 4.0.0
+
+ chunjun-connector-opengauss
+ ChunJun : Connector : openGauss
+
+
+ opengauss
+
+
+
+
+ com.dtstack.chunjun
+ chunjun-connector-jdbc-base
+ ${project.version}
+
+
+
+
+
+ org.opengauss
+ opengauss-jdbc
+ 5.0.1
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+
+
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/OpengaussRawTypeMapper.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/OpengaussRawTypeMapper.java
new file mode 100644
index 0000000000..87ad6664e1
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/OpengaussRawTypeMapper.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter;
+
+import com.dtstack.chunjun.config.TypeConfig;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.BitType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.BoolType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.BpcharType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.ByteaType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.CharType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.DateType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.Float4Type;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.Float8Type;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.Int2Type;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.Int4Type;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.Int8Type;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.JsonType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.JsonbType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.MoneyType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.NameType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.NumericType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.OidType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.PointType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.TextType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.TimeType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.TimestampType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.TimestampTzType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.TimetzType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.UuidType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.VarbitType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.VarcharType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.XmlType;
+import com.dtstack.chunjun.throwable.UnsupportedTypeException;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+public class OpengaussRawTypeMapper {
+
+ /**
+ * https://docs-opengauss.osinfra.cn/zh/docs/5.0.0/docs/BriefTutorial/%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B.html
+ *
+ * @param type
+ */
+ public static DataType apply(TypeConfig type) {
+ switch (type.getType()) {
+ case "BIT":
+ return new AtomicDataType(new BitType(true, LogicalTypeRoot.BOOLEAN, false));
+ case "BOOLEAN":
+ case "BOOL":
+ return DataTypes.BOOLEAN();
+ case "SMALLINT":
+ case "SMALLSERIAL":
+ case "INT2":
+ case "INT":
+ case "INTEGER":
+ case "SERIAL":
+ case "INT4":
+ return DataTypes.INT();
+ case "BIGINT":
+ case "BIGSERIAL":
+ case "OID":
+ case "INT8":
+ return DataTypes.BIGINT();
+ case "REAL":
+ case "FLOAT4":
+ return DataTypes.FLOAT();
+ case "FLOAT":
+ case "DOUBLE PRECISION":
+ case "FLOAT8":
+ return DataTypes.DOUBLE();
+ case "MONEY":
+ return new AtomicDataType(new MoneyType(true, LogicalTypeRoot.DOUBLE, false));
+ case "DECIMAL":
+ case "NUMERIC":
+ return type.toDecimalDataType();
+ case "CHARACTER VARYING":
+ case "VARCHAR":
+ case "CHARACTER":
+ case "CHAR":
+ case "TEXT":
+ case "NAME":
+ case "BPCHAR":
+ return DataTypes.STRING();
+ // Binary Data Types
+ case "BYTEA":
+ return DataTypes.BYTES();
+ case "VARBIT":
+ return new AtomicDataType(new VarbitType(true, LogicalTypeRoot.VARCHAR, false));
+ case "XML":
+ return new AtomicDataType(new XmlType(true, LogicalTypeRoot.VARCHAR, false));
+ case "UUID":
+ return new AtomicDataType(new UuidType(true, LogicalTypeRoot.VARCHAR, false));
+ case "POINT":
+ return new AtomicDataType(new PointType(true, LogicalTypeRoot.VARCHAR, false));
+ // Date/Time Types
+ case "ABSTIME":
+ case "TIMESTAMP":
+ case "TIMESTAMPTZ":
+ return type.toTimestampDataType(6);
+ case "DATE":
+ return DataTypes.DATE();
+ case "TIME":
+ case "TIMETZ":
+ // todo check sync
+ return type.toTimeDataType(6);
+ case "JSON":
+ return new AtomicDataType(new JsonType(true, LogicalTypeRoot.VARCHAR, false));
+ case "JSONB":
+ return new AtomicDataType(new JsonbType(true, LogicalTypeRoot.VARCHAR, false));
+ case "_BIT":
+ return new AtomicDataType(new BitType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_BOOL":
+ return new AtomicDataType(new BoolType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_INT2":
+ return new AtomicDataType(new Int2Type(true, LogicalTypeRoot.VARCHAR, true));
+ case "_INT4":
+ return new AtomicDataType(new Int4Type(true, LogicalTypeRoot.VARCHAR, true));
+ case "_INT8":
+ return new AtomicDataType(new Int8Type(true, LogicalTypeRoot.VARCHAR, true));
+ case "_FLOAT4":
+ return new AtomicDataType(new Float4Type(true, LogicalTypeRoot.VARCHAR, true));
+ case "_FLOAT8":
+ return new AtomicDataType(new Float8Type(true, LogicalTypeRoot.VARCHAR, true));
+ case "_NUMERIC":
+ return new AtomicDataType(new NumericType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_TIME":
+ return new AtomicDataType(new TimeType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_TIMETZ":
+ return new AtomicDataType(new TimetzType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_TIMESTAMP":
+ return new AtomicDataType(new TimestampType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_TIMESTAMPTZ":
+ return new AtomicDataType(new TimestampTzType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_DATE":
+ return new AtomicDataType(new DateType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_BYTEA":
+ return new AtomicDataType(new ByteaType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_VARCHAR":
+ return new AtomicDataType(new VarcharType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_OID":
+ return new AtomicDataType(new OidType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_BPCHAR":
+ return new AtomicDataType(new BpcharType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_TEXT":
+ return new AtomicDataType(new TextType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_MONEY":
+ return new AtomicDataType(new MoneyType(true, LogicalTypeRoot.VARCHAR, true));
+ // case "_INTERVAL":
+ case "_CHAR":
+ return new AtomicDataType(new CharType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_VARBIT":
+ return new AtomicDataType(new VarbitType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_NAME":
+ return new AtomicDataType(new NameType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_UUID":
+ return new AtomicDataType(new UuidType(true, LogicalTypeRoot.VARCHAR, true));
+ case "_XML":
+ return new AtomicDataType(new XmlType(true, LogicalTypeRoot.VARCHAR, true));
+ // case "_POINT":
+ // case "_JSONB":
+ // case "_JSON":
+ // case "_REF_CURSOR":
+
+ // 以下类型无法支持
+ // Enumerated Types
+
+ // Geometric Types
+ // case "LINE":
+ // case "LSEG":
+ // case "BOX":
+ // case "PATH":
+ // case "POLYGON":
+ // case "CIRCLE":
+
+ // Network Address Types
+
+ //
+ // // JSON Types
+ // case "JSONB":
+ // case "JSONPATH":
+ // return DataTypes.STRING();
+
+ default:
+ throw new UnsupportedTypeException(type);
+ }
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/OpengaussSyncConverter.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/OpengaussSyncConverter.java
new file mode 100644
index 0000000000..fad14a7c46
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/OpengaussSyncConverter.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter;
+
+import com.dtstack.chunjun.config.CommonConfig;
+import com.dtstack.chunjun.connector.jdbc.converter.JdbcSyncConverter;
+import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.BitType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.JsonType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.JsonbType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.MoneyType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.PgCustomType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.PointType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.UuidType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.VarbitType;
+import com.dtstack.chunjun.connector.opengauss.converter.logical.XmlType;
+import com.dtstack.chunjun.converter.IDeserializationConverter;
+import com.dtstack.chunjun.converter.ISerializationConverter;
+import com.dtstack.chunjun.element.AbstractBaseColumn;
+import com.dtstack.chunjun.element.ColumnRowData;
+import com.dtstack.chunjun.element.column.BigDecimalColumn;
+import com.dtstack.chunjun.element.column.BooleanColumn;
+import com.dtstack.chunjun.element.column.BytesColumn;
+import com.dtstack.chunjun.element.column.DoubleColumn;
+import com.dtstack.chunjun.element.column.FloatColumn;
+import com.dtstack.chunjun.element.column.IntColumn;
+import com.dtstack.chunjun.element.column.LongColumn;
+import com.dtstack.chunjun.element.column.SqlDateColumn;
+import com.dtstack.chunjun.element.column.StringColumn;
+import com.dtstack.chunjun.element.column.TimeColumn;
+import com.dtstack.chunjun.element.column.TimestampColumn;
+import com.dtstack.chunjun.element.column.YearMonthColumn;
+import com.dtstack.chunjun.util.DateUtil;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+
+import org.opengauss.core.BaseConnection;
+import org.opengauss.jdbc.PgArray;
+import org.opengauss.jdbc.PgSQLXML;
+import org.opengauss.util.PGobject;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.UUID;
+
+public class OpengaussSyncConverter extends JdbcSyncConverter {
+
+ private static final long serialVersionUID = 7381732546960782333L;
+
+ private transient BaseConnection connection;
+
+ public OpengaussSyncConverter(RowType rowType, CommonConfig commonConfig) {
+ super(rowType, commonConfig);
+ }
+
+ @Override
+ protected IDeserializationConverter createInternalConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ return val -> new BooleanColumn((Boolean) val);
+ case INTEGER:
+ return val -> new IntColumn((Integer) val);
+ case INTERVAL_YEAR_MONTH:
+ return (IDeserializationConverter)
+ val -> {
+ YearMonthIntervalType yearMonthIntervalType =
+ (YearMonthIntervalType) type;
+ switch (yearMonthIntervalType.getResolution()) {
+ case YEAR:
+ return new YearMonthColumn(
+ Integer.parseInt(String.valueOf(val).substring(0, 4)));
+ case MONTH:
+ case YEAR_TO_MONTH:
+ default:
+ throw new UnsupportedOperationException(
+ "jdbc converter only support YEAR");
+ }
+ };
+ case FLOAT:
+ return val -> new FloatColumn((Float) val);
+ case DOUBLE:
+ return val -> new DoubleColumn((Double) val);
+ case BIGINT:
+ return val -> new LongColumn((Long) val);
+ case DECIMAL:
+ return val -> new BigDecimalColumn((BigDecimal) val);
+ case CHAR:
+ case VARCHAR:
+ if (type instanceof PgCustomType && ((PgCustomType) type).isArray()) {
+ return val -> new StringColumn(val.toString());
+ } else if (type instanceof JsonType
+ || type instanceof JsonbType
+ || type instanceof VarbitType
+ || type instanceof PointType) {
+ return val -> new StringColumn(((PGobject) val).getValue());
+ } else if (type instanceof UuidType) {
+ return val -> new StringColumn(((UUID) val).toString());
+ } else if (type instanceof XmlType) {
+ return val -> new StringColumn(((PgSQLXML) val).getString());
+ }
+ return val -> new StringColumn(val.toString());
+ case DATE:
+ return val -> new SqlDateColumn((Date) val);
+ case TIME_WITHOUT_TIME_ZONE:
+ return val -> new TimeColumn((Time) val);
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return (IDeserializationConverter)
+ val -> {
+ if (val instanceof PGobject) {
+ return new TimestampColumn(
+ DateUtil.convertToTimestampWithZone(val.toString()), 0);
+ }
+
+ return new TimestampColumn(
+ (Timestamp) val, ((TimestampType) (type)).getPrecision());
+ };
+ case BINARY:
+ case VARBINARY:
+ return val -> new BytesColumn((byte[]) val);
+ default:
+ throw new UnsupportedOperationException("Unsupported type:" + type);
+ }
+ }
+
+ @Override
+ protected ISerializationConverter createExternalConverter(
+ LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ PGobject pgObject = new PGobject();
+ pgObject.setType("bit");
+ if (type instanceof BitType) {
+ return (val, index, statement) -> {
+ pgObject.setValue(
+ ((ColumnRowData) val).getField(index).asBoolean() ? "1" : "0");
+ statement.setObject(index, pgObject);
+ };
+ }
+ return (val, index, statement) ->
+ statement.setBoolean(
+ index, ((ColumnRowData) val).getField(index).asBoolean());
+ case INTEGER:
+ return (val, index, statement) ->
+ statement.setInt(index, ((ColumnRowData) val).getField(index).asInt());
+ case FLOAT:
+ return (val, index, statement) ->
+ statement.setFloat(index, ((ColumnRowData) val).getField(index).asFloat());
+ case DOUBLE:
+ if (type instanceof MoneyType) {
+ PGobject pGobject = new PGobject();
+ pGobject.setType("money");
+ return (val, index, statement) -> {
+ pGobject.setValue(((ColumnRowData) val).getField(index).asString());
+ statement.setObject(index, pGobject);
+ };
+ }
+ return (val, index, statement) ->
+ statement.setDouble(
+ index, ((ColumnRowData) val).getField(index).asDouble());
+ case BIGINT:
+ return (val, index, statement) ->
+ statement.setLong(index, ((ColumnRowData) val).getField(index).asLong());
+ case DECIMAL:
+ return (val, index, statement) ->
+ statement.setBigDecimal(
+ index, ((ColumnRowData) val).getField(index).asBigDecimal());
+ case CHAR:
+ case VARCHAR:
+ if (type instanceof PgCustomType && ((PgCustomType) type).isArray()) {
+ final int oid = ((PgCustomType) type).getArrayOid();
+ return (val, index, statement) ->
+ statement.setArray(
+ index,
+ new PgArray(
+ connection,
+ oid,
+ (String)
+ ((ColumnRowData) val)
+ .getField(index)
+ .getData()));
+ } else if (type instanceof JsonType) {
+ PGobject jsonObject = new PGobject();
+ jsonObject.setType("json");
+ return (val, index, statement) -> {
+ jsonObject.setValue(((ColumnRowData) val).getField(index).asString());
+ statement.setObject(index, jsonObject);
+ };
+ } else if (type instanceof JsonbType) {
+ PGobject jsonObject = new PGobject();
+ jsonObject.setType("jsonb");
+ return (val, index, statement) -> {
+ jsonObject.setValue(((ColumnRowData) val).getField(index).asString());
+ statement.setObject(index, jsonObject);
+ };
+ } else if (type instanceof VarbitType) {
+ PGobject jsonObject = new PGobject();
+ jsonObject.setType("varbit");
+ return (val, index, statement) -> {
+ jsonObject.setValue(((ColumnRowData) val).getField(index).asString());
+ statement.setObject(index, jsonObject);
+ };
+ } else if (type instanceof XmlType) {
+ PGobject jsonObject = new PGobject();
+ jsonObject.setType("xml");
+ return (val, index, statement) -> {
+ jsonObject.setValue(((ColumnRowData) val).getField(index).asString());
+ statement.setObject(index, jsonObject);
+ };
+ } else if (type instanceof UuidType) {
+ PGobject jsonObject = new PGobject();
+ jsonObject.setType("uuid");
+ return (val, index, statement) -> {
+ jsonObject.setValue(((ColumnRowData) val).getField(index).asString());
+ statement.setObject(index, jsonObject);
+ };
+ } else if (type instanceof PointType) {
+ PGobject jsonObject = new PGobject();
+ jsonObject.setType("point");
+ return (val, index, statement) -> {
+ jsonObject.setValue(((ColumnRowData) val).getField(index).asString());
+ statement.setObject(index, jsonObject);
+ };
+ }
+ return (val, index, statement) ->
+ statement.setString(
+ index, ((ColumnRowData) val).getField(index).asString());
+ case DATE:
+ return (val, index, statement) ->
+ statement.setDate(index, ((ColumnRowData) val).getField(index).asSqlDate());
+ case TIME_WITHOUT_TIME_ZONE:
+ return (val, index, statement) ->
+ statement.setTime(index, ((ColumnRowData) val).getField(index).asTime());
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return (val, index, statement) ->
+ statement.setTimestamp(
+ index, ((ColumnRowData) val).getField(index).asTimestamp());
+
+ case BINARY:
+ case VARBINARY:
+ return (val, index, statement) ->
+ statement.setBytes(index, ((ColumnRowData) val).getField(index).asBytes());
+ default:
+ throw new UnsupportedOperationException("Unsupported type:" + type);
+ }
+ }
+
+ public void setConnection(BaseConnection connection) {
+ this.connection = connection;
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BitType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BitType.java
new file mode 100644
index 0000000000..2c2b1e915c
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BitType.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class BitType extends PgCustomType {
+ private static final Set INPUT_CONVERSION =
+ conversionSet(Boolean.class.getName(), Boolean.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = Boolean.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public BitType(boolean isNullable, LogicalTypeRoot typeRoot, Boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.BIT_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-BIT";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new BitType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BoolType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BoolType.java
new file mode 100644
index 0000000000..bc1219aa09
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BoolType.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class BoolType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(Boolean.class.getName(), Boolean.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = Boolean.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public BoolType(boolean isNullable, LogicalTypeRoot typeRoot, Boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.BOOL_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-BOOL";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new BoolType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BpcharType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BpcharType.java
new file mode 100644
index 0000000000..73d0df3a61
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BpcharType.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class BpcharType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(String.class.getName(), StringData.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = String.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public BpcharType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.BPCHAR_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-BPCHAR";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new BpcharType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/ByteaType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/ByteaType.java
new file mode 100644
index 0000000000..744839ad64
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/ByteaType.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class ByteaType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(byte[].class.getName(), byte[].class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = byte[].class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public ByteaType(boolean isNullable, LogicalTypeRoot typeRoot, Boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.BYTEA_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-BYTEA";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new ByteaType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/CharType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/CharType.java
new file mode 100644
index 0000000000..b4e9684a6e
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/CharType.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class CharType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(char.class.getName(), char.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = char.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public CharType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.CHAR_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-CHAR";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new CharType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/DateType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/DateType.java
new file mode 100644
index 0000000000..96e7f849ec
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/DateType.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.sql.Date;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class DateType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(Date.class.getName(), Date.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = Date.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public DateType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.DATE_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-DATE";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new DateType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Float4Type.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Float4Type.java
new file mode 100644
index 0000000000..0ee7752b2c
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Float4Type.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class Float4Type extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(Float.class.getName(), Float.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = Float.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public Float4Type(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.FLOAT4_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-FLOAT4";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new Float4Type(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Float8Type.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Float8Type.java
new file mode 100644
index 0000000000..9b0ea89d48
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Float8Type.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class Float8Type extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(Double.class.getName(), Double.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = Double.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public Float8Type(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.FLOAT8_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-FLOAT8";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new Float8Type(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int2Type.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int2Type.java
new file mode 100644
index 0000000000..d5bc4a27fc
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int2Type.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class Int2Type extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(Integer.class.getName(), Integer.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = Integer.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public Int2Type(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.INT2_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-INT2";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new Int2Type(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int4Type.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int4Type.java
new file mode 100644
index 0000000000..150d79bc46
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int4Type.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class Int4Type extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(Integer.class.getName(), Integer.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = Integer.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public Int4Type(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.INT4_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-INT4";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new Int4Type(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int8Type.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int8Type.java
new file mode 100644
index 0000000000..83d5696485
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int8Type.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class Int8Type extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(Long.class.getName(), Long.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = Long.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public Int8Type(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.INT8_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-INT8";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new Int8Type(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/JsonType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/JsonType.java
new file mode 100644
index 0000000000..3c90ee5ec4
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/JsonType.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class JsonType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(String.class.getName(), StringData.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = String.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public JsonType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.JSON_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-JSON";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new JsonType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/JsonbType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/JsonbType.java
new file mode 100644
index 0000000000..256342797f
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/JsonbType.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class JsonbType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(String.class.getName(), StringData.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = String.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public JsonbType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.JSONB_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-JSONB";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new JsonbType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/MoneyType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/MoneyType.java
new file mode 100644
index 0000000000..a131d413f9
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/MoneyType.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class MoneyType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(Double.class.getName(), Double.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = Double.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public MoneyType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.MONEY_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-MONEY";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new MoneyType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/NameType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/NameType.java
new file mode 100644
index 0000000000..cec1ae00cb
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/NameType.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class NameType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(String.class.getName(), StringData.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = String.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public NameType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.NAME_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-NAME";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new NameType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/NumericType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/NumericType.java
new file mode 100644
index 0000000000..ac6c67d877
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/NumericType.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class NumericType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(BigDecimal.class.getName(), DecimalData.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = BigDecimal.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public NumericType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.NUMERIC_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-NUMERIC";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new NumericType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/OidType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/OidType.java
new file mode 100644
index 0000000000..504e4ae7cc
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/OidType.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class OidType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(Long.class.getName(), Long.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = Long.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public OidType(boolean isNullable, LogicalTypeRoot typeRoot, Boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.INT8_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-OID";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new OidType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/PgCustomType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/PgCustomType.java
new file mode 100644
index 0000000000..01e8e8dde0
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/PgCustomType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+public abstract class PgCustomType extends LogicalType {
+
+ private final boolean isArray;
+ private final int arrayOid;
+
+ public PgCustomType(
+ boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray, Integer oid) {
+ super(isNullable, typeRoot);
+ this.isArray = isArray;
+ this.arrayOid = oid;
+ }
+
+ public boolean isArray() {
+ return isArray;
+ }
+
+ public int getArrayOid() {
+ return arrayOid;
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/PointType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/PointType.java
new file mode 100644
index 0000000000..e82f53d18c
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/PointType.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class PointType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(String.class.getName(), StringData.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = String.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public PointType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.POINT_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-POINT";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new PointType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TextType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TextType.java
new file mode 100644
index 0000000000..5a85b39727
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TextType.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class TextType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(String.class.getName(), StringData.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = String.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public TextType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.TEXT_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-TEXT";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new TextType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimeType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimeType.java
new file mode 100644
index 0000000000..268d401204
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimeType.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.sql.Time;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class TimeType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(Time.class.getName(), Time.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = Time.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public TimeType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.TIME_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-TIME";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new TimeType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimestampType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimestampType.java
new file mode 100644
index 0000000000..0d7acd73f1
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimestampType.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class TimestampType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(Timestamp.class.getName(), TimestampData.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = Timestamp.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public TimestampType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.TIMESTAMP_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-TIMESTAMP";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new TimestampType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimestampTzType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimestampTzType.java
new file mode 100644
index 0000000000..1fd4462307
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimestampTzType.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class TimestampTzType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(Timestamp.class.getName(), TimestampData.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = Timestamp.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public TimestampTzType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.TIMESTAMPTZ_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-TIMESTAMPTZ";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new TimestampTzType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimetzType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimetzType.java
new file mode 100644
index 0000000000..8d0fa78d37
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimetzType.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.sql.Time;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class TimetzType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(Time.class.getName(), Time.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = Time.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public TimetzType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.TIMETZ_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-TIMETZ";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new TimetzType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/UuidType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/UuidType.java
new file mode 100644
index 0000000000..1bead707f7
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/UuidType.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class UuidType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(String.class.getName(), String.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = String.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public UuidType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.UUID_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-UUID";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new UuidType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/VarbitType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/VarbitType.java
new file mode 100644
index 0000000000..effc7aea7a
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/VarbitType.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class VarbitType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(String.class.getName(), String.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = String.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public VarbitType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.VARBIT_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-VARBIT";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new VarbitType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/VarcharType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/VarcharType.java
new file mode 100644
index 0000000000..c2451b9156
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/VarcharType.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class VarcharType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(String.class.getName(), StringData.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = String.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public VarcharType(boolean isNullable, LogicalTypeRoot typeRoot, Boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.VARCHAR_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-VARCHAR";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new VarcharType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/XmlType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/XmlType.java
new file mode 100644
index 0000000000..0c096d2dfd
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/XmlType.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.converter.logical;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+
+import org.opengauss.core.Oid;
+
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class XmlType extends PgCustomType {
+
+ private static final Set INPUT_CONVERSION =
+ conversionSet(String.class.getName(), String.class.getName());
+
+ private static final Class> DEFAULT_CONVERSION = String.class;
+
+ private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName());
+
+ public XmlType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) {
+ super(isNullable, typeRoot, isArray, Oid.XML_ARRAY);
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "PG-XML";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class> clazz) {
+ return INPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class> clazz) {
+ return OUTPUT_CONVERSION.contains(clazz.getName());
+ }
+
+ @Override
+ public Class> getDefaultConversion() {
+ return DEFAULT_CONVERSION;
+ }
+
+ @Override
+ public List getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(LogicalTypeVisitor visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public LogicalType copy(boolean isNullable) {
+ return new XmlType(isNullable, getTypeRoot(), isArray());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/dialect/OpengaussDialect.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/dialect/OpengaussDialect.java
new file mode 100644
index 0000000000..20d3d0552b
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/dialect/OpengaussDialect.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.dialect;
+
+import com.dtstack.chunjun.config.CommonConfig;
+import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
+import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
+import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil;
+import com.dtstack.chunjun.connector.opengauss.converter.OpengaussRawTypeMapper;
+import com.dtstack.chunjun.connector.opengauss.converter.OpengaussSyncConverter;
+import com.dtstack.chunjun.converter.AbstractRowConverter;
+import com.dtstack.chunjun.converter.RawTypeMapper;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import io.vertx.core.json.JsonArray;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class OpengaussDialect implements JdbcDialect {
+
+ private static final long serialVersionUID = -8973956767518190621L;
+
+ private static final String DIALECT_NAME = "openGauss";
+ private static final String DRIVER = "org.opengauss.Driver";
+ public static final String URL_START = "jdbc:opengauss:";
+
+ protected static final String COPY_SQL_TEMPL =
+ "copy %s(%s) from stdin DELIMITER '%s' NULL as '%s'";
+
+ @Override
+ public String dialectName() {
+ return DIALECT_NAME;
+ }
+
+ @Override
+ public boolean canHandle(String url) {
+ return url.startsWith(URL_START);
+ }
+
+ @Override
+ public RawTypeMapper getRawTypeConverter() {
+ return OpengaussRawTypeMapper::apply;
+ }
+
+ @Override
+ public AbstractRowConverter
+ getColumnConverter(RowType rowType, CommonConfig commonConfig) {
+ return new OpengaussSyncConverter(rowType, commonConfig);
+ }
+
+ @Override
+ public Optional defaultDriverName() {
+ return Optional.of(DRIVER);
+ }
+
+ @Override
+ public boolean supportUpsert() {
+ return true;
+ }
+
+ /** openGauss upsert query. It use ON CONFLICT ... DO UPDATE SET.. to replace into openGauss. */
+ @Override
+ public Optional getUpsertStatement(
+ String schema,
+ String tableName,
+ String[] fieldNames,
+ String[] uniqueKeyFields,
+ boolean allReplace) {
+ String updateClause;
+ String uniqueColumns =
+ Arrays.stream(uniqueKeyFields)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+ updateClause =
+ Arrays.stream(fieldNames)
+ .filter(f -> !Arrays.asList(uniqueKeyFields).contains(f))
+ .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
+ .collect(Collectors.joining(", "));
+
+ return Optional.of(
+ getInsertIntoStatement(schema, tableName, fieldNames)
+ + " ON CONFLICT ("
+ + uniqueColumns
+ + ")"
+ + " DO UPDATE SET "
+ + updateClause);
+ }
+
+ @Override
+ public String getSelectFromStatement(
+ String schemaName,
+ String tableName,
+ String customSql,
+ String[] selectFields,
+ String where) {
+ String selectExpressions =
+ Arrays.stream(selectFields)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+ StringBuilder sql = new StringBuilder(128);
+ sql.append("SELECT ");
+ if (StringUtils.isNotBlank(customSql)) {
+ sql.append("* FROM (")
+ .append(customSql)
+ .append(") ")
+ .append(JdbcUtil.TEMPORARY_TABLE_NAME);
+ } else {
+ sql.append(selectExpressions).append(" FROM ");
+ if (StringUtils.isNotBlank(schemaName)) {
+ sql.append(quoteIdentifier(schemaName)).append(" .");
+ }
+ sql.append(quoteIdentifier(tableName));
+ }
+
+ if (StringUtils.isNotBlank(where)) {
+ sql.append(" WHERE ").append(where);
+ }
+
+ return sql.toString();
+ }
+
+ public String getCopyStatement(
+ String schemaName,
+ String tableName,
+ String[] fields,
+ String fieldDelimiter,
+ String nullVal) {
+ String fieldsExpression =
+ Arrays.stream(fields).map(this::quoteIdentifier).collect(Collectors.joining(", "));
+
+ String tableLocation;
+ if (schemaName != null && !"".equals(schemaName.trim())) {
+ tableLocation = quoteIdentifier(schemaName) + "." + quoteIdentifier(tableName);
+ } else {
+ tableLocation = quoteIdentifier(tableName);
+ }
+
+ return String.format(
+ COPY_SQL_TEMPL, tableLocation, fieldsExpression, fieldDelimiter, nullVal);
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/sink/OpengaussOutputFormat.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/sink/OpengaussOutputFormat.java
new file mode 100644
index 0000000000..6bbf7fee85
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/sink/OpengaussOutputFormat.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.sink;
+
+import com.dtstack.chunjun.connector.jdbc.converter.JdbcSyncConverter;
+import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat;
+import com.dtstack.chunjun.connector.opengauss.converter.OpengaussSyncConverter;
+import com.dtstack.chunjun.connector.opengauss.dialect.OpengaussDialect;
+import com.dtstack.chunjun.element.ColumnRowData;
+import com.dtstack.chunjun.enums.EWriteMode;
+import com.dtstack.chunjun.throwable.NoRestartException;
+import com.dtstack.chunjun.throwable.WriteRecordException;
+
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.StringUtils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.opengauss.copy.CopyManager;
+import org.opengauss.core.BaseConnection;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+
+@Slf4j
+public class OpengaussOutputFormat extends JdbcOutputFormat {
+
+ private static final long serialVersionUID = 6244886558080900510L;
+
+ // pg 字符串里含有\u0000 会报错 ERROR: invalid byte sequence for encoding "UTF8": 0x00
+ public static final String SPACE = "\u0000";
+
+ private static final String LINE_DELIMITER = "\n";
+ private CopyManager copyManager;
+ private boolean enableCopyMode = false;
+ private String copySql = "";
+ private static final String INSERT_SQL_MODE_TYPE = "copy";
+ private static final String DEFAULT_FIELD_DELIMITER = "\001";
+
+ private static final String DEFAULT_NULL_VALUE = "\002";
+
+ @Override
+ protected void openInternal(int taskNumber, int numTasks) {
+ super.openInternal(taskNumber, numTasks);
+ try {
+ // check is use copy mode for insert
+ enableCopyMode = INSERT_SQL_MODE_TYPE.equalsIgnoreCase(jdbcConfig.getInsertSqlMode());
+ if (EWriteMode.INSERT.name().equalsIgnoreCase(jdbcConfig.getMode()) && enableCopyMode) {
+ copyManager = new CopyManager((BaseConnection) dbConn);
+
+ OpengaussDialect pgDialect = (OpengaussDialect) jdbcDialect;
+ copySql =
+ pgDialect.getCopyStatement(
+ jdbcConfig.getSchema(),
+ jdbcConfig.getTable(),
+ columnNameList.toArray(new String[0]),
+ StringUtils.isNullOrWhitespaceOnly(
+ jdbcConfig.getFieldDelim().trim())
+ ? DEFAULT_FIELD_DELIMITER
+ : jdbcConfig.getFieldDelim(),
+ StringUtils.isNullOrWhitespaceOnly(jdbcConfig.getNullDelim().trim())
+ ? DEFAULT_NULL_VALUE
+ : jdbcConfig.getNullDelim());
+
+ log.info("write sql:{}", copySql);
+ }
+ if (rowConverter instanceof JdbcSyncConverter) {
+ if (jdbcDialect.dialectName().equals("openGauss")) {
+ ((OpengaussSyncConverter) rowConverter).setConnection((BaseConnection) dbConn);
+ }
+ }
+ } catch (SQLException sqe) {
+ throw new IllegalArgumentException("checkUpsert() failed.", sqe);
+ }
+ }
+
+ @Override
+ protected void writeSingleRecordInternal(RowData row) throws WriteRecordException {
+ if (!enableCopyMode) {
+ super.writeSingleRecordInternal(row);
+ } else {
+ if (rowConverter instanceof JdbcSyncConverter) {
+ ColumnRowData colRowData = (ColumnRowData) row;
+ // write with copy
+ int index = 0;
+ try {
+ StringBuilder rowStr = new StringBuilder();
+ int lastIndex = row.getArity() - 1;
+ for (; index < row.getArity(); index++) {
+ appendColumn(colRowData, index, rowStr, index == lastIndex);
+ }
+ String rowVal = copyModeReplace(rowStr.toString());
+ ByteArrayInputStream bi =
+ new ByteArrayInputStream(rowVal.getBytes(StandardCharsets.UTF_8));
+ copyManager.copyIn(copySql, bi);
+ } catch (Exception e) {
+ processWriteException(e, index, row);
+ }
+ } else {
+ throw new NoRestartException("copy mode only support data sync with out table");
+ }
+ }
+ }
+
+ @Override
+ protected void writeMultipleRecordsInternal() throws Exception {
+ if (!enableCopyMode) {
+ super.writeMultipleRecordsInternal();
+ } else {
+ if (rowConverter instanceof JdbcSyncConverter) {
+ StringBuilder rowsStrBuilder = new StringBuilder(128);
+ for (RowData row : rows) {
+ ColumnRowData colRowData = (ColumnRowData) row;
+ int lastIndex = row.getArity() - 1;
+ StringBuilder rowStr = new StringBuilder(128);
+ for (int index = 0; index < row.getArity(); index++) {
+ appendColumn(colRowData, index, rowStr, index == lastIndex);
+ }
+ String tempData = rowStr.toString();
+ rowsStrBuilder.append(copyModeReplace(tempData)).append(LINE_DELIMITER);
+ }
+ String rowVal = rowsStrBuilder.toString();
+ ByteArrayInputStream bi =
+ new ByteArrayInputStream(rowVal.getBytes(StandardCharsets.UTF_8));
+ copyManager.copyIn(copySql, bi);
+
+ if (checkpointEnabled && CheckpointingMode.EXACTLY_ONCE == checkpointMode) {
+ rowsOfCurrentTransaction += rows.size();
+ }
+ } else {
+ throw new NoRestartException("copy mode only support data sync with out table");
+ }
+ }
+ }
+
+ private void appendColumn(
+ ColumnRowData colRowData, int pos, StringBuilder rowStr, boolean isLast) {
+ Object col = colRowData.getField(pos);
+ if (col == null) {
+ rowStr.append(
+ StringUtils.isNullOrWhitespaceOnly(jdbcConfig.getNullDelim().trim())
+ ? DEFAULT_NULL_VALUE
+ : jdbcConfig.getNullDelim());
+
+ } else {
+ rowStr.append(col);
+ }
+ if (!isLast) {
+ rowStr.append(
+ StringUtils.isNullOrWhitespaceOnly(jdbcConfig.getFieldDelim().trim())
+ ? DEFAULT_FIELD_DELIMITER
+ : jdbcConfig.getFieldDelim());
+ }
+ }
+
+ /**
+ * \r \n \ 等特殊字符串需要转义
+ *
+ * @return
+ */
+ private String copyModeReplace(String rowStr) {
+ if (rowStr.contains("\\")) {
+ rowStr = rowStr.replaceAll("\\\\", "\\\\\\\\");
+ }
+ if (rowStr.contains("\r")) {
+ rowStr = rowStr.replaceAll("\r", "\\\\r");
+ }
+
+ if (rowStr.contains("\n")) {
+ rowStr = rowStr.replaceAll("\n", "\\\\n");
+ }
+
+ // pg 字符串里含有\u0000 会报错 ERROR: invalid byte sequence for encoding "UTF8": 0x00
+ if (rowStr.contains(SPACE)) {
+ rowStr = rowStr.replaceAll(SPACE, "");
+ }
+ return rowStr;
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/sink/OpengaussSinkFactory.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/sink/OpengaussSinkFactory.java
new file mode 100644
index 0000000000..64c4d64155
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/sink/OpengaussSinkFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.sink;
+
+import com.dtstack.chunjun.config.SyncConfig;
+import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
+import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormatBuilder;
+import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory;
+import com.dtstack.chunjun.connector.opengauss.dialect.OpengaussDialect;
+
+public class OpengaussSinkFactory extends JdbcSinkFactory {
+
+ public OpengaussSinkFactory(SyncConfig syncConfig) {
+ super(syncConfig, new OpengaussDialect());
+ }
+
+ public OpengaussSinkFactory(SyncConfig syncConfig, JdbcDialect dialect) {
+ super(syncConfig, dialect);
+ }
+
+ @Override
+ protected JdbcOutputFormatBuilder getBuilder() {
+ return new JdbcOutputFormatBuilder(new OpengaussOutputFormat());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/source/OpengaussInputFormat.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/source/OpengaussInputFormat.java
new file mode 100644
index 0000000000..6450f01932
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/source/OpengaussInputFormat.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.source;
+
+import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormat;
+import com.dtstack.chunjun.connector.jdbc.util.SqlUtil;
+import com.dtstack.chunjun.util.ExceptionUtil;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class OpengaussInputFormat extends JdbcInputFormat {
+
+ private static final long serialVersionUID = -4046186579949376769L;
+
+ @Override
+ protected void queryPollingWithOutStartLocation() throws SQLException {
+ // In opengauss, if resultCursorType is FORWARD_ONLY
+ // , the query will report an error after the method
+ // #setFetchDirection(ResultSet.FETCH_REVERSE) is called.
+ String querySql =
+ SqlUtil.buildOrderSql(jdbcConfig.getQuerySql(), jdbcConfig, jdbcDialect, "ASC");
+ ps =
+ dbConn.prepareStatement(
+ querySql, ResultSet.TYPE_SCROLL_INSENSITIVE, resultSetConcurrency);
+ ps.setFetchSize(jdbcConfig.getFetchSize());
+ ps.setQueryTimeout(jdbcConfig.getQueryTimeOut());
+ resultSet = ps.executeQuery();
+ hasNext = resultSet.next();
+
+ try {
+ // 间隔轮询一直循环,直到查询到数据库中的数据为止
+ while (!hasNext) {
+ TimeUnit.MILLISECONDS.sleep(jdbcConfig.getPollingInterval());
+ resultSet.close();
+ // 如果事务不提交 就会导致数据库即使插入数据 也无法读到数据
+ dbConn.commit();
+ resultSet = ps.executeQuery();
+ hasNext = resultSet.next();
+ // 每隔五分钟打印一次,(当前时间 - 任务开始时间) % 300秒 <= 一个间隔轮询周期
+ if ((System.currentTimeMillis() - startTime) % 300000
+ <= jdbcConfig.getPollingInterval()) {
+ log.info(
+ "no record matched condition in database, execute query sql = {}, startLocation = {}",
+ jdbcConfig.getQuerySql(),
+ endLocationAccumulator.getLocalValue());
+ }
+ }
+ } catch (InterruptedException e) {
+ log.warn(
+ "interrupted while waiting for polling, e = {}",
+ ExceptionUtil.getErrorMessage(e));
+ }
+
+ // 查询到数据,更新querySql
+ StringBuilder builder = new StringBuilder(128);
+ builder.append(jdbcConfig.getQuerySql());
+ if (jdbcConfig.getQuerySql().contains("WHERE")) {
+ builder.append(" AND ");
+ } else {
+ builder.append(" WHERE ");
+ }
+ builder.append(jdbcDialect.quoteIdentifier(jdbcConfig.getIncreColumn()))
+ .append(" > ? ORDER BY ")
+ .append(jdbcDialect.quoteIdentifier(jdbcConfig.getIncreColumn()))
+ .append(" ASC");
+ jdbcConfig.setQuerySql(builder.toString());
+ ps =
+ dbConn.prepareStatement(
+ jdbcConfig.getQuerySql(),
+ ResultSet.TYPE_SCROLL_INSENSITIVE,
+ resultSetConcurrency);
+ ps.setFetchSize(jdbcConfig.getFetchSize());
+ ps.setQueryTimeout(jdbcConfig.getQueryTimeOut());
+ log.info("update querySql, sql = {}", jdbcConfig.getQuerySql());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/source/OpengaussSourceFactory.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/source/OpengaussSourceFactory.java
new file mode 100644
index 0000000000..7876a073b7
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/source/OpengaussSourceFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.source;
+
+import com.dtstack.chunjun.config.SyncConfig;
+import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory;
+import com.dtstack.chunjun.connector.opengauss.dialect.OpengaussDialect;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+public class OpengaussSourceFactory extends JdbcSourceFactory {
+
+ public OpengaussSourceFactory(SyncConfig syncConfig, StreamExecutionEnvironment env) {
+ super(syncConfig, env, new OpengaussDialect());
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/table/OpengaussDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/table/OpengaussDynamicTableFactory.java
new file mode 100644
index 0000000000..dc64cdaaef
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/table/OpengaussDynamicTableFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.opengauss.table;
+
+import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
+import com.dtstack.chunjun.connector.jdbc.table.JdbcDynamicTableFactory;
+import com.dtstack.chunjun.connector.opengauss.dialect.OpengaussDialect;
+
+public class OpengaussDynamicTableFactory extends JdbcDynamicTableFactory {
+
+ private static final String IDENTIFIER = "opengauss-x";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ protected JdbcDialect getDialect() {
+ return new OpengaussDialect();
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/chunjun-connectors/chunjun-connector-opengauss/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..bb5441b8b9
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+com.dtstack.chunjun.connector.opengauss.table.OpengaussDynamicTableFactory
diff --git a/chunjun-connectors/pom.xml b/chunjun-connectors/pom.xml
index a91f7cfe07..4e14d2d0bc 100755
--- a/chunjun-connectors/pom.xml
+++ b/chunjun-connectors/pom.xml
@@ -54,6 +54,7 @@
chunjun-connector-sqlserver
chunjun-connector-db2
chunjun-connector-postgresql
+ chunjun-connector-opengauss
chunjun-connector-greenplum
chunjun-connector-dm
chunjun-connector-gbase