describeConnector(String connectorName);
+ /**
+ * Define a session variable which can be referenced in sql commands by wrapping the variable name
+ * with {@code ${}}.
+ *
+ * If the variable is already defined, the existing value will be overridden.
+ *
+ * @param variable the name of the variable
+ * @param value the value represented by the variable
+ */
+ void define(String variable, Object value);
+
+ /**
+ * Undefine a session variable. {@code '${}'} will no longer be replaced in other
+ * functions.
+ *
+ * If the variable is not defined, then this method call is a no-op.
+ *
+ * @param variable the name of the variable to undefine
+ */
+ void undefine(String variable);
+
+ /**
+ * @return a map of the session variables and values used for variable substitution.
+ */
+ Map getVariables();
+
/**
* Closes the underlying HTTP client.
*/
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java
index b1e990a8f8eb..b8099c4e39d5 100644
--- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java
@@ -54,6 +54,7 @@
import java.nio.charset.Charset;
import java.util.Base64;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -79,6 +80,7 @@ public class ClientImpl implements Client {
private final SocketAddress serverSocketAddress;
private final String basicAuthHeader;
private final boolean ownedVertx;
+ private final Map sessionVariables;
/**
* {@code Client} instances should be created via {@link Client#create(ClientOptions)}, NOT via
@@ -105,6 +107,7 @@ private ClientImpl(final ClientOptions clientOptions, final Vertx vertx,
this.basicAuthHeader = createBasicAuthHeader(clientOptions);
this.serverSocketAddress =
SocketAddress.inetSocketAddress(clientOptions.getPort(), clientOptions.getHost());
+ this.sessionVariables = new HashMap<>();
}
@Override
@@ -217,7 +220,10 @@ public CompletableFuture executeStatement(
makePostRequest(
KSQL_ENDPOINT,
- new JsonObject().put("ksql", sql).put("streamsProperties", properties),
+ new JsonObject()
+ .put("ksql", sql)
+ .put("streamsProperties", properties)
+ .put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
response,
@@ -295,7 +301,9 @@ public CompletableFuture describeSource(final String sourceNa
makePostRequest(
KSQL_ENDPOINT,
- new JsonObject().put("ksql", "describe " + sourceName + ";"),
+ new JsonObject()
+ .put("ksql", "describe " + sourceName + ";")
+ .put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
response, cf, AdminResponseHandlers::handleDescribeSourceResponse)
@@ -334,8 +342,10 @@ public CompletableFuture createConnector(
makePostRequest(
KSQL_ENDPOINT,
- new JsonObject().put("ksql",
- String.format("CREATE %s CONNECTOR %s WITH (%s);", type, name, connectorConfigs)),
+ new JsonObject()
+ .put("ksql",
+ String.format("CREATE %s CONNECTOR %s WITH (%s);", type, name, connectorConfigs))
+ .put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
response, cf, ConnectorCommandResponseHandler::handleCreateConnectorResponse)
@@ -350,7 +360,9 @@ public CompletableFuture dropConnector(final String name) {
makePostRequest(
KSQL_ENDPOINT,
- new JsonObject().put("ksql", "drop connector " + name + ";"),
+ new JsonObject()
+ .put("ksql", "drop connector " + name + ";")
+ .put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
response, cf, ConnectorCommandResponseHandler::handleDropConnectorResponse)
@@ -380,7 +392,9 @@ public CompletableFuture describeConnector(final String na
makePostRequest(
KSQL_ENDPOINT,
- new JsonObject().put("ksql", "describe connector " + name + ";"),
+ new JsonObject()
+ .put("ksql", "describe connector " + name + ";")
+ .put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
response, cf, ConnectorCommandResponseHandler::handleDescribeConnectorsResponse)
@@ -389,6 +403,23 @@ public CompletableFuture describeConnector(final String na
return cf;
}
+ @Override
+ public void define(final String variable, final Object value) {
+ sessionVariables.put(variable, value);
+ }
+
+ @Override
+ public void undefine(final String variable) {
+ if (sessionVariables.containsKey(variable)) {
+ sessionVariables.remove(variable);
+ }
+ }
+
+ @Override
+ public Map getVariables() {
+ return new HashMap<>(sessionVariables);
+ }
+
@Override
public void close() {
httpClient.close();
@@ -413,7 +444,10 @@ private > void makeQueryRequest(
final T cf,
final StreamedResponseHandlerSupplier responseHandlerSupplier
) {
- final JsonObject requestBody = new JsonObject().put("sql", sql).put("properties", properties);
+ final JsonObject requestBody = new JsonObject()
+ .put("sql", sql)
+ .put("properties", properties)
+ .put("sessionVariables", sessionVariables);
makePostRequest(
QUERY_STREAM_ENDPOINT,
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java
index e3c8275114f8..67c4044e8fcc 100644
--- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java
@@ -15,6 +15,9 @@
package io.confluent.ksql.api.client.impl;
+import static io.confluent.ksql.api.client.impl.AdminResponseHandlers.isCreateConnectorResponse;
+import static io.confluent.ksql.api.client.impl.AdminResponseHandlers.isDropConnectorResponse;
+
import io.confluent.ksql.api.client.ConnectorDescription;
import io.confluent.ksql.api.client.ConnectorInfo;
import io.vertx.core.json.JsonArray;
@@ -33,7 +36,7 @@ static void handleCreateConnectorResponse(
final JsonObject connectorInfoEntity,
final CompletableFuture cf
) {
- if (connectorInfoEntity.getString("@type").equals("connector_info")) {
+ if (isCreateConnectorResponse(connectorInfoEntity)) {
cf.complete(null);
} else {
cf.completeExceptionally(new IllegalStateException(
@@ -46,7 +49,7 @@ static void handleDropConnectorResponse(
final JsonObject dropConnectorResponseEntity,
final CompletableFuture cf
) {
- if (dropConnectorResponseEntity.getString("@type").equals("drop_connector")) {
+ if (isDropConnectorResponse(dropConnectorResponseEntity)) {
cf.complete(null);
} else {
cf.completeExceptionally(new IllegalStateException(
diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java
index 1d9fab734b77..e5108c55538d 100644
--- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java
+++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java
@@ -30,7 +30,6 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
@@ -1475,30 +1474,179 @@ public void shouldDescribeConnector() throws Exception {
}
@Test
- public void shouldCreateConnector() {
+ public void shouldCreateConnector() throws Exception {
// Given
final CreateConnectorEntity entity = new CreateConnectorEntity("create connector;",
new ConnectorInfo("name", Collections.emptyMap(), Collections.emptyList(), SOURCE_TYPE));
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));
// When:
- final CompletableFuture result = javaClient.createConnector("name", true, Collections.EMPTY_MAP);
+ javaClient.createConnector("name", true, Collections.EMPTY_MAP).get();
// Then:
- assertTrue(result.complete(null));
+ assertThat(testEndpoints.getLastSql(), is("CREATE SOURCE CONNECTOR name WITH ();"));
}
@Test
- public void shouldDropConnector() {
+ public void shouldDropConnector() throws Exception {
// Given
final DropConnectorEntity entity = new DropConnectorEntity("drop connector;", "name");
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));
// When:
- final CompletableFuture result = javaClient.dropConnector("name");
+ javaClient.dropConnector("name").get();
// Then:
- assertTrue(result.complete(null));
+ assertThat(testEndpoints.getLastSql(), is("drop connector name;"));
+ }
+
+ @Test
+ public void shouldStoreVariables() {
+ // When:
+ javaClient.define("a", "aaa");
+ javaClient.define("a", "a");
+ javaClient.define("b", 5);
+ javaClient.define("c", "c");
+ javaClient.undefine("c");
+ javaClient.undefine("d");
+
+ // Then:
+ assertThat(javaClient.getVariables().size(), is(2));
+ assertThat(javaClient.getVariables().get("a"), is("a"));
+ assertThat(javaClient.getVariables().get("b"), is(5));
+ }
+
+ @Test
+ public void shouldSendSessionVariablesToKsqlEndpoint() throws Exception {
+ // Given:
+ javaClient.define("a", "a");
+ final CommandStatusEntity entity = new CommandStatusEntity(
+ "CSAS;",
+ new CommandId("STREAM", "FOO", "CREATE"),
+ new CommandStatus(
+ CommandStatus.Status.SUCCESS,
+ "Success",
+ Optional.of(new QueryId("CSAS_0"))
+ ),
+ 0L
+ );
+ testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));
+
+ // When:
+ javaClient.executeStatement("CSAS;").get();
+
+ // Then:
+ assertThat(testEndpoints.getLastSessionVariables(), is(new JsonObject().put("a", "a")));
+ }
+
+ @Test
+ public void shouldSendSessionVariablesWithExecuteQuery() throws Exception {
+ // Given
+ javaClient.define("a", "a");
+
+ // When
+ javaClient.executeQuery("query;").get();
+
+ // Then
+ assertThat(testEndpoints.getLastSessionVariables(), is(new JsonObject().put("a", "a")));
+ }
+
+ @Test
+ public void shouldSendSessionVariablesWithStreamQuery() throws Exception {
+ // Given
+ javaClient.define("a", "a");
+
+ // When
+ javaClient.streamQuery("query;").get();
+
+ // Then
+ assertThat(testEndpoints.getLastSessionVariables(), is(new JsonObject().put("a", "a")));
+ }
+
+ @Test
+ public void shouldSendSessionVariablesWithDescribeSource() throws Exception {
+ // Given
+ javaClient.define("a", "a");
+ final io.confluent.ksql.rest.entity.SourceDescription sd =
+ new io.confluent.ksql.rest.entity.SourceDescription(
+ "name",
+ Optional.of(WindowType.TUMBLING),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ "TABLE",
+ "",
+ "",
+ "",
+ false,
+ "KAFKA",
+ "JSON",
+ "topic",
+ 4,
+ 1,
+ "sql",
+ Collections.emptyList(),
+ ImmutableList.of("s1", "s2")
+ );
+ final SourceDescriptionEntity entity = new SourceDescriptionEntity(
+ "describe source;", sd, Collections.emptyList());
+ testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));
+
+ // When
+ javaClient.describeSource("source").get();
+
+ // Then
+ assertThat(testEndpoints.getLastSessionVariables(), is(new JsonObject().put("a", "a")));
+ }
+
+ @Test
+ public void shouldSendSessionVariablesWithCreateConnector() throws Exception {
+ // Given
+ javaClient.define("a", "a");
+ final CreateConnectorEntity entity = new CreateConnectorEntity("create connector;",
+ new ConnectorInfo("name", Collections.emptyMap(), Collections.emptyList(), SOURCE_TYPE));
+ testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));
+
+ // When:
+ javaClient.createConnector("name", true, Collections.EMPTY_MAP).get();
+
+ // Then:
+ assertThat(testEndpoints.getLastSessionVariables(), is(new JsonObject().put("a", "a")));
+ }
+
+ @Test
+ public void shouldSendSessionVariablesWithDescribeConnector() throws Exception {
+ // Given:
+ javaClient.define("a", "a");
+ final ConnectorDescription entity = new ConnectorDescription("describe connector;",
+ "connectorClass",
+ new ConnectorStateInfo(
+ "name",
+ new ConnectorState("state", "worker", "msg"),
+ Collections.emptyList(),
+ SOURCE_TYPE),
+ Collections.emptyList(), Collections.singletonList("topic"), Collections.emptyList());
+ testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));
+
+ // When:
+ javaClient.describeConnector("name").get();
+
+ // Then:
+ assertThat(testEndpoints.getLastSessionVariables(), is(new JsonObject().put("a", "a")));
+ }
+
+ @Test
+ public void shouldSendSessionVariablesWithDropConnector() throws Exception {
+ // Given:
+ javaClient.define("a", "a");
+ final DropConnectorEntity entity = new DropConnectorEntity("drop connector;", "name");
+ testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));
+
+ // When:
+ javaClient.dropConnector("name").get();
+
+ // Then:
+ assertThat(testEndpoints.getLastSessionVariables(), is(new JsonObject().put("a", "a")));
}
protected Client createJavaClient() {
diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java
index 11a12491a241..7bb2505b4783 100644
--- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java
+++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java
@@ -451,6 +451,18 @@ public void shouldExecutePullQuery() throws Exception {
verifyPullQueryRows(batchedQueryResult.get());
}
+ @Test
+ public void shouldExecutePullQueryWithVariables() throws Exception {
+ // When
+ client.define("AGG_TABLE", AGG_TABLE);
+ client.define("value", false);
+ final BatchedQueryResult batchedQueryResult = client.executeQuery("SELECT ${value} from ${AGG_TABLE} WHERE K=STRUCT(F1 := ARRAY['a']);");
+
+ // Then
+ assertThat(batchedQueryResult.queryID().get(), is(nullValue()));
+ assertThat(batchedQueryResult.get().get(0).getBoolean(1), is(false));
+ }
+
@Test
public void shouldExecutePushWithLimitQuery() throws Exception {
// When
@@ -462,6 +474,19 @@ public void shouldExecutePushWithLimitQuery() throws Exception {
verifyStreamRows(batchedQueryResult.get(), PUSH_QUERY_LIMIT_NUM_ROWS);
}
+ @Test
+ public void shouldExecutePushQueryWithVariables() throws Exception {
+ // When
+ client.define("TEST_STREAM", TEST_STREAM);
+ client.define("number", 4567);
+ final BatchedQueryResult batchedQueryResult =
+ client.executeQuery("SELECT ${number} FROM ${TEST_STREAM} EMIT CHANGES LIMIT " + PUSH_QUERY_LIMIT_NUM_ROWS + ";");
+
+ // Then
+ assertThat(batchedQueryResult.queryID().get(), is(notNullValue()));
+ assertThat(batchedQueryResult.get().get(0).getInteger(1), is(4567));
+ }
+
@Test
public void shouldHandleErrorResponseFromExecuteQuery() {
// When
@@ -1098,6 +1123,25 @@ public void shouldCreateConnector() throws Exception {
);
}
+ @Test
+ public void shouldCreateConnectorWithVariables() throws Exception {
+ // When:
+ client.define("class", MOCK_SOURCE_CLASS);
+ client.createConnector("FOO", true, ImmutableMap.of("connector.class", "${class}")).get();
+
+ // Then:
+ assertThatEventually(
+ () -> {
+ try {
+ return (client.describeConnector("FOO").get()).state();
+ } catch (InterruptedException | ExecutionException e) {
+ return null;
+ }
+ },
+ is("RUNNING")
+ );
+ }
+
private Client createClient() {
final ClientOptions clientOptions = ClientOptions.create()
.setHost("localhost")
diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java
index e8c234e19329..ef73c38d3dea 100644
--- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java
+++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java
@@ -124,6 +124,7 @@ public synchronized CompletableFuture executeKsqlRequest(
final ApiSecurityContext apiSecurityContext) {
this.lastSql = request.getKsql();
this.lastProperties = new JsonObject(request.getConfigOverrides());
+ this.lastSessionVariables = new JsonObject(request.getSessionVariables());
this.lastApiSecurityContext = apiSecurityContext;
CompletableFuture cf = new CompletableFuture<>();