Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement getTables #71

Merged
merged 5 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ plugins {
id "maven-publish"
}

ext {
javaMainClass = "io.cloudquery.MainClass"
}

group 'io.cloudquery'
// x-release-please-start-version
version = '0.0.1'
Expand Down Expand Up @@ -34,6 +38,7 @@ dependencies {
implementation "io.grpc:grpc-services:1.57.1"
implementation "io.grpc:grpc-testing:1.57.1"
implementation "io.cloudquery:plugin-pb-java:0.0.5"
implementation "org.apache.arrow:arrow-memory-core:12.0.1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR, but maybe we should introduce version variables since we have multiple imports for several packages now.

implementation "org.apache.arrow:arrow-vector:12.0.1"

implementation 'org.apache.logging.log4j:log4j-api:2.20.0'
Expand All @@ -44,11 +49,12 @@ dependencies {
testImplementation('org.junit.jupiter:junit-jupiter-api:5.10.0')
testImplementation('org.mockito:mockito-core:5.4.0')
testImplementation('org.mockito:mockito-junit-jupiter:5.4.0')
testImplementation('org.apache.arrow:arrow-memory-netty:12.0.1')
testImplementation('nl.jqno.equalsverifier:equalsverifier:3.15')
testRuntimeOnly('org.junit.jupiter:junit-jupiter-engine:5.10.0')

testImplementation 'org.assertj:assertj-core:3.24.2'

runtimeOnly "org.apache.arrow:arrow-memory-netty:12.0.1"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

test {
Expand Down Expand Up @@ -83,3 +89,11 @@ publishing {
}
}
}

task runMemDBServe(type: JavaExec) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this so I can debug the MemDB plugin via VSCode without having to manually configure class paths

group = "Execution"
description = "Start the MemDB plugin server"
classpath = sourceSets.main.runtimeClasspath
main = javaMainClass
args = ["serve"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.cloudquery.memdb.MemDB;
import io.cloudquery.server.PluginServe;

public class Main {
public class MainClass {
public static void main(String[] args) {
PluginServe serve = PluginServe.builder().plugin(new MemDB()).args(args).build();
int exitCode = serve.Serve();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
package io.cloudquery.internal.servers.plugin.v3;

import io.cloudquery.plugin.v3.PluginGrpc.PluginImplBase;
import io.cloudquery.schema.Table;
import io.cloudquery.plugin.v3.Write;
import io.grpc.stub.StreamObserver;

import java.io.ByteArrayOutputStream;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Schema;
import com.google.protobuf.ByteString;

import io.cloudquery.plugin.Plugin;

public class PluginServer extends PluginImplBase {
Expand All @@ -9,4 +25,100 @@ public class PluginServer extends PluginImplBase {
public PluginServer(Plugin plugin) {
this.plugin = plugin;
}

@Override
public void getName(io.cloudquery.plugin.v3.GetName.Request request,
StreamObserver<io.cloudquery.plugin.v3.GetName.Response> responseObserver) {
responseObserver
.onNext(io.cloudquery.plugin.v3.GetName.Response.newBuilder().setName(plugin.getName()).build());
responseObserver.onCompleted();
}

@Override
public void getVersion(io.cloudquery.plugin.v3.GetVersion.Request request,
StreamObserver<io.cloudquery.plugin.v3.GetVersion.Response> responseObserver) {
responseObserver.onNext(
io.cloudquery.plugin.v3.GetVersion.Response.newBuilder().setVersion(plugin.getVersion()).build());
responseObserver.onCompleted();
}

@Override
public void init(io.cloudquery.plugin.v3.Init.Request request,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be implemented later

StreamObserver<io.cloudquery.plugin.v3.Init.Response> responseObserver) {
plugin.init();
responseObserver.onNext(io.cloudquery.plugin.v3.Init.Response.newBuilder().build());
responseObserver.onCompleted();
}

@Override
public void getTables(io.cloudquery.plugin.v3.GetTables.Request request,
StreamObserver<io.cloudquery.plugin.v3.GetTables.Response> responseObserver) {
try {
List<Table> tables = plugin.tables();
List<ByteString> byteStrings = new ArrayList<>();
for (Table table : tables) {
try (BufferAllocator bufferAllocator = new RootAllocator()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema schema = table.toArrowSchema();
VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator);
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (ArrowStreamWriter writer = new ArrowStreamWriter(schemaRoot, null,
Channels.newChannel(out))) {
writer.start();
writer.end();
Comment on lines +66 to +67
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was expecting a call to a write method here, or is the call to end writing data?

Copy link
Member Author

@erezrokah erezrokah Aug 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, start writes the schema which is what we want here. We don't need to write any records (this will happen during sync) so there's no write.
https://github.com/apache/arrow/blob/6357c9f2419d5b0717e62adc8233c649e10de34b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java#L183

byteStrings.add(ByteString.copyFrom(out.toByteArray()));
}
}
}
}
responseObserver
.onNext(io.cloudquery.plugin.v3.GetTables.Response.newBuilder().addAllTables(byteStrings).build());
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(e);
}
}

@Override
public void sync(io.cloudquery.plugin.v3.Sync.Request request,
StreamObserver<io.cloudquery.plugin.v3.Sync.Response> responseObserver) {
plugin.sync();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be implemented later

responseObserver.onNext(io.cloudquery.plugin.v3.Sync.Response.newBuilder().build());
responseObserver.onCompleted();
}

@Override
public void read(io.cloudquery.plugin.v3.Read.Request request,
StreamObserver<io.cloudquery.plugin.v3.Read.Response> responseObserver) {
plugin.read();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be implemented later

responseObserver.onNext(io.cloudquery.plugin.v3.Read.Response.newBuilder().build());
responseObserver.onCompleted();
}

@Override
public StreamObserver<Write.Request> write(StreamObserver<Write.Response> responseObserver) {
plugin.write();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be implemented later

return new StreamObserver<>() {
@Override
public void onNext(Write.Request request) {
}

@Override
public void onError(Throwable t) {
}

@Override
public void onCompleted() {
responseObserver.onNext(Write.Response.newBuilder().build());
responseObserver.onCompleted();
}
};
}

@Override
public void close(io.cloudquery.plugin.v3.Close.Request request,
StreamObserver<io.cloudquery.plugin.v3.Close.Response> responseObserver) {
plugin.close();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be implemented later

responseObserver.onNext(io.cloudquery.plugin.v3.Close.Response.newBuilder().build());
responseObserver.onCompleted();
}
}
44 changes: 44 additions & 0 deletions lib/src/main/java/io/cloudquery/memdb/MemDB.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,53 @@
package io.cloudquery.memdb;

import java.util.List;

import io.cloudquery.plugin.Plugin;
import io.cloudquery.schema.Table;
import io.cloudquery.schema.Column;
import io.cloudquery.schema.SchemaException;

import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;

public class MemDB extends Plugin {
private List<Table> allTables = List.of(
Table.builder().name("table1").columns(List.of(Column.builder().name("name1").type(new Utf8()).build()))
.build(),
Table.builder().name("table2").columns(List.of(Column.builder().name("name1").type(new Utf8()).build()))
.build());

public MemDB() {
super("memdb", "0.0.1");
}

@Override
public void init() {
// do nothing
}

@Override
public List<Table> tables() throws SchemaException {
return Table.filterDFS(allTables, List.of("*"), List.of(), false);
}

@Override
public void sync() {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'Sync'");
}

@Override
public void read() {
throw new UnsupportedOperationException("Unimplemented method 'Read'");
}

@Override
public void write() {
throw new UnsupportedOperationException("Unimplemented method 'Write'");
}

@Override
public void close() {
// do nothing
}
}
26 changes: 24 additions & 2 deletions lib/src/main/java/io/cloudquery/plugin/Plugin.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,36 @@
package io.cloudquery.plugin;

import lombok.AllArgsConstructor;
import java.util.List;

import org.apache.logging.log4j.Logger;

import io.cloudquery.schema.SchemaException;
import io.cloudquery.schema.Table;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;

@RequiredArgsConstructor
@Getter
@AllArgsConstructor
public abstract class Plugin {
@NonNull
protected final String name;
@NonNull
protected final String version;
@Setter
protected Logger logger;

public abstract void init();

public abstract List<Table> tables() throws SchemaException;

public abstract void sync();

public abstract void read();

public abstract void write();

public abstract void close();

}
5 changes: 3 additions & 2 deletions lib/src/main/java/io/cloudquery/schema/Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.cloudquery.scalar.ValidationException;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -17,10 +18,10 @@ public class Resource {
private final List<Scalar<?>> data;

@Builder(toBuilder = true)
public Resource(Table table, Resource parent, Object item) {
public Resource(@NonNull Table table, Resource parent, Object item) {
this.item = item;
this.parent = parent;
this.table = table != null ? table : Table.builder().build();
this.table = table;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forces a table when creating a resource

this.data = new ArrayList<>();

for (Column column : this.table.getColumns()) {
Expand Down
Loading