Skip to content

Commit

Permalink
Merge pull request #18 from zilliztech/nianliuu
Browse files Browse the repository at this point in the history
nianliuu
  • Loading branch information
nianliuu authored Oct 16, 2024
2 parents b14c0ac + 61cae8e commit 41a7f92
Show file tree
Hide file tree
Showing 19 changed files with 271 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public AstraDBSource(ReadonlyConfig config) {
*/
@Override
public Boundedness getBoundedness() {
return null;
return Boundedness.BOUNDED;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ public void write(SeaTunnelRow element) {
}

@Override
public void close() {}
public void close() {
log.info("ConsoleSinkWriter closed");
}

private String fieldsInfo(SeaTunnelRowType seaTunnelRowType) {
String[] fieldsInfo = new String[seaTunnelRowType.getTotalFields()];
Expand Down
2 changes: 1 addition & 1 deletion seatunnel-connectors-v2/connector-milvus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>2.4.3</version>
<version>2.4.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ private void initMilvusClient(ReadonlyConfig config) throws SeaTunnelException {
log.info("begin to init Milvus client");
String dbName = catalogTable.getTablePath().getDatabaseName();
String collectionName = catalogTable.getTablePath().getTableName();
this.hasPartitionKey = MilvusConnectorUtils.hasPartitionKey(config.get(URL), config.get(TOKEN), dbName, collectionName);

ConnectConfig connectConfig = ConnectConfig.builder()
.uri(config.get(URL))
Expand All @@ -80,6 +79,7 @@ private void initMilvusClient(ReadonlyConfig config) throws SeaTunnelException {
if(StringUtils.isNotEmpty(dbName)) {
milvusClient.useDatabase(dbName);
}
this.hasPartitionKey = MilvusConnectorUtils.hasPartitionKey(milvusClient, collectionName);
// set rate limit
if(config.get(RATE_LIMIT) > 0) {
log.info("set rate limit for collection: " + collectionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,94 +7,22 @@
import io.milvus.param.ConnectParam;
import io.milvus.param.R;
import io.milvus.param.collection.DescribeCollectionParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Base64;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import io.milvus.v2.service.collection.request.DescribeCollectionReq;
import io.milvus.v2.service.collection.response.DescribeCollectionResp;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MilvusConnectorUtils {
private static final Logger log = LoggerFactory.getLogger(MilvusConnectorUtils.class);
private static final SecretKey SECRET_KEY = generateSecretKey();

public static String encryptToken(String token) {
try {
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.ENCRYPT_MODE, SECRET_KEY);

byte[] encryptedBytes = cipher.doFinal(token.getBytes(StandardCharsets.UTF_8));
return Base64.getEncoder().encodeToString(encryptedBytes);
} catch (Exception e) {
// Handle encryption errors
log.error("encryption error" + e.getMessage());
return null;
}
}

public static String decryptToken(String token) {
try {
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.DECRYPT_MODE, SECRET_KEY);

byte[] encryptedBytes = Base64.getDecoder().decode(token);
byte[] decryptedBytes = cipher.doFinal(encryptedBytes);

return new String(decryptedBytes, StandardCharsets.UTF_8);
} catch (Exception e) {
// Handle decryption errors
log.error("decryption error" + e.getMessage());
return null;
}
}

public static boolean isEncrypted(String token) {
try {
// Check if the string is a valid Base64 encoded string
byte[] decodedBytes = Base64.getDecoder().decode(token);

// Try to decrypt the string (this will throw an exception if it fails)
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.DECRYPT_MODE, SECRET_KEY);
cipher.doFinal(decodedBytes);

// If decryption was successful, we assume the string was encrypted
return true;
} catch (Exception e) {
// If any exception is caught, the string is likely not encrypted
return false;
}
}

public static SecretKey generateSecretKey() {
try {
KeyGenerator keyGenerator = KeyGenerator.getInstance("AES");
SecureRandom secureRandom = new SecureRandom();
keyGenerator.init(128, secureRandom);
return keyGenerator.generateKey();
} catch (NoSuchAlgorithmException e) {
log.error(e.getMessage());
return null;
}
}
public static Boolean hasPartitionKey(MilvusClientV2 milvusClient, String collectionName) {

public static Boolean hasPartitionKey(String url, String token, String dbName, String collectionName) {
ConnectParam connectParam = ConnectParam.newBuilder()
.withUri(url)
.withToken(token)
.build();
MilvusClient milvusClient = new MilvusServiceClient(connectParam);
R<DescribeCollectionResponse> describeCollectionResponseR = milvusClient.describeCollection(
DescribeCollectionParam.newBuilder()
.withDatabaseName(dbName)
.withCollectionName(collectionName)
DescribeCollectionResp describeCollectionResp = milvusClient.describeCollection(
DescribeCollectionReq.builder()
.collectionName(collectionName)
.build());
boolean hasPartitionKey = describeCollectionResponseR.getData().getSchema().getFieldsList().stream().anyMatch(FieldSchema::getIsPartitionKey);
milvusClient.close();
return hasPartitionKey;
return describeCollectionResp.getCollectionSchema().getFieldSchemaList().stream()
.anyMatch(CreateCollectionReq.FieldSchema::getIsPartitionKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import java.util.*;
import java.util.stream.Collectors;

import static org.apache.seatunnel.api.table.type.BasicType.JSON_TYPE;

@Slf4j
public class MilvusConvertUtils {

Expand Down Expand Up @@ -110,7 +112,7 @@ public CatalogTable getCatalogTable(
.build());

if (response.getStatus() != R.Status.Success.getCode()) {
throw new MilvusConnectorException(MilvusConnectionErrorCode.DESC_COLLECTION_ERROR);
throw new MilvusConnectorException(MilvusConnectionErrorCode.DESC_COLLECTION_ERROR, response.getMessage());
}
log.info("describe collection database: {}, collection: {}, response: {}", database, collection, response);
// collection column
Expand All @@ -126,6 +128,17 @@ public CatalogTable getCatalogTable(
partitionKeyField = fieldSchema.getName();
}
}
if(collectionResponse.getSchema().getEnableDynamicField()){
Map<String, Object> options = new HashMap<>();

options.put("isDynamicField", true);
PhysicalColumn dynamicColumn = PhysicalColumn.builder()
.name("meta")
.dataType(JSON_TYPE)
.options(options)
.build();
columns.add(dynamicColumn);
}

// primary key
PrimaryKey primaryKey = buildPrimaryKey(schema.getFieldsList());
Expand Down
52 changes: 42 additions & 10 deletions seatunnel-connectors-v2/connector-pinecone/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,61 @@
<artifactId>seatunnel-connectors-v2</artifactId>
<version>${revision}</version>
</parent>
<repositories>
<repository>
<id>github</id>
<url>https://maven.pkg.github.com/nianliuu/pinecone-java-client</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<artifactId>connector-pinecone</artifactId>
<name>SeaTunnel : Connectors V2 : Pinecone</name>

<properties>
<!-- Specify the gRPC version -->
<grpc.version>1.59.1</grpc.version>
<protobuf.version>3.25.1</protobuf.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.pinecone</groupId>
<artifactId>pinecone-client</artifactId>
<version>2.1.0-fix</version>
<version>2.1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.1</version>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public PineconeSource(ReadonlyConfig config) {
*/
@Override
public Boundedness getBoundedness() {
return null;
return Boundedness.BOUNDED;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.pinecone.clients.Index;
import io.pinecone.clients.Pinecone;
import io.pinecone.proto.*;
import io.pinecone.proto.Vector;
import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
Expand All @@ -17,10 +18,7 @@
import org.apache.seatunnel.connectors.pinecone.utils.ConverterUtils;

import java.io.IOException;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -90,6 +88,9 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
}
List<ListItem> vectorsList = listResponse.getVectorsList();
List<String> ids = vectorsList.stream().map(ListItem::getId).collect(Collectors.toList());
if(ids.isEmpty()){
break;
}
FetchResponse fetchResponse = index.fetch(ids, namespace);
Map<String, Vector> vectorMap = fetchResponse.getVectorsMap();
for (Map.Entry<String, Vector> entry : vectorMap.entrySet()) {
Expand Down Expand Up @@ -134,7 +135,7 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
*/
@Override
public List<PineconeSourceSplit> snapshotState(long checkpointId) throws Exception {
return null;
return new ArrayList<>(pendingSplits);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -24,7 +25,6 @@

public class ConverterUtils {
public static SeaTunnelRow convertToSeatunnelRow(TableSchema tableSchema, Vector vector) {
SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType();
Object[] fields = new Object[typeInfo.getTotalFields()];
List<String> fieldNames = Arrays.stream(typeInfo.getFieldNames()).collect(Collectors.toList());
Expand All @@ -46,12 +46,15 @@ public static SeaTunnelRow convertToSeatunnelRow(TableSchema tableSchema, Vector
fields[fieldIndex] = BufferUtils.toByteBuffer(floatArray);
} else if (typeInfo.getFieldType(fieldIndex).equals(VECTOR_SPARSE_FLOAT_TYPE)) {
// Convert SparseVector to a ByteBuffer
Map<Long, Float> spraseMap = vector.getSparseValues().getIndicesList().stream().collect(Collectors.toMap(
index -> (long) vector.getSparseValues().getIndices(index),
index -> vector.getSparseValues().getValues(index)
));
Map<Long, Float> sparseMap = new HashMap<>();
int count = vector.getSparseValues().getIndicesCount();
for (int i = 0; i < count; i++) {
long index = vector.getSparseValues().getIndices(i);
float value = vector.getSparseValues().getValues(i);
sparseMap.put(index, value);
}

fields[fieldIndex] = spraseMap;
fields[fieldIndex] = sparseMap;

}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package org.apache.seatunnel.connectors.pinecone.utils;

import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import io.pinecone.clients.Index;
import io.pinecone.clients.Pinecone;
import io.pinecone.proto.FetchResponse;
Expand All @@ -14,8 +8,7 @@
import io.pinecone.proto.Vector;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.*;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.openapitools.control.client.model.IndexModel;

import java.util.ArrayList;
Expand Down Expand Up @@ -50,6 +43,10 @@ public Map<TablePath, CatalogTable> getSourceTables() {
ListResponse listResponse = index.list();
List<ListItem> vectorsList = listResponse.getVectorsList();
List<String> ids = vectorsList.stream().map(ListItem::getId).collect(Collectors.toList());
if (ids.isEmpty()) {
// no data in the index
return sourceTables;
}
FetchResponse fetchResponse = index.fetch(ids);
Map<String, Vector> vectorMap = fetchResponse.getVectorsMap();
Vector vector = vectorMap.entrySet().stream().iterator().next().getValue();
Expand Down Expand Up @@ -80,7 +77,8 @@ public Map<TablePath, CatalogTable> getSourceTables() {
.scale(indexMetadata.getDimension())
.build();
columns.add(vectorColumn);
} else if (!vector.getSparseValues().getValuesList().isEmpty()) {
}
if (!vector.getSparseValues().getIndicesList().isEmpty()) {
PhysicalColumn sparseVectorColumn = PhysicalColumn.builder()
.name("sparse_vector")
.dataType(VECTOR_SPARSE_FLOAT_TYPE)
Expand Down
Loading

0 comments on commit 41a7f92

Please sign in to comment.