Skip to content

Commit

Permalink
feat: Implement concurrency and relations resolving (#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
erezrokah authored Aug 24, 2023
1 parent 1fe0c7f commit 0a470b7
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 54 deletions.
21 changes: 21 additions & 0 deletions lib/src/main/java/io/cloudquery/memdb/MemDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,27 @@ public void resolve(
}
})
.transform(TransformWithClass.builder(Table2Data.class).pkField("id").build())
.relations(
List.of(
Table.builder()
.name("table2_child")
.resolver(
new TableResolver() {

@Override
public void resolve(
ClientMeta clientMeta,
Resource parent,
TableOutputStream stream) {
String parentName = parent.get("name").toString();
stream.write(
Table2ChildData.builder().name(parentName + "_name1").build());
stream.write(
Table2ChildData.builder().name(parentName + "_name2").build());
}
})
.transform(TransformWithClass.builder(Table2ChildData.class).build())
.build()))
.build());
}

Expand Down
2 changes: 1 addition & 1 deletion lib/src/main/java/io/cloudquery/memdb/Spec.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static Spec fromJSON(String json) throws JsonMappingException, JsonProces
ObjectMapper objectMapper = new ObjectMapper();
Spec spec = objectMapper.readValue(json, Spec.class);
if (spec.getConcurrency() == 0) {
spec.setConcurrency(10000);
spec.setConcurrency(100);
}
return spec;
}
Expand Down
10 changes: 10 additions & 0 deletions lib/src/main/java/io/cloudquery/memdb/Table2ChildData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.cloudquery.memdb;

import lombok.Builder;
import lombok.Getter;

@Builder
@Getter
public class Table2ChildData {
private String name;
}

This file was deleted.

83 changes: 62 additions & 21 deletions lib/src/main/java/io/cloudquery/scheduler/Scheduler.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package io.cloudquery.scheduler;

