From 33c80bea34f29d604b2c035490f50c68bb56ae29 Mon Sep 17 00:00:00 2001 From: erezrokah Date: Thu, 24 Aug 2023 10:44:15 +0200 Subject: [PATCH] feat: Resolve CQId, add cq IDs to MemDB plugin --- .../io/cloudquery/helper/ArrowHelper.java | 4 ++ .../main/java/io/cloudquery/memdb/MemDB.java | 5 +- .../io/cloudquery/scheduler/Scheduler.java | 1 + .../java/io/cloudquery/schema/Resource.java | 47 +++++++++++++++++++ .../io/cloudquery/schema/ResourceTest.java | 42 +++++++++++++++++ 5 files changed, 98 insertions(+), 1 deletion(-) diff --git a/lib/src/main/java/io/cloudquery/helper/ArrowHelper.java b/lib/src/main/java/io/cloudquery/helper/ArrowHelper.java index 3dc8ff7..1fc11f1 100644 --- a/lib/src/main/java/io/cloudquery/helper/ArrowHelper.java +++ b/lib/src/main/java/io/cloudquery/helper/ArrowHelper.java @@ -58,6 +58,10 @@ public class ArrowHelper { private static void setVectorData(FieldVector vector, Object data) { vector.allocateNew(); + if (data == null) { + vector.setNull(0); + return; + } if (vector instanceof BigIntVector bigIntVector) { bigIntVector.set(0, (long) data); return; diff --git a/lib/src/main/java/io/cloudquery/memdb/MemDB.java b/lib/src/main/java/io/cloudquery/memdb/MemDB.java index cd53cf2..26a100e 100644 --- a/lib/src/main/java/io/cloudquery/memdb/MemDB.java +++ b/lib/src/main/java/io/cloudquery/memdb/MemDB.java @@ -142,7 +142,10 @@ public void close() { public ClientMeta newClient(String spec, NewClientOptions options) throws Exception { this.spec = Spec.fromJSON(spec); this.allTables = getTables(); - Tables.transformTables(allTables); + Tables.transformTables(this.allTables); + for (Table table : this.allTables) { + table.addCQIDs(); + } return new MemDBClient(); } } diff --git a/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java b/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java index ba734f7..11c0574 100644 --- a/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java +++ b/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java @@ -54,6 +54,7 @@ public void run() { table.getResolver().get().resolve(client, parent, schedulerTableOutputStream); for (Resource resource : schedulerTableOutputStream.getResources()) { + resource.resolveCQId(deterministicCqId); ByteString record = resource.encode(); Sync.MessageInsert insert = Sync.MessageInsert.newBuilder().setRecord(record).build(); diff --git a/lib/src/main/java/io/cloudquery/schema/Resource.java b/lib/src/main/java/io/cloudquery/schema/Resource.java index e2b6c2a..66e532d 100644 --- a/lib/src/main/java/io/cloudquery/schema/Resource.java +++ b/lib/src/main/java/io/cloudquery/schema/Resource.java @@ -1,12 +1,19 @@ package io.cloudquery.schema; +import com.google.common.base.Objects; import com.google.protobuf.ByteString; import io.cloudquery.helper.ArrowHelper; import io.cloudquery.scalar.Scalar; import io.cloudquery.scalar.ValidationException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.UUID; import lombok.Builder; import lombok.Getter; import lombok.NonNull; @@ -44,4 +51,44 @@ public Scalar get(String columnName) { public ByteString encode() throws IOException { return ArrowHelper.encode(this); } + + public void setCqId(UUID value) throws ValidationException { + int index = table.indexOfColumn(Column.CQ_ID_COLUMN.getName()); + if (index == -1) { + return; + } + this.data.get(index).set(value); + } + + public void resolveCQId(boolean deterministicCqId) + throws ValidationException, NoSuchAlgorithmException { + UUID randomUUID = UUID.randomUUID(); + if (!deterministicCqId) { + this.setCqId(randomUUID); + return; + } + + // Use an array list to support sorting + ArrayList pks = new ArrayList<>(this.table.primaryKeys()); + boolean cqOnlyPK = + pks.stream().allMatch((pk) -> Objects.equal(pk, Column.CQ_ID_COLUMN.getName())); + if (cqOnlyPK) { + this.setCqId(randomUUID); + return; + } + + Collections.sort(pks); + // Generate uuid v5 (same as sha-1) + MessageDigest digest = MessageDigest.getInstance("SHA-1"); + for (String pk : pks) { + digest.update(pk.getBytes(StandardCharsets.UTF_8)); + digest.update(this.get(pk).toString().getBytes(StandardCharsets.UTF_8)); + } + + ByteBuffer byteBuffer = ByteBuffer.wrap(digest.digest()); + long mostSig = byteBuffer.getLong(); + long leastSig = byteBuffer.getLong(); + this.setCqId(new UUID(mostSig, leastSig)); + return; + } } diff --git a/lib/src/test/java/io/cloudquery/schema/ResourceTest.java b/lib/src/test/java/io/cloudquery/schema/ResourceTest.java index ae388ef..ff5c983 100644 --- a/lib/src/test/java/io/cloudquery/schema/ResourceTest.java +++ b/lib/src/test/java/io/cloudquery/schema/ResourceTest.java @@ -1,11 +1,15 @@ package io.cloudquery.schema; +import static org.junit.Assert.assertNotNull; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import io.cloudquery.scalar.ValidationException; import io.cloudquery.types.UUIDType; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.UUID; import org.apache.arrow.vector.types.pojo.ArrowType; @@ -41,4 +45,42 @@ public void shouldSetAndGetDataTypes() throws ValidationException { resource.set(column1.getName(), UUID); assertEquals(UUID, resource.get(column1.getName()).get()); } + + @Test + public void shouldResolveRandomCQId() throws ValidationException, NoSuchAlgorithmException { + Table table = Table.builder().name("test").build(); + table.addCQIDs(); + + Resource resource = Resource.builder().table(table).build(); + resource.resolveCQId(false); + + assertNotNull(resource.get(Column.CQ_ID_COLUMN.getName()).get()); + assertEquals( + UUID.getClass().getName(), + resource.get(Column.CQ_ID_COLUMN.getName()).get().getClass().getName()); + } + + @Test + public void shouldResolveDeterministicCqId() + throws ValidationException, NoSuchAlgorithmException { + Column column1 = + Column.builder().name("name").primaryKey(true).type(ArrowType.Utf8.INSTANCE).build(); + Column column2 = + Column.builder().primaryKey(true).name("id").type(new ArrowType.Int(64, true)).build(); + Table table = + Table.builder() + .name("test") + .columns(new ArrayList(Arrays.asList(column1, column2))) + .build(); + table.addCQIDs(); + + Resource resource = Resource.builder().table(table).build(); + resource.set(column1.getName(), "test"); + resource.set(column2.getName(), 1000); + resource.resolveCQId(true); + + assertEquals( + "a63a6152-e1d8-470f-f118-e5fa4874cb2d", + resource.get(Column.CQ_ID_COLUMN.getName()).toString()); + } }