Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): Java client with push + pull query support #5200

Merged
merged 20 commits into from
May 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions ksqldb-api-client/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2020 Confluent Inc.
~
~ Licensed under the Confluent Community License (the "License"); you may not use
~ this file except in compliance with the License. You may obtain a copy of the
~ License at
~
~ http://www.confluent.io/confluent-community-license
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OF ANY KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>


<parent>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-parent</artifactId>
<version>6.0.0-SNAPSHOT</version>
</parent>

<artifactId>ksqldb-api-client</artifactId>

<dependencies>
<dependency>
<groupId>io.confluent.ksql</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ok for now. But before we ship I think it's important that we don't depend on any ksqlDB server side stuff - otherwise a whole load of dependencies will be pulled into the client jar, which will it hard to use by users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a look. The main dependencies right now are:

  • QueryResponseMetadata, which is used by QueryResponseHandler in order to deserialize the object
  • BufferedPublisher, which is extended by QueryResultImpl
  • BaseSubscriber, which is extended by PollableSubscriber

What's your recommendation for removing these dependencies? I see the value in not having the client depend on any of the server modules but I also don't think it makes sense to duplicate these classes. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will need to factor out the reactive streams base classes into their own module - e.g. "reactive-common" and have both the server and client depend on that.
For QueryResponseMetaData we could follow the pattern of the old REST API and put the shared classes in their own package (i.e. like rest-model). Or perhaps we should use rest-model?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got'cha, this makes a lot of sense. Will do in a follow-up PR.

<artifactId>ksqldb-rest-app</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>

<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>${reactive-streams.version}</version>
</dependency>

<!-- Required for running tests -->

<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-rest-app</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-test-util</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<!-- Only required to provide LoginModule implementation in tests -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-jaas</artifactId>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client;

import io.confluent.ksql.api.client.impl.ClientImpl;
import io.vertx.core.Vertx;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Publisher;

public interface Client {

/**
* Execute a query (push or pull) and receive the results one row at a time.
*
* @param sql statement of query to execute.
* @return query result.
*/
CompletableFuture<QueryResult> streamQuery(String sql);

/**
* Execute a query (push or pull) and receive the results one row at a time.
*
* @param sql statement of query to execute.
* @param properties query properties.
* @return query result.
*/
CompletableFuture<QueryResult> streamQuery(String sql, Map<String, Object> properties);

/**
* Execute a query (push or pull) and receive all result rows together, once the query has
* completed.
*
* @param sql statement of query to execute.
* @return query result.
*/
CompletableFuture<List<Row>> executeQuery(String sql);

/**
* Execute a query (push or pull) and receive all result rows together, once the query has
* completed.
*
* @param sql statement of query to execute.
* @param properties query properties.
* @return query result.
*/
CompletableFuture<List<Row>> executeQuery(String sql, Map<String, Object> properties);

CompletableFuture<Void> insertInto(String streamName, Map<String, Object> row);

Publisher<InsertAck> streamInserts(String streamName, Publisher<List<Object>> insertsPublisher);

void close();

static Client create(ClientOptions clientOptions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice this method is not used in tests. I think it's better to use the interface method to create a client than directly instantiating ClientImpl. This enables us to change the implementation more easily without breaking clients. We should consider hiding the constructor of ClientImpl (e.g. making package protected or private and indirecting through a factory)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also add a version of create that takes a Vertx instance. This allows the client to use any existing Vert.x the user might already be using in their app, thus alllowing thread pools to be reused etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call on the motivation for using the interface method over directly instantiating ClientImpl. I've updated the tests, and also applied the analogous change to ClientOptionsImpl.

I'm not seeing a way to make the constructor for ClientImpl package private, though. Client.java is in a different package from ClientImpl.java so if Client.java is able to instantiate ClientImpl, then ClientImpl must have a public method for instantiation, whether that's a constructor or a factory method. How do people usually work around this?

I'd also add a version of create that takes a Vertx instance.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look how it's done in Vert.x https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/Vertx.java#L86

Basically the interface uses a static factory instance to actually create the implemention. The ServiceHelper is used to load the factory at run-time by scanning the classpath for implementations of the factory. The factory itself is in the same package as VertxImpl so the VertxImpl constructor can be package protected. It's a bit convoluted and may be overkill for us right now, might be sufficient to not worry about hiding the constructor but perhaps adding javadoc to it saying it should not be instantiated directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm interesting, the VertxImpl constructor is package protected but the factory implementation (in the same package) is still public: https://github.com/eclipse-vertx/vert.x/blob/3.8/src/main/java/io/vertx/core/impl/VertxFactoryImpl.java#L23
so if a user wanted to circumvent the intent of the VertxImpl constructor being private they could still do so by calling new VertxImplFactory().vertx(), right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, but the intention here isn't to protect against malicious users - it's pretty trivial to construct any object, even if it has an inaccessible private/protected/package protected constructor, using reflection. The idea is to nudge users to the right API :)

return new ClientImpl(clientOptions);
}

