diff --git a/docs/en/connector-v2/sink/Hive.md b/docs/en/connector-v2/sink/Hive.md index 245cd5b2714..dac4a814c2f 100644 --- a/docs/en/connector-v2/sink/Hive.md +++ b/docs/en/connector-v2/sink/Hive.md @@ -47,7 +47,7 @@ By default, we use 2PC commit to ensure `exactly-once` ### table_name [string] -Target Hive table name eg: db1.table1 +Target Hive table name eg: db1.table1, and if the source is multiple mode, you can use `${database_name}.${table_name}` to generate the table name, it will replace the `${database_name}` and `${table_name}` with the value of the CatalogTable generate from the source. ### metastore_uri [string] @@ -343,6 +343,56 @@ sink { } ``` +### example 2 + +We have multiple source table like this: + +```bash +create table test_1( +) +PARTITIONED BY (xx); + +create table test_2( +) +PARTITIONED BY (xx); +... +``` + +We need read data from these source tables and write to another tables: + +The job config file can like this: + +``` +env { + # You can set flink configuration here + parallelism = 3 + job.name="test_hive_source_to_hive" +} + +source { + Hive { + tables_configs = [ + { + table_name = "test_hive.test_1" + metastore_uri = "thrift://ctyun6:9083" + }, + { + table_name = "test_hive.test_2" + metastore_uri = "thrift://ctyun7:9083" + } + ] + } +} + +sink { + # choose stdout output plugin to output data to console + Hive { + table_name = "${database_name}.${table_name}" + metastore_uri = "thrift://ctyun7:9083" + } +} +``` + ## Changelog ### 2.2.0-beta 2022-09-26 diff --git a/docs/en/connector-v2/source/Hive.md b/docs/en/connector-v2/source/Hive.md index 5273ca2ee50..bb7b8514097 100644 --- a/docs/en/connector-v2/source/Hive.md +++ b/docs/en/connector-v2/source/Hive.md @@ -59,10 +59,6 @@ Hive metastore uri The path of `hdfs-site.xml`, used to load ha configuration of namenodes -### hive_site_path [string] - -The path of `hive-site.xml`, used to authentication hive metastore - ### read_partitions [list] The target partitions that user want to read from hive table, if user does not set this parameter, it will read all the data from hive table. @@ -102,6 +98,8 @@ Source plugin common parameters, please refer to [Source Common Options](common- ## Example +### Example 1: Single table + ```bash Hive { @@ -111,6 +109,25 @@ Source plugin common parameters, please refer to [Source Common Options](common- ``` +### Example 2: Multiple tables + +```bash + + Hive { + tables_configs = [ + { + table_name = "default.seatunnel_orc_1" + metastore_uri = "thrift://namenode001:9083" + }, + { + table_name = "default.seatunnel_orc_2" + metastore_uri = "thrift://namenode001:9083" + } + ] + } + +``` + ## Changelog ### 2.2.0-beta 2022-09-26 diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java index b268fe612e8..eafaedf05d2 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java @@ -42,6 +42,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; /** Utils contains some common methods for construct CatalogTable. */ @Slf4j @@ -234,4 +236,41 @@ public static SeaTunnelRowType buildSimpleTextSchema() { public static CatalogTable buildSimpleTextTable() { return getCatalogTable("default", buildSimpleTextSchema()); } + + public static CatalogTable newCatalogTable( + CatalogTable catalogTable, SeaTunnelRowType seaTunnelRowType) { + TableSchema tableSchema = catalogTable.getTableSchema(); + + Map columnMap = + tableSchema.getColumns().stream() + .collect(Collectors.toMap(Column::getName, Function.identity())); + String[] fieldNames = seaTunnelRowType.getFieldNames(); + SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes(); + + List finalColumns = new ArrayList<>(); + for (int i = 0; i < fieldNames.length; i++) { + Column column = columnMap.get(fieldNames[i]); + if (column != null) { + finalColumns.add(column); + } else { + finalColumns.add( + PhysicalColumn.of(fieldNames[i], fieldTypes[i], 0, false, null, null)); + } + } + + TableSchema finalSchema = + TableSchema.builder() + .columns(finalColumns) + .primaryKey(tableSchema.getPrimaryKey()) + .constraintKey(tableSchema.getConstraintKeys()) + .build(); + + return CatalogTable.of( + catalogTable.getTableId(), + finalSchema, + catalogTable.getOptions(), + catalogTable.getPartitionKeys(), + catalogTable.getComment(), + catalogTable.getCatalogName()); + } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java index 2f2bc972a00..db4340b373d 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java @@ -17,11 +17,11 @@ package org.apache.seatunnel.connectors.seatunnel.hive.commit; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter; +import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkOptions; import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy; import org.apache.thrift.TException; @@ -33,33 +33,31 @@ import java.util.Map; import java.util.stream.Collectors; -import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ABORT_DROP_PARTITION_METADATA; - @Slf4j public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter { - private final Config pluginConfig; private final String dbName; private final String tableName; private final boolean abortDropPartitionMetadata; + private final ReadonlyConfig readonlyConfig; + public HiveSinkAggregatedCommitter( - Config pluginConfig, String dbName, String tableName, HadoopConf hadoopConf) { + ReadonlyConfig readonlyConfig, String dbName, String tableName, HadoopConf hadoopConf) { super(hadoopConf); - this.pluginConfig = pluginConfig; + this.readonlyConfig = readonlyConfig; this.dbName = dbName; this.tableName = tableName; this.abortDropPartitionMetadata = - pluginConfig.hasPath(ABORT_DROP_PARTITION_METADATA.key()) - ? pluginConfig.getBoolean(ABORT_DROP_PARTITION_METADATA.key()) - : ABORT_DROP_PARTITION_METADATA.defaultValue(); + readonlyConfig.get(HiveSinkOptions.ABORT_DROP_PARTITION_METADATA); } @Override public List commit( List aggregatedCommitInfos) throws IOException { + List errorCommitInfos = super.commit(aggregatedCommitInfos); if (errorCommitInfos.isEmpty()) { - HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig); + HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(readonlyConfig); try { for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) { Map> partitionDirAndValuesMap = @@ -87,7 +85,7 @@ public List commit( public void abort(List aggregatedCommitInfos) throws Exception { super.abort(aggregatedCommitInfos); if (abortDropPartitionMetadata) { - HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig); + HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(readonlyConfig); for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) { Map> partitionDirAndValuesMap = aggregatedCommitInfo.getPartitionDirAndValuesMap(); diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java new file mode 100644 index 00000000000..efed4e91c58 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hive.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; + +public class BaseHiveOptions extends BaseSourceConfigOptions { + + public static final Option TABLE_NAME = + Options.key("table_name") + .stringType() + .noDefaultValue() + .withDescription("Hive table name"); + + public static final Option METASTORE_URI = + Options.key("metastore_uri") + .stringType() + .noDefaultValue() + .withDescription("Hive metastore uri"); + + public static final Option HIVE_SITE_PATH = + Options.key("hive_site_path") + .stringType() + .noDefaultValue() + .withDescription("The path of hive-site.xml"); +} diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java index 0afadc64d8f..714be586194 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java @@ -17,14 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.hive.config; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; -import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy; - -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.hive.metastore.api.Table; import java.util.HashMap; import java.util.Map; @@ -66,29 +60,4 @@ public class HiveConfig { .noDefaultValue() .withDescription( "The specified loading path for the 'core-site.xml', 'hdfs-site.xml' files"); - - public static final String TEXT_INPUT_FORMAT_CLASSNAME = - "org.apache.hadoop.mapred.TextInputFormat"; - public static final String TEXT_OUTPUT_FORMAT_CLASSNAME = - "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"; - public static final String PARQUET_INPUT_FORMAT_CLASSNAME = - "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; - public static final String PARQUET_OUTPUT_FORMAT_CLASSNAME = - "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"; - public static final String ORC_INPUT_FORMAT_CLASSNAME = - "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; - public static final String ORC_OUTPUT_FORMAT_CLASSNAME = - "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"; - - public static Pair getTableInfo(Config config) { - String table = config.getString(TABLE_NAME.key()); - String[] splits = table.split("\\."); - if (splits.length != 2) { - throw new RuntimeException("Please config " + TABLE_NAME + " as db.table format"); - } - HiveMetaStoreProxy hiveMetaStoreProxy = HiveMetaStoreProxy.getInstance(config); - Table tableInformation = hiveMetaStoreProxy.getTable(splits[0], splits[1]); - hiveMetaStoreProxy.close(); - return Pair.of(splits, tableInformation); - } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConstants.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConstants.java new file mode 100644 index 00000000000..8539df68a8e --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConstants.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hive.config; + +public class HiveConstants { + + public static final String CONNECTOR_NAME = "Hive"; + + public static final String TEXT_INPUT_FORMAT_CLASSNAME = + "org.apache.hadoop.mapred.TextInputFormat"; + public static final String TEXT_OUTPUT_FORMAT_CLASSNAME = + "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"; + public static final String PARQUET_INPUT_FORMAT_CLASSNAME = + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; + public static final String PARQUET_OUTPUT_FORMAT_CLASSNAME = + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"; + public static final String ORC_INPUT_FORMAT_CLASSNAME = + "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; + public static final String ORC_OUTPUT_FORMAT_CLASSNAME = + "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"; +} diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java index 205a9b5decc..a0923acc1bb 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java @@ -23,7 +23,9 @@ public enum HiveConnectorErrorCode implements SeaTunnelErrorCode { GET_HDFS_NAMENODE_HOST_FAILED("HIVE-01", "Get name node host from table location failed"), INITIALIZE_HIVE_METASTORE_CLIENT_FAILED("HIVE-02", "Initialize hive metastore client failed"), GET_HIVE_TABLE_INFORMATION_FAILED( - "HIVE-03", "Get hive table information from hive metastore service failed"); + "HIVE-03", "Get hive table information from hive metastore service failed"), + HIVE_TABLE_NAME_ERROR("HIVE-04", "Hive table name is invalid"), + ; private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java index 078ce83fd11..b91c65de9bd 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java @@ -20,110 +20,78 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.serialization.DefaultSerializer; +import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; -import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.BaseHdfsFileSink; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; +import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy; +import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory; import org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedCommitter; -import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig; -import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException; +import org.apache.seatunnel.connectors.seatunnel.hive.sink.writter.HiveSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions; import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory; +import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; -import com.google.auto.service.AutoService; - -import java.net.URI; -import java.net.URISyntaxException; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER; import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE; import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_NAME_EXPRESSION; import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_PATH; -import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.HAVE_PARTITION; import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.IS_PARTITION_FIELD_WRITE_IN_FILE; import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.PARTITION_BY; -import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.PARTITION_DIR_EXPRESSION; import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.ROW_DELIMITER; import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.SINK_COLUMNS; -import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.METASTORE_URI; -import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_OUTPUT_FORMAT_CLASSNAME; -import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_OUTPUT_FORMAT_CLASSNAME; -import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TABLE_NAME; -import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_OUTPUT_FORMAT_CLASSNAME; - -@AutoService(SeaTunnelSink.class) -public class HiveSink extends BaseHdfsFileSink { - private String dbName; - private String tableName; - private Table tableInformation; - @Override - public String getPluginName() { - return "Hive"; +public class HiveSink + implements SeaTunnelSink< + SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>, + SupportMultiTableSink { + + // Since Table might contain some unserializable fields, we need to make it transient + // And use getTableInformation to get the Table object + private transient Table tableInformation; + private final CatalogTable catalogTable; + private final ReadonlyConfig readonlyConfig; + private final HadoopConf hadoopConf; + private final FileSinkConfig fileSinkConfig; + private transient WriteStrategy writeStrategy; + private String jobId; + + public HiveSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + this.readonlyConfig = readonlyConfig; + this.catalogTable = catalogTable; + this.tableInformation = getTableInformation(); + this.hadoopConf = createHadoopConf(readonlyConfig); + this.fileSinkConfig = generateFileSinkConfig(readonlyConfig, catalogTable); + this.writeStrategy = getWriteStrategy(); } - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - CheckResult result = - CheckConfigUtil.checkAllExists(pluginConfig, METASTORE_URI.key(), TABLE_NAME.key()); - if (!result.isSuccess()) { - throw new HiveConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SINK, result.getMsg())); - } - result = - CheckConfigUtil.checkAtLeastOneExists( - pluginConfig, - FILE_FORMAT_TYPE.key(), - FILE_PATH.key(), - FIELD_DELIMITER.key(), - ROW_DELIMITER.key(), - IS_PARTITION_FIELD_WRITE_IN_FILE.key(), - PARTITION_DIR_EXPRESSION.key(), - HAVE_PARTITION.key(), - SINK_COLUMNS.key(), - PARTITION_BY.key()); - if (result.isSuccess()) { - throw new HiveConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "Hive sink connector does not support these setting [%s]", - String.join( - ",", - FILE_FORMAT_TYPE.key(), - FILE_PATH.key(), - FIELD_DELIMITER.key(), - ROW_DELIMITER.key(), - IS_PARTITION_FIELD_WRITE_IN_FILE.key(), - PARTITION_DIR_EXPRESSION.key(), - HAVE_PARTITION.key(), - SINK_COLUMNS.key(), - PARTITION_BY.key()))); - } - Pair tableInfo = HiveConfig.getTableInfo(pluginConfig); - dbName = tableInfo.getLeft()[0]; - tableName = tableInfo.getLeft()[1]; - tableInformation = tableInfo.getRight(); + private FileSinkConfig generateFileSinkConfig( + ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + Table tableInformation = getTableInformation(); + Config pluginConfig = readonlyConfig.toConfig(); List sinkFields = tableInformation.getSd().getCols().stream() .map(FieldSchema::getName) @@ -133,35 +101,42 @@ public void prepare(Config pluginConfig) throws PrepareFailException { .map(FieldSchema::getName) .collect(Collectors.toList()); sinkFields.addAll(partitionKeys); - String outputFormat = tableInformation.getSd().getOutputFormat(); - if (TEXT_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) { - Map parameters = - tableInformation.getSd().getSerdeInfo().getParameters(); - pluginConfig = - pluginConfig - .withValue( - FILE_FORMAT_TYPE.key(), - ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString())) - .withValue( - FIELD_DELIMITER.key(), - ConfigValueFactory.fromAnyRef(parameters.get("field.delim"))) - .withValue( - ROW_DELIMITER.key(), - ConfigValueFactory.fromAnyRef(parameters.get("line.delim"))); - } else if (PARQUET_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) { - pluginConfig = - pluginConfig.withValue( - FILE_FORMAT_TYPE.key(), - ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString())); - } else if (ORC_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) { - pluginConfig = - pluginConfig.withValue( - FILE_FORMAT_TYPE.key(), - ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString())); - } else { - throw new HiveConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - "Hive connector only support [text parquet orc] table now"); + + FileFormat fileFormat = HiveTableUtils.parseFileFormat(tableInformation); + switch (fileFormat) { + case TEXT: + Map parameters = + tableInformation.getSd().getSerdeInfo().getParameters(); + pluginConfig = + pluginConfig + .withValue( + FILE_FORMAT_TYPE.key(), + ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString())) + .withValue( + FIELD_DELIMITER.key(), + ConfigValueFactory.fromAnyRef( + parameters.get("field.delim"))) + .withValue( + ROW_DELIMITER.key(), + ConfigValueFactory.fromAnyRef( + parameters.get("line.delim"))); + break; + case PARQUET: + pluginConfig = + pluginConfig.withValue( + FILE_FORMAT_TYPE.key(), + ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString())); + break; + case ORC: + pluginConfig = + pluginConfig.withValue( + FILE_FORMAT_TYPE.key(), + ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString())); + break; + default: + throw new HiveConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + "Hive connector only support [text parquet orc] table now"); } pluginConfig = pluginConfig @@ -178,42 +153,97 @@ public void prepare(Config pluginConfig) throws PrepareFailException { .withValue(SINK_COLUMNS.key(), ConfigValueFactory.fromAnyRef(sinkFields)) .withValue( PARTITION_BY.key(), ConfigValueFactory.fromAnyRef(partitionKeys)); - String hiveSdLocation = tableInformation.getSd().getLocation(); - try { - /** - * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop conf can be - * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy can obtain the - * correct Schema and FsHdfsImpl that can be filled into hadoop configuration in {@link - * org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()} - */ - hadoopConf = - StorageFactory.getStorageType(hiveSdLocation) - .buildHadoopConfWithReadOnlyConfig( - ReadonlyConfig.fromConfig(pluginConfig)); - String path = new URI(hiveSdLocation).getPath(); - pluginConfig = - pluginConfig - .withValue(FILE_PATH.key(), ConfigValueFactory.fromAnyRef(path)) - .withValue( - FS_DEFAULT_NAME_KEY, - ConfigValueFactory.fromAnyRef(hadoopConf.getHdfsNameKey())); - } catch (URISyntaxException e) { - String errorMsg = - String.format( - "Get hdfs namenode host from table location [%s] failed," - + "please check it", - hiveSdLocation); - throw new HiveConnectorException( - HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED, errorMsg, e); - } - this.pluginConfig = pluginConfig; - super.prepare(pluginConfig); + + return new FileSinkConfig(pluginConfig, catalogTable.getSeaTunnelRowType()); + } + + @Override + public String getPluginName() { + return HiveConstants.CONNECTOR_NAME; } @Override public Optional> createAggregatedCommitter() { return Optional.of( - new HiveSinkAggregatedCommitter(pluginConfig, dbName, tableName, hadoopConf)); + new HiveSinkAggregatedCommitter( + readonlyConfig, + getTableInformation().getDbName(), + getTableInformation().getTableName(), + hadoopConf)); + } + + @Override + public void setJobContext(JobContext jobContext) { + this.jobId = jobContext.getJobId(); + } + + @Override + public SinkWriter restoreWriter( + SinkWriter.Context context, List states) { + return new HiveSinkWriter(getWriteStrategy(), hadoopConf, context, jobId, states); + } + + @Override + public SinkWriter createWriter( + SinkWriter.Context context) { + return new HiveSinkWriter(getWriteStrategy(), hadoopConf, context, jobId); + } + + @Override + public Optional> getCommitInfoSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public Optional> getAggregatedCommitInfoSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public Optional> getWriterStateSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + private HadoopConf createHadoopConf(ReadonlyConfig readonlyConfig) { + String hdfsLocation = getTableInformation().getSd().getLocation(); + + /** + * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop conf can be + * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy can obtain the correct + * Schema and FsHdfsImpl that can be filled into hadoop configuration in {@link + * org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()} + */ + HadoopConf hadoopConf = + StorageFactory.getStorageType(hdfsLocation) + .buildHadoopConfWithReadOnlyConfig(readonlyConfig); + readonlyConfig + .getOptional(HiveSourceOptions.HDFS_SITE_PATH) + .ifPresent(hadoopConf::setHdfsSitePath); + readonlyConfig + .getOptional(HiveSourceOptions.REMOTE_USER) + .ifPresent(hadoopConf::setRemoteUser); + readonlyConfig + .getOptional(HiveSourceOptions.KERBEROS_PRINCIPAL) + .ifPresent(hadoopConf::setKerberosPrincipal); + readonlyConfig + .getOptional(HiveSourceOptions.KERBEROS_KEYTAB_PATH) + .ifPresent(hadoopConf::setKerberosKeytabPath); + return hadoopConf; + } + + private Table getTableInformation() { + if (tableInformation == null) { + tableInformation = HiveTableUtils.getTableInfo(readonlyConfig); + } + return tableInformation; + } + + private WriteStrategy getWriteStrategy() { + if (writeStrategy == null) { + writeStrategy = WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig); + writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType()); + } + return writeStrategy; } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java index e40864517f2..e53aed86fc6 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java @@ -17,19 +17,32 @@ package org.apache.seatunnel.connectors.seatunnel.hive.sink; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkReplaceNameConstant; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig; +import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants; import com.google.auto.service.AutoService; +import java.util.HashMap; +import java.util.Map; + @AutoService(Factory.class) -public class HiveSinkFactory implements TableSinkFactory { - @Override - public String factoryIdentifier() { - return "Hive"; - } +public class HiveSinkFactory + implements TableSinkFactory< + SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo> { @Override public OptionRule optionRule() { @@ -37,8 +50,70 @@ public OptionRule optionRule() { .required(HiveConfig.TABLE_NAME) .required(HiveConfig.METASTORE_URI) .optional(HiveConfig.ABORT_DROP_PARTITION_METADATA) + .optional(BaseSinkConfig.KERBEROS_PRINCIPAL) + .optional(BaseSinkConfig.KERBEROS_KEYTAB_PATH) + .optional(BaseSinkConfig.REMOTE_USER) .optional(HiveConfig.HADOOP_CONF) .optional(HiveConfig.HADOOP_CONF_PATH) .build(); } + + @Override + public TableSink + createSink(TableSinkFactoryContext context) { + ReadonlyConfig readonlyConfig = context.getOptions(); + CatalogTable catalogTable = context.getCatalogTable(); + + ReadonlyConfig finalReadonlyConfig = + generateCurrentReadonlyConfig(readonlyConfig, catalogTable); + return () -> new HiveSink(finalReadonlyConfig, catalogTable); + } + + @Override + public String factoryIdentifier() { + return HiveConstants.CONNECTOR_NAME; + } + + private ReadonlyConfig generateCurrentReadonlyConfig( + ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + + Map configMap = readonlyConfig.toMap(); + + readonlyConfig + .getOptional(HiveSinkOptions.TABLE_NAME) + .ifPresent( + tableName -> { + String replacedPath = + replaceCatalogTableInPath(tableName, catalogTable); + configMap.put(HiveSinkOptions.TABLE_NAME.key(), replacedPath); + }); + + return ReadonlyConfig.fromMap(new HashMap<>(configMap)); + } + + private String replaceCatalogTableInPath(String originTableName, CatalogTable catalogTable) { + String tableName = originTableName; + TableIdentifier tableIdentifier = catalogTable.getTableId(); + if (tableIdentifier != null) { + if (tableIdentifier.getDatabaseName() != null) { + tableName = + tableName.replace( + SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY, + tableIdentifier.getDatabaseName()); + } + if (tableIdentifier.getSchemaName() != null) { + tableName = + tableName.replace( + SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY, + tableIdentifier.getSchemaName()); + } + if (tableIdentifier.getTableName() != null) { + tableName = + tableName.replace( + SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY, + tableIdentifier.getTableName()); + } + } + return tableName; + } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java new file mode 100644 index 00000000000..a241717a448 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hive.sink; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.connectors.seatunnel.hive.config.BaseHiveOptions; + +public class HiveSinkOptions extends BaseHiveOptions { + + public static final Option ABORT_DROP_PARTITION_METADATA = + Options.key("abort_drop_partition_metadata") + .booleanType() + .defaultValue(false) + .withDescription( + "Flag to decide whether to drop partition metadata from Hive Metastore during an abort operation. Note: this only affects the metadata in the metastore, the data in the partition will always be deleted(data generated during the synchronization process)."); +} diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/writter/HiveSinkWriter.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/writter/HiveSinkWriter.java new file mode 100644 index 00000000000..a00f5590821 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/writter/HiveSinkWriter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hive.sink.writter; + +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; +import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; +import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy; + +import java.util.Collections; +import java.util.List; + +public class HiveSinkWriter extends BaseFileSinkWriter + implements SupportMultiTableSinkWriter { + + public HiveSinkWriter( + WriteStrategy writeStrategy, + HadoopConf hadoopConf, + Context context, + String jobId, + List fileSinkStates) { + // todo: do we need to set writeStrategy as share resource? then how to deal with the pre + // fileSinkStates? + super(writeStrategy, hadoopConf, context, jobId, fileSinkStates); + } + + public HiveSinkWriter( + WriteStrategy writeStrategy, + HadoopConf hadoopConf, + SinkWriter.Context context, + String jobId) { + this(writeStrategy, hadoopConf, context, jobId, Collections.emptyList()); + } +} diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java index f7642e36115..00f2bd0a8a5 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java @@ -17,213 +17,67 @@ package org.apache.seatunnel.connectors.seatunnel.hive.source; -import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; - -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; -import org.apache.seatunnel.api.table.type.SqlType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.common.exception.CommonError; -import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; -import org.apache.seatunnel.common.utils.JsonUtils; -import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; -import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.BaseHdfsFileSource; -import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig; -import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode; -import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException; -import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory; - -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit; +import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState; +import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants; +import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.hive.source.config.MultipleTableHiveSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.hive.source.reader.MultipleTableHiveSourceReader; +import org.apache.seatunnel.connectors.seatunnel.hive.source.split.MultipleTableHiveSourceSplitEnumerator; -import com.google.auto.service.AutoService; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; - -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; -import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions.FILE_FORMAT_TYPE; -import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions.FILE_PATH; -import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_INPUT_FORMAT_CLASSNAME; -import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_INPUT_FORMAT_CLASSNAME; -import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_INPUT_FORMAT_CLASSNAME; +import java.util.stream.Collectors; -@AutoService(SeaTunnelSource.class) public class HiveSource extends BaseHdfsFileSource { - private Table tableInformation; + + private final MultipleTableHiveSourceConfig multipleTableHiveSourceConfig; + + public HiveSource(ReadonlyConfig readonlyConfig) { + this.multipleTableHiveSourceConfig = new MultipleTableHiveSourceConfig(readonlyConfig); + } @Override public String getPluginName() { - return "Hive"; + return HiveConstants.CONNECTOR_NAME; } @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - CheckResult result = - CheckConfigUtil.checkAllExists( - pluginConfig, HiveConfig.METASTORE_URI.key(), HiveConfig.TABLE_NAME.key()); - if (!result.isSuccess()) { - throw new HiveConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SOURCE, result.getMsg())); - } - result = - CheckConfigUtil.checkAtLeastOneExists( - pluginConfig, - TableSchemaOptions.SCHEMA.key(), - FILE_FORMAT_TYPE.key(), - FILE_PATH.key(), - FS_DEFAULT_NAME_KEY); - if (result.isSuccess()) { - throw new HiveConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "Hive source connector does not support these setting [%s]", - String.join( - ",", - TableSchemaOptions.SCHEMA.key(), - FILE_FORMAT_TYPE.key(), - FILE_PATH.key(), - FS_DEFAULT_NAME_KEY))); - } - if (pluginConfig.hasPath(BaseSourceConfigOptions.READ_PARTITIONS.key())) { - // verify partition list - List partitionsList = - pluginConfig.getStringList(BaseSourceConfigOptions.READ_PARTITIONS.key()); - if (partitionsList.isEmpty()) { - throw new HiveConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - "Partitions list is empty, please check"); - } - int depth = partitionsList.get(0).replaceAll("\\\\", "/").split("/").length; - long count = - partitionsList.stream() - .map(partition -> partition.replaceAll("\\\\", "/").split("/").length) - .filter(length -> length != depth) - .count(); - if (count > 0) { - throw new HiveConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - "Every partition that in partition list should has the same directory depth"); - } - } - Pair tableInfo = HiveConfig.getTableInfo(pluginConfig); - tableInformation = tableInfo.getRight(); - String inputFormat = tableInformation.getSd().getInputFormat(); - if (TEXT_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) { - pluginConfig = - pluginConfig.withValue( - FILE_FORMAT_TYPE.key(), - ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString())); - // Build schema from hive table information - // Because the entrySet in typesafe config couldn't keep key-value order - // So use jackson to keep key-value order - Map schema = parseSchema(tableInformation); - ConfigRenderOptions options = ConfigRenderOptions.concise(); - String render = pluginConfig.root().render(options); - ObjectNode jsonNodes = JsonUtils.parseObject(render); - jsonNodes.putPOJO(TableSchemaOptions.SCHEMA.key(), schema); - pluginConfig = ConfigFactory.parseString(jsonNodes.toString()); - } else if (PARQUET_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) { - pluginConfig = - pluginConfig.withValue( - FILE_FORMAT_TYPE.key(), - ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString())); - } else if (ORC_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) { - pluginConfig = - pluginConfig.withValue( - FILE_FORMAT_TYPE.key(), - ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString())); - } else { - throw new HiveConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - "Hive connector only support [text parquet orc] table now"); - } - String hiveSdLocation = tableInformation.getSd().getLocation(); - try { - URI uri = new URI(hiveSdLocation); - String path = uri.getPath(); - String defaultFs = hiveSdLocation.replace(path, ""); - pluginConfig = - pluginConfig - .withValue( - BaseSourceConfigOptions.FILE_PATH.key(), - ConfigValueFactory.fromAnyRef(path)) - .withValue( - FS_DEFAULT_NAME_KEY, ConfigValueFactory.fromAnyRef(defaultFs)); - /** - * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop conf can be - * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy can obtain the - * correct Schema and FsHdfsImpl that can be filled into hadoop configuration in {@link - * org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()} - */ - hadoopConf = - StorageFactory.getStorageType(hiveSdLocation) - .buildHadoopConfWithReadOnlyConfig( - ReadonlyConfig.fromConfig(pluginConfig)); - } catch (URISyntaxException e) { - String errorMsg = - String.format( - "Get hdfs namenode host from table location [%s] failed," - + "please check it", - hiveSdLocation); - throw new HiveConnectorException( - HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED, errorMsg, e); - } - super.prepare(pluginConfig); + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; } - private Map parseSchema(Table table) { - LinkedHashMap fields = new LinkedHashMap<>(); - LinkedHashMap schema = new LinkedHashMap<>(); - List cols = table.getSd().getCols(); - for (FieldSchema col : cols) { - String name = col.getName(); - String type = col.getType(); - fields.put(name, covertHiveTypeToSeaTunnelType(name, type)); - } - schema.put("fields", fields); - return schema; + @Override + public List getProducedCatalogTables() { + return multipleTableHiveSourceConfig.getHiveSourceConfigs().stream() + .map(HiveSourceConfig::getCatalogTable) + .collect(Collectors.toList()); } - private Object covertHiveTypeToSeaTunnelType(String name, String hiveType) { - if (hiveType.contains("varchar")) { - return SqlType.STRING; - } - if (hiveType.contains("char")) { - throw CommonError.convertToSeaTunnelTypeError( - getPluginName(), PluginType.SOURCE, hiveType, name); - } - if (hiveType.contains("binary")) { - return SqlType.BYTES.name(); - } - if (hiveType.contains("struct")) { - LinkedHashMap fields = new LinkedHashMap<>(); - int start = hiveType.indexOf("<"); - int end = hiveType.lastIndexOf(">"); - String[] columns = hiveType.substring(start + 1, end).split(","); - for (String column : columns) { - String[] splits = column.split(":"); - fields.put(splits[0], covertHiveTypeToSeaTunnelType(splits[0], splits[1])); - } - return fields; - } - return hiveType; + @Override + public SourceReader createReader( + SourceReader.Context readerContext) { + return new MultipleTableHiveSourceReader(readerContext, multipleTableHiveSourceConfig); + } + + @Override + public SourceSplitEnumerator createEnumerator( + SourceSplitEnumerator.Context enumeratorContext) { + return new MultipleTableHiveSourceSplitEnumerator( + enumeratorContext, multipleTableHiveSourceConfig); + } + + @Override + public SourceSplitEnumerator restoreEnumerator( + SourceSplitEnumerator.Context enumeratorContext, + FileSourceState checkpointState) { + return new MultipleTableHiveSourceSplitEnumerator( + enumeratorContext, multipleTableHiveSourceConfig, checkpointState); } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java index afa1ae0e366..07adfef106f 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java @@ -19,27 +19,44 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig; +import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants; +import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions; import com.google.auto.service.AutoService; +import java.io.Serializable; + @AutoService(Factory.class) public class HiveSourceFactory implements TableSourceFactory { @Override public String factoryIdentifier() { - return "Hive"; + return HiveConstants.CONNECTOR_NAME; + } + + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> (SeaTunnelSource) new HiveSource(context.getOptions()); } @Override public OptionRule optionRule() { return OptionRule.builder() - .required(HiveConfig.TABLE_NAME) - .required(HiveConfig.METASTORE_URI) + .optional(HiveConfig.TABLE_NAME) + .optional(HiveConfig.METASTORE_URI) + .optional(HiveSourceOptions.TABLE_CONFIGS) .optional(BaseSourceConfigOptions.READ_PARTITIONS) .optional(BaseSourceConfigOptions.READ_COLUMNS) + .optional(BaseSourceConfigOptions.KERBEROS_PRINCIPAL) + .optional(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH) + .optional(BaseSourceConfigOptions.REMOTE_USER) .optional(HiveConfig.HADOOP_CONF) .optional(HiveConfig.HADOOP_CONF_PATH) .build(); diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java new file mode 100644 index 00000000000..203491dcf98 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hive.source.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; + +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory; +import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants; +import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException; +import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory; +import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableUtils; +import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTypeConvertor; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; + +import lombok.Getter; +import lombok.SneakyThrows; + +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER; +import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE; +import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.ROW_DELIMITER; + +@Getter +public class HiveSourceConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private final Table table; + private final CatalogTable catalogTable; + private final FileFormat fileFormat; + private final ReadStrategy readStrategy; + private final List filePaths; + private final HadoopConf hadoopConf; + + @SneakyThrows + public HiveSourceConfig(ReadonlyConfig readonlyConfig) { + readonlyConfig + .getOptional(HdfsSourceConfigOptions.READ_PARTITIONS) + .ifPresent(this::validatePartitions); + this.table = HiveTableUtils.getTableInfo(readonlyConfig); + this.hadoopConf = parseHiveHadoopConfig(readonlyConfig, table); + this.fileFormat = HiveTableUtils.parseFileFormat(table); + this.readStrategy = parseReadStrategy(table, readonlyConfig, fileFormat, hadoopConf); + this.filePaths = parseFilePaths(table, readStrategy); + this.catalogTable = + parseCatalogTable( + readonlyConfig, readStrategy, fileFormat, hadoopConf, filePaths, table); + } + + private void validatePartitions(List partitionsList) { + if (CollectionUtils.isEmpty(partitionsList)) { + throw new HiveConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + "Partitions list is empty, please check"); + } + int depth = partitionsList.get(0).replaceAll("\\\\", "/").split("/").length; + long count = + partitionsList.stream() + .map(partition -> partition.replaceAll("\\\\", "/").split("/").length) + .filter(length -> length != depth) + .count(); + if (count > 0) { + throw new HiveConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + "Every partition that in partition list should has the same directory depth"); + } + } + + private ReadStrategy parseReadStrategy( + Table table, + ReadonlyConfig readonlyConfig, + FileFormat fileFormat, + HadoopConf hadoopConf) { + + ReadStrategy readStrategy = ReadStrategyFactory.of(fileFormat.name()); + Config config = readonlyConfig.toConfig(); + + switch (fileFormat) { + case TEXT: + // if the file format is text, we set the delim. + Map parameters = table.getSd().getSerdeInfo().getParameters(); + config = + config.withValue( + FIELD_DELIMITER.key(), + ConfigValueFactory.fromAnyRef( + parameters.get("field.delim"))) + .withValue( + ROW_DELIMITER.key(), + ConfigValueFactory.fromAnyRef(parameters.get("line.delim"))) + .withValue( + FILE_FORMAT_TYPE.key(), + ConfigValueFactory.fromAnyRef(FileFormat.TEXT.name())); + break; + case ORC: + config = + config.withValue( + FILE_FORMAT_TYPE.key(), + ConfigValueFactory.fromAnyRef(FileFormat.ORC.name())); + break; + case PARQUET: + config = + config.withValue( + FILE_FORMAT_TYPE.key(), + ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.name())); + break; + default: + } + readStrategy.setPluginConfig(config); + readStrategy.init(hadoopConf); + return readStrategy; + } + + private HadoopConf parseHiveHadoopConfig(ReadonlyConfig readonlyConfig, Table table) { + String hiveSdLocation = table.getSd().getLocation(); + /** + * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop conf can be + * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy can obtain the correct + * Schema and FsHdfsImpl that can be filled into hadoop configuration in {@link + * org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()} + */ + HadoopConf hadoopConf = + StorageFactory.getStorageType(hiveSdLocation) + .buildHadoopConfWithReadOnlyConfig(readonlyConfig); + readonlyConfig + .getOptional(HdfsSourceConfigOptions.HDFS_SITE_PATH) + .ifPresent(hadoopConf::setHdfsSitePath); + readonlyConfig + .getOptional(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL) + .ifPresent(hadoopConf::setKerberosPrincipal); + readonlyConfig + .getOptional(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH) + .ifPresent(hadoopConf::setKerberosKeytabPath); + readonlyConfig + .getOptional(HdfsSourceConfigOptions.REMOTE_USER) + .ifPresent(hadoopConf::setRemoteUser); + return hadoopConf; + } + + private List parseFilePaths(Table table, ReadStrategy readStrategy) { + String hdfsPath = parseHdfsPath(table); + try { + return readStrategy.getFileNamesByPath(hdfsPath); + } catch (Exception e) { + String errorMsg = String.format("Get file list from this path [%s] failed", hdfsPath); + throw new FileConnectorException( + FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e); + } + } + + private String parseFsDefaultName(Table table) { + String hdfsLocation = table.getSd().getLocation(); + try { + URI uri = new URI(hdfsLocation); + String path = uri.getPath(); + return hdfsLocation.replace(path, ""); + } catch (URISyntaxException e) { + String errorMsg = + String.format( + "Get hdfs namenode host from table location [%s] failed," + + "please check it", + hdfsLocation); + throw new HiveConnectorException( + HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED, errorMsg, e); + } + } + + private String parseHdfsPath(Table table) { + String hdfsLocation = table.getSd().getLocation(); + try { + URI uri = new URI(hdfsLocation); + return uri.getPath(); + } catch (URISyntaxException e) { + String errorMsg = + String.format( + "Get hdfs namenode host from table location [%s] failed," + + "please check it", + hdfsLocation); + throw new HiveConnectorException( + HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED, errorMsg, e); + } + } + + private CatalogTable parseCatalogTable( + ReadonlyConfig readonlyConfig, + ReadStrategy readStrategy, + FileFormat fileFormat, + HadoopConf hadoopConf, + List filePaths, + Table table) { + switch (fileFormat) { + case PARQUET: + case ORC: + return parseCatalogTableFromRemotePath( + readonlyConfig, hadoopConf, filePaths, table); + case TEXT: + return parseCatalogTableFromTable(readonlyConfig, readStrategy, table); + default: + throw new HiveConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + "Hive connector only support [text parquet orc] table now"); + } + } + + private CatalogTable parseCatalogTableFromRemotePath( + ReadonlyConfig readonlyConfig, + HadoopConf hadoopConf, + List filePaths, + Table table) { + if (CollectionUtils.isEmpty(filePaths)) { + // When the directory is empty, distribute default behavior schema + return buildEmptyCatalogTable(readonlyConfig, table); + } + CatalogTable catalogTable = buildEmptyCatalogTable(readonlyConfig, table); + try { + SeaTunnelRowType seaTunnelRowTypeInfo = + readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0)); + return CatalogTableUtil.newCatalogTable(catalogTable, seaTunnelRowTypeInfo); + } catch (FileConnectorException e) { + String errorMsg = + String.format("Get table schema from file [%s] failed", filePaths.get(0)); + throw new FileConnectorException( + CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, errorMsg, e); + } + } + + private CatalogTable parseCatalogTableFromTable( + ReadonlyConfig readonlyConfig, ReadStrategy readStrategy, Table table) { + List cols = table.getSd().getCols(); + String[] fieldNames = new String[cols.size()]; + SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[cols.size()]; + for (int i = 0; i < cols.size(); i++) { + FieldSchema col = cols.get(i); + fieldNames[i] = col.getName(); + fieldTypes[i] = + HiveTypeConvertor.covertHiveTypeToSeaTunnelType(col.getName(), col.getType()); + } + + SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames, fieldTypes); + readStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType); + final SeaTunnelRowType finalSeatunnelRowType = readStrategy.getActualSeaTunnelRowTypeInfo(); + + CatalogTable catalogTable = buildEmptyCatalogTable(readonlyConfig, table); + return CatalogTableUtil.newCatalogTable(catalogTable, finalSeatunnelRowType); + } + + private CatalogTable buildEmptyCatalogTable(ReadonlyConfig readonlyConfig, Table table) { + TablePath tablePath = TablePath.of(table.getDbName(), table.getTableName()); + return CatalogTable.of( + TableIdentifier.of(HiveConstants.CONNECTOR_NAME, tablePath), + TableSchema.builder().build(), + new HashMap<>(), + new ArrayList<>(), + readonlyConfig.get(TableSchemaOptions.TableIdentifierOptions.COMMENT)); + } +} diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java new file mode 100644 index 00000000000..c30cb1783d8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hive.source.config; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.connectors.seatunnel.hive.config.BaseHiveOptions; + +import java.util.List; +import java.util.Map; + +public class HiveSourceOptions extends BaseHiveOptions { + public static final Option>> TABLE_CONFIGS = + Options.key("tables_configs") + .type(new TypeReference>>() {}) + .noDefaultValue() + .withDescription( + "Local file source configs, used to create multiple local file source."); +} diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java new file mode 100644 index 00000000000..9db899ca8c7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hive.source.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import com.google.common.collect.Lists; +import lombok.Getter; + +import java.io.Serializable; +import java.util.List; +import java.util.stream.Collectors; + +public class MultipleTableHiveSourceConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + @Getter private List hiveSourceConfigs; + + public MultipleTableHiveSourceConfig(ReadonlyConfig readonlyConfig) { + if (readonlyConfig.getOptional(HiveSourceOptions.TABLE_CONFIGS).isPresent()) { + parseFromLocalFileSourceConfigs(readonlyConfig); + } else { + parseFromLocalFileSourceConfig(readonlyConfig); + } + } + + private void parseFromLocalFileSourceConfigs(ReadonlyConfig readonlyConfig) { + this.hiveSourceConfigs = + readonlyConfig.get(HiveSourceOptions.TABLE_CONFIGS).stream() + .map(ReadonlyConfig::fromMap) + .map(HiveSourceConfig::new) + .collect(Collectors.toList()); + } + + private void parseFromLocalFileSourceConfig(ReadonlyConfig localFileSourceRootConfig) { + HiveSourceConfig hiveSourceConfig = new HiveSourceConfig(localFileSourceRootConfig); + this.hiveSourceConfigs = Lists.newArrayList(hiveSourceConfig); + } +} diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java new file mode 100644 index 00000000000..9ea6f1e632e --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hive.source.reader; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy; +import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit; +import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.hive.source.config.MultipleTableHiveSourceConfig; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode.FILE_READ_FAILED; +import static org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode.FILE_READ_STRATEGY_NOT_SUPPORT; + +@Slf4j +public class MultipleTableHiveSourceReader implements SourceReader { + + private final SourceReader.Context context; + private volatile boolean noMoreSplit; + + private final Deque sourceSplits = new ConcurrentLinkedDeque<>(); + + private final Map readStrategyMap; + + public MultipleTableHiveSourceReader( + SourceReader.Context context, + MultipleTableHiveSourceConfig multipleTableHiveSourceConfig) { + this.context = context; + this.readStrategyMap = + multipleTableHiveSourceConfig.getHiveSourceConfigs().stream() + .collect( + Collectors.toMap( + localFileSourceConfig -> + localFileSourceConfig + .getCatalogTable() + .getTableId() + .toTablePath() + .toString(), + HiveSourceConfig::getReadStrategy)); + } + + @Override + public void pollNext(Collector output) { + synchronized (output.getCheckpointLock()) { + FileSourceSplit split = sourceSplits.poll(); + if (null != split) { + ReadStrategy readStrategy = readStrategyMap.get(split.getTableId()); + if (readStrategy == null) { + throw new FileConnectorException( + FILE_READ_STRATEGY_NOT_SUPPORT, + "Cannot found the read strategy for this table: [" + + split.getTableId() + + "]"); + } + try { + readStrategy.read(split.getFilePath(), split.getTableId(), output); + } catch (Exception e) { + String errorMsg = + String.format("Read data from this file [%s] failed", split.splitId()); + throw new FileConnectorException(FILE_READ_FAILED, errorMsg, e); + } + } else if (noMoreSplit && sourceSplits.isEmpty()) { + // signal to the source that we have reached the end of the data. + log.info( + "There is no more element for the bounded MultipleTableLocalFileSourceReader"); + context.signalNoMoreElement(); + } + } + } + + @Override + public List snapshotState(long checkpointId) { + return new ArrayList<>(sourceSplits); + } + + @Override + public void addSplits(List splits) { + sourceSplits.addAll(splits); + } + + @Override + public void handleNoMoreSplits() { + noMoreSplit = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + // do nothing + } + + @Override + public void open() throws Exception { + // do nothing + log.info("Opened the MultipleTableHiveSourceReader"); + } + + @Override + public void close() throws IOException { + // do nothing + log.info("Closed the MultipleTableHiveSourceReader"); + } +} diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/HiveSourceSplit.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/HiveSourceSplit.java new file mode 100644 index 00000000000..58ebb808d50 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/HiveSourceSplit.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hive.source.split; + +import org.apache.seatunnel.api.source.SourceSplit; + +import lombok.Getter; + +public class HiveSourceSplit implements SourceSplit { + + private static final long serialVersionUID = 1L; + + @Getter private final String tableId; + @Getter private final String filePath; + + public HiveSourceSplit(String tableId, String filePath) { + this.tableId = tableId; + this.filePath = filePath; + } + + @Override + public String splitId() { + return tableId + "_" + filePath; + } +} diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java new file mode 100644 index 00000000000..aa4701e37b8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hive.source.split; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit; +import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState; +import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.hive.source.config.MultipleTableHiveSourceConfig; + +import org.apache.commons.collections4.CollectionUtils; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@Slf4j +public class MultipleTableHiveSourceSplitEnumerator + implements SourceSplitEnumerator { + + private final SourceSplitEnumerator.Context context; + private final Set pendingSplit; + private final Set assignedSplit; + private final Map> filePathMap; + + public MultipleTableHiveSourceSplitEnumerator( + SourceSplitEnumerator.Context context, + MultipleTableHiveSourceConfig multipleTableLocalFileSourceConfig) { + this.context = context; + this.filePathMap = + multipleTableLocalFileSourceConfig.getHiveSourceConfigs().stream() + .collect( + Collectors.toMap( + localFileSourceConfig -> + localFileSourceConfig + .getCatalogTable() + .getTableId() + .toTablePath() + .toString(), + HiveSourceConfig::getFilePaths)); + this.assignedSplit = new HashSet<>(); + this.pendingSplit = new HashSet<>(); + } + + public MultipleTableHiveSourceSplitEnumerator( + SourceSplitEnumerator.Context context, + MultipleTableHiveSourceConfig multipleTableLocalFileSourceConfig, + FileSourceState localFileSourceState) { + this(context, multipleTableLocalFileSourceConfig); + this.assignedSplit.addAll(localFileSourceState.getAssignedSplit()); + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + if (CollectionUtils.isEmpty(splits)) { + return; + } + pendingSplit.addAll(splits); + assignSplit(subtaskId); + } + + @Override + public int currentUnassignedSplitSize() { + return pendingSplit.size(); + } + + @Override + public void handleSplitRequest(int subtaskId) {} + + @Override + public void registerReader(int subtaskId) { + for (Map.Entry> filePathEntry : filePathMap.entrySet()) { + String tableId = filePathEntry.getKey(); + List filePaths = filePathEntry.getValue(); + for (String filePath : filePaths) { + pendingSplit.add(new FileSourceSplit(tableId, filePath)); + } + } + assignSplit(subtaskId); + } + + @Override + public FileSourceState snapshotState(long checkpointId) { + return new FileSourceState(assignedSplit); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + // do nothing. + } + + private void assignSplit(int taskId) { + List currentTaskSplits = new ArrayList<>(); + if (context.currentParallelism() == 1) { + // if parallelism == 1, we should assign all the splits to reader + currentTaskSplits.addAll(pendingSplit); + } else { + // if parallelism > 1, according to hashCode of split's id to determine whether to + // allocate the current task + for (FileSourceSplit fileSourceSplit : pendingSplit) { + int splitOwner = + getSplitOwner(fileSourceSplit.splitId(), context.currentParallelism()); + if (splitOwner == taskId) { + currentTaskSplits.add(fileSourceSplit); + } + } + } + // assign splits + context.assignSplit(taskId, currentTaskSplits); + // save the state of assigned splits + assignedSplit.addAll(currentTaskSplits); + // remove the assigned splits from pending splits + currentTaskSplits.forEach(pendingSplit::remove); + log.info( + "SubTask {} is assigned to [{}]", + taskId, + currentTaskSplits.stream() + .map(FileSourceSplit::splitId) + .collect(Collectors.joining(","))); + context.signalNoMoreSplits(taskId); + } + + private static int getSplitOwner(String tp, int numReaders) { + return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; + } + + @Override + public void open() { + // do nothing + } + + @Override + public void run() throws Exception { + // do nothing + } + + @Override + public void close() throws IOException { + // do nothing + } +} diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/state/HiveSourceState.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/state/HiveSourceState.java new file mode 100644 index 00000000000..cd9da5351e6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/state/HiveSourceState.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hive.source.state; + +import org.apache.seatunnel.connectors.seatunnel.hive.source.split.HiveSourceSplit; + +import java.io.Serializable; +import java.util.Set; + +public class HiveSourceState implements Serializable { + + private static final long serialVersionUID = 1L; + + private final Set assignedSplit; + + public HiveSourceState(Set assignedSplit) { + this.assignedSplit = assignedSplit; + } + + public Set getAssignedSplit() { + return assignedSplit; + } +} diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java index 6a1288b661e..b3c463d8042 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java @@ -17,15 +17,14 @@ package org.apache.seatunnel.connectors.seatunnel.hive.utils; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.common.config.TypesafeConfigUtils; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopLoginFactory; -import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException; +import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -47,36 +46,31 @@ public class HiveMetaStoreProxy { private HiveMetaStoreClient hiveMetaStoreClient; private static volatile HiveMetaStoreProxy INSTANCE = null; - private HiveMetaStoreProxy(Config config) { - String metastoreUri = config.getString(HiveConfig.METASTORE_URI.key()); - + private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) { + String metastoreUri = readonlyConfig.get(HiveSourceOptions.METASTORE_URI); + HiveConf hiveConf = new HiveConf(); + hiveConf.set("hive.metastore.uris", metastoreUri); try { - HiveConf hiveConf = new HiveConf(); - hiveConf.set("hive.metastore.uris", metastoreUri); - if (config.hasPath(HiveConfig.HIVE_SITE_PATH.key())) { - String hiveSitePath = config.getString(HiveConfig.HIVE_SITE_PATH.key()); + if (StringUtils.isNotEmpty(readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH))) { + String hiveSitePath = readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH); hiveConf.addResource(new File(hiveSitePath).toURI().toURL()); } - if (HiveMetaStoreProxyUtils.enableKerberos(config)) { + if (HiveMetaStoreProxyUtils.enableKerberos(readonlyConfig)) { this.hiveMetaStoreClient = HadoopLoginFactory.loginWithKerberos( new Configuration(), - TypesafeConfigUtils.getConfig( - config, - BaseSourceConfigOptions.KRB5_PATH.key(), - BaseSourceConfigOptions.KRB5_PATH.defaultValue()), - config.getString(BaseSourceConfigOptions.KERBEROS_PRINCIPAL.key()), - config.getString( - BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH.key()), + readonlyConfig.get(BaseSourceConfigOptions.KRB5_PATH), + readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_PRINCIPAL), + readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH), (configuration, userGroupInformation) -> new HiveMetaStoreClient(hiveConf)); return; } - if (HiveMetaStoreProxyUtils.enableRemoteUser(config)) { + if (HiveMetaStoreProxyUtils.enableRemoteUser(readonlyConfig)) { this.hiveMetaStoreClient = HadoopLoginFactory.loginWithRemoteUser( new Configuration(), - config.getString(BaseSourceConfigOptions.REMOTE_USER.key()), + readonlyConfig.get(BaseSourceConfigOptions.REMOTE_USER), (configuration, userGroupInformation) -> new HiveMetaStoreClient(hiveConf)); return; @@ -95,7 +89,7 @@ private HiveMetaStoreProxy(Config config) { String.format( "Using this hive uris [%s], hive conf [%s] to initialize " + "hive metastore client instance failed", - metastoreUri, config.getString(HiveConfig.HIVE_SITE_PATH.key())); + metastoreUri, readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH)); throw new HiveConnectorException( HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e); } catch (Exception e) { @@ -106,11 +100,11 @@ private HiveMetaStoreProxy(Config config) { } } - public static HiveMetaStoreProxy getInstance(Config config) { + public static HiveMetaStoreProxy getInstance(ReadonlyConfig readonlyConfig) { if (INSTANCE == null) { synchronized (HiveMetaStoreProxy.class) { if (INSTANCE == null) { - INSTANCE = new HiveMetaStoreProxy(config); + INSTANCE = new HiveMetaStoreProxy(readonlyConfig); } } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java index fda221d886f..f1474f7694b 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java @@ -17,8 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.hive.utils; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; import lombok.experimental.UtilityClass; @@ -26,11 +25,11 @@ @UtilityClass public class HiveMetaStoreProxyUtils { - public boolean enableKerberos(Config config) { + public boolean enableKerberos(ReadonlyConfig config) { boolean kerberosPrincipalEmpty = - config.hasPath(BaseSourceConfigOptions.KERBEROS_PRINCIPAL.key()); + config.getOptional(BaseSourceConfigOptions.KERBEROS_PRINCIPAL).isPresent(); boolean kerberosKeytabPathEmpty = - config.hasPath(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH.key()); + config.getOptional(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH).isPresent(); if (kerberosKeytabPathEmpty && kerberosPrincipalEmpty) { return true; } @@ -43,7 +42,7 @@ public boolean enableKerberos(Config config) { throw new IllegalArgumentException("Please set kerberosKeytabPath"); } - public boolean enableRemoteUser(Config config) { - return config.hasPath(BaseSourceConfigOptions.REMOTE_USER.key()); + public boolean enableRemoteUser(ReadonlyConfig config) { + return config.getOptional(BaseSourceConfigOptions.REMOTE_USER).isPresent(); } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java new file mode 100644 index 00000000000..e4282db204b --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hive.utils; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; +import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants; +import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException; +import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions; + +import org.apache.hadoop.hive.metastore.api.Table; + +public class HiveTableUtils { + + public static Table getTableInfo(ReadonlyConfig readonlyConfig) { + String table = readonlyConfig.get(HiveSourceOptions.TABLE_NAME); + TablePath tablePath = TablePath.of(table); + if (tablePath.getDatabaseName() == null || tablePath.getTableName() == null) { + throw new SeaTunnelRuntimeException( + HiveConnectorErrorCode.HIVE_TABLE_NAME_ERROR, "Current table name is " + table); + } + HiveMetaStoreProxy hiveMetaStoreProxy = HiveMetaStoreProxy.getInstance(readonlyConfig); + Table tableInformation = + hiveMetaStoreProxy.getTable(tablePath.getDatabaseName(), tablePath.getTableName()); + hiveMetaStoreProxy.close(); + return tableInformation; + } + + public static FileFormat parseFileFormat(Table table) { + String inputFormat = table.getSd().getInputFormat(); + if (HiveConstants.TEXT_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) { + return FileFormat.TEXT; + } + if (HiveConstants.PARQUET_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) { + return FileFormat.PARQUET; + } + if (HiveConstants.ORC_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) { + return FileFormat.ORC; + } + throw new HiveConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + "Hive connector only support [text parquet orc] table now"); + } +} diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertor.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertor.java new file mode 100644 index 00000000000..7aac96c978e --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hive.utils; + +import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants; + +import java.util.LinkedHashMap; + +public class HiveTypeConvertor { + + public static SeaTunnelDataType covertHiveTypeToSeaTunnelType(String name, String hiveType) { + if (hiveType.contains("varchar")) { + return BasicType.STRING_TYPE; + } + if (hiveType.contains("char")) { + throw CommonError.convertToSeaTunnelTypeError( + HiveConstants.CONNECTOR_NAME, PluginType.SOURCE, hiveType, name); + } + if (hiveType.contains("binary")) { + return PrimitiveByteArrayType.INSTANCE; + } + if (hiveType.contains("struct")) { + LinkedHashMap fields = new LinkedHashMap<>(); + int start = hiveType.indexOf("<"); + int end = hiveType.lastIndexOf(">"); + String[] columns = hiveType.substring(start + 1, end).split(","); + for (String column : columns) { + String[] splits = column.split(":"); + fields.put( + splits[0], covertHiveTypeToSeaTunnelType(splits[0], splits[1]).toString()); + } + return SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType( + name, JsonUtils.toJsonString(fields)); + } + return SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(name, hiveType); + } +} diff --git a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/source/TypeConvertTest.java b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/source/TypeConvertTest.java deleted file mode 100644 index 3810085b36a..00000000000 --- a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/source/TypeConvertTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.hive.source; - -import org.apache.seatunnel.common.utils.ReflectionUtils; - -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Optional; - -public class TypeConvertTest { - - @Test - void testWithUnsupportedType() { - Optional parseSchema = - ReflectionUtils.getDeclaredMethod(HiveSource.class, "parseSchema", Table.class); - Assertions.assertTrue(parseSchema.isPresent()); - Table table = new Table(); - table.setSd(new StorageDescriptor()); - table.getSd().addToCols(new FieldSchema("test", "char", null)); - InvocationTargetException exception = - Assertions.assertThrows( - InvocationTargetException.class, - () -> parseSchema.get().invoke(new HiveSource(), table)); - Assertions.assertEquals( - "ErrorCode:[COMMON-16], ErrorDescription:['Hive' source unsupported convert type 'char' of 'test' to SeaTunnel data type.]", - exception.getCause().getMessage()); - } -} diff --git a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java index ad952112c44..eb0afe9d4ad 100644 --- a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java +++ b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + import org.junit.jupiter.api.Test; import lombok.SneakyThrows; @@ -35,7 +37,7 @@ class HiveMetaStoreProxyUtilsTest { @Test void enableKerberos() { - Config config = parseConfig("/hive_without_kerberos.conf"); + ReadonlyConfig config = parseConfig("/hive_without_kerberos.conf"); assertFalse(HiveMetaStoreProxyUtils.enableKerberos(config)); assertFalse(HiveMetaStoreProxyUtils.enableRemoteUser(config)); @@ -48,9 +50,10 @@ void enableKerberos() { } @SneakyThrows - private Config parseConfig(String configFile) { + private ReadonlyConfig parseConfig(String configFile) { URL resource = HiveMetaStoreProxyUtilsTest.class.getResource(configFile); String filePath = Paths.get(resource.toURI()).toString(); - return ConfigFactory.parseFile(new File(filePath)); + Config config = ConfigFactory.parseFile(new File(filePath)); + return ReadonlyConfig.fromConfig(config); } } diff --git a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertorTest.java b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertorTest.java new file mode 100644 index 00000000000..ef87e083a46 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertorTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hive.utils; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class HiveTypeConvertorTest { + + @Test + void covertHiveTypeToSeaTunnelType() { + SeaTunnelRuntimeException exception = + Assertions.assertThrows( + SeaTunnelRuntimeException.class, + () -> HiveTypeConvertor.covertHiveTypeToSeaTunnelType("test", "char")); + assertEquals( + "ErrorCode:[COMMON-16], ErrorDescription:['Hive' source unsupported convert type 'char' of 'test' to SeaTunnel data type.]", + exception.getMessage()); + } + + @Test + void convertHiveStructType() { + SeaTunnelDataType structType = + HiveTypeConvertor.covertHiveTypeToSeaTunnelType( + "structType", "struct"); + assertEquals(SqlType.ROW, structType.getSqlType()); + SeaTunnelRowType seaTunnelRowType = (SeaTunnelRowType) structType; + assertEquals(BasicType.STRING_TYPE, seaTunnelRowType.getFieldType(0)); + assertEquals(BasicType.STRING_TYPE, seaTunnelRowType.getFieldType(0)); + } +} diff --git a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java index 5b46d812012..6d59ff27f56 100644 --- a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java +++ b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java @@ -64,6 +64,7 @@ public void testAllConnectorImplementFactoryWithUpToDateMethod() throws ClassNot // hive-exec.jar. We need to check manually. List blockList = new ArrayList<>(); blockList.add("HiveSourceFactory"); + blockList.add("HiveSinkFactory"); for (TableSourceFactory factory : sourceFactories) { if (ReflectionUtils.getDeclaredMethod( diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-examples/seatunnel-engine-examples/pom.xml index 3256bdde885..1f22a56658c 100644 --- a/seatunnel-examples/seatunnel-engine-examples/pom.xml +++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml @@ -67,5 +67,55 @@ connector-assert ${project.version} + + + org.apache.seatunnel + connector-hive + ${project.version} + + + + org.apache.hive + hive-exec + 2.3.9 + + + log4j + log4j + + + org.apache.logging.log4j + log4j-1.2-api + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.logging.log4j + log4j-web + + + org.slf4j + slf4j-log4j12 + + + org.apache.parquet + parquet-hadoop-bundle + + + jdk.tools + jdk.tools + + + org.pentaho + pentaho-aggdesigner-algorithm + + + org.apache.avro + avro + + +