Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Resolve CQId, add CQIds to MemDB plugin #95

Merged
merged 2 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/src/main/java/io/cloudquery/helper/ArrowHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public class ArrowHelper {

private static void setVectorData(FieldVector vector, Object data) {
vector.allocateNew();
if (data == null) {
vector.setNull(0);
return;
}
if (vector instanceof BigIntVector bigIntVector) {
bigIntVector.set(0, (long) data);
return;
Expand Down
5 changes: 4 additions & 1 deletion lib/src/main/java/io/cloudquery/memdb/MemDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,10 @@ public void close() {
public ClientMeta newClient(String spec, NewClientOptions options) throws Exception {
this.spec = Spec.fromJSON(spec);
this.allTables = getTables();
Tables.transformTables(allTables);
Tables.transformTables(this.allTables);
for (Table table : this.allTables) {
table.addCQIDs();
}
return new MemDBClient();
}
}
1 change: 1 addition & 0 deletions lib/src/main/java/io/cloudquery/scheduler/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public void run() {
table.getResolver().get().resolve(client, parent, schedulerTableOutputStream);

for (Resource resource : schedulerTableOutputStream.getResources()) {
resource.resolveCQId(deterministicCqId);
ByteString record = resource.encode();
Sync.MessageInsert insert =
Sync.MessageInsert.newBuilder().setRecord(record).build();
Expand Down
47 changes: 47 additions & 0 deletions lib/src/main/java/io/cloudquery/schema/Resource.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package io.cloudquery.schema;

import com.google.common.base.Objects;
import com.google.protobuf.ByteString;
import io.cloudquery.helper.ArrowHelper;
import io.cloudquery.scalar.Scalar;
import io.cloudquery.scalar.ValidationException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
Expand Down Expand Up @@ -44,4 +51,44 @@ public Scalar<?> get(String columnName) {
public ByteString encode() throws IOException {
return ArrowHelper.encode(this);
}

public void setCqId(UUID value) throws ValidationException {
int index = table.indexOfColumn(Column.CQ_ID_COLUMN.getName());
if (index == -1) {
return;
}
this.data.get(index).set(value);
}

public void resolveCQId(boolean deterministicCqId)
throws ValidationException, NoSuchAlgorithmException {
UUID randomUUID = UUID.randomUUID();
if (!deterministicCqId) {
this.setCqId(randomUUID);
return;
}

// Use an array list to support sorting
ArrayList<String> pks = new ArrayList<>(this.table.primaryKeys());
boolean cqOnlyPK =
pks.stream().allMatch((pk) -> Objects.equal(pk, Column.CQ_ID_COLUMN.getName()));
if (cqOnlyPK) {
this.setCqId(randomUUID);
return;
}

Collections.sort(pks);
// Generate uuid v5 (same as sha-1)
MessageDigest digest = MessageDigest.getInstance("SHA-1");
for (String pk : pks) {
digest.update(pk.getBytes(StandardCharsets.UTF_8));
digest.update(this.get(pk).toString().getBytes(StandardCharsets.UTF_8));
}

ByteBuffer byteBuffer = ByteBuffer.wrap(digest.digest());
long mostSig = byteBuffer.getLong();
long leastSig = byteBuffer.getLong();
this.setCqId(new UUID(mostSig, leastSig));
return;
}
}
42 changes: 42 additions & 0 deletions lib/src/test/java/io/cloudquery/schema/ResourceTest.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package io.cloudquery.schema;

import static org.junit.Assert.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;

import io.cloudquery.scalar.ValidationException;
import io.cloudquery.types.UUIDType;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import org.apache.arrow.vector.types.pojo.ArrowType;
Expand Down Expand Up @@ -41,4 +45,42 @@ public void shouldSetAndGetDataTypes() throws ValidationException {
resource.set(column1.getName(), UUID);
assertEquals(UUID, resource.get(column1.getName()).get());
}

@Test
public void shouldResolveRandomCQId() throws ValidationException, NoSuchAlgorithmException {
Table table = Table.builder().name("test").build();
table.addCQIDs();

Resource resource = Resource.builder().table(table).build();
resource.resolveCQId(false);

assertNotNull(resource.get(Column.CQ_ID_COLUMN.getName()).get());
assertEquals(
UUID.getClass().getName(),
resource.get(Column.CQ_ID_COLUMN.getName()).get().getClass().getName());
}

@Test
public void shouldResolveDeterministicCqId()
throws ValidationException, NoSuchAlgorithmException {
Column column1 =
Column.builder().name("name").primaryKey(true).type(ArrowType.Utf8.INSTANCE).build();
Column column2 =
Column.builder().primaryKey(true).name("id").type(new ArrowType.Int(64, true)).build();
Table table =
Table.builder()
.name("test")
.columns(new ArrayList<Column>(Arrays.asList(column1, column2)))
.build();
table.addCQIDs();

Resource resource = Resource.builder().table(table).build();
resource.set(column1.getName(), "test");
resource.set(column2.getName(), 1000);
resource.resolveCQId(true);

assertEquals(
"a63a6152-e1d8-470f-f118-e5fa4874cb2d",
resource.get(Column.CQ_ID_COLUMN.getName()).toString());
}
}