From d933aedad4e66287d72b0c2739ba861e54a2fa4a Mon Sep 17 00:00:00 2001 From: Nian Liu Date: Wed, 23 Oct 2024 11:49:48 +0800 Subject: [PATCH] update milvus connector to support dynamic schema, failed retry, etc. --- .../api/table/catalog/PhysicalColumn.java | 16 +- .../api/table/type/SeaTunnelRow.java | 5 + .../common/constants/CommonOptions.java | 15 + .../connector-milvus/pom.xml | 25 +- .../milvus/catalog/MilvusCatalog.java | 170 +++---- .../milvus/catalog/MilvusOptions.java | 4 + .../milvus/config/MilvusSinkConfig.java | 27 ++ .../milvus/config/MilvusSourceConfig.java | 12 + .../milvus/convert/MilvusConvertUtils.java | 417 ------------------ .../exception/MilvusConnectionErrorCode.java | 7 +- .../milvus/sink/MilvusBufferBatchWriter.java | 344 +++++++++++++++ .../seatunnel/milvus/sink/MilvusSink.java | 4 +- .../milvus/sink/MilvusSinkWriter.java | 59 +-- .../milvus/sink/batch/MilvusBatchWriter.java | 31 -- .../sink/batch/MilvusBufferBatchWriter.java | 148 ------- .../seatunnel/milvus/source/MilvusSource.java | 9 +- .../milvus/source/MilvusSourceReader.java | 255 ++++++----- .../milvus/source/MilvusSourceSplit.java | 1 + .../source/MilvusSourceSplitEnumertor.java | 72 ++- .../milvus/utils/MilvusConnectorUtils.java | 72 +++ .../milvus/utils/MilvusConvertUtils.java | 278 ++++++++++++ .../utils/sink/MilvusSinkConverter.java | 294 ++++++++++++ .../utils/source/MilvusSourceConverter.java | 364 +++++++++++++++ .../fieldmapper/FieldMapperTransform.java | 7 +- 24 files changed, 1771 insertions(+), 865 deletions(-) create mode 100644 seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CommonOptions.java delete mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java create mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java delete mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java delete mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java create mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConnectorUtils.java create mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java create mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/sink/MilvusSinkConverter.java create mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/source/MilvusSourceConverter.java diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java index db9da1b2b75..2a425000222 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java @@ -215,11 +215,25 @@ public static PhysicalColumn of( String comment, String sourceType, Map options) { + return new PhysicalColumn( + name, dataType, columnLength, nullable, defaultValue, comment, sourceType, options); + } + + public static PhysicalColumn of( + String name, + SeaTunnelDataType dataType, + Long columnLength, + Integer scale, + boolean nullable, + Object defaultValue, + String comment, + String sourceType, + Map options) { return new PhysicalColumn( name, dataType, columnLength, - null, + scale, nullable, defaultValue, comment, diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java index 10a5b33a935..4d5fd08b6dd 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.api.table.type; +import lombok.Data; + import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; @@ -24,6 +26,7 @@ import java.util.Objects; /** SeaTunnel row type. */ +@Data public final class SeaTunnelRow implements Serializable { private static final long serialVersionUID = -1L; /** Table identifier. */ @@ -35,6 +38,8 @@ public final class SeaTunnelRow implements Serializable { private volatile int size; + private String partitionName; + public SeaTunnelRow(int arity) { this.fields = new Object[arity]; } diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CommonOptions.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CommonOptions.java new file mode 100644 index 00000000000..2f222825325 --- /dev/null +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CommonOptions.java @@ -0,0 +1,15 @@ +package org.apache.seatunnel.common.constants; + +import lombok.Getter; + +@Getter +public enum CommonOptions { + JSON("Json"), + METADATA("Metadata"); + + private final String name; + + CommonOptions(String value) { + this.name = value; + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/pom.xml b/seatunnel-connectors-v2/connector-milvus/pom.xml index fc972ce1968..9a5fed37ab2 100644 --- a/seatunnel-connectors-v2/connector-milvus/pom.xml +++ b/seatunnel-connectors-v2/connector-milvus/pom.xml @@ -28,12 +28,20 @@ connector-milvus SeaTunnel : Connectors V2 : Milvus - + + + + com.google.code.gson + gson + 2.10.1 + + + io.milvus milvus-sdk-java - 2.4.3 + 2.4.5 org.slf4j @@ -42,19 +50,6 @@ - - org.mockito - mockito-core - 4.11.0 - test - - - org.mockito - mockito-inline - 4.11.0 - test - - diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java index c1e1ac292da..6c0b846b432 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java @@ -24,7 +24,6 @@ import org.apache.seatunnel.api.table.catalog.ConstraintKey; import org.apache.seatunnel.api.table.catalog.InfoPreviewResult; import org.apache.seatunnel.api.table.catalog.PreviewResult; -import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.VectorIndex; @@ -33,20 +32,21 @@ import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; -import org.apache.seatunnel.api.table.type.ArrayType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.common.constants.CommonOptions; import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig; -import org.apache.seatunnel.connectors.seatunnel.milvus.convert.MilvusConvertUtils; import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode; import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; +import org.apache.seatunnel.connectors.seatunnel.milvus.utils.sink.MilvusSinkConverter; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import com.google.protobuf.ProtocolStringList; import io.milvus.client.MilvusServiceClient; import io.milvus.common.clientenum.ConsistencyLevelEnum; -import io.milvus.grpc.DataType; import io.milvus.grpc.ListDatabasesResponse; import io.milvus.grpc.ShowCollectionsResponse; +import io.milvus.grpc.ShowPartitionsResponse; import io.milvus.grpc.ShowType; import io.milvus.param.ConnectParam; import io.milvus.param.IndexType; @@ -61,6 +61,8 @@ import io.milvus.param.collection.HasCollectionParam; import io.milvus.param.collection.ShowCollectionsParam; import io.milvus.param.index.CreateIndexParam; +import io.milvus.param.partition.CreatePartitionParam; +import io.milvus.param.partition.ShowPartitionsParam; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; @@ -70,6 +72,7 @@ import java.util.Optional; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.CREATE_INDEX; @Slf4j public class MilvusCatalog implements Catalog { @@ -196,7 +199,8 @@ public void createTable(TablePath tablePath, CatalogTable catalogTable, boolean checkNotNull(tableSchema, "tableSchema must not be null"); createTableInternal(tablePath, catalogTable); - if (CollectionUtils.isNotEmpty(tableSchema.getConstraintKeys())) { + if (CollectionUtils.isNotEmpty(tableSchema.getConstraintKeys()) + && config.get(CREATE_INDEX)) { for (ConstraintKey constraintKey : tableSchema.getConstraintKeys()) { if (constraintKey .getConstraintType() @@ -231,27 +235,61 @@ private void createIndexInternal( public void createTableInternal(TablePath tablePath, CatalogTable catalogTable) { try { + Map options = catalogTable.getOptions(); + + // partition key logic + boolean existPartitionKeyField = options.containsKey(MilvusOptions.PARTITION_KEY_FIELD); + String partitionKeyField = + existPartitionKeyField ? options.get(MilvusOptions.PARTITION_KEY_FIELD) : null; + // if options set, will overwrite aut read + if (StringUtils.isNotEmpty(config.get(MilvusSinkConfig.PARTITION_KEY))) { + existPartitionKeyField = true; + partitionKeyField = config.get(MilvusSinkConfig.PARTITION_KEY); + } + TableSchema tableSchema = catalogTable.getTableSchema(); List fieldTypes = new ArrayList<>(); for (Column column : tableSchema.getColumns()) { - fieldTypes.add(convertToFieldType(column, tableSchema.getPrimaryKey())); + if (column.getOptions() != null + && column.getOptions().containsKey(CommonOptions.METADATA.getName()) + && (Boolean) column.getOptions().get(CommonOptions.METADATA.getName())) { + // skip dynamic field + continue; + } + FieldType fieldType = + MilvusSinkConverter.convertToFieldType( + column, + tableSchema.getPrimaryKey(), + partitionKeyField, + config.get(MilvusSinkConfig.ENABLE_AUTO_ID)); + fieldTypes.add(fieldType); } - Map options = catalogTable.getOptions(); Boolean enableDynamicField = (options.containsKey(MilvusOptions.ENABLE_DYNAMIC_FIELD)) ? Boolean.valueOf(options.get(MilvusOptions.ENABLE_DYNAMIC_FIELD)) : config.get(MilvusSinkConfig.ENABLE_DYNAMIC_FIELD); - + String collectionDescription = ""; + if (config.get(MilvusSinkConfig.COLLECTION_DESCRIPTION) != null + && config.get(MilvusSinkConfig.COLLECTION_DESCRIPTION) + .containsKey(tablePath.getTableName())) { + // use description from config first + collectionDescription = + config.get(MilvusSinkConfig.COLLECTION_DESCRIPTION) + .get(tablePath.getTableName()); + } else if (null != catalogTable.getComment()) { + collectionDescription = catalogTable.getComment(); + } CreateCollectionParam.Builder builder = CreateCollectionParam.newBuilder() .withDatabaseName(tablePath.getDatabaseName()) .withCollectionName(tablePath.getTableName()) + .withDescription(collectionDescription) .withFieldTypes(fieldTypes) .withEnableDynamicField(enableDynamicField) .withConsistencyLevel(ConsistencyLevelEnum.BOUNDED); - if (null != catalogTable.getComment()) { - builder.withDescription(catalogTable.getComment()); + if (StringUtils.isNotEmpty(options.get(MilvusOptions.SHARDS_NUM))) { + builder.withShardsNum(Integer.parseInt(options.get(MilvusOptions.SHARDS_NUM))); } CreateCollectionParam createCollectionParam = builder.build(); @@ -260,89 +298,51 @@ public void createTableInternal(TablePath tablePath, CatalogTable catalogTable) throw new MilvusConnectorException( MilvusConnectionErrorCode.CREATE_COLLECTION_ERROR, response.getMessage()); } + + // not exist partition key field, will read show partitions to create + if (!existPartitionKeyField && options.containsKey(MilvusOptions.PARTITION_KEY_FIELD)) { + createPartitionInternal(options.get(MilvusOptions.PARTITION_KEY_FIELD), tablePath); + } + } catch (Exception e) { throw new MilvusConnectorException( MilvusConnectionErrorCode.CREATE_COLLECTION_ERROR, e); } } - private FieldType convertToFieldType(Column column, PrimaryKey primaryKey) { - SeaTunnelDataType seaTunnelDataType = column.getDataType(); - FieldType.Builder build = - FieldType.newBuilder() - .withName(column.getName()) - .withDataType( - MilvusConvertUtils.convertSqlTypeToDataType( - seaTunnelDataType.getSqlType())); - switch (seaTunnelDataType.getSqlType()) { - case ROW: - build.withMaxLength(65535); - break; - case DATE: - build.withMaxLength(20); - break; - case INT: - build.withDataType(DataType.Int32); - break; - case SMALLINT: - build.withDataType(DataType.Int16); - break; - case TINYINT: - build.withDataType(DataType.Int8); - break; - case FLOAT: - build.withDataType(DataType.Float); - break; - case DOUBLE: - build.withDataType(DataType.Double); - break; - case MAP: - build.withDataType(DataType.JSON); - break; - case BOOLEAN: - build.withDataType(DataType.Bool); - break; - case STRING: - if (column.getColumnLength() == 0) { - build.withMaxLength(512); - } else { - build.withMaxLength((int) (column.getColumnLength() / 4)); - } - break; - case ARRAY: - ArrayType arrayType = (ArrayType) column.getDataType(); - SeaTunnelDataType elementType = arrayType.getElementType(); - build.withElementType( - MilvusConvertUtils.convertSqlTypeToDataType(elementType.getSqlType())); - build.withMaxCapacity(4095); - switch (elementType.getSqlType()) { - case STRING: - if (column.getColumnLength() == 0) { - build.withMaxLength(512); - } else { - build.withMaxLength((int) (column.getColumnLength() / 4)); - } - break; - } - break; - case BINARY_VECTOR: - case FLOAT_VECTOR: - case FLOAT16_VECTOR: - case BFLOAT16_VECTOR: - build.withDimension(column.getScale()); - break; + private void createPartitionInternal(String partitionNames, TablePath tablePath) { + R showPartitionsResponseR = + this.client.showPartitions( + ShowPartitionsParam.newBuilder() + .withDatabaseName(tablePath.getDatabaseName()) + .withCollectionName(tablePath.getTableName()) + .build()); + if (!Objects.equals(showPartitionsResponseR.getStatus(), R.success().getStatus())) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.SHOW_PARTITION_ERROR, + showPartitionsResponseR.getMessage()); } - - if (null != primaryKey && primaryKey.getColumnNames().contains(column.getName())) { - build.withPrimaryKey(true); - if (null != primaryKey.getEnableAutoId()) { - build.withAutoID(primaryKey.getEnableAutoId()); - } else { - build.withAutoID(config.get(MilvusSinkConfig.ENABLE_AUTO_ID)); + ProtocolStringList existPartitionNames = + showPartitionsResponseR.getData().getPartitionNamesList(); + + // start to loop create partition + String[] partitionNameArray = partitionNames.split(","); + for (String partitionName : partitionNameArray) { + if (existPartitionNames.contains(partitionName)) { + continue; + } + R response = + this.client.createPartition( + CreatePartitionParam.newBuilder() + .withDatabaseName(tablePath.getDatabaseName()) + .withCollectionName(tablePath.getTableName()) + .withPartitionName(partitionName) + .build()); + if (!R.success().getStatus().equals(response.getStatus())) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.CREATE_PARTITION_ERROR, response.getMessage()); } } - - return build.build(); } @Override diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java index b589b21d3da..96241546f6c 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java @@ -14,9 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.seatunnel.connectors.seatunnel.milvus.catalog; public class MilvusOptions { public static final String ENABLE_DYNAMIC_FIELD = "enableDynamicField"; + public static final String SHARDS_NUM = "shardsNum"; + public static final String PARTITION_KEY_FIELD = "partitionKeyField"; + public static final String PARTITION_NAMES = "partitionNames"; } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java index cd286c987df..8d874fc0ae3 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java @@ -23,6 +23,8 @@ import org.apache.seatunnel.api.sink.SchemaSaveMode; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA; import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA; @@ -32,6 +34,16 @@ public class MilvusSinkConfig extends MilvusCommonConfig { public static final Option DATABASE = Options.key("database").stringType().noDefaultValue().withDescription("database"); + public static final Option> COLLECTION_DESCRIPTION = + Options.key("collection_description") + .mapType() + .defaultValue(new HashMap<>()) + .withDescription("collection description"); + public static final Option PARTITION_KEY = + Options.key("partition_key") + .stringType() + .noDefaultValue() + .withDescription("Milvus partition key field"); public static final Option SCHEMA_SAVE_MODE = Options.key("schema_save_mode") @@ -70,4 +82,19 @@ public class MilvusSinkConfig extends MilvusCommonConfig { .intType() .defaultValue(1000) .withDescription("writer batch size"); + public static final Option RATE_LIMIT = + Options.key("rate_limit") + .intType() + .defaultValue(100000) + .withDescription("writer rate limit"); + public static final Option LOAD_COLLECTION = + Options.key("load_collection") + .booleanType() + .defaultValue(false) + .withDescription("if load collection"); + public static final Option CREATE_INDEX = + Options.key("create_index") + .booleanType() + .defaultValue(false) + .withDescription("if load collection"); } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java index b3efba279dc..94b98548386 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java @@ -33,4 +33,16 @@ public class MilvusSourceConfig extends MilvusCommonConfig { .stringType() .noDefaultValue() .withDescription("Milvus collection to read"); + + public static final Option BATCH_SIZE = + Options.key("batch_size") + .intType() + .defaultValue(1000) + .withDescription("writer batch size"); + + public static final Option RATE_LIMIT = + Options.key("rate_limit") + .intType() + .defaultValue(1000000) + .withDescription("writer rate limit"); } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java deleted file mode 100644 index 65027077957..00000000000 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java +++ /dev/null @@ -1,417 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.milvus.convert; - -import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.Column; -import org.apache.seatunnel.api.table.catalog.ConstraintKey; -import org.apache.seatunnel.api.table.catalog.PhysicalColumn; -import org.apache.seatunnel.api.table.catalog.PrimaryKey; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; -import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.TableSchema; -import org.apache.seatunnel.api.table.catalog.VectorIndex; -import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.type.ArrayType; -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SqlType; -import org.apache.seatunnel.api.table.type.VectorType; -import org.apache.seatunnel.common.utils.BufferUtils; -import org.apache.seatunnel.common.utils.JsonUtils; -import org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusOptions; -import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig; -import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode; -import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; - -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.util.Lists; - -import com.google.gson.Gson; -import com.google.gson.JsonParser; -import com.google.protobuf.ProtocolStringList; -import io.milvus.client.MilvusServiceClient; -import io.milvus.common.utils.JacksonUtils; -import io.milvus.grpc.CollectionSchema; -import io.milvus.grpc.DataType; -import io.milvus.grpc.DescribeCollectionResponse; -import io.milvus.grpc.DescribeIndexResponse; -import io.milvus.grpc.FieldSchema; -import io.milvus.grpc.IndexDescription; -import io.milvus.grpc.KeyValuePair; -import io.milvus.grpc.ShowCollectionsResponse; -import io.milvus.grpc.ShowType; -import io.milvus.param.ConnectParam; -import io.milvus.param.R; -import io.milvus.param.collection.DescribeCollectionParam; -import io.milvus.param.collection.ShowCollectionsParam; -import io.milvus.param.index.DescribeIndexParam; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -public class MilvusConvertUtils { - - private static final String CATALOG_NAME = "Milvus"; - - private static final Gson gson = new Gson(); - - public static Map getSourceTables(ReadonlyConfig config) { - MilvusServiceClient client = null; - try { - client = - new MilvusServiceClient( - ConnectParam.newBuilder() - .withUri(config.get(MilvusSourceConfig.URL)) - .withToken(config.get(MilvusSourceConfig.TOKEN)) - .build()); - - String database = config.get(MilvusSourceConfig.DATABASE); - List collectionList = new ArrayList<>(); - if (StringUtils.isNotEmpty(config.get(MilvusSourceConfig.COLLECTION))) { - collectionList.add(config.get(MilvusSourceConfig.COLLECTION)); - } else { - R response = - client.showCollections( - ShowCollectionsParam.newBuilder() - .withDatabaseName(database) - .withShowType(ShowType.All) - .build()); - if (response.getStatus() != R.Status.Success.getCode()) { - throw new MilvusConnectorException( - MilvusConnectionErrorCode.SHOW_COLLECTIONS_ERROR); - } - - ProtocolStringList collections = response.getData().getCollectionNamesList(); - if (CollectionUtils.isEmpty(collections)) { - throw new MilvusConnectorException( - MilvusConnectionErrorCode.DATABASE_NO_COLLECTIONS, database); - } - collectionList.addAll(collections); - } - - Map map = new HashMap<>(); - for (String collection : collectionList) { - CatalogTable catalogTable = getCatalogTable(client, database, collection); - map.put(TablePath.of(database, collection), catalogTable); - } - return map; - } catch (Exception e) { - throw new CatalogException(e.getMessage(), e); - } finally { - if (client != null) { - client.close(); - } - } - } - - public static CatalogTable getCatalogTable( - MilvusServiceClient client, String database, String collection) { - R response = - client.describeCollection( - DescribeCollectionParam.newBuilder() - .withDatabaseName(database) - .withCollectionName(collection) - .build()); - - if (response.getStatus() != R.Status.Success.getCode()) { - throw new MilvusConnectorException(MilvusConnectionErrorCode.DESC_COLLECTION_ERROR); - } - - // collection column - DescribeCollectionResponse data = response.getData(); - CollectionSchema schema = data.getSchema(); - List columns = new ArrayList<>(); - for (FieldSchema fieldSchema : schema.getFieldsList()) { - columns.add(MilvusConvertUtils.convertColumn(fieldSchema)); - } - - // primary key - PrimaryKey primaryKey = buildPrimaryKey(schema.getFieldsList()); - - // index - R describeIndexResponseR = - client.describeIndex( - DescribeIndexParam.newBuilder() - .withDatabaseName(database) - .withCollectionName(collection) - .build()); - if (describeIndexResponseR.getStatus() != R.Status.Success.getCode()) { - throw new MilvusConnectorException(MilvusConnectionErrorCode.DESC_INDEX_ERROR); - } - DescribeIndexResponse indexResponse = describeIndexResponseR.getData(); - List vectorIndexes = buildVectorIndexes(indexResponse); - - // build tableSchema - TableSchema tableSchema = - TableSchema.builder() - .columns(columns) - .primaryKey(primaryKey) - .constraintKey( - ConstraintKey.of( - ConstraintKey.ConstraintType.VECTOR_INDEX_KEY, - "vector_index", - vectorIndexes)) - .build(); - - // build tableId - TableIdentifier tableId = TableIdentifier.of(CATALOG_NAME, database, collection); - - // build options info - Map options = new HashMap<>(); - options.put( - MilvusOptions.ENABLE_DYNAMIC_FIELD, String.valueOf(schema.getEnableDynamicField())); - - return CatalogTable.of( - tableId, tableSchema, options, new ArrayList<>(), schema.getDescription()); - } - - private static List buildVectorIndexes( - DescribeIndexResponse indexResponse) { - if (CollectionUtils.isEmpty(indexResponse.getIndexDescriptionsList())) { - return null; - } - - List list = new ArrayList<>(); - for (IndexDescription per : indexResponse.getIndexDescriptionsList()) { - Map paramsMap = - per.getParamsList().stream() - .collect( - Collectors.toMap(KeyValuePair::getKey, KeyValuePair::getValue)); - - VectorIndex index = - new VectorIndex( - per.getIndexName(), - per.getFieldName(), - paramsMap.get("index_type"), - paramsMap.get("metric_type")); - - list.add(index); - } - - return list; - } - - public static PrimaryKey buildPrimaryKey(List fields) { - for (FieldSchema field : fields) { - if (field.getIsPrimaryKey()) { - return PrimaryKey.of( - field.getName(), Lists.newArrayList(field.getName()), field.getAutoID()); - } - } - - return null; - } - - public static PhysicalColumn convertColumn(FieldSchema fieldSchema) { - DataType dataType = fieldSchema.getDataType(); - PhysicalColumn.PhysicalColumnBuilder builder = PhysicalColumn.builder(); - builder.name(fieldSchema.getName()); - builder.sourceType(dataType.name()); - builder.comment(fieldSchema.getDescription()); - - switch (dataType) { - case Bool: - builder.dataType(BasicType.BOOLEAN_TYPE); - break; - case Int8: - builder.dataType(BasicType.BYTE_TYPE); - break; - case Int16: - builder.dataType(BasicType.SHORT_TYPE); - break; - case Int32: - builder.dataType(BasicType.INT_TYPE); - break; - case Int64: - builder.dataType(BasicType.LONG_TYPE); - break; - case Float: - builder.dataType(BasicType.FLOAT_TYPE); - break; - case Double: - builder.dataType(BasicType.DOUBLE_TYPE); - break; - case VarChar: - builder.dataType(BasicType.STRING_TYPE); - for (KeyValuePair keyValuePair : fieldSchema.getTypeParamsList()) { - if (keyValuePair.getKey().equals("max_length")) { - builder.columnLength(Long.parseLong(keyValuePair.getValue()) * 4); - break; - } - } - break; - case String: - case JSON: - builder.dataType(BasicType.STRING_TYPE); - break; - case Array: - builder.dataType(ArrayType.STRING_ARRAY_TYPE); - break; - case FloatVector: - builder.dataType(VectorType.VECTOR_FLOAT_TYPE); - for (KeyValuePair keyValuePair : fieldSchema.getTypeParamsList()) { - if (keyValuePair.getKey().equals("dim")) { - builder.scale(Integer.valueOf(keyValuePair.getValue())); - break; - } - } - break; - case BinaryVector: - builder.dataType(VectorType.VECTOR_BINARY_TYPE); - for (KeyValuePair keyValuePair : fieldSchema.getTypeParamsList()) { - if (keyValuePair.getKey().equals("dim")) { - builder.scale(Integer.valueOf(keyValuePair.getValue())); - break; - } - } - break; - case SparseFloatVector: - builder.dataType(VectorType.VECTOR_SPARSE_FLOAT_TYPE); - break; - case Float16Vector: - builder.dataType(VectorType.VECTOR_FLOAT16_TYPE); - for (KeyValuePair keyValuePair : fieldSchema.getTypeParamsList()) { - if (keyValuePair.getKey().equals("dim")) { - builder.scale(Integer.valueOf(keyValuePair.getValue())); - break; - } - } - break; - case BFloat16Vector: - builder.dataType(VectorType.VECTOR_BFLOAT16_TYPE); - for (KeyValuePair keyValuePair : fieldSchema.getTypeParamsList()) { - if (keyValuePair.getKey().equals("dim")) { - builder.scale(Integer.valueOf(keyValuePair.getValue())); - break; - } - } - break; - default: - throw new UnsupportedOperationException("Unsupported data type: " + dataType); - } - - return builder.build(); - } - - public static Object convertBySeaTunnelType(SeaTunnelDataType fieldType, Object value) { - SqlType sqlType = fieldType.getSqlType(); - switch (sqlType) { - case INT: - return Integer.parseInt(value.toString()); - case BIGINT: - return Long.parseLong(value.toString()); - case SMALLINT: - return Short.parseShort(value.toString()); - case STRING: - case DATE: - return value.toString(); - case FLOAT_VECTOR: - ByteBuffer floatVectorBuffer = (ByteBuffer) value; - Float[] floats = BufferUtils.toFloatArray(floatVectorBuffer); - return Arrays.stream(floats).collect(Collectors.toList()); - case BINARY_VECTOR: - case BFLOAT16_VECTOR: - case FLOAT16_VECTOR: - ByteBuffer vector = (ByteBuffer) value; - return gson.toJsonTree(vector.array()); - case SPARSE_FLOAT_VECTOR: - return JsonParser.parseString(JacksonUtils.toJsonString(value)).getAsJsonObject(); - case FLOAT: - return Float.parseFloat(value.toString()); - case BOOLEAN: - return Boolean.parseBoolean(value.toString()); - case DOUBLE: - return Double.parseDouble(value.toString()); - case ARRAY: - ArrayType arrayType = (ArrayType) fieldType; - switch (arrayType.getElementType().getSqlType()) { - case STRING: - String[] stringArray = (String[]) value; - return Arrays.asList(stringArray); - case INT: - Integer[] intArray = (Integer[]) value; - return Arrays.asList(intArray); - case BIGINT: - Long[] longArray = (Long[]) value; - return Arrays.asList(longArray); - case FLOAT: - Float[] floatArray = (Float[]) value; - return Arrays.asList(floatArray); - case DOUBLE: - Double[] doubleArray = (Double[]) value; - return Arrays.asList(doubleArray); - } - case ROW: - SeaTunnelRow row = (SeaTunnelRow) value; - return JsonUtils.toJsonString(row.getFields()); - case MAP: - return JacksonUtils.toJsonString(value); - default: - throw new MilvusConnectorException( - MilvusConnectionErrorCode.NOT_SUPPORT_TYPE, sqlType.name()); - } - } - - public static DataType convertSqlTypeToDataType(SqlType sqlType) { - switch (sqlType) { - case BOOLEAN: - return DataType.Bool; - case TINYINT: - return DataType.Int8; - case SMALLINT: - return DataType.Int16; - case INT: - return DataType.Int32; - case BIGINT: - return DataType.Int64; - case FLOAT: - return DataType.Float; - case DOUBLE: - return DataType.Double; - case STRING: - return DataType.VarChar; - case ARRAY: - return DataType.Array; - case FLOAT_VECTOR: - return DataType.FloatVector; - case BINARY_VECTOR: - return DataType.BinaryVector; - case FLOAT16_VECTOR: - return DataType.Float16Vector; - case BFLOAT16_VECTOR: - return DataType.BFloat16Vector; - case SPARSE_FLOAT_VECTOR: - return DataType.SparseFloatVector; - case DATE: - return DataType.VarChar; - case ROW: - return DataType.VarChar; - } - throw new CatalogException( - String.format("Not support convert to milvus type, sqlType is %s", sqlType)); - } -} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectionErrorCode.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectionErrorCode.java index 3acc3de804c..5aaee447ea6 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectionErrorCode.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectionErrorCode.java @@ -35,7 +35,12 @@ public enum MilvusConnectionErrorCode implements SeaTunnelErrorCode { CREATE_DATABASE_ERROR("MILVUS-13", "Create database error"), CREATE_COLLECTION_ERROR("MILVUS-14", "Create collection error"), CREATE_INDEX_ERROR("MILVUS-15", "Create index error"), - ; + INIT_CLIENT_ERROR("MILVUS-16", "Init milvus client error"), + WRITE_DATA_FAIL("MILVUS-17", "Write milvus data fail"), + READ_DATA_FAIL("MILVUS-18", "Read milvus data fail"), + LIST_PARTITIONS_FAILED("MILVUS-19", "Failed to list milvus partition"), + SHOW_PARTITION_ERROR("MILVUS-20", "Desc partition error"), + CREATE_PARTITION_ERROR("MILVUS-21", "Create partition error"); private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java new file mode 100644 index 00000000000..a35b2af2d01 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; +import org.apache.seatunnel.connectors.seatunnel.milvus.utils.MilvusConnectorUtils; +import org.apache.seatunnel.connectors.seatunnel.milvus.utils.sink.MilvusSinkConverter; + +import org.apache.commons.lang3.StringUtils; + +import com.google.gson.JsonObject; +import io.milvus.v2.client.ConnectConfig; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.common.IndexParam; +import io.milvus.v2.service.collection.request.AlterCollectionReq; +import io.milvus.v2.service.collection.request.DescribeCollectionReq; +import io.milvus.v2.service.collection.request.GetLoadStateReq; +import io.milvus.v2.service.collection.request.LoadCollectionReq; +import io.milvus.v2.service.collection.response.DescribeCollectionResp; +import io.milvus.v2.service.index.request.CreateIndexReq; +import io.milvus.v2.service.partition.request.CreatePartitionReq; +import io.milvus.v2.service.partition.request.HasPartitionReq; +import io.milvus.v2.service.vector.request.InsertReq; +import io.milvus.v2.service.vector.request.UpsertReq; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.BATCH_SIZE; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.CREATE_INDEX; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.ENABLE_AUTO_ID; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.ENABLE_UPSERT; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.LOAD_COLLECTION; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.RATE_LIMIT; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.TOKEN; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.URL; + +@Slf4j +public class MilvusBufferBatchWriter { + + private final CatalogTable catalogTable; + private final ReadonlyConfig config; + private final String collectionName; + private final Boolean autoId; + private final Boolean enableUpsert; + private Boolean hasPartitionKey; + + private MilvusClientV2 milvusClient; + private final MilvusSinkConverter milvusSinkConverter; + private int batchSize; + private volatile Map> milvusDataCache; + private final AtomicLong writeCache = new AtomicLong(); + private final AtomicLong writeCount = new AtomicLong(); + + private final List jsonFieldNames; + private String dynamicFieldName; + + public MilvusBufferBatchWriter(CatalogTable catalogTable, ReadonlyConfig config) + throws SeaTunnelException { + this.catalogTable = catalogTable; + this.config = config; + this.autoId = + getAutoId( + catalogTable.getTableSchema().getPrimaryKey(), config.get(ENABLE_AUTO_ID)); + this.enableUpsert = config.get(ENABLE_UPSERT); + this.batchSize = config.get(BATCH_SIZE); + this.collectionName = catalogTable.getTablePath().getTableName(); + this.milvusDataCache = new HashMap<>(); + this.milvusSinkConverter = new MilvusSinkConverter(); + + this.dynamicFieldName = MilvusConnectorUtils.getDynamicField(catalogTable); + this.jsonFieldNames = MilvusConnectorUtils.getJsonField(catalogTable); + + initMilvusClient(config); + } + /* + * set up the Milvus client + */ + private void initMilvusClient(ReadonlyConfig config) throws SeaTunnelException { + try { + log.info("begin to init Milvus client"); + String dbName = catalogTable.getTablePath().getDatabaseName(); + String collectionName = catalogTable.getTablePath().getTableName(); + + ConnectConfig connectConfig = + ConnectConfig.builder().uri(config.get(URL)).token(config.get(TOKEN)).build(); + this.milvusClient = new MilvusClientV2(connectConfig); + if (StringUtils.isNotEmpty(dbName)) { + milvusClient.useDatabase(dbName); + } + this.hasPartitionKey = + MilvusConnectorUtils.hasPartitionKey(milvusClient, collectionName); + // set rate limit + if (config.get(RATE_LIMIT) > 0) { + log.info("set rate limit for collection: " + collectionName); + Map properties = new HashMap<>(); + properties.put("collection.insertRate.max.mb", config.get(RATE_LIMIT).toString()); + properties.put("collection.upsertRate.max.mb", config.get(RATE_LIMIT).toString()); + AlterCollectionReq alterCollectionReq = + AlterCollectionReq.builder() + .collectionName(collectionName) + .properties(properties) + .build(); + milvusClient.alterCollection(alterCollectionReq); + } + try { + if (config.get(CREATE_INDEX)) { + // create index + log.info("create index for collection: " + collectionName); + DescribeCollectionResp describeCollectionResp = + milvusClient.describeCollection( + DescribeCollectionReq.builder() + .collectionName(collectionName) + .build()); + List indexParams = new ArrayList<>(); + for (String fieldName : describeCollectionResp.getVectorFieldNames()) { + IndexParam indexParam = + IndexParam.builder() + .fieldName(fieldName) + .metricType(IndexParam.MetricType.COSINE) + .build(); + indexParams.add(indexParam); + } + CreateIndexReq createIndexReq = + CreateIndexReq.builder() + .collectionName(collectionName) + .indexParams(indexParams) + .build(); + milvusClient.createIndex(createIndexReq); + } + } catch (Exception e) { + log.warn("create index failed, maybe index already exists"); + } + if (config.get(LOAD_COLLECTION) + && !milvusClient.getLoadState( + GetLoadStateReq.builder().collectionName(collectionName).build())) { + log.info("load collection: " + collectionName); + milvusClient.loadCollection( + LoadCollectionReq.builder().collectionName(collectionName).build()); + } + log.info("init Milvus client success"); + } catch (Exception e) { + log.error("init Milvus client failed", e); + throw new MilvusConnectorException(MilvusConnectionErrorCode.INIT_CLIENT_ERROR, e); + } + } + + private Boolean getAutoId(PrimaryKey primaryKey, Boolean enableAutoId) { + if (null != primaryKey && null != primaryKey.getEnableAutoId()) { + return primaryKey.getEnableAutoId(); + } else { + return enableAutoId; + } + } + + public void addToBatch(SeaTunnelRow element) { + // put data to cache by partition + if (StringUtils.isNotEmpty(element.getPartitionName()) + && !milvusDataCache.containsKey(element.getPartitionName())) { + String partitionName = element.getPartitionName(); + Boolean partitions = + milvusClient.hasPartition( + HasPartitionReq.builder() + .collectionName(collectionName) + .partitionName(partitionName) + .build()); + if (!partitions) { + log.info("create partition: " + partitionName); + CreatePartitionReq createPartitionReq = + CreatePartitionReq.builder() + .collectionName(collectionName) + .partitionName(partitionName) + .build(); + milvusClient.createPartition(createPartitionReq); + log.info("create partition success"); + } + } + JsonObject data = + milvusSinkConverter.buildMilvusData( + catalogTable, config, jsonFieldNames, dynamicFieldName, element); + String partitionName = + element.getPartitionName() == null ? "_default" : element.getPartitionName(); + this.milvusDataCache.computeIfAbsent(partitionName, k -> new ArrayList<>()); + milvusDataCache.get(partitionName).add(data); + writeCache.incrementAndGet(); + } + + public boolean needFlush() { + return this.writeCache.get() >= this.batchSize; + } + + public void flush() throws Exception { + log.info("Starting to put {} records to Milvus.", this.writeCache.get()); + // Flush the batch writer + // Get the number of records completed + if (this.milvusDataCache.isEmpty()) { + return; + } + writeData2Collection(); + log.info( + "Successfully put {} records to Milvus. Total records written: {}", + this.writeCache.get(), + this.writeCount.get()); + this.milvusDataCache = new HashMap<>(); + this.writeCache.set(0L); + } + + public void close() throws Exception { + String collectionName = catalogTable.getTablePath().getTableName(); + // set rate limit + Map properties = new HashMap<>(); + properties.put("collection.insertRate.max.mb", "-1"); + properties.put("collection.upsertRate.max.mb", "-1"); + AlterCollectionReq alterCollectionReq = + AlterCollectionReq.builder() + .collectionName(collectionName) + .properties(properties) + .build(); + milvusClient.alterCollection(alterCollectionReq); + this.milvusClient.close(10); + } + + private void writeData2Collection() throws Exception { + try { + for (String partitionName : milvusDataCache.keySet()) { + // default to use upsertReq, but upsert only works when autoID is disabled + List data = milvusDataCache.get(partitionName); + if (Objects.equals(partitionName, "_default") || hasPartitionKey) { + partitionName = null; + } + if (enableUpsert && !autoId) { + upsertWrite(partitionName, data); + } else { + insertWrite(partitionName, data); + } + } + } catch (Exception e) { + log.error("write data to Milvus failed", e); + log.error("error data: " + milvusDataCache); + throw new MilvusConnectorException(MilvusConnectionErrorCode.WRITE_DATA_FAIL); + } + writeCount.addAndGet(this.writeCache.get()); + } + + private void upsertWrite(String partitionName, List data) + throws InterruptedException { + UpsertReq upsertReq = + UpsertReq.builder().collectionName(this.collectionName).data(data).build(); + if (StringUtils.isNotEmpty(partitionName)) { + upsertReq.setPartitionName(partitionName); + } + try { + milvusClient.upsert(upsertReq); + } catch (Exception e) { + if (e.getMessage().contains("rate limit exceeded") + || e.getMessage().contains("received message larger than max")) { + if (data.size() > 10) { + log.warn("upsert data failed, retry in smaller chunks: {} ", data.size() / 2); + this.batchSize = this.batchSize / 2; + log.info("sleep 1 minute to avoid rate limit"); + // sleep 1 minute to avoid rate limit + Thread.sleep(60000); + log.info("sleep 1 minute success"); + // Split the data and retry in smaller chunks + List firstHalf = data.subList(0, data.size() / 2); + List secondHalf = data.subList(data.size() / 2, data.size()); + upsertWrite(partitionName, firstHalf); + upsertWrite(partitionName, secondHalf); + } else { + // If the data size is 10, throw the exception to avoid infinite recursion + throw new MilvusConnectorException( + MilvusConnectionErrorCode.WRITE_DATA_FAIL, + "upsert data failed," + " size down to 10, break", + e); + } + } else { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.WRITE_DATA_FAIL, + "upsert data failed with unknown exception", + e); + } + } + log.info("upsert data success"); + } + + private void insertWrite(String partitionName, List data) { + InsertReq insertReq = + InsertReq.builder().collectionName(this.collectionName).data(data).build(); + if (StringUtils.isNotEmpty(partitionName)) { + insertReq.setPartitionName(partitionName); + } + try { + milvusClient.insert(insertReq); + } catch (Exception e) { + if (e.getMessage().contains("rate limit exceeded") + || e.getMessage().contains("received message larger than max")) { + if (data.size() > 10) { + log.warn("insert data failed, retry in smaller chunks: {} ", data.size() / 2); + // Split the data and retry in smaller chunks + List firstHalf = data.subList(0, data.size() / 2); + List secondHalf = data.subList(data.size() / 2, data.size()); + this.batchSize = this.batchSize / 2; + insertWrite(partitionName, firstHalf); + insertWrite(partitionName, secondHalf); + } else { + // If the data size is 10, throw the exception to avoid infinite recursion + throw new MilvusConnectorException( + MilvusConnectionErrorCode.WRITE_DATA_FAIL, "insert data failed", e); + } + } else { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.WRITE_DATA_FAIL, + "insert data failed with unknown exception", + e); + } + } + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java index 10f4b6ca69d..9167d806df1 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java @@ -38,10 +38,13 @@ import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusCommitInfo; import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusSinkState; +import lombok.extern.slf4j.Slf4j; + import java.util.Collections; import java.util.List; import java.util.Optional; +@Slf4j public class MilvusSink implements SeaTunnelSink< SeaTunnelRow, @@ -61,7 +64,6 @@ public MilvusSink(ReadonlyConfig config, CatalogTable catalogTable) { @Override public SinkWriter createWriter( SinkWriter.Context context) { - return new MilvusSinkWriter(context, catalogTable, config, Collections.emptyList()); } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java index 8fee6ebc68f..98b2b46c3b4 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -21,74 +21,53 @@ import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig; -import org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch.MilvusBatchWriter; -import org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch.MilvusBufferBatchWriter; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusCommitInfo; import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusSinkState; -import io.milvus.v2.client.ConnectConfig; -import io.milvus.v2.client.MilvusClientV2; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.List; import java.util.Optional; -import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.BATCH_SIZE; - -@Slf4j /** MilvusSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Milvus. */ +@Slf4j public class MilvusSinkWriter implements SinkWriter { - private final Context context; - private final ReadonlyConfig config; - private MilvusBatchWriter batchWriter; + private final MilvusBufferBatchWriter batchWriter; + private ReadonlyConfig config; public MilvusSinkWriter( Context context, CatalogTable catalogTable, ReadonlyConfig config, List milvusSinkStates) { - this.context = context; + this.batchWriter = new MilvusBufferBatchWriter(catalogTable, config); this.config = config; - ConnectConfig connectConfig = - ConnectConfig.builder() - .uri(config.get(MilvusSinkConfig.URL)) - .token(config.get(MilvusSinkConfig.TOKEN)) - .dbName(config.get(MilvusSinkConfig.DATABASE)) - .build(); - this.batchWriter = - new MilvusBufferBatchWriter( - catalogTable, - config.get(BATCH_SIZE), - getAutoId(catalogTable.getTableSchema().getPrimaryKey()), - config.get(MilvusSinkConfig.ENABLE_UPSERT), - new MilvusClientV2(connectConfig)); + log.info("create Milvus sink writer success"); + log.info("MilvusSinkWriter config: " + config); } /** * write data to third party data receiver. * * @param element the data need be written. - * @throws IOException throw IOException when write data failed. */ @Override public void write(SeaTunnelRow element) { batchWriter.addToBatch(element); if (batchWriter.needFlush()) { - batchWriter.flush(); - } - } - - private Boolean getAutoId(PrimaryKey primaryKey) { - if (null != primaryKey && null != primaryKey.getEnableAutoId()) { - return primaryKey.getEnableAutoId(); - } else { - return config.get(MilvusSinkConfig.ENABLE_AUTO_ID); + try { + // Flush the batch writer + batchWriter.flush(); + } catch (Exception e) { + log.error("flush Milvus sink writer failed", e); + throw new MilvusConnectorException(MilvusConnectionErrorCode.WRITE_DATA_FAIL, e); + } } } @@ -102,7 +81,6 @@ private Boolean getAutoId(PrimaryKey primaryKey) { */ @Override public Optional prepareCommit() throws IOException { - batchWriter.flush(); return Optional.empty(); } @@ -122,9 +100,14 @@ public void abortPrepare() {} */ @Override public void close() throws IOException { - if (batchWriter != null) { + try { + log.info("Stopping Milvus Client"); batchWriter.flush(); batchWriter.close(); + log.info("Stop Milvus Client success"); + } catch (Exception e) { + log.error("Stop Milvus Client failed", e); + throw new MilvusConnectorException(MilvusConnectionErrorCode.CLOSE_CLIENT_ERROR, e); } } } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java deleted file mode 100644 index 91e04342dc6..00000000000 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch; - -import org.apache.seatunnel.api.table.type.SeaTunnelRow; - -public interface MilvusBatchWriter { - - void addToBatch(SeaTunnelRow element); - - boolean needFlush(); - - boolean flush(); - - void close(); -} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java deleted file mode 100644 index 46f4e7ce7c7..00000000000 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch; - -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.PrimaryKey; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.utils.SeaTunnelException; -import org.apache.seatunnel.connectors.seatunnel.milvus.convert.MilvusConvertUtils; -import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode; -import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; - -import org.apache.commons.collections4.CollectionUtils; - -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import io.milvus.v2.client.MilvusClientV2; -import io.milvus.v2.service.vector.request.InsertReq; -import io.milvus.v2.service.vector.request.UpsertReq; - -import java.util.ArrayList; -import java.util.List; - -import static org.apache.seatunnel.api.table.catalog.PrimaryKey.isPrimaryKeyField; - -public class MilvusBufferBatchWriter implements MilvusBatchWriter { - - private final int batchSize; - private final CatalogTable catalogTable; - private final Boolean autoId; - private final Boolean enableUpsert; - private final String collectionName; - private MilvusClientV2 milvusClient; - - private volatile List milvusDataCache; - private volatile int writeCount = 0; - private static final Gson GSON = new Gson(); - - public MilvusBufferBatchWriter( - CatalogTable catalogTable, - Integer batchSize, - Boolean autoId, - Boolean enableUpsert, - MilvusClientV2 milvusClient) { - this.catalogTable = catalogTable; - this.autoId = autoId; - this.enableUpsert = enableUpsert; - this.milvusClient = milvusClient; - this.collectionName = catalogTable.getTablePath().getTableName(); - this.batchSize = batchSize; - this.milvusDataCache = new ArrayList<>(batchSize); - } - - @Override - public void addToBatch(SeaTunnelRow element) { - JsonObject data = buildMilvusData(element); - milvusDataCache.add(data); - writeCount++; - } - - @Override - public boolean needFlush() { - return this.writeCount >= this.batchSize; - } - - @Override - public synchronized boolean flush() { - if (CollectionUtils.isEmpty(this.milvusDataCache)) { - return true; - } - writeData2Collection(); - this.milvusDataCache = new ArrayList<>(this.batchSize); - this.writeCount = 0; - return true; - } - - @Override - public void close() { - try { - this.milvusClient.close(10); - } catch (InterruptedException e) { - throw new SeaTunnelException(e); - } - } - - private JsonObject buildMilvusData(SeaTunnelRow element) { - SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType(); - PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey(); - - JsonObject data = new JsonObject(); - for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) { - String fieldName = seaTunnelRowType.getFieldNames()[i]; - - if (autoId && isPrimaryKeyField(primaryKey, fieldName)) { - continue; // if create table open AutoId, then don't need insert data with - // primaryKey field. - } - - SeaTunnelDataType fieldType = seaTunnelRowType.getFieldType(i); - Object value = element.getField(i); - if (null == value) { - throw new MilvusConnectorException( - MilvusConnectionErrorCode.FIELD_IS_NULL, fieldName); - } - - data.add( - fieldName, - GSON.toJsonTree(MilvusConvertUtils.convertBySeaTunnelType(fieldType, value))); - } - return data; - } - - private void writeData2Collection() { - // default to use upsertReq, but upsert only works when autoID is disabled - if (enableUpsert && !autoId) { - UpsertReq upsertReq = - UpsertReq.builder() - .collectionName(this.collectionName) - .data(this.milvusDataCache) - .build(); - milvusClient.upsert(upsertReq); - } else { - InsertReq insertReq = - InsertReq.builder() - .collectionName(this.collectionName) - .data(this.milvusDataCache) - .build(); - milvusClient.insert(insertReq); - } - } -} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java index 76ccfb743e5..abb7e9c898d 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java @@ -28,7 +28,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig; -import org.apache.seatunnel.connectors.seatunnel.milvus.convert.MilvusConvertUtils; +import org.apache.seatunnel.connectors.seatunnel.milvus.utils.MilvusConvertUtils; import java.util.ArrayList; import java.util.List; @@ -42,9 +42,10 @@ public class MilvusSource private final ReadonlyConfig config; private final Map sourceTables; - public MilvusSource(ReadonlyConfig sourceConfig) { - this.config = sourceConfig; - this.sourceTables = MilvusConvertUtils.getSourceTables(config); + public MilvusSource(ReadonlyConfig sourceConfing) { + this.config = sourceConfing; + MilvusConvertUtils milvusConvertUtils = new MilvusConvertUtils(sourceConfing); + this.sourceTables = milvusConvertUtils.getSourceTables(); } @Override diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java index 7464c652b31..0fcfb49e784 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java @@ -24,44 +24,49 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; -import org.apache.seatunnel.api.table.type.RowKind; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.exception.CommonErrorCode; -import org.apache.seatunnel.common.utils.BufferUtils; import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig; import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode; import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; +import org.apache.seatunnel.connectors.seatunnel.milvus.utils.source.MilvusSourceConverter; import org.apache.curator.shaded.com.google.common.collect.Lists; +import org.codehaus.plexus.util.StringUtils; + import io.milvus.client.MilvusServiceClient; import io.milvus.grpc.GetLoadStateResponse; import io.milvus.grpc.LoadState; +import io.milvus.grpc.QueryResults; import io.milvus.orm.iterator.QueryIterator; import io.milvus.param.ConnectParam; import io.milvus.param.R; +import io.milvus.param.RpcStatus; +import io.milvus.param.collection.AlterCollectionParam; import io.milvus.param.collection.GetLoadStateParam; import io.milvus.param.dml.QueryIteratorParam; +import io.milvus.param.dml.QueryParam; import io.milvus.response.QueryResultsWrapper; import lombok.extern.slf4j.Slf4j; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.Deque; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentLinkedDeque; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig.BATCH_SIZE; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig.RATE_LIMIT; + @Slf4j public class MilvusSourceReader implements SourceReader { private final Deque pendingSplits = new ConcurrentLinkedDeque<>(); private final ReadonlyConfig config; private final Context context; - private Map sourceTables; + private final Map sourceTables; private MilvusServiceClient client; @@ -84,11 +89,36 @@ public void open() throws Exception { .withUri(config.get(MilvusSourceConfig.URL)) .withToken(config.get(MilvusSourceConfig.TOKEN)) .build()); + setRateLimit(config.get(RATE_LIMIT).toString()); + } + + private void setRateLimit(String rateLimit) { + log.info("Set rate limit: " + rateLimit); + for (Map.Entry entry : sourceTables.entrySet()) { + TablePath tablePath = entry.getKey(); + String collectionName = tablePath.getTableName(); + + AlterCollectionParam alterCollectionParam = + AlterCollectionParam.newBuilder() + .withDatabaseName(tablePath.getDatabaseName()) + .withCollectionName(collectionName) + .withProperty("collection.queryRate.max.qps", rateLimit) + .build(); + R response = client.alterCollection(alterCollectionParam); + if (response.getStatus() != R.Status.Success.getCode()) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED, response.getException()); + } + } + log.info("Set rate limit success"); } @Override public void close() throws IOException { + log.info("Close milvus source reader"); + setRateLimit("-1"); client.close(); + log.info("Close milvus source reader success"); } @Override @@ -96,7 +126,13 @@ public void pollNext(Collector output) throws Exception { synchronized (output.getCheckpointLock()) { MilvusSourceSplit split = pendingSplits.poll(); if (null != split) { - handleEveryRowInternal(split, output); + try { + log.info("Begin to read data from split: " + split); + pollNextData(split, output); + } catch (Exception e) { + log.error("Read data from split: " + split + " failed", e); + throw new MilvusConnectorException(MilvusConnectionErrorCode.READ_DATA_FAIL, e); + } } else { if (!noMoreSplit) { log.info("Milvus source wait split!"); @@ -113,9 +149,12 @@ public void pollNext(Collector output) throws Exception { Thread.sleep(1000L); } - private void handleEveryRowInternal(MilvusSourceSplit split, Collector output) { + private void pollNextData(MilvusSourceSplit split, Collector output) + throws InterruptedException { TablePath tablePath = split.getTablePath(); + String partitionName = split.getPartitionName(); TableSchema tableSchema = sourceTables.get(tablePath).getTableSchema(); + log.info("begin to read data from milvus, table schema: " + tableSchema); if (null == tableSchema) { throw new MilvusConnectorException( MilvusConnectionErrorCode.SOURCE_TABLE_SCHEMA_IS_NULL); @@ -136,129 +175,115 @@ private void handleEveryRowInternal(MilvusSourceSplit split, Collector response = client.queryIterator(param); - if (response.getStatus() != R.Status.Success.getCode()) { + R queryResultsR = client.query(queryParam.build()); + + if (queryResultsR.getStatus() != R.Status.Success.getCode()) { throw new MilvusConnectorException( MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED, loadStateResponse.getException()); } + QueryResultsWrapper wrapper = new QueryResultsWrapper(queryResultsR.getData()); + List records = wrapper.getRowRecords(); + log.info("Total records num: " + records.get(0).getFieldValues().get("count(*)")); - QueryIterator iterator = response.getData(); - while (true) { - List next = iterator.next(); - if (next == null || next.isEmpty()) { - break; - } else { - for (QueryResultsWrapper.RowRecord record : next) { - SeaTunnelRow seaTunnelRow = - convertToSeaTunnelRow(record, tableSchema, tablePath); - output.collect(seaTunnelRow); - } - } - } + long batchSize = (long) config.get(BATCH_SIZE); + queryIteratorData(tablePath, partitionName, tableSchema, output, batchSize); } - public SeaTunnelRow convertToSeaTunnelRow( - QueryResultsWrapper.RowRecord record, TableSchema tableSchema, TablePath tablePath) { - SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType(); - Object[] fields = new Object[record.getFieldValues().size()]; - Map fieldValuesMap = record.getFieldValues(); - String[] fieldNames = typeInfo.getFieldNames(); - for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) { - SeaTunnelDataType seaTunnelDataType = typeInfo.getFieldType(fieldIndex); - Object filedValues = fieldValuesMap.get(fieldNames[fieldIndex]); - switch (seaTunnelDataType.getSqlType()) { - case STRING: - fields[fieldIndex] = filedValues.toString(); - break; - case BOOLEAN: - if (filedValues instanceof Boolean) { - fields[fieldIndex] = filedValues; - } else { - fields[fieldIndex] = Boolean.valueOf(filedValues.toString()); - } - break; - case INT: - if (filedValues instanceof Integer) { - fields[fieldIndex] = filedValues; - } else { - fields[fieldIndex] = Integer.valueOf(filedValues.toString()); - } - break; - case BIGINT: - if (filedValues instanceof Long) { - fields[fieldIndex] = filedValues; - } else { - fields[fieldIndex] = Long.parseLong(filedValues.toString()); - } - break; - case FLOAT: - if (filedValues instanceof Float) { - fields[fieldIndex] = filedValues; - } else { - fields[fieldIndex] = Float.parseFloat(filedValues.toString()); - } - break; - case DOUBLE: - if (filedValues instanceof Double) { - fields[fieldIndex] = filedValues; - } else { - fields[fieldIndex] = Double.parseDouble(filedValues.toString()); - } - break; - case FLOAT_VECTOR: - if (filedValues instanceof List) { - List list = (List) filedValues; - Float[] arrays = new Float[list.size()]; - for (int i = 0; i < list.size(); i++) { - arrays[i] = Float.parseFloat(list.get(i).toString()); - } - fields[fieldIndex] = BufferUtils.toByteBuffer(arrays); - break; - } else { - throw new MilvusConnectorException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, - "Unexpected vector value: " + filedValues); - } - case BINARY_VECTOR: - case FLOAT16_VECTOR: - case BFLOAT16_VECTOR: - if (filedValues instanceof ByteBuffer) { - fields[fieldIndex] = filedValues; + private void queryIteratorData( + TablePath tablePath, + String partitionName, + TableSchema tableSchema, + Collector output, + long batchSize) + throws InterruptedException { + try { + MilvusSourceConverter sourceConverter = new MilvusSourceConverter(tableSchema); + + QueryIteratorParam.Builder param = + QueryIteratorParam.newBuilder() + .withDatabaseName(tablePath.getDatabaseName()) + .withCollectionName(tablePath.getTableName()) + .withOutFields(Lists.newArrayList("*")) + .withBatchSize(batchSize); + + if (StringUtils.isNotEmpty(partitionName)) { + param.withPartitionNames(Collections.singletonList(partitionName)); + } + + R response = client.queryIterator(param.build()); + if (response.getStatus() != R.Status.Success.getCode()) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED, response.getException()); + } + int maxFailRetry = 3; + QueryIterator iterator = response.getData(); + while (maxFailRetry > 0) { + try { + List next = iterator.next(); + if (next == null || next.isEmpty()) { break; } else { - throw new MilvusConnectorException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, - "Unexpected vector value: " + filedValues); + for (QueryResultsWrapper.RowRecord record : next) { + SeaTunnelRow seaTunnelRow = + sourceConverter.convertToSeaTunnelRow( + record, tableSchema, tablePath); + if (StringUtils.isNotEmpty(partitionName)) { + seaTunnelRow.setPartitionName(partitionName); + } + output.collect(seaTunnelRow); + } } - case SPARSE_FLOAT_VECTOR: - if (filedValues instanceof Map) { - fields[fieldIndex] = filedValues; - break; + } catch (Exception e) { + if (e.getMessage().contains("rate limit exceeded")) { + // for rateLimit, we can try iterator again after 30s, no need to update + // batch size directly + maxFailRetry--; + if (maxFailRetry == 0) { + log.error( + "Iterate next data from milvus failed, batchSize = {}, throw exception", + batchSize, + e); + throw new MilvusConnectorException( + MilvusConnectionErrorCode.READ_DATA_FAIL, e); + } + log.error( + "Iterate next data from milvus failed, batchSize = {}, will retry after 30 s, maxRetry: {}", + batchSize, + maxFailRetry, + e); + Thread.sleep(30000); } else { + // if this error, we need to reduce batch size and try again, so throw + // exception here throw new MilvusConnectorException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, - "Unexpected vector value: " + filedValues); + MilvusConnectionErrorCode.READ_DATA_FAIL, e); } - default: - throw new MilvusConnectorException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, - "Unexpected value: " + seaTunnelDataType.getSqlType().name()); + } + } + } catch (Exception e) { + if (e.getMessage().contains("rate limit exceeded") && batchSize > 10) { + log.error( + "Query Iterate data from milvus failed, retry from beginning with smaller batch size: {} after 30 s", + batchSize / 2, + e); + Thread.sleep(30000); + queryIteratorData(tablePath, partitionName, tableSchema, output, batchSize / 2); + } else { + throw new MilvusConnectorException(MilvusConnectionErrorCode.READ_DATA_FAIL, e); } } - - SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields); - seaTunnelRow.setTableId(tablePath.getFullName()); - seaTunnelRow.setRowKind(RowKind.INSERT); - return seaTunnelRow; } @Override @@ -268,7 +293,7 @@ public List snapshotState(long checkpointId) throws Exception @Override public void addSplits(List splits) { - log.info("Adding milvus splits to reader: {}", splits); + log.info("Adding milvus splits to reader: " + splits); pendingSplits.addAll(splits); } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplit.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplit.java index e79d74b6dc0..d448242d9aa 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplit.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplit.java @@ -29,6 +29,7 @@ public class MilvusSourceSplit implements SourceSplit { private TablePath tablePath; private String splitId; + private String partitionName; @Override public String splitId() { diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java index e01e9c8ad5d..dd251032999 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java @@ -22,8 +22,19 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode; import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; +import io.milvus.client.MilvusClient; +import io.milvus.client.MilvusServiceClient; +import io.milvus.grpc.DescribeCollectionResponse; +import io.milvus.grpc.FieldSchema; +import io.milvus.grpc.ShowPartitionsResponse; +import io.milvus.param.ConnectParam; +import io.milvus.param.R; +import io.milvus.param.collection.DescribeCollectionParam; +import io.milvus.param.partition.ShowPartitionsParam; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -92,17 +103,62 @@ public void run() throws Exception { } private Collection generateSplits(CatalogTable table) { - log.info("Start splitting table {} into chunks...", table.getTablePath()); - MilvusSourceSplit milvusSourceSplit = - MilvusSourceSplit.builder() - .splitId(createSplitId(table.getTablePath(), 0)) - .tablePath(table.getTablePath()) + log.info("Start splitting table {} into chunks by partition...", table.getTablePath()); + ConnectParam connectParam = + ConnectParam.newBuilder() + .withUri(config.get(MilvusSourceConfig.URL)) + .withToken(config.get(MilvusSourceConfig.TOKEN)) .build(); - - return Collections.singletonList(milvusSourceSplit); + MilvusClient client = new MilvusServiceClient(connectParam); + String database = table.getTablePath().getDatabaseName(); + String collection = table.getTablePath().getTableName(); + R describeCollectionResponseR = + client.describeCollection( + DescribeCollectionParam.newBuilder() + .withDatabaseName(database) + .withCollectionName(collection) + .build()); + boolean hasPartitionKey = + describeCollectionResponseR.getData().getSchema().getFieldsList().stream() + .anyMatch(FieldSchema::getIsPartitionKey); + List milvusSourceSplits = new ArrayList<>(); + if (!hasPartitionKey) { + ShowPartitionsParam showPartitionsParam = + ShowPartitionsParam.newBuilder() + .withDatabaseName(database) + .withCollectionName(collection) + .build(); + R showPartitionsResponseR = + client.showPartitions(showPartitionsParam); + if (showPartitionsResponseR.getStatus() != R.Status.Success.getCode()) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.LIST_PARTITIONS_FAILED, + "Failed to show partitions: " + showPartitionsResponseR.getMessage()); + } + List partitionList = showPartitionsResponseR.getData().getPartitionNamesList(); + for (String partitionName : partitionList) { + MilvusSourceSplit milvusSourceSplit = + MilvusSourceSplit.builder() + .tablePath(table.getTablePath()) + .splitId(createSplitId(table.getTablePath(), partitionName)) + .partitionName(partitionName) + .build(); + log.info("Generated split: {}", milvusSourceSplit); + milvusSourceSplits.add(milvusSourceSplit); + } + } else { + MilvusSourceSplit milvusSourceSplit = + MilvusSourceSplit.builder() + .tablePath(table.getTablePath()) + .splitId(createSplitId(table.getTablePath(), "0")) + .build(); + log.info("Generated split: {}", milvusSourceSplit); + milvusSourceSplits.add(milvusSourceSplit); + } + return milvusSourceSplits; } - protected String createSplitId(TablePath tablePath, int index) { + protected String createSplitId(TablePath tablePath, String index) { return String.format("%s-%s", tablePath, index); } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConnectorUtils.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConnectorUtils.java new file mode 100644 index 00000000000..4a17c1e0835 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConnectorUtils.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.utils; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.common.constants.CommonOptions; + +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.service.collection.request.CreateCollectionReq; +import io.milvus.v2.service.collection.request.DescribeCollectionReq; +import io.milvus.v2.service.collection.response.DescribeCollectionResp; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public class MilvusConnectorUtils { + + public static Boolean hasPartitionKey(MilvusClientV2 milvusClient, String collectionName) { + + DescribeCollectionResp describeCollectionResp = + milvusClient.describeCollection( + DescribeCollectionReq.builder().collectionName(collectionName).build()); + return describeCollectionResp.getCollectionSchema().getFieldSchemaList().stream() + .anyMatch(CreateCollectionReq.FieldSchema::getIsPartitionKey); + } + + public static String getDynamicField(CatalogTable catalogTable) { + List columns = catalogTable.getTableSchema().getColumns(); + Column dynamicField = null; + for (Column column : columns) { + if (column.getOptions() != null + && column.getOptions().containsKey(CommonOptions.METADATA.getName()) + && (Boolean) column.getOptions().get(CommonOptions.METADATA.getName())) { + // skip dynamic field + dynamicField = column; + } + } + return dynamicField == null ? null : dynamicField.getName(); + } + + public static List getJsonField(CatalogTable catalogTable) { + List columns = catalogTable.getTableSchema().getColumns(); + List jsonColumn = new ArrayList<>(); + for (Column column : columns) { + if (column.getOptions() != null + && column.getOptions().containsKey(CommonOptions.JSON.getName()) + && (Boolean) column.getOptions().get(CommonOptions.JSON.getName())) { + // skip dynamic field + jsonColumn.add(column.getName()); + } + } + return jsonColumn; + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java new file mode 100644 index 00000000000..7d42966eb67 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.utils; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.VectorIndex; +import org.apache.seatunnel.common.constants.CommonOptions; +import org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusOptions; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; +import org.apache.seatunnel.connectors.seatunnel.milvus.utils.source.MilvusSourceConverter; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.util.Lists; + +import com.google.protobuf.ProtocolStringList; +import io.milvus.client.MilvusServiceClient; +import io.milvus.grpc.CollectionSchema; +import io.milvus.grpc.DescribeCollectionResponse; +import io.milvus.grpc.DescribeIndexResponse; +import io.milvus.grpc.FieldSchema; +import io.milvus.grpc.IndexDescription; +import io.milvus.grpc.KeyValuePair; +import io.milvus.grpc.ShowCollectionsResponse; +import io.milvus.grpc.ShowPartitionsResponse; +import io.milvus.grpc.ShowType; +import io.milvus.param.ConnectParam; +import io.milvus.param.R; +import io.milvus.param.collection.DescribeCollectionParam; +import io.milvus.param.collection.ShowCollectionsParam; +import io.milvus.param.index.DescribeIndexParam; +import io.milvus.param.partition.ShowPartitionsParam; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE; + +@Slf4j +public class MilvusConvertUtils { + private final ReadonlyConfig config; + + public MilvusConvertUtils(ReadonlyConfig config) { + this.config = config; + } + + public Map getSourceTables() { + MilvusServiceClient client = + new MilvusServiceClient( + ConnectParam.newBuilder() + .withUri(config.get(MilvusSourceConfig.URL)) + .withToken(config.get(MilvusSourceConfig.TOKEN)) + .build()); + + String database = config.get(MilvusSourceConfig.DATABASE); + List collectionList = new ArrayList<>(); + if (StringUtils.isNotEmpty(config.get(MilvusSourceConfig.COLLECTION))) { + collectionList.add(config.get(MilvusSourceConfig.COLLECTION)); + } else { + R response = + client.showCollections( + ShowCollectionsParam.newBuilder() + .withDatabaseName(database) + .withShowType(ShowType.All) + .build()); + if (response.getStatus() != R.Status.Success.getCode()) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.SHOW_COLLECTIONS_ERROR); + } + + ProtocolStringList collections = response.getData().getCollectionNamesList(); + if (CollectionUtils.isEmpty(collections)) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.DATABASE_NO_COLLECTIONS, database); + } + collectionList.addAll(collections); + } + + Map map = new HashMap<>(); + for (String collection : collectionList) { + CatalogTable catalogTable = getCatalogTable(client, database, collection); + TablePath tablePath = TablePath.of(database, null, collection); + map.put(tablePath, catalogTable); + } + return map; + } + + public CatalogTable getCatalogTable( + MilvusServiceClient client, String database, String collection) { + R response = + client.describeCollection( + DescribeCollectionParam.newBuilder() + .withDatabaseName(database) + .withCollectionName(collection) + .build()); + + if (response.getStatus() != R.Status.Success.getCode()) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.DESC_COLLECTION_ERROR, response.getMessage()); + } + log.info( + "describe collection database: {}, collection: {}, response: {}", + database, + collection, + response); + // collection column + DescribeCollectionResponse collectionResponse = response.getData(); + CollectionSchema schema = collectionResponse.getSchema(); + List columns = new ArrayList<>(); + boolean existPartitionKeyField = false; + String partitionKeyField = null; + for (FieldSchema fieldSchema : schema.getFieldsList()) { + PhysicalColumn physicalColumn = MilvusSourceConverter.convertColumn(fieldSchema); + columns.add(physicalColumn); + if (fieldSchema.getIsPartitionKey()) { + existPartitionKeyField = true; + partitionKeyField = fieldSchema.getName(); + } + } + if (collectionResponse.getSchema().getEnableDynamicField()) { + Map options = new HashMap<>(); + + options.put(CommonOptions.METADATA.getName(), true); + PhysicalColumn dynamicColumn = + PhysicalColumn.builder() + .name(CommonOptions.METADATA.getName()) + .dataType(STRING_TYPE) + .options(options) + .build(); + columns.add(dynamicColumn); + } + + // primary key + PrimaryKey primaryKey = buildPrimaryKey(schema.getFieldsList()); + + // index + R describeIndexResponseR = + client.describeIndex( + DescribeIndexParam.newBuilder() + .withDatabaseName(database) + .withCollectionName(collection) + .build()); + if (describeIndexResponseR.getStatus() != R.Status.Success.getCode()) { + throw new MilvusConnectorException(MilvusConnectionErrorCode.DESC_INDEX_ERROR); + } + DescribeIndexResponse indexResponse = describeIndexResponseR.getData(); + List vectorIndexes = buildVectorIndexes(indexResponse); + + // build tableSchema + TableSchema tableSchema = + TableSchema.builder() + .columns(columns) + .primaryKey(primaryKey) + .constraintKey( + ConstraintKey.of( + ConstraintKey.ConstraintType.VECTOR_INDEX_KEY, + "vector_index", + vectorIndexes)) + .build(); + + // build tableId + String CATALOG_NAME = "Milvus"; + TableIdentifier tableId = TableIdentifier.of(CATALOG_NAME, database, null, collection); + // build options info + Map options = new HashMap<>(); + options.put( + MilvusOptions.ENABLE_DYNAMIC_FIELD, String.valueOf(schema.getEnableDynamicField())); + options.put(MilvusOptions.SHARDS_NUM, String.valueOf(collectionResponse.getShardsNum())); + if (existPartitionKeyField) { + options.put(MilvusOptions.PARTITION_KEY_FIELD, partitionKeyField); + } else { + fillPartitionNames(options, client, database, collection); + } + + return CatalogTable.of( + tableId, tableSchema, options, new ArrayList<>(), schema.getDescription()); + } + + private static void fillPartitionNames( + Map options, + MilvusServiceClient client, + String database, + String collection) { + // not exist partition key, will read partition + R partitionsResponseR = + client.showPartitions( + ShowPartitionsParam.newBuilder() + .withDatabaseName(database) + .withCollectionName(collection) + .build()); + if (partitionsResponseR.getStatus() != R.Status.Success.getCode()) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.SHOW_PARTITION_ERROR, + partitionsResponseR.getMessage()); + } + + ProtocolStringList partitionNamesList = + partitionsResponseR.getData().getPartitionNamesList(); + List list = new ArrayList<>(); + for (String partition : partitionNamesList) { + if (partition.equals("_default")) { + continue; + } + list.add(partition); + } + if (CollectionUtils.isEmpty(partitionNamesList)) { + return; + } + + options.put(MilvusOptions.PARTITION_NAMES, String.join(",", list)); + } + + private static List buildVectorIndexes( + DescribeIndexResponse indexResponse) { + if (CollectionUtils.isEmpty(indexResponse.getIndexDescriptionsList())) { + return null; + } + + List list = new ArrayList<>(); + for (IndexDescription per : indexResponse.getIndexDescriptionsList()) { + Map paramsMap = + per.getParamsList().stream() + .collect( + Collectors.toMap(KeyValuePair::getKey, KeyValuePair::getValue)); + + VectorIndex index = + new VectorIndex( + per.getIndexName(), + per.getFieldName(), + paramsMap.get("index_type"), + paramsMap.get("metric_type")); + + list.add(index); + } + + return list; + } + + public static PrimaryKey buildPrimaryKey(List fields) { + for (FieldSchema field : fields) { + if (field.getIsPrimaryKey()) { + return PrimaryKey.of( + field.getName(), Lists.newArrayList(field.getName()), field.getAutoID()); + } + } + + return null; + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/sink/MilvusSinkConverter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/sink/MilvusSinkConverter.java new file mode 100644 index 00000000000..18aa3dbccfd --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/sink/MilvusSinkConverter.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.utils.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.common.constants.CommonOptions; +import org.apache.seatunnel.common.utils.BufferUtils; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; + +import org.apache.commons.lang3.StringUtils; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.milvus.common.utils.JacksonUtils; +import io.milvus.grpc.DataType; +import io.milvus.param.collection.FieldType; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.api.table.catalog.PrimaryKey.isPrimaryKeyField; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.ENABLE_AUTO_ID; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.ENABLE_DYNAMIC_FIELD; + +public class MilvusSinkConverter { + private static final Gson gson = new Gson(); + + public Object convertBySeaTunnelType( + SeaTunnelDataType fieldType, Boolean isJson, Object value) { + SqlType sqlType = fieldType.getSqlType(); + switch (sqlType) { + case INT: + return Integer.parseInt(value.toString()); + case TINYINT: + return Byte.parseByte(value.toString()); + case BIGINT: + return Long.parseLong(value.toString()); + case SMALLINT: + return Short.parseShort(value.toString()); + case STRING: + case DATE: + if (isJson) { + return gson.fromJson(value.toString(), JsonObject.class); + } + return value.toString(); + case FLOAT_VECTOR: + ByteBuffer floatVectorBuffer = (ByteBuffer) value; + Float[] floats = BufferUtils.toFloatArray(floatVectorBuffer); + return Arrays.stream(floats).collect(Collectors.toList()); + case BINARY_VECTOR: + case BFLOAT16_VECTOR: + case FLOAT16_VECTOR: + ByteBuffer binaryVector = (ByteBuffer) value; + return gson.toJsonTree(binaryVector.array()); + case SPARSE_FLOAT_VECTOR: + return JsonParser.parseString(JacksonUtils.toJsonString(value)).getAsJsonObject(); + case FLOAT: + return Float.parseFloat(value.toString()); + case BOOLEAN: + return Boolean.parseBoolean(value.toString()); + case DOUBLE: + return Double.parseDouble(value.toString()); + case ARRAY: + ArrayType arrayType = (ArrayType) fieldType; + switch (arrayType.getElementType().getSqlType()) { + case STRING: + String[] stringArray = (String[]) value; + return Arrays.asList(stringArray); + case SMALLINT: + Short[] shortArray = (Short[]) value; + return Arrays.asList(shortArray); + case TINYINT: + Byte[] byteArray = (Byte[]) value; + return Arrays.asList(byteArray); + case INT: + Integer[] intArray = (Integer[]) value; + return Arrays.asList(intArray); + case BIGINT: + Long[] longArray = (Long[]) value; + return Arrays.asList(longArray); + case FLOAT: + Float[] floatArray = (Float[]) value; + return Arrays.asList(floatArray); + case DOUBLE: + Double[] doubleArray = (Double[]) value; + return Arrays.asList(doubleArray); + } + case ROW: + SeaTunnelRow row = (SeaTunnelRow) value; + return JsonUtils.toJsonString(row.getFields()); + case MAP: + return JacksonUtils.toJsonString(value); + default: + throw new MilvusConnectorException( + MilvusConnectionErrorCode.NOT_SUPPORT_TYPE, sqlType.name()); + } + } + + public static FieldType convertToFieldType( + Column column, PrimaryKey primaryKey, String partitionKeyField, Boolean autoId) { + SeaTunnelDataType seaTunnelDataType = column.getDataType(); + DataType milvusDataType = convertSqlTypeToDataType(seaTunnelDataType.getSqlType()); + FieldType.Builder build = + FieldType.newBuilder().withName(column.getName()).withDataType(milvusDataType); + if (StringUtils.isNotEmpty(column.getComment())) { + build.withDescription(column.getComment()); + } + switch (seaTunnelDataType.getSqlType()) { + case ROW: + build.withMaxLength(65535); + break; + case DATE: + build.withMaxLength(20); + break; + case STRING: + if (column.getOptions() != null + && column.getOptions().get(CommonOptions.JSON.getName()) != null + && (Boolean) column.getOptions().get(CommonOptions.JSON.getName())) { + // check if is json + build.withDataType(DataType.JSON); + } else if (column.getColumnLength() == null || column.getColumnLength() == 0) { + build.withMaxLength(65535); + } else { + build.withMaxLength((int) (column.getColumnLength() / 4)); + } + break; + case ARRAY: + ArrayType arrayType = (ArrayType) column.getDataType(); + SeaTunnelDataType elementType = arrayType.getElementType(); + build.withElementType(convertSqlTypeToDataType(elementType.getSqlType())); + build.withMaxCapacity(4095); + switch (elementType.getSqlType()) { + case STRING: + if (column.getColumnLength() == null || column.getColumnLength() == 0) { + build.withMaxLength(65535); + } else { + build.withMaxLength((int) (column.getColumnLength() / 4)); + } + break; + } + break; + case BINARY_VECTOR: + case FLOAT_VECTOR: + case FLOAT16_VECTOR: + case BFLOAT16_VECTOR: + build.withDimension(column.getScale()); + break; + } + + // check is primaryKey + if (null != primaryKey && primaryKey.getColumnNames().contains(column.getName())) { + build.withPrimaryKey(true); + List integerTypes = new ArrayList<>(); + integerTypes.add(SqlType.INT); + integerTypes.add(SqlType.SMALLINT); + integerTypes.add(SqlType.TINYINT); + integerTypes.add(SqlType.BIGINT); + if (integerTypes.contains(seaTunnelDataType.getSqlType())) { + build.withDataType(DataType.Int64); + } else { + build.withDataType(DataType.VarChar); + build.withMaxLength(65535); + } + if (null != primaryKey.getEnableAutoId()) { + build.withAutoID(primaryKey.getEnableAutoId()); + } else { + build.withAutoID(autoId); + } + } + + // check is partitionKey + if (column.getName().equals(partitionKeyField)) { + build.withPartitionKey(true); + } + + return build.build(); + } + + public static DataType convertSqlTypeToDataType(SqlType sqlType) { + switch (sqlType) { + case BOOLEAN: + return DataType.Bool; + case TINYINT: + return DataType.Int8; + case SMALLINT: + return DataType.Int16; + case INT: + return DataType.Int32; + case BIGINT: + return DataType.Int64; + case FLOAT: + return DataType.Float; + case DOUBLE: + return DataType.Double; + case STRING: + return DataType.VarChar; + case ARRAY: + return DataType.Array; + case MAP: + return DataType.JSON; + case FLOAT_VECTOR: + return DataType.FloatVector; + case BINARY_VECTOR: + return DataType.BinaryVector; + case FLOAT16_VECTOR: + return DataType.Float16Vector; + case BFLOAT16_VECTOR: + return DataType.BFloat16Vector; + case SPARSE_FLOAT_VECTOR: + return DataType.SparseFloatVector; + case DATE: + return DataType.VarChar; + case ROW: + return DataType.VarChar; + } + throw new CatalogException( + String.format("Not support convert to milvus type, sqlType is %s", sqlType)); + } + + public JsonObject buildMilvusData( + CatalogTable catalogTable, + ReadonlyConfig config, + List jsonFields, + String dynamicField, + SeaTunnelRow element) { + SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType(); + PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey(); + Boolean autoId = config.get(ENABLE_AUTO_ID); + + JsonObject data = new JsonObject(); + Gson gson = new Gson(); + for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) { + String fieldName = seaTunnelRowType.getFieldNames()[i]; + Boolean isJson = jsonFields.contains(fieldName); + if (autoId && isPrimaryKeyField(primaryKey, fieldName)) { + continue; // if create table open AutoId, then don't need insert data with + // primaryKey field. + } + + SeaTunnelDataType fieldType = seaTunnelRowType.getFieldType(i); + Object value = element.getField(i); + if (null == value) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.FIELD_IS_NULL, fieldName); + } + // if the field is dynamic field, then parse the dynamic field + if (dynamicField != null + && dynamicField.equals(fieldName) + && config.get(ENABLE_DYNAMIC_FIELD)) { + JsonObject dynamicData = gson.fromJson(value.toString(), JsonObject.class); + dynamicData + .entrySet() + .forEach( + entry -> { + data.add(entry.getKey(), entry.getValue()); + }); + continue; + } + Object object = convertBySeaTunnelType(fieldType, isJson, value); + data.add(fieldName, gson.toJsonTree(object)); + } + return data; + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/source/MilvusSourceConverter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/source/MilvusSourceConverter.java new file mode 100644 index 00000000000..bda3f96a420 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/source/MilvusSourceConverter.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.utils.source; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.api.table.type.VectorType; +import org.apache.seatunnel.common.constants.CommonOptions; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.common.utils.BufferUtils; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import io.milvus.grpc.DataType; +import io.milvus.grpc.FieldSchema; +import io.milvus.grpc.KeyValuePair; +import io.milvus.response.QueryResultsWrapper; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE; + +public class MilvusSourceConverter { + private final List existField; + private Gson gson = new Gson(); + + public MilvusSourceConverter(TableSchema tableSchema) { + this.existField = + tableSchema.getColumns().stream() + .filter( + column -> + column.getOptions() == null + || !column.getOptions() + .containsValue(CommonOptions.METADATA)) + .map(Column::getName) + .collect(Collectors.toList()); + } + + public SeaTunnelRow convertToSeaTunnelRow( + QueryResultsWrapper.RowRecord record, TableSchema tableSchema, TablePath tablePath) { + // get field names and types + SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType(); + String[] fieldNames = typeInfo.getFieldNames(); + + Object[] seatunnelField = new Object[typeInfo.getTotalFields()]; + // get field values from source milvus + Map fieldValuesMap = record.getFieldValues(); + // filter dynamic field + JsonObject dynamicField = convertDynamicField(fieldValuesMap); + + for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) { + if (fieldNames[fieldIndex].equals(CommonOptions.METADATA.getName())) { + seatunnelField[fieldIndex] = dynamicField.toString(); + continue; + } + SeaTunnelDataType seaTunnelDataType = typeInfo.getFieldType(fieldIndex); + Object filedValues = fieldValuesMap.get(fieldNames[fieldIndex]); + switch (seaTunnelDataType.getSqlType()) { + case STRING: + seatunnelField[fieldIndex] = filedValues.toString(); + break; + case BOOLEAN: + if (filedValues instanceof Boolean) { + seatunnelField[fieldIndex] = filedValues; + } else { + seatunnelField[fieldIndex] = Boolean.valueOf(filedValues.toString()); + } + break; + case TINYINT: + if (filedValues instanceof Byte) { + seatunnelField[fieldIndex] = filedValues; + } else { + seatunnelField[fieldIndex] = Byte.parseByte(filedValues.toString()); + } + break; + case SMALLINT: + if (filedValues instanceof Short) { + seatunnelField[fieldIndex] = filedValues; + } else { + seatunnelField[fieldIndex] = Short.parseShort(filedValues.toString()); + } + case INT: + if (filedValues instanceof Integer) { + seatunnelField[fieldIndex] = filedValues; + } else { + seatunnelField[fieldIndex] = Integer.valueOf(filedValues.toString()); + } + break; + case BIGINT: + if (filedValues instanceof Long) { + seatunnelField[fieldIndex] = filedValues; + } else { + seatunnelField[fieldIndex] = Long.parseLong(filedValues.toString()); + } + break; + case FLOAT: + if (filedValues instanceof Float) { + seatunnelField[fieldIndex] = filedValues; + } else { + seatunnelField[fieldIndex] = Float.parseFloat(filedValues.toString()); + } + break; + case DOUBLE: + if (filedValues instanceof Double) { + seatunnelField[fieldIndex] = filedValues; + } else { + seatunnelField[fieldIndex] = Double.parseDouble(filedValues.toString()); + } + break; + case ARRAY: + if (filedValues instanceof List) { + List list = (List) filedValues; + ArrayType arrayType = (ArrayType) seaTunnelDataType; + SqlType elementType = arrayType.getElementType().getSqlType(); + switch (elementType) { + case STRING: + String[] arrays = new String[list.size()]; + for (int i = 0; i < list.size(); i++) { + arrays[i] = list.get(i).toString(); + } + seatunnelField[fieldIndex] = arrays; + break; + case BOOLEAN: + Boolean[] booleanArrays = new Boolean[list.size()]; + for (int i = 0; i < list.size(); i++) { + booleanArrays[i] = Boolean.valueOf(list.get(i).toString()); + } + seatunnelField[fieldIndex] = booleanArrays; + break; + case TINYINT: + Byte[] byteArrays = new Byte[list.size()]; + for (int i = 0; i < list.size(); i++) { + byteArrays[i] = Byte.parseByte(list.get(i).toString()); + } + seatunnelField[fieldIndex] = byteArrays; + break; + case SMALLINT: + Short[] shortArrays = new Short[list.size()]; + for (int i = 0; i < list.size(); i++) { + shortArrays[i] = Short.parseShort(list.get(i).toString()); + } + seatunnelField[fieldIndex] = shortArrays; + break; + case INT: + Integer[] intArrays = new Integer[list.size()]; + for (int i = 0; i < list.size(); i++) { + intArrays[i] = Integer.valueOf(list.get(i).toString()); + } + seatunnelField[fieldIndex] = intArrays; + break; + case BIGINT: + Long[] longArrays = new Long[list.size()]; + for (int i = 0; i < list.size(); i++) { + longArrays[i] = Long.parseLong(list.get(i).toString()); + } + seatunnelField[fieldIndex] = longArrays; + break; + case FLOAT: + Float[] floatArrays = new Float[list.size()]; + for (int i = 0; i < list.size(); i++) { + floatArrays[i] = Float.parseFloat(list.get(i).toString()); + } + seatunnelField[fieldIndex] = floatArrays; + break; + case DOUBLE: + Double[] doubleArrays = new Double[list.size()]; + for (int i = 0; i < list.size(); i++) { + doubleArrays[i] = Double.parseDouble(list.get(i).toString()); + } + seatunnelField[fieldIndex] = doubleArrays; + break; + default: + throw new MilvusConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unexpected array value: " + filedValues); + } + } else { + throw new MilvusConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unexpected array value: " + filedValues); + } + break; + case FLOAT_VECTOR: + if (filedValues instanceof List) { + List list = (List) filedValues; + Float[] arrays = new Float[list.size()]; + for (int i = 0; i < list.size(); i++) { + arrays[i] = Float.parseFloat(list.get(i).toString()); + } + seatunnelField[fieldIndex] = BufferUtils.toByteBuffer(arrays); + break; + } else { + throw new MilvusConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unexpected vector value: " + filedValues); + } + case BINARY_VECTOR: + case FLOAT16_VECTOR: + case BFLOAT16_VECTOR: + if (filedValues instanceof ByteBuffer) { + seatunnelField[fieldIndex] = filedValues; + break; + } else { + throw new MilvusConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unexpected vector value: " + filedValues); + } + case SPARSE_FLOAT_VECTOR: + if (filedValues instanceof Map) { + seatunnelField[fieldIndex] = filedValues; + break; + } else { + throw new MilvusConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unexpected vector value: " + filedValues); + } + default: + throw new MilvusConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unexpected value: " + seaTunnelDataType.getSqlType().name()); + } + } + + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(seatunnelField); + seaTunnelRow.setTableId(tablePath.getFullName()); + seaTunnelRow.setRowKind(RowKind.INSERT); + return seaTunnelRow; + } + + public static PhysicalColumn convertColumn(FieldSchema fieldSchema) { + DataType dataType = fieldSchema.getDataType(); + PhysicalColumn.PhysicalColumnBuilder builder = PhysicalColumn.builder(); + builder.name(fieldSchema.getName()); + builder.sourceType(dataType.name()); + builder.comment(fieldSchema.getDescription()); + + switch (dataType) { + case Bool: + builder.dataType(BasicType.BOOLEAN_TYPE); + break; + case Int8: + builder.dataType(BasicType.BYTE_TYPE); + break; + case Int16: + builder.dataType(BasicType.SHORT_TYPE); + break; + case Int32: + builder.dataType(BasicType.INT_TYPE); + break; + case Int64: + builder.dataType(BasicType.LONG_TYPE); + break; + case Float: + builder.dataType(BasicType.FLOAT_TYPE); + break; + case Double: + builder.dataType(BasicType.DOUBLE_TYPE); + break; + case VarChar: + builder.dataType(BasicType.STRING_TYPE); + for (KeyValuePair keyValuePair : fieldSchema.getTypeParamsList()) { + if (keyValuePair.getKey().equals("max_length")) { + builder.columnLength(Long.parseLong(keyValuePair.getValue()) * 4); + break; + } + } + break; + case String: + builder.dataType(BasicType.STRING_TYPE); + break; + case JSON: + builder.dataType(STRING_TYPE); + Map options = new HashMap<>(); + options.put(CommonOptions.JSON.getName(), true); + builder.options(options); + break; + case Array: + builder.dataType(ArrayType.STRING_ARRAY_TYPE); + break; + case FloatVector: + builder.dataType(VectorType.VECTOR_FLOAT_TYPE); + for (KeyValuePair keyValuePair : fieldSchema.getTypeParamsList()) { + if (keyValuePair.getKey().equals("dim")) { + builder.scale(Integer.valueOf(keyValuePair.getValue())); + break; + } + } + break; + case BinaryVector: + builder.dataType(VectorType.VECTOR_BINARY_TYPE); + for (KeyValuePair keyValuePair : fieldSchema.getTypeParamsList()) { + if (keyValuePair.getKey().equals("dim")) { + builder.scale(Integer.valueOf(keyValuePair.getValue())); + break; + } + } + break; + case SparseFloatVector: + builder.dataType(VectorType.VECTOR_SPARSE_FLOAT_TYPE); + break; + case Float16Vector: + builder.dataType(VectorType.VECTOR_FLOAT16_TYPE); + for (KeyValuePair keyValuePair : fieldSchema.getTypeParamsList()) { + if (keyValuePair.getKey().equals("dim")) { + builder.scale(Integer.valueOf(keyValuePair.getValue())); + break; + } + } + break; + case BFloat16Vector: + builder.dataType(VectorType.VECTOR_BFLOAT16_TYPE); + for (KeyValuePair keyValuePair : fieldSchema.getTypeParamsList()) { + if (keyValuePair.getKey().equals("dim")) { + builder.scale(Integer.valueOf(keyValuePair.getValue())); + break; + } + } + break; + default: + throw new UnsupportedOperationException("Unsupported data type: " + dataType); + } + + return builder.build(); + } + + private JsonObject convertDynamicField(Map fieldValuesMap) { + JsonObject dynamicField = new JsonObject(); + for (Map.Entry entry : fieldValuesMap.entrySet()) { + if (!existField.contains(entry.getKey())) { + dynamicField.add(entry.getKey(), gson.toJsonTree(entry.getValue())); + } + } + return dynamicField; + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java index 037d4ba7424..5bdd987b6c9 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java @@ -84,6 +84,7 @@ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) { SeaTunnelRow outputRow = new SeaTunnelRow(outputDataArray); outputRow.setRowKind(inputRow.getRowKind()); outputRow.setTableId(inputRow.getTableId()); + outputRow.setPartitionName(inputRow.getPartitionName()); return outputRow; } @@ -110,9 +111,13 @@ protected TableSchema transformTableSchema() { value, oldColumn.getDataType(), oldColumn.getColumnLength(), + oldColumn.getScale(), oldColumn.isNullable(), oldColumn.getDefaultValue(), - oldColumn.getComment()); + oldColumn.getComment(), + oldColumn.getSourceType(), + oldColumn.getOptions()); + outputColumns.add(outputColumn); outputFieldNames.add(outputColumn.getName()); needReaderColIndex.add(fieldIndex);