import com.google.protobuf.ByteString;
import io.cloudquery.helper.ArrowHelper;
import io.cloudquery.plugin.v3.Sync;
import io.cloudquery.schema.ClientMeta;
import io.cloudquery.schema.Resource;
import io.cloudquery.schema.Table;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.Builder;
import lombok.NonNull;
import org.apache.logging.log4j.Logger;
Expand All @@ -20,8 +25,58 @@ public class Scheduler {
private int concurrency;
private boolean deterministicCqId;

public void sync() {
private void resolveTables(List<Table> tables, Resource parent, int concurrency)
throws InterruptedException {
if (tables == null || tables.isEmpty()) {
return;
}
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tables.size(), concurrency));
for (Table table : tables) {
final int nextLevelConcurrency = Math.max(1, concurrency / 2);
executor.submit(
new Runnable() {
@Override
public void run() {
try {
String tableMessage =
parent != null
? "table " + table.getName() + " of parent" + parent.getTable().getName()
: "table " + table.getName();

logger.info("resolving {}", tableMessage);
if (!table.getResolver().isPresent()) {
logger.error("no resolver for {}", tableMessage);
return;
}

SchedulerTableOutputStream schedulerTableOutputStream =
new SchedulerTableOutputStream(table, parent, client, logger);
table.getResolver().get().resolve(client, parent, schedulerTableOutputStream);

for (Resource resource : schedulerTableOutputStream.getResources()) {
ByteString record = resource.encode();
Sync.MessageInsert insert =
Sync.MessageInsert.newBuilder().setRecord(record).build();
Sync.Response response = Sync.Response.newBuilder().setInsert(insert).build();
syncStream.onNext(response);
resolveTables(table.getRelations(), resource, nextLevelConcurrency);
}

logger.info("resolved {}", tableMessage);
} catch (Exception e) {
logger.error("Failed to resolve table: {}", table.getName(), e);
syncStream.onError(e);
return;
}
}
});
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
}

public void sync() {
for (Table table : Table.flattenTables(tables)) {
try {
logger.info("sending migrate message for table: {}", table.getName());
Sync.MessageMigrateTable migrateTable =
Expand All @@ -34,26 +89,12 @@ public void sync() {
}
}

for (Table table : tables) {
try {
logger.info("resolving table: {}", table.getName());
if (!table.getResolver().isPresent()) {
logger.error("no resolver for table: {}", table.getName());
continue;
}
SchedulerTableOutputStream schedulerTableOutputStream =
SchedulerTableOutputStream.builder()
.table(table)
.client(client)
.logger(logger)
.syncStream(syncStream)
.build();
table.getResolver().get().resolve(client, null, schedulerTableOutputStream);
logger.info("resolved table: {}", table.getName());
} catch (Exception e) {
syncStream.onError(e);
return;
}
try {
resolveTables(this.tables, null, this.concurrency);
} catch (InterruptedException e) {
logger.error("Failed to resolve tables", e);
syncStream.onError(e);
return;
}

syncStream.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,71 @@
package io.cloudquery.scheduler;

import com.google.protobuf.ByteString;
import io.cloudquery.plugin.TableOutputStream;
import io.cloudquery.plugin.v3.Sync;
import io.cloudquery.schema.ClientMeta;
import io.cloudquery.schema.Column;
import io.cloudquery.schema.Resource;
import io.cloudquery.schema.Table;
import io.cloudquery.transformers.TransformerException;
import io.grpc.stub.StreamObserver;
import lombok.Builder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import org.apache.logging.log4j.Logger;

@Builder
public class SchedulerTableOutputStream implements TableOutputStream {
private static final int RESOURCE_RESOLVE_CONCURRENCY = 100;
private static final int RESOURCE_RESOLVE_TIMEOUT_MINUTES = 10;
@NonNull private final Table table;
private final Resource parent;
@NonNull private final ClientMeta client;
@NonNull private final Logger logger;
@NonNull private final StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream;

private List<Resource> resources = new ArrayList<Resource>();

private ExecutorService executor;

public SchedulerTableOutputStream(
@NonNull Table table, Resource parent, @NonNull ClientMeta client, @NonNull Logger logger) {
this.table = table;
this.parent = parent;
this.client = client;
this.logger = logger;
this.executor = Executors.newFixedThreadPool(RESOURCE_RESOLVE_CONCURRENCY);
}

@Override
public void write(Object data) {
Resource resource = Resource.builder().table(table).parent(parent).item(data).build();
for (Column column : table.getColumns()) {
try {
logger.info("resolving column: {}", column.getName());
if (column.getResolver() == null) {
logger.error("no resolver for column: {}", column.getName());
continue;
}
column.getResolver().resolve(client, resource, column);
logger.info("resolved column: {}", column.getName());
} catch (TransformerException e) {
logger.error("Failed to resolve column: {}", column.getName(), e);
return;
}
executor.submit(
new Runnable() {
@Override
public void run() {
try {
logger.debug("resolving column: {}", column.getName());
if (column.getResolver() == null) {
logger.error("no resolver for column: {}", column.getName());
return;
}
column.getResolver().resolve(client, resource, column);
logger.debug("resolved column: {}", column.getName());
return;
} catch (TransformerException e) {
logger.error("Failed to resolve column: {}", column.getName(), e);
return;
}
}
});
}
resources.add(resource);
}

try {
ByteString record = resource.encode();
Sync.MessageInsert insert = Sync.MessageInsert.newBuilder().setRecord(record).build();
Sync.Response response = Sync.Response.newBuilder().setInsert(insert).build();
syncStream.onNext(response);
} catch (Exception e) {
logger.error("Failed to encode resource: {}", resource, e);
return;
}
public List<Resource> getResources() throws InterruptedException {
// TODO: Optimize this to not wait for all resources to complete
executor.shutdown();
executor.awaitTermination(RESOURCE_RESOLVE_TIMEOUT_MINUTES, TimeUnit.MINUTES);
return this.resources;
}
}
4 changes: 2 additions & 2 deletions lib/src/main/java/io/cloudquery/schema/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.cloudquery.transformers.TransformerException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -28,7 +28,7 @@ public interface Transform {
}

public static List<Table> flattenTables(List<Table> tables) {
Map<String, Table> flattenMap = new HashMap<>();
Map<String, Table> flattenMap = new LinkedHashMap<>();
for (Table table : tables) {
Table newTable = table.toBuilder().relations(Collections.emptyList()).build();
flattenMap.put(newTable.name, newTable);
Expand Down

0 comments on commit 0a470b7

Please sign in to comment.