diff --git a/plugin-mapping.properties b/plugin-mapping.properties index d2ee0b25830..8ab86c5b5e6 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -124,8 +124,6 @@ seatunnel.sink.Pulsar = connector-pulsar seatunnel.source.ObsFile = connector-file-obs seatunnel.sink.ObsFile = connector-file-obs seatunnel.sink.ActiveMQ = connector-activemq -seatunnel.source.Qdrant = connector-qdrant -seatunnel.sink.Qdrant = connector-qdrant seatunnel.source.Sls = connector-sls seatunnel.source.Typesense = connector-typesense seatunnel.sink.Typesense = connector-typesense @@ -152,4 +150,8 @@ seatunnel.source.Elasticsearch = connector-elasticsearch seatunnel.sink.Elasticsearch = connector-elasticsearch seatunnel.source.Jdbc = connector-jdbc seatunnel.sink.Jdbc = connector-jdbc +seatunnel.source.Pinecone = connector-pinecone +seatunnel.source.Qdrant = connector-qdrant +seatunnel.sink.Qdrant = connector-qdrant +seatunnel.source.Tencent = connector-tencent-vectordb diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java index 3eb9b39a19d..b1456347e04 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java @@ -254,6 +254,11 @@ public void createTableInternal(TablePath tablePath, CatalogTable catalogTable) TableSchema tableSchema = catalogTable.getTableSchema(); List fieldTypes = new ArrayList<>(); for (Column column : tableSchema.getColumns()) { + if(column.getOptions() != null && column.getOptions().containsKey(MilvusOptions.DYNAMIC_FIELD) + && (Boolean) column.getOptions().get(MilvusOptions.DYNAMIC_FIELD)){ + // skip dynamic field + continue; + } fieldTypes.add(convertToFieldType(column, tableSchema.getPrimaryKey(), partitionKeyField)); } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java index e8884a805e1..26f152ccbff 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java @@ -23,4 +23,5 @@ public class MilvusOptions { public static final String SHARDS_NUM = "shardsNum"; public static final String PARTITION_KEY_FIELD = "partitionKeyField"; public static final String PARTITION_NAMES = "partitionNames"; + public static final String DYNAMIC_FIELD = "isDynamicField"; } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java index 989a96f4080..dbb165a78b2 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java @@ -11,6 +11,4 @@ public interface MilvusBatchWriter { void flush() throws Exception; void close() throws Exception; - - long getRecordsWritten(); } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java index 51f2ca345bf..0aa1a719e0f 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java @@ -155,7 +155,6 @@ public void addToBatch(SeaTunnelRow element) { this.milvusDataCache.computeIfAbsent(partitionName, k -> new ArrayList<>()); milvusDataCache.get(partitionName).add(data); writeCache.incrementAndGet(); - writeCount.incrementAndGet(); } @Override @@ -165,15 +164,14 @@ public boolean needFlush() { @Override public void flush() throws Exception { - log.info("Starting to put {} records to Milvus.", this.batchSize); + log.info("Starting to put {} records to Milvus.", this.writeCache.get()); // Flush the batch writer // Get the number of records completed - long recordsWritten = getRecordsWritten(); - log.info("Successfully put {} records to Milvus. Total records written: {}", this.batchSize, recordsWritten); if (this.milvusDataCache.isEmpty()) { return; } writeData2Collection(); + log.info("Successfully put {} records to Milvus. Total records written: {}", this.writeCache.get(), this.writeCount.get()); this.milvusDataCache = new HashMap<>(); this.writeCache.set(0L); } @@ -194,11 +192,6 @@ public void close() throws Exception { } - @Override - public long getRecordsWritten() { - return this.writeCount.get(); - } - private JsonObject buildMilvusData(SeaTunnelRow element) { SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType(); PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey(); @@ -219,7 +212,8 @@ private JsonObject buildMilvusData(SeaTunnelRow element) { MilvusConnectionErrorCode.FIELD_IS_NULL, fieldName); } Gson gson = new Gson(); - data.add(fieldName, gson.toJsonTree(MilvusConvertUtils.convertBySeaTunnelType(fieldType, value))); + Object object = MilvusConvertUtils.convertBySeaTunnelType(fieldType, value); + data.add(fieldName, gson.toJsonTree(object)); } return data; } @@ -243,6 +237,7 @@ private void writeData2Collection() throws Exception { log.error("error data: " + milvusDataCache); throw new MilvusConnectorException(MilvusConnectionErrorCode.WRITE_DATA_FAIL); } + writeCount.addAndGet(this.writeCache.get()); } private void upsertWrite(String partitionName, List data) throws InterruptedException { @@ -259,21 +254,24 @@ private void upsertWrite(String partitionName, List data) throws Int } catch (Exception e) { if (e.getMessage().contains("rate limit exceeded") || e.getMessage().contains("received message larger than max")) { if (data.size() > 10) { - log.warn("upsert data failed, retry in smaller chunks"); + log.warn("upsert data failed, retry in smaller chunks: {} ", data.size()/2); + this.batchSize = this.batchSize / 2; + log.info("sleep 1 minute to avoid rate limit"); //sleep 1 minute to avoid rate limit Thread.sleep(60000); + log.info("sleep 1 minute success"); // Split the data and retry in smaller chunks List firstHalf = data.subList(0, data.size() / 2); List secondHalf = data.subList(data.size() / 2, data.size()); - this.batchSize = this.batchSize / 2; upsertWrite(partitionName, firstHalf); upsertWrite(partitionName, secondHalf); } else { // If the data size is 10, throw the exception to avoid infinite recursion - throw e; + throw new MilvusConnectorException(MilvusConnectionErrorCode.WRITE_DATA_FAIL, e.getMessage(), e); } } } + log.info("upsert data success"); } private void insertWrite(String partitionName, List data) { @@ -290,10 +288,11 @@ private void insertWrite(String partitionName, List data) { } catch (Exception e) { if (e.getMessage().contains("rate limit exceeded") || e.getMessage().contains("received message larger than max")) { if (data.size() > 10) { - log.warn("insert data failed, retry in smaller chunks"); + log.warn("insert data failed, retry in smaller chunks: {} ", data.size()/2); // Split the data and retry in smaller chunks List firstHalf = data.subList(0, data.size() / 2); List secondHalf = data.subList(data.size() / 2, data.size()); + this.batchSize = this.batchSize / 2; insertWrite(partitionName, firstHalf); insertWrite(partitionName, secondHalf); } else { diff --git a/seatunnel-connectors-v2/connector-pinecone/pom.xml b/seatunnel-connectors-v2/connector-pinecone/pom.xml new file mode 100644 index 00000000000..ee1ee67269a --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/pom.xml @@ -0,0 +1,46 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connectors-v2 + ${revision} + + + connector-pinecone + SeaTunnel : Connectors V2 : Pinecone + + + + io.pinecone + pinecone-client + 2.1.0 + + + com.google.code.gson + gson + 2.10.1 + + + + + diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/config/PineconeSourceConfig.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/config/PineconeSourceConfig.java new file mode 100644 index 00000000000..e0958341cb8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/config/PineconeSourceConfig.java @@ -0,0 +1,20 @@ +package org.apache.seatunnel.connectors.pinecone.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class PineconeSourceConfig { + public static final String CONNECTOR_IDENTITY = "Pinecone"; + + public static final Option API_KEY = + Options.key("api_key") + .stringType() + .noDefaultValue() + .withDescription("Pinecone token for authentication"); + + public static final Option INDEX = + Options.key("index") + .stringType() + .noDefaultValue() + .withDescription("Pinecone index name"); +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/exception/PineconeConnectionErrorCode.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/exception/PineconeConnectionErrorCode.java new file mode 100644 index 00000000000..97f87466d91 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/exception/PineconeConnectionErrorCode.java @@ -0,0 +1,18 @@ +package org.apache.seatunnel.connectors.pinecone.exception; + +import lombok.Getter; +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +@Getter +public enum PineconeConnectionErrorCode implements SeaTunnelErrorCode { + SOURCE_TABLE_SCHEMA_IS_NULL("PINECONE-01", "Source table schema is null"), + READ_DATA_FAIL("PINECONE-02", "Read data fail"); + + private final String code; + private final String description; + + PineconeConnectionErrorCode(String code, String description) { + this.code = code; + this.description = description; + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/exception/PineconeConnectorException.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/exception/PineconeConnectorException.java new file mode 100644 index 00000000000..08fcfa9cf59 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/exception/PineconeConnectorException.java @@ -0,0 +1,14 @@ +package org.apache.seatunnel.connectors.pinecone.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class PineconeConnectorException extends SeaTunnelRuntimeException { + public PineconeConnectorException(SeaTunnelErrorCode seaTunnelErrorCode) { + super(seaTunnelErrorCode, seaTunnelErrorCode.getErrorMessage()); + } + + public PineconeConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, seaTunnelErrorCode.getErrorMessage(), cause); + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSource.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSource.java new file mode 100644 index 00000000000..958246fba84 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSource.java @@ -0,0 +1,92 @@ +package org.apache.seatunnel.connectors.pinecone.source; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.*; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.pinecone.config.PineconeSourceConfig; +import org.apache.seatunnel.connectors.pinecone.utils.PineconeUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class PineconeSource implements SeaTunnelSource, + SupportParallelism, + SupportColumnProjection { + private final ReadonlyConfig config; + private final Map sourceTables; + + public PineconeSource(ReadonlyConfig config) { + this.config = config; + PineconeUtils pineconeUtils = new PineconeUtils(config); + this.sourceTables = pineconeUtils.getSourceTables(); + } + + /** + * Get the boundedness of this source. + * + * @return the boundedness of this source. + */ + @Override + public Boundedness getBoundedness() { + return null; + } + + /** + * Create source reader, used to produce data. + * + * @param readerContext reader context. + * @return source reader. + * @throws Exception when create reader failed. + */ + @Override + public SourceReader createReader(SourceReader.Context readerContext) throws Exception { + return new PineconeSourceReader(readerContext, config, sourceTables); + } + + @Override + public List getProducedCatalogTables() { + return new ArrayList<>(sourceTables.values()); + } + + /** + * Create source split enumerator, used to generate splits. This method will be called only once + * when start a source. + * + * @param enumeratorContext enumerator context. + * @return source split enumerator. + * @throws Exception when create enumerator failed. + */ + @Override + public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception { + return new PineconeSourceSplitEnumertor(enumeratorContext, config, sourceTables, null); + } + + /** + * Create source split enumerator, used to generate splits. This method will be called when + * restore from checkpoint. + * + * @param enumeratorContext enumerator context. + * @param checkpointState checkpoint state. + * @return source split enumerator. + * @throws Exception when create enumerator failed. + */ + @Override + public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, PineconeSourceState checkpointState) throws Exception { + return new PineconeSourceSplitEnumertor(enumeratorContext, config, sourceTables, checkpointState); + } + + /** + * Returns a unique identifier among same factory interfaces. + * + *

For consistency, an identifier should be declared as one lower case word (e.g. {@code + * kafka}). If multiple factories exist for different versions, a version should be appended + * using "-" (e.g. {@code elasticsearch-7}). + */ + @Override + public String getPluginName() { + return PineconeSourceConfig.CONNECTOR_IDENTITY; + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceFactory.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceFactory.java new file mode 100644 index 00000000000..6266fd35609 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceFactory.java @@ -0,0 +1,43 @@ +package org.apache.seatunnel.connectors.pinecone.source; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.connectors.pinecone.config.PineconeSourceConfig; + +import java.io.Serializable; + +@Slf4j +@AutoService(Factory.class) +public class PineconeSourceFactory implements TableSourceFactory { + + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> (SeaTunnelSource) new PineconeSource(context.getOptions()); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(PineconeSourceConfig.API_KEY) + .optional() + .build(); + } + + @Override + public Class getSourceClass() { + return PineconeSource.class; + } + + @Override + public String factoryIdentifier() { + return PineconeSourceConfig.CONNECTOR_IDENTITY; + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceReader.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceReader.java new file mode 100644 index 00000000000..300fdccf867 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceReader.java @@ -0,0 +1,171 @@ +package org.apache.seatunnel.connectors.pinecone.source; + +import io.pinecone.clients.Index; +import io.pinecone.clients.Pinecone; +import io.pinecone.proto.*; +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.pinecone.exception.PineconeConnectionErrorCode; +import org.apache.seatunnel.connectors.pinecone.exception.PineconeConnectorException; +import org.apache.seatunnel.connectors.pinecone.utils.ConverterUtils; +import org.apache.seatunnel.connectors.pinecone.utils.PineconeUtils; + +import java.io.IOException; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.connectors.pinecone.config.PineconeSourceConfig.API_KEY; + +@Slf4j +public class PineconeSourceReader implements SourceReader { + private final Deque pendingSplits = new ConcurrentLinkedDeque<>(); + private final ReadonlyConfig config; + private final Context context; + private Map sourceTables; + private Pinecone pinecone; + private String paginationToken; + + private volatile boolean noMoreSplit; + public PineconeSourceReader(Context readerContext, ReadonlyConfig config, Map sourceTables) { + this.context = readerContext; + this.config = config; + this.sourceTables = sourceTables; + } + + /** + * Open the source reader. + */ + @Override + public void open() throws Exception { + pinecone = new Pinecone.Builder(config.get(API_KEY)).build(); + } + + /** + * Called to close the reader, in case it holds on to any resources, like threads or network + * connections. + */ + @Override + public void close() throws IOException { + } + + /** + * Generate the next batch of records. + * + * @param output output collector. + * @throws Exception if error occurs. + */ + @Override + public void pollNext(Collector output) throws Exception { + synchronized (output.getCheckpointLock()) { + PineconeSourceSplit split = pendingSplits.poll(); + if (null != split) { + try { + log.info("Begin to read data from split: " + split); + TablePath tablePath = split.getTablePath(); + String namespace = split.getPartitionName(); + TableSchema tableSchema = sourceTables.get(tablePath).getTableSchema(); + log.info("begin to read data from pinecone, table schema: " + tableSchema); + if (null == tableSchema) { + throw new PineconeConnectorException( + PineconeConnectionErrorCode.SOURCE_TABLE_SCHEMA_IS_NULL); + } + Index index = pinecone.getIndexConnection(tablePath.getTableName()); + ListResponse listResponse; + while (!(Objects.equals(paginationToken, "")) && !namespace.isEmpty()) { + if(paginationToken == null){ + listResponse = index.list(namespace, 100); + }else { + listResponse = index.list(namespace, 100, paginationToken); + } + Pagination pagination = listResponse.getPagination(); + paginationToken = pagination.getNext(); + if(paginationToken.isEmpty()){ + break; + } + List vectorsList = listResponse.getVectorsList(); + List ids = vectorsList.stream().map(ListItem::getId).collect(Collectors.toList()); + FetchResponse fetchResponse = index.fetch(ids, namespace); + Map vectorMap = fetchResponse.getVectorsMap(); + for (Map.Entry entry : vectorMap.entrySet()) { + Vector vector = entry.getValue(); + SeaTunnelRow row = ConverterUtils.convertToSeatunnelRow(vector); + row.setPartitionName(namespace); + row.setTableId(tablePath.getFullName()); + output.collect(row); + } + } + } catch (Exception e) { + log.error("Read data from split: " + split + " failed", e); + throw new PineconeConnectorException( + PineconeConnectionErrorCode.READ_DATA_FAIL, e); + } + } else { + if (!noMoreSplit) { + log.info("Pinecone source wait split!"); + } + } + } + if (noMoreSplit + && pendingSplits.isEmpty() + && Boundedness.BOUNDED.equals(context.getBoundedness())) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded pinecone source"); + context.signalNoMoreElement(); + } + Thread.sleep(1000L); + } + + /** + * Get the current split checkpoint state by checkpointId. + * + *

If the source is bounded, checkpoint is not triggered. + * + * @param checkpointId checkpoint Id. + * @return split checkpoint state. + * @throws Exception if error occurs. + */ + @Override + public List snapshotState(long checkpointId) throws Exception { + return null; + } + + /** + * Add the split checkpoint state to reader. + * + * @param splits split checkpoint state. + */ + @Override + public void addSplits(List splits) { + log.info("Adding pinecone splits to reader: " + splits); + pendingSplits.addAll(splits); + } + + /** + * This method is called when the reader is notified that it will not receive any further + * splits. + * + *

It is triggered when the enumerator calls {@link + * SourceSplitEnumerator.Context#signalNoMoreSplits(int)} with the reader's parallel subtask. + */ + @Override + public void handleNoMoreSplits() { + log.info("receive no more splits message, this milvus reader will not add new split."); + noMoreSplit = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceSplit.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceSplit.java new file mode 100644 index 00000000000..562bc8a20fc --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceSplit.java @@ -0,0 +1,23 @@ +package org.apache.seatunnel.connectors.pinecone.source; + +import lombok.Data; +import lombok.experimental.SuperBuilder; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.TablePath; + +@Data +@SuperBuilder +public class PineconeSourceSplit implements SourceSplit { + private TablePath tablePath; + private String splitId; + private String partitionName; + /** + * Get the split id of this source split. + * + * @return id of this source split. + */ + @Override + public String splitId() { + return splitId; + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceSplitEnumertor.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceSplitEnumertor.java new file mode 100644 index 00000000000..93c90f38a35 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceSplitEnumertor.java @@ -0,0 +1,198 @@ +package org.apache.seatunnel.connectors.pinecone.source; + +import io.pinecone.clients.Index; +import io.pinecone.clients.Pinecone; +import io.pinecone.proto.DescribeIndexStatsResponse; +import io.pinecone.proto.NamespaceSummary; +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.openapitools.control.client.model.CollectionModel; +import org.openapitools.control.client.model.IndexModel; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.apache.seatunnel.connectors.pinecone.config.PineconeSourceConfig.API_KEY; + +@Slf4j +public class PineconeSourceSplitEnumertor implements SourceSplitEnumerator { + private final Map tables; + private final Context context; + private final ConcurrentLinkedQueue pendingTables; + private final Map> pendingSplits; + private final Object stateLock = new Object(); + private final Pinecone pinecone; + + private ReadonlyConfig config; + public PineconeSourceSplitEnumertor(Context context, ReadonlyConfig config, + Map sourceTables, PineconeSourceState sourceState) { + this.context = context; + this.tables = sourceTables; + this.config = config; + if (sourceState == null) { + this.pendingTables = new ConcurrentLinkedQueue<>(tables.keySet()); + this.pendingSplits = new HashMap<>(); + } else { + this.pendingTables = new ConcurrentLinkedQueue<>(sourceState.getPendingTables()); + this.pendingSplits = new HashMap<>(sourceState.getPendingSplits()); + } + pinecone = new Pinecone.Builder(config.get(API_KEY)).build(); + } + + @Override + public void open() { + + } + + /** + * The method is executed by the engine only once. + */ + @Override + public void run() throws Exception { + log.info("Starting pinecone split enumerator."); + Set readers = context.registeredReaders(); + while (!pendingTables.isEmpty()) { + synchronized (stateLock) { + TablePath tablePath = pendingTables.poll(); + log.info("begin to split table path: {}", tablePath); + Collection splits = generateSplits(tables.get(tablePath)); + log.info("end to split table {} into {} splits.", tablePath, splits.size()); + + addPendingSplit(splits); + } + + synchronized (stateLock) { + assignSplit(readers); + } + } + + log.info("No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); + } + + private void assignSplit(Collection readers) { + log.info("Assign pendingSplits to readers {}", readers); + + for (int reader : readers) { + List assignmentForReader = pendingSplits.remove(reader); + if (assignmentForReader != null && !assignmentForReader.isEmpty()) { + log.debug("Assign splits {} to reader {}", assignmentForReader, reader); + context.assignSplit(reader, assignmentForReader); + } + } + } + + private void addPendingSplit(Collection splits) { + int readerCount = context.currentParallelism(); + for (PineconeSourceSplit split : splits) { + int ownerReader = getSplitOwner(split.splitId(), readerCount); + log.info("Assigning {} to {} reader.", split, ownerReader); + + pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).add(split); + } + } + + private Collection generateSplits(CatalogTable catalogTable) { + Index index = pinecone.getIndexConnection(catalogTable.getTablePath().getTableName()); + DescribeIndexStatsResponse describeIndexStatsResponse = index.describeIndexStats(); + Map namespaceSummaryMap = describeIndexStatsResponse.getNamespacesMap(); + List splits = new ArrayList<>(); + for (String namespace : namespaceSummaryMap.keySet()) { + PineconeSourceSplit pineconeSourceSplit = PineconeSourceSplit.builder() + .tablePath(catalogTable.getTablePath()) + .splitId(catalogTable.getTablePath().getTableName() + "-" + namespace) + .partitionName(namespace) + .build(); + splits.add(pineconeSourceSplit); + } + return splits; + } + + private static int getSplitOwner(String tp, int numReaders) { + return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; + } + + /** + * Called to close the enumerator, in case it holds on to any resources, like threads or network + * connections. + */ + @Override + public void close() throws IOException { + + } + + /** + * Add a split back to the split enumerator. It will only happen when a {@link SourceReader} + * fails and there are splits assigned to it after the last successful checkpoint. + * + * @param splits The split to add back to the enumerator for reassignment. + * @param subtaskId The id of the subtask to which the returned splits belong. + */ + @Override + public void addSplitsBack(List splits, int subtaskId) { + if (!splits.isEmpty()) { + synchronized (stateLock) { + addPendingSplit(splits, subtaskId); + if (context.registeredReaders().contains(subtaskId)) { + assignSplit(Collections.singletonList(subtaskId)); + } else { + log.warn( + "Reader {} is not registered. Pending splits {} are not assigned.", + subtaskId, + splits); + } + } + } + log.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size()); + + } + + private void addPendingSplit(Collection splits, int ownerReader) { + pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).addAll(splits); + } + + @Override + public int currentUnassignedSplitSize() { + return pendingTables.isEmpty() && pendingSplits.isEmpty() ? 0 : 1; + } + + @Override + public void handleSplitRequest(int subtaskId) { + + } + + @Override + public void registerReader(int subtaskId) { + log.info("Register reader {} to MilvusSourceSplitEnumerator.", subtaskId); + if (!pendingSplits.isEmpty()) { + synchronized (stateLock) { + assignSplit(Collections.singletonList(subtaskId)); + } + } + + } + + /** + * If the source is bounded, checkpoint is not triggered. + * + * @param checkpointId + */ + @Override + public PineconeSourceState snapshotState(long checkpointId) throws Exception { + synchronized (stateLock) { + return new PineconeSourceState( + new ArrayList(pendingTables), new HashMap<>(pendingSplits)); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceState.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceState.java new file mode 100644 index 00000000000..caf7e881f9f --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceState.java @@ -0,0 +1,16 @@ +package org.apache.seatunnel.connectors.pinecone.source; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.apache.seatunnel.api.table.catalog.TablePath; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +@Data +@AllArgsConstructor +public class PineconeSourceState implements Serializable { + private List pendingTables; + private Map> pendingSplits; +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/utils/ConverterUtils.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/utils/ConverterUtils.java new file mode 100644 index 00000000000..7f353e23e01 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/utils/ConverterUtils.java @@ -0,0 +1,63 @@ +package org.apache.seatunnel.connectors.pinecone.utils; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.protobuf.Struct; +import com.google.protobuf.Value; +import io.pinecone.proto.Vector; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import java.util.List; +import java.util.Map; + +public class ConverterUtils { + public static SeaTunnelRow convertToSeatunnelRow(Vector vector) { + Object[] fields = new Object[3]; + fields[0] = vector.getId(); + List floats = vector.getValuesList(); + Float[] arrays = floats.toArray(new Float[0]); + fields[1] = arrays; + Struct meta = vector.getMetadata(); + Gson gson = new Gson(); + JsonObject data = new JsonObject(); + for (Map.Entry entry : meta.getFieldsMap().entrySet()) { + data.add(entry.getKey(), convertValueToJsonElement(entry.getValue())); + } + fields[2] = data; + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields); + seaTunnelRow.setRowKind(RowKind.INSERT); + return seaTunnelRow; + } + + private static JsonElement convertValueToJsonElement(Value value) { + Gson gson = new Gson(); + switch (value.getKindCase()) { + case NULL_VALUE: + return gson.toJsonTree(null); // Null value + case NUMBER_VALUE: + return gson.toJsonTree(value.getNumberValue()); // Double value + case STRING_VALUE: + return gson.toJsonTree(value.getStringValue()); // String value + case BOOL_VALUE: + return gson.toJsonTree(value.getBoolValue()); // Boolean value + case STRUCT_VALUE: + // Convert Struct to a JsonObject + JsonObject structJson = new JsonObject(); + value.getStructValue().getFieldsMap().forEach((k, v) -> + structJson.add(k, convertValueToJsonElement(v)) + ); + return structJson; + case LIST_VALUE: + // Convert List to a JsonArray + return gson.toJsonTree( + value.getListValue().getValuesList().stream() + .map(ConverterUtils::convertValueToJsonElement) + .toArray() + ); + default: + return gson.toJsonTree(null); // Default or unsupported case + } + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/utils/PineconeUtils.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/utils/PineconeUtils.java new file mode 100644 index 00000000000..4e3201b9a51 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/utils/PineconeUtils.java @@ -0,0 +1,77 @@ +package org.apache.seatunnel.connectors.pinecone.utils; + +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.protobuf.Struct; +import com.google.protobuf.Value; +import io.pinecone.clients.Index; +import io.pinecone.clients.Pinecone; +import io.pinecone.proto.Vector; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.*; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.openapitools.control.client.model.IndexModel; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.seatunnel.api.table.type.BasicType.JSON_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE; +import static org.apache.seatunnel.api.table.type.VectorType.VECTOR_FLOAT_TYPE; +import static org.apache.seatunnel.connectors.pinecone.config.PineconeSourceConfig.API_KEY; +import static org.apache.seatunnel.connectors.pinecone.config.PineconeSourceConfig.INDEX; + +public class PineconeUtils { + private ReadonlyConfig config; + Map sourceTables; + + public PineconeUtils(ReadonlyConfig config) { + this.config = config; + this.sourceTables = new HashMap<>(); + } + + public Map getSourceTables() { + Pinecone pinecone = new Pinecone.Builder(config.get(API_KEY)).build(); + String indexName = config.get(INDEX); + IndexModel indexMetadata = pinecone.describeIndex(indexName); + TablePath tablePath = TablePath.of("default", indexName); + + List columns = new ArrayList<>(); + + PhysicalColumn idColumn = PhysicalColumn.builder() + .name("id") + .dataType(STRING_TYPE) + .build(); + PhysicalColumn vectorColumn = PhysicalColumn.builder() + .name("vector") + .dataType(VECTOR_FLOAT_TYPE) + .scale(indexMetadata.getDimension()) + .build(); + Map options = new HashMap<>(); + options.put("isDynamicField", true); + PhysicalColumn dynamicColumn = PhysicalColumn.builder() + .name("meta") + .dataType(JSON_TYPE) + .options(options) + .build(); + columns.add(idColumn); + columns.add(vectorColumn); + columns.add(dynamicColumn); + + TableSchema tableSchema = TableSchema.builder() + .primaryKey(PrimaryKey.of("id", Lists.newArrayList("id"))) + .columns(columns) + .build(); + Map sourceTables = new HashMap<>(); + CatalogTable catalogTable = CatalogTable.of(TableIdentifier.of("pinecone", tablePath), + tableSchema, new HashMap<>(), new ArrayList<>(), ""); + sourceTables.put(tablePath, catalogTable); + return sourceTables; + } +} + diff --git a/seatunnel-connectors-v2/connector-qdrant/pom.xml b/seatunnel-connectors-v2/connector-qdrant/pom.xml index 686f0bdb7a4..5233f76fd4a 100644 --- a/seatunnel-connectors-v2/connector-qdrant/pom.xml +++ b/seatunnel-connectors-v2/connector-qdrant/pom.xml @@ -43,7 +43,7 @@ io.grpc grpc-protobuf - 1.65.1 + 1.59.1 diff --git a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantParameters.java b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantParameters.java index 1ae612fafbb..9efc02d99e5 100644 --- a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantParameters.java +++ b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantParameters.java @@ -42,6 +42,6 @@ public QdrantParameters(ReadonlyConfig config) { } public QdrantClient buildQdrantClient() { - return new QdrantClient(QdrantGrpcClient.newBuilder(host, port, useTls).build()); + return new QdrantClient(QdrantGrpcClient.newBuilder(host, port, useTls).withApiKey(this.apiKey).build()); } } diff --git a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceReader.java b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceReader.java index 2c371631295..64376b0092e 100644 --- a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceReader.java +++ b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceReader.java @@ -157,8 +157,13 @@ private SeaTunnelRow convertToSeaTunnelRow(Points.RetrievedPoint point) { case DOUBLE: fields[fieldIndex] = value.getDoubleValue(); break; - case BINARY_VECTOR: case FLOAT_VECTOR: + List floats = vector.getDataList(); + Float[] floats1 = new Float[floats.size()]; + floats.toArray(floats1); + fields[fieldIndex] = floats1; + break; + case BINARY_VECTOR: case FLOAT16_VECTOR: case BFLOAT16_VECTOR: List list = vector.getDataList(); diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/pom.xml b/seatunnel-connectors-v2/connector-tencent-vectordb/pom.xml new file mode 100644 index 00000000000..ceee972583a --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/pom.xml @@ -0,0 +1,47 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connectors-v2 + ${revision} + + + connector-tencent-vectordb + SeaTunnel : Connectors V2 : Pinecone + + + + com.tencent.tcvectordb + vectordatabase-sdk-java + 1.3.5 + + + + com.google.code.gson + gson + 2.10.1 + + + + + diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/config/TencentVectorDBSourceConfig.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/config/TencentVectorDBSourceConfig.java new file mode 100644 index 00000000000..7a3a7fda063 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/config/TencentVectorDBSourceConfig.java @@ -0,0 +1,43 @@ +package org.apache.seatunnel.connectors.tencent.vectordb.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class TencentVectorDBSourceConfig { + public static final String CONNECTOR_IDENTITY = "TencentVectorDB"; + + public static final Option URL = + Options.key("url") + .stringType() + .noDefaultValue() + .withDescription("url"); + + public static final Option USER_NAME = + Options.key("user_name") + .stringType() + .noDefaultValue() + .withDescription("user name for authentication"); + + public static final Option API_KEY = + Options.key("api_key") + .stringType() + .noDefaultValue() + .withDescription("token for authentication"); + + public static final Option DATABASE = + Options.key("database") + .stringType() + .noDefaultValue() + .withDescription("Tencent Vector DB database name"); + + public static final Option COLLECTION = + Options.key("collection") + .stringType() + .noDefaultValue() + .withDescription("Tencent Vector DB collection name"); + public static final Option BATCH_SIZE = + Options.key("batch_size") + .longType() + .defaultValue(100L) + .withDescription("Tencent Vector DB reader batch size"); +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/exception/TencentVectorDBConnectorErrorCode.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/exception/TencentVectorDBConnectorErrorCode.java new file mode 100644 index 00000000000..0267de39bfc --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/exception/TencentVectorDBConnectorErrorCode.java @@ -0,0 +1,18 @@ +package org.apache.seatunnel.connectors.tencent.vectordb.exception; + +import lombok.Getter; +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +@Getter +public enum TencentVectorDBConnectorErrorCode implements SeaTunnelErrorCode { + SOURCE_TABLE_SCHEMA_IS_NULL("TC-VECTORDB-01", "Source table schema is null"), + READ_DATA_FAIL("TC-VECTORDB-02", "Read data fail"); + + private final String code; + private final String description; + + TencentVectorDBConnectorErrorCode(String code, String description) { + this.code = code; + this.description = description; + } +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/exception/TencentVectorDBConnectorException.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/exception/TencentVectorDBConnectorException.java new file mode 100644 index 00000000000..c36608decf0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/exception/TencentVectorDBConnectorException.java @@ -0,0 +1,14 @@ +package org.apache.seatunnel.connectors.tencent.vectordb.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class TencentVectorDBConnectorException extends SeaTunnelRuntimeException { + public TencentVectorDBConnectorException(SeaTunnelErrorCode seaTunnelErrorCode) { + super(seaTunnelErrorCode, seaTunnelErrorCode.getErrorMessage()); + } + + public TencentVectorDBConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, seaTunnelErrorCode.getErrorMessage(), cause); + } +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSource.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSource.java new file mode 100644 index 00000000000..046dfbaa520 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSource.java @@ -0,0 +1,92 @@ +package org.apache.seatunnel.connectors.tencent.vectordb.source; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.*; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.tencent.vectordb.config.TencentVectorDBSourceConfig; +import org.apache.seatunnel.connectors.tencent.vectordb.utils.ConnectorUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class TencentVectorDBSource implements SeaTunnelSource, + SupportParallelism, + SupportColumnProjection { + private final ReadonlyConfig config; + private final Map sourceTables; + + public TencentVectorDBSource(ReadonlyConfig config) { + this.config = config; + ConnectorUtils connectorUtils = new ConnectorUtils(config); + this.sourceTables = connectorUtils.getSourceTables(); + } + + /** + * Get the boundedness of this source. + * + * @return the boundedness of this source. + */ + @Override + public Boundedness getBoundedness() { + return null; + } + + /** + * Create source reader, used to produce data. + * + * @param readerContext reader context. + * @return source reader. + * @throws Exception when create reader failed. + */ + @Override + public SourceReader createReader(SourceReader.Context readerContext) throws Exception { + return new TencentVectorDBSourceReader(readerContext, config, sourceTables); + } + + @Override + public List getProducedCatalogTables() { + return new ArrayList<>(sourceTables.values()); + } + + /** + * Create source split enumerator, used to generate splits. This method will be called only once + * when start a source. + * + * @param enumeratorContext enumerator context. + * @return source split enumerator. + * @throws Exception when create enumerator failed. + */ + @Override + public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception { + return new TencentVectorDBSourceSplitEnumertor(enumeratorContext, config, sourceTables, null); + } + + /** + * Create source split enumerator, used to generate splits. This method will be called when + * restore from checkpoint. + * + * @param enumeratorContext enumerator context. + * @param checkpointState checkpoint state. + * @return source split enumerator. + * @throws Exception when create enumerator failed. + */ + @Override + public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, TencentVectorDBSourceState checkpointState) throws Exception { + return new TencentVectorDBSourceSplitEnumertor(enumeratorContext, config, sourceTables, checkpointState); + } + + /** + * Returns a unique identifier among same factory interfaces. + * + *

For consistency, an identifier should be declared as one lower case word (e.g. {@code + * kafka}). If multiple factories exist for different versions, a version should be appended + * using "-" (e.g. {@code elasticsearch-7}). + */ + @Override + public String getPluginName() { + return TencentVectorDBSourceConfig.CONNECTOR_IDENTITY; + } +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceFactory.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceFactory.java new file mode 100644 index 00000000000..44172039133 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceFactory.java @@ -0,0 +1,43 @@ +package org.apache.seatunnel.connectors.tencent.vectordb.source; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.connectors.tencent.vectordb.config.TencentVectorDBSourceConfig; + +import java.io.Serializable; + +@Slf4j +@AutoService(Factory.class) +public class TencentVectorDBSourceFactory implements TableSourceFactory { + + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> (SeaTunnelSource) new TencentVectorDBSource(context.getOptions()); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(TencentVectorDBSourceConfig.API_KEY) + .optional() + .build(); + } + + @Override + public Class getSourceClass() { + return TencentVectorDBSource.class; + } + + @Override + public String factoryIdentifier() { + return TencentVectorDBSourceConfig.CONNECTOR_IDENTITY; + } +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceReader.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceReader.java new file mode 100644 index 00000000000..e0c613d917c --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceReader.java @@ -0,0 +1,174 @@ +package org.apache.seatunnel.connectors.tencent.vectordb.source; + +import com.tencent.tcvectordb.client.RPCVectorDBClient; +import com.tencent.tcvectordb.client.VectorDBClient; +import com.tencent.tcvectordb.model.Collection; +import com.tencent.tcvectordb.model.Database; +import com.tencent.tcvectordb.model.Document; +import com.tencent.tcvectordb.model.param.database.ConnectParam; +import com.tencent.tcvectordb.model.param.dml.QueryParam; +import com.tencent.tcvectordb.model.param.enums.ReadConsistencyEnum; +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.tencent.vectordb.exception.TencentVectorDBConnectorErrorCode; +import org.apache.seatunnel.connectors.tencent.vectordb.exception.TencentVectorDBConnectorException; +import org.apache.seatunnel.connectors.tencent.vectordb.utils.ConverterUtils; + +import java.io.IOException; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.seatunnel.connectors.tencent.vectordb.config.TencentVectorDBSourceConfig.*; + +@Slf4j +public class TencentVectorDBSourceReader implements SourceReader { + private final Deque pendingSplits = new ConcurrentLinkedDeque<>(); + private final ReadonlyConfig config; + private final Context context; + private Map sourceTables; + private VectorDBClient client; + private final AtomicLong offSet = new AtomicLong(0); + + private volatile boolean noMoreSplit; + public TencentVectorDBSourceReader(Context readerContext, ReadonlyConfig config, Map sourceTables) { + this.context = readerContext; + this.config = config; + this.sourceTables = sourceTables; + } + + /** + * Open the source reader. + */ + @Override + public void open() throws Exception { + ConnectParam connectParam = ConnectParam.newBuilder() + .withUrl(config.get(URL)) + .withUsername(config.get(USER_NAME)) + .withKey(config.get(API_KEY)) + .withTimeout(30) + .build(); + client = new RPCVectorDBClient(connectParam, ReadConsistencyEnum.EVENTUAL_CONSISTENCY); + } + + /** + * Called to close the reader, in case it holds on to any resources, like threads or network + * connections. + */ + @Override + public void close() throws IOException { + } + + /** + * Generate the next batch of records. + * + * @param output output collector. + * @throws Exception if error occurs. + */ + @Override + public void pollNext(Collector output) throws Exception { + synchronized (output.getCheckpointLock()) { + TencentVectorDBSourceSplit split = pendingSplits.poll(); + if (null != split) { + try { + log.info("Begin to read data from split: " + split); + TablePath tablePath = split.getTablePath(); + TableSchema tableSchema = sourceTables.get(tablePath).getTableSchema(); + log.info("begin to read data from pinecone, table schema: " + tableSchema); + if (null == tableSchema) { + throw new TencentVectorDBConnectorException( + TencentVectorDBConnectorErrorCode.SOURCE_TABLE_SCHEMA_IS_NULL); + } + Database database = client.database(tablePath.getDatabaseName()); + Collection collection = database.collection(tablePath.getTableName()); + while (true) { + QueryParam queryParam = QueryParam.newBuilder() + .withRetrieveVector(true) + .withLimit(config.get(BATCH_SIZE)) + .withOffset(offSet.get()) + .build(); + List documents = collection.query(queryParam); + if(documents.isEmpty()){ + break; + } + offSet.addAndGet(documents.size()); + for (Document document : documents) { + SeaTunnelRow row = ConverterUtils.convertToSeatunnelRow(document); + row.setTableId(tablePath.getFullName()); + output.collect(row); + } + } + } catch (Exception e) { + log.error("Read data from split: " + split + " failed", e); + throw new TencentVectorDBConnectorException( + TencentVectorDBConnectorErrorCode.READ_DATA_FAIL, e); + } + } else { + if (!noMoreSplit) { + log.info("Pinecone source wait split!"); + } + } + } + if (noMoreSplit + && pendingSplits.isEmpty() + && Boundedness.BOUNDED.equals(context.getBoundedness())) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded pinecone source"); + context.signalNoMoreElement(); + } + Thread.sleep(1000L); + } + + /** + * Get the current split checkpoint state by checkpointId. + * + *

If the source is bounded, checkpoint is not triggered. + * + * @param checkpointId checkpoint Id. + * @return split checkpoint state. + * @throws Exception if error occurs. + */ + @Override + public List snapshotState(long checkpointId) throws Exception { + return null; + } + + /** + * Add the split checkpoint state to reader. + * + * @param splits split checkpoint state. + */ + @Override + public void addSplits(List splits) { + log.info("Adding pinecone splits to reader: " + splits); + pendingSplits.addAll(splits); + } + + /** + * This method is called when the reader is notified that it will not receive any further + * splits. + * + *

It is triggered when the enumerator calls {@link + * SourceSplitEnumerator.Context#signalNoMoreSplits(int)} with the reader's parallel subtask. + */ + @Override + public void handleNoMoreSplits() { + log.info("receive no more splits message, this milvus reader will not add new split."); + noMoreSplit = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + + } +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceSplit.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceSplit.java new file mode 100644 index 00000000000..129ae3dd6e3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceSplit.java @@ -0,0 +1,23 @@ +package org.apache.seatunnel.connectors.tencent.vectordb.source; + +import lombok.Data; +import lombok.experimental.SuperBuilder; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.TablePath; + +@Data +@SuperBuilder +public class TencentVectorDBSourceSplit implements SourceSplit { + private TablePath tablePath; + private String splitId; + private String partitionName; + /** + * Get the split id of this source split. + * + * @return id of this source split. + */ + @Override + public String splitId() { + return splitId; + } +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceSplitEnumertor.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceSplitEnumertor.java new file mode 100644 index 00000000000..d0f83995165 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceSplitEnumertor.java @@ -0,0 +1,180 @@ +package org.apache.seatunnel.connectors.tencent.vectordb.source; + +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Slf4j +public class TencentVectorDBSourceSplitEnumertor implements SourceSplitEnumerator { + private final Map tables; + private final Context context; + private final ConcurrentLinkedQueue pendingTables; + private final Map> pendingSplits; + private final Object stateLock = new Object(); + + private ReadonlyConfig config; + public TencentVectorDBSourceSplitEnumertor(Context context, ReadonlyConfig config, + Map sourceTables, TencentVectorDBSourceState sourceState) { + this.context = context; + this.tables = sourceTables; + this.config = config; + if (sourceState == null) { + this.pendingTables = new ConcurrentLinkedQueue<>(tables.keySet()); + this.pendingSplits = new HashMap<>(); + } else { + this.pendingTables = new ConcurrentLinkedQueue<>(sourceState.getPendingTables()); + this.pendingSplits = new HashMap<>(sourceState.getPendingSplits()); + } + } + + @Override + public void open() { + + } + + /** + * The method is executed by the engine only once. + */ + @Override + public void run() throws Exception { + log.info("Starting pinecone split enumerator."); + Set readers = context.registeredReaders(); + while (!pendingTables.isEmpty()) { + synchronized (stateLock) { + TablePath tablePath = pendingTables.poll(); + log.info("begin to split table path: {}", tablePath); + Collection splits = generateSplits(tables.get(tablePath)); + log.info("end to split table {} into {} splits.", tablePath, splits.size()); + + addPendingSplit(splits); + } + + synchronized (stateLock) { + assignSplit(readers); + } + } + + log.info("No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); + } + + private void assignSplit(Collection readers) { + log.info("Assign pendingSplits to readers {}", readers); + + for (int reader : readers) { + List assignmentForReader = pendingSplits.remove(reader); + if (assignmentForReader != null && !assignmentForReader.isEmpty()) { + log.debug("Assign splits {} to reader {}", assignmentForReader, reader); + context.assignSplit(reader, assignmentForReader); + } + } + } + + private void addPendingSplit(Collection splits) { + int readerCount = context.currentParallelism(); + for (TencentVectorDBSourceSplit split : splits) { + int ownerReader = getSplitOwner(split.splitId(), readerCount); + log.info("Assigning {} to {} reader.", split, ownerReader); + + pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).add(split); + } + } + + private Collection generateSplits(CatalogTable catalogTable) { + TencentVectorDBSourceSplit tencentVectorDBSourceSplit = TencentVectorDBSourceSplit.builder() + .tablePath(catalogTable.getTablePath()) + .splitId(catalogTable.getTablePath().getTableName()) + .build(); + + return Collections.singletonList(tencentVectorDBSourceSplit); + } + + private static int getSplitOwner(String tp, int numReaders) { + return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; + } + + /** + * Called to close the enumerator, in case it holds on to any resources, like threads or network + * connections. + */ + @Override + public void close() throws IOException { + + } + + /** + * Add a split back to the split enumerator. It will only happen when a {@link SourceReader} + * fails and there are splits assigned to it after the last successful checkpoint. + * + * @param splits The split to add back to the enumerator for reassignment. + * @param subtaskId The id of the subtask to which the returned splits belong. + */ + @Override + public void addSplitsBack(List splits, int subtaskId) { + if (!splits.isEmpty()) { + synchronized (stateLock) { + addPendingSplit(splits, subtaskId); + if (context.registeredReaders().contains(subtaskId)) { + assignSplit(Collections.singletonList(subtaskId)); + } else { + log.warn( + "Reader {} is not registered. Pending splits {} are not assigned.", + subtaskId, + splits); + } + } + } + log.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size()); + + } + + private void addPendingSplit(Collection splits, int ownerReader) { + pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).addAll(splits); + } + + @Override + public int currentUnassignedSplitSize() { + return pendingTables.isEmpty() && pendingSplits.isEmpty() ? 0 : 1; + } + + @Override + public void handleSplitRequest(int subtaskId) { + + } + + @Override + public void registerReader(int subtaskId) { + log.info("Register reader {} to MilvusSourceSplitEnumerator.", subtaskId); + if (!pendingSplits.isEmpty()) { + synchronized (stateLock) { + assignSplit(Collections.singletonList(subtaskId)); + } + } + + } + + /** + * If the source is bounded, checkpoint is not triggered. + * + * @param checkpointId + */ + @Override + public TencentVectorDBSourceState snapshotState(long checkpointId) throws Exception { + synchronized (stateLock) { + return new TencentVectorDBSourceState( + new ArrayList(pendingTables), new HashMap<>(pendingSplits)); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + + } +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceState.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceState.java new file mode 100644 index 00000000000..49a0e2be3d3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceState.java @@ -0,0 +1,16 @@ +package org.apache.seatunnel.connectors.tencent.vectordb.source; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.apache.seatunnel.api.table.catalog.TablePath; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +@Data +@AllArgsConstructor +public class TencentVectorDBSourceState implements Serializable { + private List pendingTables; + private Map> pendingSplits; +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/utils/ConnectorUtils.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/utils/ConnectorUtils.java new file mode 100644 index 00000000000..752edab21e9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/utils/ConnectorUtils.java @@ -0,0 +1,82 @@ +package org.apache.seatunnel.connectors.tencent.vectordb.utils; + +import com.google.common.collect.Lists; +import com.tencent.tcvectordb.client.RPCVectorDBClient; +import com.tencent.tcvectordb.client.VectorDBClient; +import com.tencent.tcvectordb.model.Collection; +import com.tencent.tcvectordb.model.Database; +import com.tencent.tcvectordb.model.param.collection.IndexField; +import com.tencent.tcvectordb.model.param.database.ConnectParam; +import com.tencent.tcvectordb.model.param.enums.ReadConsistencyEnum; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.*; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.seatunnel.api.table.type.BasicType.JSON_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE; +import static org.apache.seatunnel.api.table.type.VectorType.VECTOR_FLOAT_TYPE; +import static org.apache.seatunnel.connectors.tencent.vectordb.config.TencentVectorDBSourceConfig.*; + +public class ConnectorUtils { + private ReadonlyConfig config; + Map sourceTables; + + public ConnectorUtils(ReadonlyConfig config) { + this.config = config; + this.sourceTables = new HashMap<>(); + } + + public Map getSourceTables() { + ConnectParam connectParam = ConnectParam.newBuilder() + .withUrl(config.get(URL)) + .withUsername(config.get(USER_NAME)) + .withKey(config.get(API_KEY)) + .withTimeout(30) + .build(); + VectorDBClient client = new RPCVectorDBClient(connectParam, ReadConsistencyEnum.EVENTUAL_CONSISTENCY); + Database database = client.database(config.get(DATABASE)); + Collection collection = database.describeCollection(config.get(COLLECTION)); + TablePath tablePath = TablePath.of(config.get(DATABASE), config.get(COLLECTION)); + + List columns = new ArrayList<>(); + String primaryKey = "id"; + for( IndexField indexField : collection.getIndexes()){ + if(indexField.isPrimaryKey()){ + columns.add(PhysicalColumn.builder() + .name(indexField.getFieldName()) + .dataType(STRING_TYPE) + .build()); + primaryKey = indexField.getFieldName(); + }else if(indexField.isVectorField()){ + columns.add(PhysicalColumn.builder() + .name(indexField.getFieldName()) + .dataType(VECTOR_FLOAT_TYPE) + .scale(indexField.getDimension()) + .build()); + } + } + Map options = new HashMap<>(); + options.put("isDynamicField", true); + PhysicalColumn dynamicColumn = PhysicalColumn.builder() + .name("meta") + .dataType(JSON_TYPE) + .options(options) + .build(); + columns.add(dynamicColumn); + + TableSchema tableSchema = TableSchema.builder() + .primaryKey(PrimaryKey.of(primaryKey, Lists.newArrayList(primaryKey))) + .columns(columns) + .build(); + Map sourceTables = new HashMap<>(); + CatalogTable catalogTable = CatalogTable.of(TableIdentifier.of("tencent", tablePath), + tableSchema, new HashMap<>(), new ArrayList<>(), ""); + sourceTables.put(tablePath, catalogTable); + return sourceTables; + } +} + diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/utils/ConverterUtils.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/utils/ConverterUtils.java new file mode 100644 index 00000000000..30b34a34300 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/utils/ConverterUtils.java @@ -0,0 +1,37 @@ +package org.apache.seatunnel.connectors.tencent.vectordb.utils; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.tencent.tcvectordb.model.DocField; +import com.tencent.tcvectordb.model.Document; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import java.util.List; +import java.util.Map; + +public class ConverterUtils { + public static SeaTunnelRow convertToSeatunnelRow(Document vector) { + Object[] fields = new Object[3]; + fields[0] = vector.getId(); + + // Convert each Double to Float + Float[] arrays = vector.getVector().stream().map(Double::floatValue).toArray(Float[]::new); + fields[1] = arrays; + List meta = vector.getDocFields(); + JsonObject data = new JsonObject(); + for (DocField entry : meta) { + data.add(entry.getName(), convertValueToJsonElement(entry.getValue())); + } + fields[2] = data; + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields); + seaTunnelRow.setRowKind(RowKind.INSERT); + return seaTunnelRow; + } + + private static JsonElement convertValueToJsonElement(Object value) { + Gson gson = new Gson(); + return gson.toJsonTree(value); + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 0b805349500..fbefc377c38 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -33,17 +33,20 @@ connector-common connector-console connector-fake + connector-file connector-milvus connector-jdbc - connector-kafka + connector-elasticsearch + connector-pinecone + connector-qdrant + connector-tencent-vectordb - connector-file @@ -79,7 +82,6 @@ - diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index dd951c15104..b5dfa82ff6b 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -183,6 +183,24 @@ ${project.version} provided + + org.apache.seatunnel + connector-pinecone + ${project.version} + provided + + + org.apache.seatunnel + connector-qdrant + ${project.version} + provided + + + org.apache.seatunnel + connector-tencent-vectordb + ${project.version} + provided + @@ -602,13 +620,6 @@ - - - - - - - diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-examples/seatunnel-engine-examples/pom.xml index cf862cc034b..167892f557d 100644 --- a/seatunnel-examples/seatunnel-engine-examples/pom.xml +++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml @@ -32,7 +32,7 @@ com.google.protobuf protobuf-java - 3.24.0 + 3.25.1 @@ -67,14 +67,32 @@ org.apache.seatunnel - connector-elasticsearch - ${project.version} - - - org.apache.seatunnel - connector-jdbc + connector-tencent-vectordb ${project.version} + + + + + + + + + + + + + + + + + + + + + + + org.postgresql postgresql @@ -93,11 +111,5 @@ ${project.version} - - org.apache.seatunnel - connector-assert - ${project.version} - -