Skip to content

Commit

Permalink
feat: enable variable substitution for java client (#7335)
Browse files Browse the repository at this point in the history
* feat: enable variable substitution for java client

* fix unit test

* address review comments

* add variable sunstitution unit tests for all methods

* address review comments
  • Loading branch information
Zara Lim authored Apr 6, 2021
1 parent c0e1e73 commit c82a072
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 16 deletions.
37 changes: 37 additions & 0 deletions docs/developer-guide/ksqldb-clients/java-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Use the Java client to:
- [Describe specific streams and tables](#describe-source)
- [Get metadata about the ksqlDB cluster](#server-info)
- [Manage, list and describe connectors](#connector-operations)
- [Define variables for substitution](#variable-substitution)

Get started below or skip to the end for full-fledged [examples](#tutorial-examples).

Expand Down Expand Up @@ -771,6 +772,42 @@ System.out.println(description.name()
);
```

Define variables for substitution<a name="variable-substitution"></a>
---------------------------------------------------------------

Starting with ksqlDB 0.18.0, users can define session variables by calling the [`define()`](/api/io/confluent/ksql/api/client/Client.html#define(java.lang.String,boolean,java.lang.Object)) method and
reference them in other functions by wrapping the variable name in `${}`. The [`undefine()`](/api/io/confluent/ksql/api/client/Client.html#undefine(java.lang.String)) method
undefines a session variable, and [`getVariables()`](/api/io/confluent/ksql/api/client/Client.html#getVariables()) returns a map of the currently defined variables
and their values. Substitution is supported for the following functions:
* `streamQuery`
* `executeQuery`
* `executeStatement`
* `describeSource`
* `createConnector`
* `dropConnector`
* `describeConnector`

### Example Usage ###
Define a new variable:
```java
client.define("topic", "stream-topic");
```

Use a variable in `executeStatement`:
```java
client.executeStatement("CREATE STREAM S (NAME STRING, AGE INTEGER) WITH (kafka_topic='${topic}', value_format='json');");
```

Undefine a variable:
```java
client.undefine("topic");
```

Get all variables:
```java
Map<String, Object> variables = client.getVariables();
```

Tutorial Examples<a name="tutorial-examples"></a>
-------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,32 @@ CompletableFuture<Void> createConnector(
*/
CompletableFuture<ConnectorDescription> describeConnector(String connectorName);

/**
* Define a session variable which can be referenced in sql commands by wrapping the variable name
* with {@code ${}}.
*
* <p>If the variable is already defined, the existing value will be overridden.
*
* @param variable the name of the variable
* @param value the value represented by the variable
*/
void define(String variable, Object value);

/**
* Undefine a session variable. {@code '${<variable name>}'} will no longer be replaced in other
* functions.
*
* <p>If the variable is not defined, then this method call is a no-op.
*
* @param variable the name of the variable to undefine
*/
void undefine(String variable);

/**
* @return a map of the session variables and values used for variable substitution.
*/
Map<String, Object> getVariables();

/**
* Closes the underlying HTTP client.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.nio.charset.Charset;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -79,6 +80,7 @@ public class ClientImpl implements Client {
private final SocketAddress serverSocketAddress;
private final String basicAuthHeader;
private final boolean ownedVertx;
private final Map<String, Object> sessionVariables;

/**
* {@code Client} instances should be created via {@link Client#create(ClientOptions)}, NOT via
Expand All @@ -105,6 +107,7 @@ private ClientImpl(final ClientOptions clientOptions, final Vertx vertx,
this.basicAuthHeader = createBasicAuthHeader(clientOptions);
this.serverSocketAddress =
SocketAddress.inetSocketAddress(clientOptions.getPort(), clientOptions.getHost());
this.sessionVariables = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -217,7 +220,10 @@ public CompletableFuture<ExecuteStatementResult> executeStatement(

makePostRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", sql).put("streamsProperties", properties),
new JsonObject()
.put("ksql", sql)
.put("streamsProperties", properties)
.put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
response,
Expand Down Expand Up @@ -295,7 +301,9 @@ public CompletableFuture<SourceDescription> describeSource(final String sourceNa

makePostRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", "describe " + sourceName + ";"),
new JsonObject()
.put("ksql", "describe " + sourceName + ";")
.put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
response, cf, AdminResponseHandlers::handleDescribeSourceResponse)
Expand Down Expand Up @@ -334,8 +342,10 @@ public CompletableFuture<Void> createConnector(

makePostRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql",
String.format("CREATE %s CONNECTOR %s WITH (%s);", type, name, connectorConfigs)),
new JsonObject()
.put("ksql",
String.format("CREATE %s CONNECTOR %s WITH (%s);", type, name, connectorConfigs))
.put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
response, cf, ConnectorCommandResponseHandler::handleCreateConnectorResponse)
Expand All @@ -350,7 +360,9 @@ public CompletableFuture<Void> dropConnector(final String name) {

makePostRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", "drop connector " + name + ";"),
new JsonObject()
.put("ksql", "drop connector " + name + ";")
.put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
response, cf, ConnectorCommandResponseHandler::handleDropConnectorResponse)
Expand Down Expand Up @@ -380,7 +392,9 @@ public CompletableFuture<ConnectorDescription> describeConnector(final String na

makePostRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", "describe connector " + name + ";"),
new JsonObject()
.put("ksql", "describe connector " + name + ";")
.put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
response, cf, ConnectorCommandResponseHandler::handleDescribeConnectorsResponse)
Expand All @@ -389,6 +403,23 @@ public CompletableFuture<ConnectorDescription> describeConnector(final String na
return cf;
}

@Override
public void define(final String variable, final Object value) {
sessionVariables.put(variable, value);
}

@Override
public void undefine(final String variable) {
if (sessionVariables.containsKey(variable)) {
sessionVariables.remove(variable);
}
}

@Override
public Map<String, Object> getVariables() {
return new HashMap<>(sessionVariables);
}

@Override
public void close() {
httpClient.close();
Expand All @@ -413,7 +444,10 @@ private <T extends CompletableFuture<?>> void makeQueryRequest(
final T cf,
final StreamedResponseHandlerSupplier<T> responseHandlerSupplier
) {
final JsonObject requestBody = new JsonObject().put("sql", sql).put("properties", properties);
final JsonObject requestBody = new JsonObject()
.put("sql", sql)
.put("properties", properties)
.put("sessionVariables", sessionVariables);

makePostRequest(
QUERY_STREAM_ENDPOINT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

package io.confluent.ksql.api.client.impl;

import static io.confluent.ksql.api.client.impl.AdminResponseHandlers.isCreateConnectorResponse;
import static io.confluent.ksql.api.client.impl.AdminResponseHandlers.isDropConnectorResponse;

import io.confluent.ksql.api.client.ConnectorDescription;
import io.confluent.ksql.api.client.ConnectorInfo;
import io.vertx.core.json.JsonArray;
Expand All @@ -33,7 +36,7 @@ static void handleCreateConnectorResponse(
final JsonObject connectorInfoEntity,
final CompletableFuture<Void> cf
) {
if (connectorInfoEntity.getString("@type").equals("connector_info")) {
if (isCreateConnectorResponse(connectorInfoEntity)) {
cf.complete(null);
} else {
cf.completeExceptionally(new IllegalStateException(
Expand All @@ -46,7 +49,7 @@ static void handleDropConnectorResponse(
final JsonObject dropConnectorResponseEntity,
final CompletableFuture<Void> cf
) {
if (dropConnectorResponseEntity.getString("@type").equals("drop_connector")) {
if (isDropConnectorResponse(dropConnectorResponseEntity)) {
cf.complete(null);
} else {
cf.completeExceptionally(new IllegalStateException(
Expand Down
Loading

0 comments on commit c82a072

Please sign in to comment.