static Client create(ClientOptions clientOptions, Vertx vertx) {
return new ClientImpl(clientOptions, vertx);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client;

import io.confluent.ksql.api.client.impl.ClientOptionsImpl;

public interface ClientOptions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also expose the Vert.x HttpClientOptions? There are probably other settings (e.g. pool size) that users might want to tweak.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand the expected behavior if a user provides HttpClienttOptions: will ClientOptionsImpl update the provided HttpClienttOptions according to the other fields, or will the user-provided HttpClientOptions be used directly?

The latter doesn't seem very user-friendly since then the user would be responsible for duplicating the work of ClientImpl in populating HttpClientOptions based on ClientOptions, but the former also seems confusing since the user would have to know which HttpClientOptions properties will be overridden and which won't.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably expose the HttpClientOptions directly and not have similar methods on ClientOptions at all. I.e. only have options on ClientOptions if they're not related to HTTP. If you're not comfortable exposing the HttpClientOptions directly you could wrap them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I.e. only have options on ClientOptions if they're not related to HTTP.

What counts as "not related to HTTP"? Of the options exposed so far (host, port, useTls, trustStore, keyStore, and basicAuth), all of them have equivalents in the Vert.x HttpClientOptions besides basicAuth. If we were to only expose HttpClientOptions and not have similar options on ClientOptions then ClientOptions would become

public interface ClientOptions {

  ClientOptions setBasicAuthCredentials(String username, String password);

  boolean isUseBasicAuth();

  String getBasicAuthUsername();

  String getBasicAuthPassword();

  ClientOptions copy();

  static ClientOptions create(HttpClientOptions httpClientOptions) {
    return new ClientOptionsImpl(httpClientOptions);
  }

Is this what you're proposing? I feel like I've misunderstood.

If you're not comfortable exposing the HttpClientOptions directly you could wrap them.

I assume this means creating a wrapper type around HttpClientOptions, rather than wrapping the individual methods of HttpClientOptions into ClientOptions (as exposing too many options in ClientOptions feels like it'll overwhelm the user). If we were to create a wrapper type around HttpClientOptions, would it not be better to leave the more commonly used methods in ClientOptions itself (as is currently the case) and only wrap the other options in HttpClientOptions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to change my mind on this one. I think it's ok to just use our own ClientOptions and not to expose HttpClientOptions. If we find we need to expose more properties over time on ClientOptions we can do that.

(Aside: BTW I think we should support token auth on the client too, not just basic auth)


ClientOptions setHost(String host);

ClientOptions setPort(int port);

ClientOptions setUseTls(boolean useTls);

ClientOptions setUseClientAuth(boolean useClientAuth);

ClientOptions setVerifyHost(boolean verifyHost);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These additional TLS options were needed to get the tests working. In a follow-up PR they'll either be removed in favor of exposing Vert.x HttpClientOptions (pending discussion in #5200 (comment)) or I'll refactor all the TLS options into a separate interface in order to clean up this one.


ClientOptions setTrustAll(boolean trustAll);

ClientOptions setTrustStore(String trustStorePath);

ClientOptions setTrustStorePassword(String trustStorePassword);

ClientOptions setKeyStore(String keyStorePath);

ClientOptions setKeyStorePassword(String keyStorePassword);

ClientOptions setBasicAuthCredentials(String username, String password);

ClientOptions setExecuteQueryMaxResultRows(int maxRows);

String getHost();

int getPort();

boolean isUseTls();

boolean isUseClientAuth();

boolean isVerifyHost();

boolean isTrustAll();

boolean isUseBasicAuth();

String getTrustStore();

String getTrustStorePassword();

String getKeyStore();

String getKeyStorePassword();

String getBasicAuthUsername();

String getBasicAuthPassword();

int getExecuteQueryMaxResultRows();

ClientOptions copy();

static ClientOptions create() {
return new ClientOptionsImpl();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client;

public interface InsertAck {

int getNum();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client;

import java.util.List;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;

/**
* The result of a query (push or pull), streamed one row at time. Records may be consumed by either
* subscribing to the publisher or polling (blocking) for one record at a time. These two methods of
* consumption are mutually exclusive; only one method may be used (per QueryResult).
*/
public interface QueryResult extends Publisher<Row> {

List<String> columnNames();

List<String> columnTypes();

String queryID();

/**
* Block until a row becomes available.
*
* @return the row.
*/
Row poll();

/**
* Block until a row becomes available or the timeout has elapsed.
*
* @param timeout amount of to wait for a row. Non-positive values are interpreted as no timeout.
* @param timeUnit unit for timeout param.
* @return the row, if available; else, null.
*/
Row poll(long timeout, TimeUnit timeUnit);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vcrfxia As mentioned on the KLIP: why do we not use Duration instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the bump. I've updated the KLIP and will update the code in a future PR (along with a multitude of other feedback from the KLIP).


boolean isComplete();

void close();

}
Loading