From a15c0decbdf943407cb6cd45a0d39578e2825ffc Mon Sep 17 00:00:00 2001 From: Nian Liu Date: Tue, 22 Oct 2024 18:02:01 +0800 Subject: [PATCH] add pinecone source connector for seatunnel --- .../api/table/type/SeaTunnelRow.java | 5 + .../common/constants/CommonOptions.java | 15 ++ .../connector-pinecone/pom.xml | 86 +++++++ .../pinecone/config/PineconeSourceConfig.java | 43 ++++ .../PineconeConnectionErrorCode.java | 35 +++ .../exception/PineconeConnectorException.java | 31 +++ .../pinecone/source/PineconeSource.java | 114 +++++++++ .../source/PineconeSourceFactory.java | 60 +++++ .../pinecone/source/PineconeSourceReader.java | 193 ++++++++++++++++ .../pinecone/source/PineconeSourceSplit.java | 40 ++++ .../source/PineconeSourceSplitEnumertor.java | 217 ++++++++++++++++++ .../pinecone/source/PineconeSourceState.java | 33 +++ .../pinecone/utils/ConverterUtils.java | 111 +++++++++ .../pinecone/utils/PineconeUtils.java | 140 +++++++++++ seatunnel-connectors-v2/pom.xml | 1 + 15 files changed, 1124 insertions(+) create mode 100644 seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CommonOptions.java create mode 100644 seatunnel-connectors-v2/connector-pinecone/pom.xml create mode 100644 seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/config/PineconeSourceConfig.java create mode 100644 seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/exception/PineconeConnectionErrorCode.java create mode 100644 seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/exception/PineconeConnectorException.java create mode 100644 seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSource.java create mode 100644 seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceFactory.java create mode 100644 seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceReader.java create mode 100644 seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceSplit.java create mode 100644 seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceSplitEnumertor.java create mode 100644 seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceState.java create mode 100644 seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/utils/ConverterUtils.java create mode 100644 seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/utils/PineconeUtils.java diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java index 10a5b33a935..4d5fd08b6dd 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.api.table.type; +import lombok.Data; + import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; @@ -24,6 +26,7 @@ import java.util.Objects; /** SeaTunnel row type. */ +@Data public final class SeaTunnelRow implements Serializable { private static final long serialVersionUID = -1L; /** Table identifier. */ @@ -35,6 +38,8 @@ public final class SeaTunnelRow implements Serializable { private volatile int size; + private String partitionName; + public SeaTunnelRow(int arity) { this.fields = new Object[arity]; } diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CommonOptions.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CommonOptions.java new file mode 100644 index 00000000000..d95d50dfa9d --- /dev/null +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CommonOptions.java @@ -0,0 +1,15 @@ +package org.apache.seatunnel.common.constants; + +import lombok.Getter; + +@Getter +public enum CommonOptions { + JSON("Json"), + METADATA("Metadata"); + + private final String name; + + CommonOptions(String name) { + this.name = name; + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/pom.xml b/seatunnel-connectors-v2/connector-pinecone/pom.xml new file mode 100644 index 00000000000..3932dded74a --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/pom.xml @@ -0,0 +1,86 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connectors-v2 + ${revision} + + connector-pinecone + SeaTunnel : Connectors V2 : Pinecone + + + + 1.59.1 + 3.25.1 + + + + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + io.grpc + grpc-netty + ${grpc.version} + + + io.grpc + grpc-netty-shaded + ${grpc.version} + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + + + + + io.pinecone + pinecone-client + 2.1.0-SNAPSHOT + + + com.google.code.gson + gson + 2.10.1 + + + com.google.protobuf + protobuf-java + 3.25.1 + + + + + diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/config/PineconeSourceConfig.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/config/PineconeSourceConfig.java new file mode 100644 index 00000000000..40182203df5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/config/PineconeSourceConfig.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.pinecone.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class PineconeSourceConfig { + public static final String CONNECTOR_IDENTITY = "Pinecone"; + + public static final Option API_KEY = + Options.key("api_key") + .stringType() + .noDefaultValue() + .withDescription("Pinecone token for authentication"); + + public static final Option INDEX = + Options.key("index") + .stringType() + .noDefaultValue() + .withDescription("Pinecone index name"); + + public static final Option BATCH_SIZE = + Options.key("batch_size") + .intType() + .defaultValue(100) + .withDescription("writer batch size"); +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/exception/PineconeConnectionErrorCode.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/exception/PineconeConnectionErrorCode.java new file mode 100644 index 00000000000..31eab6b08e1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/exception/PineconeConnectionErrorCode.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.pinecone.exception; + +import lombok.Getter; +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +@Getter +public enum PineconeConnectionErrorCode implements SeaTunnelErrorCode { + SOURCE_TABLE_SCHEMA_IS_NULL("PINECONE-01", "Source table schema is null"), + READ_DATA_FAIL("PINECONE-02", "Read data fail"); + + private final String code; + private final String description; + + PineconeConnectionErrorCode(String code, String description) { + this.code = code; + this.description = description; + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/exception/PineconeConnectorException.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/exception/PineconeConnectorException.java new file mode 100644 index 00000000000..f4a7071eebe --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/exception/PineconeConnectorException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.pinecone.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class PineconeConnectorException extends SeaTunnelRuntimeException { + public PineconeConnectorException(SeaTunnelErrorCode seaTunnelErrorCode) { + super(seaTunnelErrorCode, seaTunnelErrorCode.getErrorMessage()); + } + + public PineconeConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, seaTunnelErrorCode.getErrorMessage(), cause); + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSource.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSource.java new file mode 100644 index 00000000000..28a4f3082ba --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSource.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.pinecone.source; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.source.SupportColumnProjection; +import org.apache.seatunnel.api.source.SupportParallelism; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.pinecone.config.PineconeSourceConfig; +import org.apache.seatunnel.connectors.pinecone.utils.PineconeUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class PineconeSource implements SeaTunnelSource, + SupportParallelism, + SupportColumnProjection { + private final ReadonlyConfig config; + private final Map sourceTables; + + public PineconeSource(ReadonlyConfig config) { + this.config = config; + PineconeUtils pineconeUtils = new PineconeUtils(config); + this.sourceTables = pineconeUtils.getSourceTables(); + } + + /** + * Get the boundedness of this source. + * + * @return the boundedness of this source. + */ + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + /** + * Create source reader, used to produce data. + * + * @param readerContext reader context. + * @return source reader. + * @throws Exception when create reader failed. + */ + @Override + public SourceReader createReader(SourceReader.Context readerContext) throws Exception { + return new PineconeSourceReader(readerContext, config, sourceTables); + } + + @Override + public List getProducedCatalogTables() { + return new ArrayList<>(sourceTables.values()); + } + + /** + * Create source split enumerator, used to generate splits. This method will be called only once + * when start a source. + * + * @param enumeratorContext enumerator context. + * @return source split enumerator. + * @throws Exception when create enumerator failed. + */ + @Override + public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception { + return new PineconeSourceSplitEnumertor(enumeratorContext, config, sourceTables, null); + } + + /** + * Create source split enumerator, used to generate splits. This method will be called when + * restore from checkpoint. + * + * @param enumeratorContext enumerator context. + * @param checkpointState checkpoint state. + * @return source split enumerator. + * @throws Exception when create enumerator failed. + */ + @Override + public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, PineconeSourceState checkpointState) throws Exception { + return new PineconeSourceSplitEnumertor(enumeratorContext, config, sourceTables, checkpointState); + } + + /** + * Returns a unique identifier among same factory interfaces. + * + *

For consistency, an identifier should be declared as one lower case word (e.g. {@code + * kafka}). If multiple factories exist for different versions, a version should be appended + * using "-" (e.g. {@code elasticsearch-7}). + */ + @Override + public String getPluginName() { + return PineconeSourceConfig.CONNECTOR_IDENTITY; + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceFactory.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceFactory.java new file mode 100644 index 00000000000..21dcd80e313 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.pinecone.source; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.connectors.pinecone.config.PineconeSourceConfig; + +import java.io.Serializable; + +@Slf4j +@AutoService(Factory.class) +public class PineconeSourceFactory implements TableSourceFactory { + + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> (SeaTunnelSource) new PineconeSource(context.getOptions()); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(PineconeSourceConfig.API_KEY) + .optional() + .build(); + } + + @Override + public Class getSourceClass() { + return PineconeSource.class; + } + + @Override + public String factoryIdentifier() { + return PineconeSourceConfig.CONNECTOR_IDENTITY; + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceReader.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceReader.java new file mode 100644 index 00000000000..8b3e6b7d89e --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceReader.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.pinecone.source; + +import io.pinecone.clients.Index; +import io.pinecone.clients.Pinecone; +import io.pinecone.proto.FetchResponse; +import io.pinecone.proto.ListItem; +import io.pinecone.proto.ListResponse; +import io.pinecone.proto.Pagination; +import io.pinecone.proto.Vector; +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import static org.apache.seatunnel.connectors.pinecone.config.PineconeSourceConfig.API_KEY; +import static org.apache.seatunnel.connectors.pinecone.config.PineconeSourceConfig.BATCH_SIZE; +import org.apache.seatunnel.connectors.pinecone.exception.PineconeConnectionErrorCode; +import org.apache.seatunnel.connectors.pinecone.exception.PineconeConnectorException; +import org.apache.seatunnel.connectors.pinecone.utils.ConverterUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.stream.Collectors; + +@Slf4j +public class PineconeSourceReader implements SourceReader { + private final Deque pendingSplits = new ConcurrentLinkedDeque<>(); + private final ReadonlyConfig config; + private final Context context; + private final Map sourceTables; + private Pinecone pinecone; + private String paginationToken; + + private volatile boolean noMoreSplit; + public PineconeSourceReader(Context readerContext, ReadonlyConfig config, Map sourceTables) { + this.context = readerContext; + this.config = config; + this.sourceTables = sourceTables; + } + + /** + * Open the source reader. + */ + @Override + public void open() throws Exception { + pinecone = new Pinecone.Builder(config.get(API_KEY)).build(); + } + + /** + * Called to close the reader, in case it holds on to any resources, like threads or network + * connections. + */ + @Override + public void close() throws IOException { + } + + /** + * Generate the next batch of records. + * + * @param output output collector. + * @throws Exception if error occurs. + */ + @Override + public void pollNext(Collector output) throws Exception { + synchronized (output.getCheckpointLock()) { + PineconeSourceSplit split = pendingSplits.poll(); + if (null != split) { + try { + log.info("Begin to read data from split: " + split); + TablePath tablePath = split.getTablePath(); + String namespace = split.getNamespace(); + TableSchema tableSchema = sourceTables.get(tablePath).getTableSchema(); + log.info("begin to read data from pinecone, table schema: " + tableSchema); + if (null == tableSchema) { + throw new PineconeConnectorException( + PineconeConnectionErrorCode.SOURCE_TABLE_SCHEMA_IS_NULL); + } + Index index = pinecone.getIndexConnection(tablePath.getTableName()); + ListResponse listResponse; + while (!(Objects.equals(paginationToken, ""))) { + if(paginationToken == null){ + listResponse = index.list(namespace, config.get(BATCH_SIZE)); + }else { + listResponse = index.list(namespace, config.get(BATCH_SIZE), paginationToken); + } + List vectorsList = listResponse.getVectorsList(); + List ids = vectorsList.stream().map(ListItem::getId).collect(Collectors.toList()); + if(ids.isEmpty()){ + break; + } + FetchResponse fetchResponse = index.fetch(ids, namespace); + Map vectorMap = fetchResponse.getVectorsMap(); + for (Map.Entry entry : vectorMap.entrySet()) { + Vector vector = entry.getValue(); + SeaTunnelRow row = ConverterUtils.convertToSeatunnelRow(tableSchema, vector); + row.setPartitionName(namespace); + row.setTableId(tablePath.getFullName()); + output.collect(row); + } + Pagination pagination = listResponse.getPagination(); + paginationToken = pagination.getNext(); + } + } catch (Exception e) { + log.error("Read data from split: " + split + " failed", e); + throw new PineconeConnectorException( + PineconeConnectionErrorCode.READ_DATA_FAIL, e); + } + } else { + if (!noMoreSplit) { + log.info("Pinecone source wait split!"); + } + } + } + if (noMoreSplit + && pendingSplits.isEmpty() + && Boundedness.BOUNDED.equals(context.getBoundedness())) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded pinecone source"); + context.signalNoMoreElement(); + } + Thread.sleep(1000L); + } + + /** + * Get the current split checkpoint state by checkpointId. + * + *

If the source is bounded, checkpoint is not triggered. + * + * @param checkpointId checkpoint Id. + * @return split checkpoint state. + * @throws Exception if error occurs. + */ + @Override + public List snapshotState(long checkpointId) throws Exception { + return new ArrayList<>(pendingSplits); + } + + /** + * Add the split checkpoint state to reader. + * + * @param splits split checkpoint state. + */ + @Override + public void addSplits(List splits) { + log.info("Adding pinecone splits to reader: " + splits); + pendingSplits.addAll(splits); + } + + /** + * This method is called when the reader is notified that it will not receive any further + * splits. + * + *

It is triggered when the enumerator calls {@link + * SourceSplitEnumerator.Context#signalNoMoreSplits(int)} with the reader's parallel subtask. + */ + @Override + public void handleNoMoreSplits() { + log.info("receive no more splits message, this milvus reader will not add new split."); + noMoreSplit = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceSplit.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceSplit.java new file mode 100644 index 00000000000..07f0352b098 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceSplit.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.pinecone.source; + +import lombok.Data; +import lombok.experimental.SuperBuilder; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.TablePath; + +@Data +@SuperBuilder +public class PineconeSourceSplit implements SourceSplit { + private TablePath tablePath; + private String splitId; + private String namespace; + /** + * Get the split id of this source split. + * + * @return id of this source split. + */ + @Override + public String splitId() { + return splitId; + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceSplitEnumertor.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceSplitEnumertor.java new file mode 100644 index 00000000000..18a6b7fdbaf --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceSplitEnumertor.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.pinecone.source; + +import io.pinecone.clients.Index; +import io.pinecone.clients.Pinecone; +import io.pinecone.proto.DescribeIndexStatsResponse; +import io.pinecone.proto.NamespaceSummary; +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import static org.apache.seatunnel.connectors.pinecone.config.PineconeSourceConfig.API_KEY; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Slf4j +public class PineconeSourceSplitEnumertor implements SourceSplitEnumerator { + private final Map tables; + private final Context context; + private final ConcurrentLinkedQueue pendingTables; + private final Map> pendingSplits; + private final Object stateLock = new Object(); + private final Pinecone pinecone; + + private ReadonlyConfig config; + public PineconeSourceSplitEnumertor(Context context, ReadonlyConfig config, + Map sourceTables, PineconeSourceState sourceState) { + this.context = context; + this.tables = sourceTables; + this.config = config; + if (sourceState == null) { + this.pendingTables = new ConcurrentLinkedQueue<>(tables.keySet()); + this.pendingSplits = new HashMap<>(); + } else { + this.pendingTables = new ConcurrentLinkedQueue<>(sourceState.getPendingTables()); + this.pendingSplits = new HashMap<>(sourceState.getPendingSplits()); + } + pinecone = new Pinecone.Builder(config.get(API_KEY)).build(); + } + + @Override + public void open() { + + } + + /** + * The method is executed by the engine only once. + */ + @Override + public void run() throws Exception { + log.info("Starting pinecone split enumerator."); + Set readers = context.registeredReaders(); + while (!pendingTables.isEmpty()) { + synchronized (stateLock) { + TablePath tablePath = pendingTables.poll(); + log.info("begin to split table path: {}", tablePath); + Collection splits = generateSplits(tables.get(tablePath)); + log.info("end to split table {} into {} splits.", tablePath, splits.size()); + + addPendingSplit(splits); + } + + synchronized (stateLock) { + assignSplit(readers); + } + } + + log.info("No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); + } + + private void assignSplit(Collection readers) { + log.info("Assign pendingSplits to readers {}", readers); + + for (int reader : readers) { + List assignmentForReader = pendingSplits.remove(reader); + if (assignmentForReader != null && !assignmentForReader.isEmpty()) { + log.debug("Assign splits {} to reader {}", assignmentForReader, reader); + context.assignSplit(reader, assignmentForReader); + } + } + } + + private void addPendingSplit(Collection splits) { + int readerCount = context.currentParallelism(); + for (PineconeSourceSplit split : splits) { + int ownerReader = getSplitOwner(split.splitId(), readerCount); + log.info("Assigning {} to {} reader.", split, ownerReader); + + pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).add(split); + } + } + + private Collection generateSplits(CatalogTable catalogTable) { + Index index = pinecone.getIndexConnection(catalogTable.getTablePath().getTableName()); + DescribeIndexStatsResponse describeIndexStatsResponse = index.describeIndexStats(); + Map namespaceSummaryMap = describeIndexStatsResponse.getNamespacesMap(); + List splits = new ArrayList<>(); + for (String namespace : namespaceSummaryMap.keySet()) { + PineconeSourceSplit pineconeSourceSplit = PineconeSourceSplit.builder() + .tablePath(catalogTable.getTablePath()) + .splitId(catalogTable.getTablePath().getTableName() + "-" + namespace) + .namespace(namespace) + .build(); + splits.add(pineconeSourceSplit); + } + return splits; + } + + private static int getSplitOwner(String tp, int numReaders) { + return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; + } + + /** + * Called to close the enumerator, in case it holds on to any resources, like threads or network + * connections. + */ + @Override + public void close() throws IOException { + + } + + /** + * Add a split back to the split enumerator. It will only happen when a {@link SourceReader} + * fails and there are splits assigned to it after the last successful checkpoint. + * + * @param splits The split to add back to the enumerator for reassignment. + * @param subtaskId The id of the subtask to which the returned splits belong. + */ + @Override + public void addSplitsBack(List splits, int subtaskId) { + if (!splits.isEmpty()) { + synchronized (stateLock) { + addPendingSplit(splits, subtaskId); + if (context.registeredReaders().contains(subtaskId)) { + assignSplit(Collections.singletonList(subtaskId)); + } else { + log.warn( + "Reader {} is not registered. Pending splits {} are not assigned.", + subtaskId, + splits); + } + } + } + log.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size()); + + } + + private void addPendingSplit(Collection splits, int ownerReader) { + pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).addAll(splits); + } + + @Override + public int currentUnassignedSplitSize() { + return pendingTables.isEmpty() && pendingSplits.isEmpty() ? 0 : 1; + } + + @Override + public void handleSplitRequest(int subtaskId) { + + } + + @Override + public void registerReader(int subtaskId) { + log.info("Register reader {} to MilvusSourceSplitEnumerator.", subtaskId); + if (!pendingSplits.isEmpty()) { + synchronized (stateLock) { + assignSplit(Collections.singletonList(subtaskId)); + } + } + + } + + /** + * If the source is bounded, checkpoint is not triggered. + * + * @param checkpointId + */ + @Override + public PineconeSourceState snapshotState(long checkpointId) throws Exception { + synchronized (stateLock) { + return new PineconeSourceState( + new ArrayList(pendingTables), new HashMap<>(pendingSplits)); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceState.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceState.java new file mode 100644 index 00000000000..7d908f086f1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/source/PineconeSourceState.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.pinecone.source; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.apache.seatunnel.api.table.catalog.TablePath; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +@Data +@AllArgsConstructor +public class PineconeSourceState implements Serializable { + private List pendingTables; + private Map> pendingSplits; +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/utils/ConverterUtils.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/utils/ConverterUtils.java new file mode 100644 index 00000000000..36fc203484d --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/utils/ConverterUtils.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.pinecone.utils; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.protobuf.Struct; +import com.google.protobuf.Value; +import io.pinecone.proto.Vector; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import static org.apache.seatunnel.api.table.type.VectorType.VECTOR_FLOAT_TYPE; +import static org.apache.seatunnel.api.table.type.VectorType.VECTOR_SPARSE_FLOAT_TYPE; +import org.apache.seatunnel.common.constants.CommonOptions; +import org.apache.seatunnel.common.utils.BufferUtils; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class ConverterUtils { + public static SeaTunnelRow convertToSeatunnelRow(TableSchema tableSchema, Vector vector) { + SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType(); + Object[] fields = new Object[typeInfo.getTotalFields()]; + List fieldNames = Arrays.stream(typeInfo.getFieldNames()).collect(Collectors.toList()); + + for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) { + if (fieldNames.get(fieldIndex).equals("id")) { + fields[fieldIndex] = vector.getId(); + } else if (fieldNames.get(fieldIndex).equals(CommonOptions.METADATA.getName())) { + Struct meta = vector.getMetadata(); + JsonObject data = new JsonObject(); + for (Map.Entry entry : meta.getFieldsMap().entrySet()) { + data.add(entry.getKey(), convertValueToJsonElement(entry.getValue())); + } + fields[fieldIndex] = data.toString(); + } else if (typeInfo.getFieldType(fieldIndex).equals(VECTOR_FLOAT_TYPE)) { + List floats = vector.getValuesList(); + // Convert List to Float[] + Float[] floatArray = floats.toArray(new Float[0]); + fields[fieldIndex] = BufferUtils.toByteBuffer(floatArray); + } else if (typeInfo.getFieldType(fieldIndex).equals(VECTOR_SPARSE_FLOAT_TYPE)) { + // Convert SparseVector to a ByteBuffer + Map sparseMap = new HashMap<>(); + int count = vector.getSparseValues().getIndicesCount(); + for (int i = 0; i < count; i++) { + long index = vector.getSparseValues().getIndices(i); + float value = vector.getSparseValues().getValues(i); + sparseMap.put(index, value); + } + + fields[fieldIndex] = sparseMap; + + } + } + + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields); + seaTunnelRow.setRowKind(RowKind.INSERT); + return seaTunnelRow; + } + + private static JsonElement convertValueToJsonElement(Value value) { + Gson gson = new Gson(); + switch (value.getKindCase()) { + case NULL_VALUE: + return gson.toJsonTree(null); // Null value + case NUMBER_VALUE: + return gson.toJsonTree(value.getNumberValue()); // Double value + case STRING_VALUE: + return gson.toJsonTree(value.getStringValue()); // String value + case BOOL_VALUE: + return gson.toJsonTree(value.getBoolValue()); // Boolean value + case STRUCT_VALUE: + // Convert Struct to a JsonObject + JsonObject structJson = new JsonObject(); + value.getStructValue().getFieldsMap().forEach((k, v) -> + structJson.add(k, convertValueToJsonElement(v)) + ); + return structJson; + case LIST_VALUE: + // Convert List to a JsonArray + return gson.toJsonTree( + value.getListValue().getValuesList().stream() + .map(ConverterUtils::convertValueToJsonElement) + .toArray() + ); + default: + return gson.toJsonTree(null); // Default or unsupported case + } + } +} diff --git a/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/utils/PineconeUtils.java b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/utils/PineconeUtils.java new file mode 100644 index 00000000000..233ad611f8c --- /dev/null +++ b/seatunnel-connectors-v2/connector-pinecone/src/main/java/org/apache/seatunnel/connectors/pinecone/utils/PineconeUtils.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.pinecone.utils; + +import io.pinecone.clients.Index; +import io.pinecone.clients.Pinecone; +import io.pinecone.proto.DescribeIndexStatsResponse; +import io.pinecone.proto.FetchResponse; +import io.pinecone.proto.ListItem; +import io.pinecone.proto.ListResponse; +import io.pinecone.proto.NamespaceSummary; +import io.pinecone.proto.Vector; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE; +import static org.apache.seatunnel.api.table.type.VectorType.VECTOR_FLOAT_TYPE; +import static org.apache.seatunnel.api.table.type.VectorType.VECTOR_SPARSE_FLOAT_TYPE; +import org.apache.seatunnel.common.constants.CommonOptions; +import static org.apache.seatunnel.connectors.pinecone.config.PineconeSourceConfig.API_KEY; +import static org.apache.seatunnel.connectors.pinecone.config.PineconeSourceConfig.INDEX; +import org.openapitools.control.client.model.IndexModel; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class PineconeUtils { + private ReadonlyConfig config; + Map sourceTables; + + public PineconeUtils(ReadonlyConfig config) { + this.config = config; + this.sourceTables = new HashMap<>(); + } + + public Map getSourceTables() { + Pinecone pinecone = new Pinecone.Builder(config.get(API_KEY)).build(); + String indexName = config.get(INDEX); + IndexModel indexMetadata = pinecone.describeIndex(indexName); + TablePath tablePath = TablePath.of("default", indexName); + + Index index = pinecone.getIndexConnection(indexName); + + Vector vector = null; + + DescribeIndexStatsResponse describeIndexStatsResponse = index.describeIndexStats(); + Map namespaceSummaryMap = describeIndexStatsResponse.getNamespacesMap(); + for(Map.Entry entry : namespaceSummaryMap.entrySet()) { + NamespaceSummary namespaceSummary = entry.getValue(); + if (namespaceSummary.getVectorCount() != 0) { + ListResponse listResponse = index.list(entry.getKey(), 10); + List vectorsList = listResponse.getVectorsList(); + List ids = vectorsList.stream().map(ListItem::getId).collect(Collectors.toList()); + if (ids.isEmpty()) { + // no data in the index + return sourceTables; + } + FetchResponse fetchResponse = index.fetch(ids, entry.getKey()); + Map vectorMap = fetchResponse.getVectorsMap(); + vector = vectorMap.entrySet().stream().iterator().next().getValue(); + break; + } + } + if(vector == null) { + // no data in the index + return sourceTables; + } + + List columns = new ArrayList<>(); + + PhysicalColumn idColumn = PhysicalColumn.builder() + .name("id") + .dataType(STRING_TYPE) + .build(); + + columns.add(idColumn); + + Map options = new HashMap<>(); + + options.put(CommonOptions.METADATA.getName(), true); + PhysicalColumn dynamicColumn = PhysicalColumn.builder() + .name(CommonOptions.METADATA.getName()) + .dataType(STRING_TYPE) + .options(options) + .build(); + columns.add(dynamicColumn); + + if(!vector.getValuesList().isEmpty()) { + PhysicalColumn vectorColumn = PhysicalColumn.builder() + .name("vector") + .dataType(VECTOR_FLOAT_TYPE) + .scale(indexMetadata.getDimension()) + .build(); + columns.add(vectorColumn); + } + if (!vector.getSparseValues().getIndicesList().isEmpty()) { + PhysicalColumn sparseVectorColumn = PhysicalColumn.builder() + .name("sparse_vector") + .dataType(VECTOR_SPARSE_FLOAT_TYPE) + .scale(indexMetadata.getDimension()) + .build(); + columns.add(sparseVectorColumn); + } + + TableSchema tableSchema = TableSchema.builder() + .primaryKey(PrimaryKey.of("id", Collections.singletonList("id"))) + .columns(columns) + .build(); + Map sourceTables = new HashMap<>(); + CatalogTable catalogTable = CatalogTable.of(TableIdentifier.of("pinecone", tablePath), + tableSchema, new HashMap<>(), new ArrayList<>(), ""); + sourceTables.put(tablePath, catalogTable); + return sourceTables; + } +} + diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index cf7314e619a..821bf64bb87 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -82,6 +82,7 @@ connector-qdrant connector-sls connector-typesense + connector-pinecone