From d6544f2e7e0c228a885e23aa424740eb74fd5bf5 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Tue, 26 Oct 2021 18:06:15 +0100 Subject: [PATCH] added test for ws, made ct in client string --- .../io/confluent/ksql/api/client/Row.java | 3 - .../ksql/api/client/impl/ClientImpl.java | 38 ++--- .../impl/ExecuteQueryResponseHandler.java | 11 +- .../ksql/api/client/impl/RowImpl.java | 16 -- .../impl/StreamQueryResponseHandler.java | 19 ++- .../client/impl/StreamedQueryResultImpl.java | 4 + .../ksql/api/client/impl/RowImplTest.java | 4 +- .../integration/ClientIntegrationTest.java | 74 +++------ .../RestClientIntegrationTest.java | 10 +- .../confluent/ksql/util/KeyValueMetadata.java | 9 ++ .../ksql/util/KsqlRequestConfig.java | 12 +- .../io/confluent/ksql/util/RowMetadata.java | 9 ++ .../ksql/physical/pull/PullQueryRow.java | 11 ++ .../ksql/api/impl/BlockingQueryPublisher.java | 1 - .../ksql/api/impl/QueryEndpoint.java | 16 +- .../streaming/PullQueryPublisher.java | 16 +- .../streaming/PullQueryStreamWriter.java | 2 +- .../streaming/StreamedQueryResource.java | 20 ++- .../resources/streaming/WSQueryEndpoint.java | 23 ++- .../PullQueryMetricsWSFunctionalTest.java | 8 +- .../ksql/rest/integration/RestApiTest.java | 144 ++++++++++++++++-- .../integration/RestIntegrationTestUtil.java | 13 +- .../ksql/rest/client/KsqlRestClient.java | 2 +- .../ksql/rest/entity/StreamedRowTest.java | 4 +- .../ksql/rest/entity/StreamedRow.java | 27 ++-- 25 files changed, 315 insertions(+), 181 deletions(-) diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Row.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Row.java index f7f89d45c614..d299abe2cfbb 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Row.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Row.java @@ -17,7 +17,6 @@ import java.math.BigDecimal; import java.util.List; -import java.util.Optional; /** * A single record, returned as part of a query result. @@ -278,6 +277,4 @@ public interface Row { * @throws IllegalArgumentException if the column name is invalid */ KsqlArray getKsqlArray(String columnName); - - Optional getSerializedConsistencyVector(); } \ No newline at end of file diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java index 1001dc4eaccd..d506c90696e3 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java @@ -20,8 +20,6 @@ import static io.netty.handler.codec.http.HttpResponseStatus.OK; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.api.client.AcksPublisher; import io.confluent.ksql.api.client.BatchedQueryResult; import io.confluent.ksql.api.client.Client; @@ -66,6 +64,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; import org.reactivestreams.Publisher; @@ -87,10 +86,11 @@ public class ClientImpl implements Client { private final String basicAuthHeader; private final boolean ownedVertx; private final Map sessionVariables; - private final ConsistencyOffsetVector consistencyOffsetVector; private final Map requestProperties; private final LocalPropertyParser parser; + private AtomicReference serializedConsistencyVector; + /** * {@code Client} instances should be created via {@link Client#create(ClientOptions)}, NOT via * this constructor. @@ -117,7 +117,7 @@ private ClientImpl(final ClientOptions clientOptions, final Vertx vertx, this.serverSocketAddress = SocketAddress.inetSocketAddress(clientOptions.getPort(), clientOptions.getHost()); this.sessionVariables = new HashMap<>(); - this.consistencyOffsetVector = new ConsistencyOffsetVector(); + this.serializedConsistencyVector = new AtomicReference<>(); this.requestProperties = new HashMap<>(); this.parser = new LocalPropertyParser(); } @@ -134,12 +134,13 @@ public CompletableFuture streamQuery( ) { if (ConsistencyOffsetVector.isConsistencyVectorEnabled(requestProperties)) { requestProperties.put( - KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN, - getConsistencyOffsetVector().serialize()); + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR, + serializedConsistencyVector.get()); } final CompletableFuture cf = new CompletableFuture<>(); makeQueryRequest(sql, properties, cf, - (ctx, rp, fut, req) -> new StreamQueryResponseHandler(ctx, rp, fut)); + (ctx, rp, fut, req) -> new StreamQueryResponseHandler( + ctx, rp, fut, serializedConsistencyVector)); return cf; } @@ -155,8 +156,8 @@ public BatchedQueryResult executeQuery( ) { if (ConsistencyOffsetVector.isConsistencyVectorEnabled(requestProperties)) { requestProperties.put( - KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN, - getConsistencyOffsetVector().serialize()); + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR, + serializedConsistencyVector.get()); } final BatchedQueryResult result = new BatchedQueryResultImpl(); makeQueryRequest( @@ -164,7 +165,8 @@ public BatchedQueryResult executeQuery( properties, result, (context, recordParser, cf, request) -> new ExecuteQueryResponseHandler( - context, recordParser, cf, clientOptions.getExecuteQueryMaxResultRows()) + context, recordParser, cf, clientOptions.getExecuteQueryMaxResultRows(), + serializedConsistencyVector) ); return result; } @@ -440,24 +442,15 @@ public Map getVariables() { return new HashMap<>(sessionVariables); } - public Object getRequestProperty(final String key) { - return requestProperties.get(key); - } - public void setRequestProperty(final String key, final Object value) { Objects.requireNonNull(value, "value"); final Object parsed = parser.parse(key, value); requestProperties.put(key, parsed); } - public Map toMap() { - return ImmutableMap.copyOf(requestProperties); - } - @VisibleForTesting - @SuppressFBWarnings(value = "EI_EXPOSE_REP") - public ConsistencyOffsetVector getConsistencyOffsetVector() { - return consistencyOffsetVector; + public String getSerializedConsistencyVector() { + return serializedConsistencyVector.get(); } @Override @@ -585,7 +578,7 @@ private HttpClientRequest configureBasicAuth(final HttpClientRequest request) { return request.putHeader(AUTHORIZATION.toString(), basicAuthHeader); } - private static > void handleStreamedResponse( + private > void handleStreamedResponse( final HttpClientResponse response, final T cf, final StreamedResponseHandlerSupplier responseHandlerSupplier) { @@ -593,7 +586,6 @@ private static > void handleStreamedResponse( final RecordParser recordParser = RecordParser.newDelimited("\n", response); final ResponseHandler responseHandler = responseHandlerSupplier.get(Vertx.currentContext(), recordParser, cf, response.request()); - recordParser.handler(responseHandler::handleBodyBuffer); recordParser.endHandler(responseHandler::handleBodyEnd); recordParser.exceptionHandler(responseHandler::handleException); diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ExecuteQueryResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ExecuteQueryResponseHandler.java index b8e65f635257..01c35bf465fd 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ExecuteQueryResponseHandler.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ExecuteQueryResponseHandler.java @@ -28,7 +28,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,15 +42,18 @@ public class ExecuteQueryResponseHandler extends QueryResponseHandler columnNames; private List columnTypes; private Map columnNameToIndex; + private AtomicReference serializedConsistencyVector; ExecuteQueryResponseHandler( final Context context, final RecordParser recordParser, final BatchedQueryResult cf, - final int maxRows) { + final int maxRows, + final AtomicReference serializedCV) { super(context, recordParser, cf); this.maxRows = maxRows; this.rows = new ArrayList<>(); + this.serializedConsistencyVector = Objects.requireNonNull(serializedCV, "serializedCV"); } @Override @@ -65,8 +69,7 @@ protected void handleRow(final Buffer buff) { final Row row; final Object json = buff.toJson(); if (json instanceof String) { - row = new RowImpl(Optional.of((String)json)); - addToList(row); + serializedConsistencyVector.set((String)json); } else if (json instanceof JsonArray) { final JsonArray values = new JsonArray(buff); row = new RowImpl(columnNames, columnTypes, values, columnNameToIndex); diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/RowImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/RowImpl.java index 7512fd67b3b8..8538a8037a35 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/RowImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/RowImpl.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; public class RowImpl implements Row { @@ -35,7 +34,6 @@ public class RowImpl implements Row { private final ImmutableList columnTypes; private final KsqlArray values; private final ImmutableMap columnNameToIndex; - private final Optional serializedConsistencyVector; public RowImpl( final List columnNames, @@ -46,15 +44,6 @@ public RowImpl( this.columnTypes = ImmutableList.copyOf(Objects.requireNonNull(columnTypes)); this.values = new KsqlArray(Objects.requireNonNull(values).getList()); this.columnNameToIndex = ImmutableMap.copyOf(Objects.requireNonNull(columnNameToIndex)); - this.serializedConsistencyVector = Optional.empty(); - } - - public RowImpl(final Optional serializedConsistencyVector) { - this.columnNames = null; - this.columnTypes = null; - this.values = null; - this.columnNameToIndex = null; - this.serializedConsistencyVector = Objects.requireNonNull(serializedConsistencyVector); } @Override @@ -189,11 +178,6 @@ public KsqlArray getKsqlArray(final String columnName) { return getKsqlArray(indexFromName(columnName)); } - @Override - public Optional getSerializedConsistencyVector() { - return serializedConsistencyVector; - } - private int indexFromName(final String columnName) { final Integer index = columnNameToIndex.get(columnName); if (index == null) { diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamQueryResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamQueryResponseHandler.java index 23e7e1593ada..af5e85d0ae4a 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamQueryResponseHandler.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamQueryResponseHandler.java @@ -26,8 +26,9 @@ import io.vertx.core.json.JsonObject; import io.vertx.core.parsetools.RecordParser; import java.util.Map; -import java.util.Optional; +import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; public class StreamQueryResponseHandler extends QueryResponseHandler> { @@ -35,10 +36,13 @@ public class StreamQueryResponseHandler private StreamedQueryResultImpl queryResult; private Map columnNameToIndex; private boolean paused; + private AtomicReference serializedConsistencyVector; StreamQueryResponseHandler(final Context context, final RecordParser recordParser, - final CompletableFuture cf) { + final CompletableFuture cf, + final AtomicReference serializedCV) { super(context, recordParser, cf); + this.serializedConsistencyVector = Objects.requireNonNull(serializedCV, "serializedCV"); } @Override @@ -58,17 +62,12 @@ protected void handleRow(final Buffer buff) { if (queryResult == null) { throw new IllegalStateException("handleRow called before metadata processed"); } - final Object json = buff.toJson(); final Row row; if (json instanceof String) { - row = new RowImpl(Optional.of((String)json)); - final boolean full = queryResult.accept(row); - if (full && !paused) { - recordParser.pause(); - queryResult.drainHandler(this::publisherReceptive); - paused = true; - } + // This is the serialized consistency vector + // Don't add it to the publisher's buffer since the user should not see it + serializedConsistencyVector.set((String)json); } else if (json instanceof JsonArray) { row = new RowImpl( queryResult.columnNames(), diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamedQueryResultImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamedQueryResultImpl.java index 0bb5e9a1ee23..d37828481111 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamedQueryResultImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamedQueryResultImpl.java @@ -123,6 +123,10 @@ public void handleError(final Exception e) { sendError(e); } + public boolean isPolling() { + return polling; + } + private void handleErrorWhilePolling(final Throwable t) { log.error("Unexpected error while polling: " + t); } diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/RowImplTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/RowImplTest.java index dbbc9f49f595..ce939f943567 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/RowImplTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/RowImplTest.java @@ -27,11 +27,9 @@ import io.confluent.ksql.api.client.KsqlArray; import io.confluent.ksql.api.client.KsqlObject; import io.confluent.ksql.api.client.util.RowUtil; -import io.netty.buffer.ByteBuf; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import java.math.BigDecimal; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import org.junit.Before; @@ -63,6 +61,7 @@ public class RowImplTest { .add("04:40:34.789"); private RowImpl row; + private RowImpl rowWithCV; @Before public void setUp() { @@ -224,5 +223,4 @@ public void shouldImplementHashCodeAndEquals() { ) .testEquals(); } - } \ No newline at end of file diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index 8946b3c99cb7..c69d3ea4a41c 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -31,7 +31,6 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; @@ -450,9 +449,6 @@ public void shouldStreamPullQueryOnTableAsync() throws Exception { public void shouldRoundTripCVWhenPullQueryOnTableAsync() throws Exception { // Given ((ClientImpl)client).setRequestProperty(KsqlConfig.KSQL_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR_ENABLED, true); - ((ClientImpl)client).getConsistencyOffsetVector().setVersion(1); - ((ClientImpl)client).getConsistencyOffsetVector().setOffsetVector( - ImmutableMap.of("Vicky", ImmutableMap.of(1, 1L, 2, 2L, 3, 3L))); // When final StreamedQueryResult streamedQueryResult = client.streamQuery(PULL_QUERY_ON_TABLE).get(); @@ -460,11 +456,14 @@ public void shouldRoundTripCVWhenPullQueryOnTableAsync() throws Exception { // Then shouldReceiveRows( streamedQueryResult, - 2 , - ClientIntegrationTest::verifyRowsWithConsistencyVector, + 1, + (v) -> {}, //do nothing true ); + assertThat(((ClientImpl)client).getSerializedConsistencyVector(), is(notNullValue())); + final String serializedCV = ((ClientImpl)client).getSerializedConsistencyVector(); + verifyConsistencyVector(serializedCV); assertThatEventually(streamedQueryResult::isComplete, is(true)); } @@ -472,9 +471,6 @@ public void shouldRoundTripCVWhenPullQueryOnTableAsync() throws Exception { public void shouldNotRoundTripCVWhenPullQueryOnTableAsync() throws Exception { // Given ((ClientImpl)client).setRequestProperty(KsqlConfig.KSQL_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR_ENABLED, false); - ((ClientImpl)client).getConsistencyOffsetVector().setVersion(1); - ((ClientImpl)client).getConsistencyOffsetVector().setOffsetVector( - ImmutableMap.of("Vicky", ImmutableMap.of(1, 1L, 2, 2L, 3, 3L))); // When final StreamedQueryResult streamedQueryResult = client.streamQuery(PULL_QUERY_ON_TABLE).get(); @@ -483,7 +479,7 @@ public void shouldNotRoundTripCVWhenPullQueryOnTableAsync() throws Exception { shouldReceiveRows( streamedQueryResult, 1 , - ClientIntegrationTest::verifyRowsWithoutConsistencyVector, + (v) -> {}, //do nothing true ); @@ -511,23 +507,16 @@ public void shouldStreamPullQueryOnTableSync() throws Exception { public void shouldRoundTripCVWhenPullQueryOnTableSync() throws Exception { // Given ((ClientImpl)client).setRequestProperty(KsqlConfig.KSQL_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR_ENABLED, true); - ((ClientImpl)client).getConsistencyOffsetVector().setVersion(1); - ((ClientImpl)client).getConsistencyOffsetVector().setOffsetVector( - ImmutableMap.of("Vicky", ImmutableMap.of(1, 1L, 2, 2L, 3, 3L))); - client.streamQuery(PULL_QUERY_ON_TABLE).get(); // When final StreamedQueryResult streamedQueryResult = client.streamQuery(PULL_QUERY_ON_TABLE).get(); - - // Then - // Poll row of data streamedQueryResult.poll(); - // Next row is CT - final Row row = streamedQueryResult.poll(); - assertThat(row, not(nullValue())); - assertThat(row.getSerializedConsistencyVector(), not(Optional.empty())); - verifyConsistencyVector(row); + // Then + assertThatEventually(() -> ((ClientImpl)client).getSerializedConsistencyVector(), + is(notNullValue())); + final String serializedCV = ((ClientImpl)client).getSerializedConsistencyVector(); + verifyConsistencyVector(serializedCV); assertThatEventually(streamedQueryResult::isComplete, is(true)); } @@ -637,34 +626,30 @@ public void shouldExecutePullQueryWithVariables() throws Exception { public void shouldRoundTripCVWhenExecutePullQuery() throws Exception { // Given ((ClientImpl)client).setRequestProperty(KsqlConfig.KSQL_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR_ENABLED, true); - ((ClientImpl)client).getConsistencyOffsetVector().setVersion(1); - ((ClientImpl)client).getConsistencyOffsetVector().setOffsetVector( - ImmutableMap.of("Vicky", ImmutableMap.of(1, 1L, 2, 2L, 3, 3L))); // When final BatchedQueryResult batchedQueryResult = client.executeQuery(PULL_QUERY_ON_TABLE); + batchedQueryResult.get(); // Then assertThat(batchedQueryResult.queryID().get(), is(nullValue())); - - verifyRowsWithConsistencyVector(batchedQueryResult.get()); + assertThat(((ClientImpl)client).getSerializedConsistencyVector(), is(notNullValue())); + final String serializedCV = ((ClientImpl)client).getSerializedConsistencyVector(); + verifyConsistencyVector(serializedCV); } @Test public void shouldNotRoundTripCVWhenExecutePullQuery() throws Exception { // Given ((ClientImpl)client).setRequestProperty(KsqlConfig.KSQL_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR_ENABLED, false); - ((ClientImpl)client).getConsistencyOffsetVector().setVersion(1); - ((ClientImpl)client).getConsistencyOffsetVector().setOffsetVector( - ImmutableMap.of("Vicky", ImmutableMap.of(1, 1L, 2, 2L, 3, 3L))); // When final BatchedQueryResult batchedQueryResult = client.executeQuery(PULL_QUERY_ON_TABLE); + batchedQueryResult.get(); // Then assertThat(batchedQueryResult.queryID().get(), is(nullValue())); - - verifyRowsWithoutConsistencyVector(batchedQueryResult.get()); + assertThat(((ClientImpl)client).getSerializedConsistencyVector(), is(nullValue())); } @@ -1248,30 +1233,11 @@ private static void verifyStreamRowWithIndex(final Row row, final int index) { assertThat(obj.toString(), is(obj.toJsonString())); } - private static void verifyRowsWithConsistencyVector(final List rows) { - assertThat(rows, hasSize(2)); - assertThat(rows.get(1).getSerializedConsistencyVector(), not(Optional.empty())); - final ConsistencyOffsetVector cvResponse = new ConsistencyOffsetVector(); - cvResponse.deserialize(rows.get(1).getSerializedConsistencyVector().get()); - assertThat(cvResponse.getVersion(), is(2)); - assertThat(cvResponse.getOffsetVector().keySet(), hasSize(2)); - assertThat(cvResponse.getTopicOffsets("dummy").keySet(), hasSize(3)); - assertThat(cvResponse.getTopicOffsets("dummy").get(5), is(5L)); - assertThat(cvResponse.getTopicOffsets("dummy").get(6), is(6L)); - assertThat(cvResponse.getTopicOffsets("dummy").get(7), is(7L)); - } - - private static void verifyRowsWithoutConsistencyVector(final List rows) { - assertThat(rows, hasSize(1)); - assertThat(rows.get(0).getSerializedConsistencyVector(), is(Optional.empty())); - } - - - private static void verifyConsistencyVector(final Row row) { + private static void verifyConsistencyVector(final String serializedCV) { final ConsistencyOffsetVector cvResponse = new ConsistencyOffsetVector(); - cvResponse.deserialize(row.getSerializedConsistencyVector().get()); + cvResponse.deserialize(serializedCV); assertThat(cvResponse.getVersion(), is(2)); - assertThat(cvResponse.getOffsetVector().keySet(), hasSize(2)); + assertThat(cvResponse.getOffsetVector().keySet(), hasSize(1)); assertThat(cvResponse.getTopicOffsets("dummy").keySet(), hasSize(3)); assertThat(cvResponse.getTopicOffsets("dummy").get(5), is(5L)); assertThat(cvResponse.getTopicOffsets("dummy").get(6), is(6L)); diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/RestClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/RestClientIntegrationTest.java index 7dffca041a52..de61126d0715 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/RestClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/RestClientIntegrationTest.java @@ -251,13 +251,11 @@ public void shouldRoundTripConsistencyVectorWhenEnabled() throws Exception { ksqlRestClient.makeQueryRequestStreamed(PULL_QUERY_ON_TABLE, 1L); final List rows = getElementsFromPublisher(4, response.getResponse()); - System.out.println(rows); assertThat(rows, hasSize(3)); - assertThat(rows.get(2).getSerializedCV().get(), not(Optional.empty())); - final String serialized = rows.get(2).getSerializedCV().get(); + assertThat(rows.get(2).getSerializedConsistencyToken().get(), not(Optional.empty())); + final String serialized = rows.get(2).getSerializedConsistencyToken().get(); final ConsistencyOffsetVector cvResponse = new ConsistencyOffsetVector(); cvResponse.deserialize(serialized); - System.out.println(cvResponse); assertThat(cvResponse.getVersion(), is(2)); assertThat(cvResponse.getOffsetVector().keySet(), hasSize(2)); assertThat(cvResponse.getTopicOffsets("dummy").keySet(), hasSize(3)); @@ -278,8 +276,8 @@ public void shouldNotRoundTripConsistencyVectorWhenDisabled() throws Exception { final List rows = getElementsFromPublisher(4, response.getResponse()); assertThat(rows, hasSize(2)); - assertThat(rows.get(0).getSerializedCV(), is(Optional.empty())); - assertThat(rows.get(1).getSerializedCV(), is(Optional.empty())); + assertThat(rows.get(0).getSerializedConsistencyToken(), is(Optional.empty())); + assertThat(rows.get(1).getSerializedConsistencyToken(), is(Optional.empty())); } diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KeyValueMetadata.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KeyValueMetadata.java index 1d5fdf95822c..5d164cea16c2 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KeyValueMetadata.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KeyValueMetadata.java @@ -74,4 +74,13 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(keyValue, rowMetadata); } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("KeyValueMetadata{"); + sb.append("keyValue=").append(keyValue); + sb.append(", rowMetadata=").append(rowMetadata); + sb.append('}'); + return sb.toString(); + } } diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlRequestConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlRequestConfig.java index f5f363bff9c2..9e6378c26809 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlRequestConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlRequestConfig.java @@ -63,10 +63,10 @@ public class KsqlRequestConfig extends AbstractConfig { "Indicates whether a connecting node expects to be at the start of the registry data. After a" + "rebalance, this ensures we don't miss any data."; - public static final String KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN = + public static final String KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR = "request.ksql.query.pull.consistency.token"; - public static final String KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN_DEFAULT = ""; - private static final String KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN_DOC = + public static final String KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR_DEFAULT = ""; + private static final String KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR__DOC = "Indicates the offsets of the last read."; @@ -109,11 +109,11 @@ private static ConfigDef buildConfigDef() { ConfigDef.Importance.LOW, KSQL_REQUEST_QUERY_PUSH_REGISTRY_START_DOC ).define( - KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN, + KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR, Type.STRING, - KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN_DEFAULT, + KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR_DEFAULT, ConfigDef.Importance.LOW, - KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN_DOC + KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR__DOC ); return configDef; } diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/RowMetadata.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/RowMetadata.java index 9a82a136017b..151f20b16be4 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/RowMetadata.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/RowMetadata.java @@ -59,4 +59,13 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(pushOffsetsRange, consistencyOffsetVector); } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("RowMetadata{"); + sb.append("pushOffsetsRange=").append(pushOffsetsRange); + sb.append(", consistencyOffsetVector=").append(consistencyOffsetVector); + sb.append('}'); + return sb.toString(); + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryRow.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryRow.java index 73e84243163f..91836f80183f 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryRow.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryRow.java @@ -64,6 +64,17 @@ public GenericRow getGenericRow() { return toGenericRow(row); } + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("PullQueryRow{"); + sb.append("row=").append(row); + sb.append(", schema=").append(schema); + sb.append(", sourceNode=").append(sourceNode); + sb.append(", consistencyOffsetVector=").append(consistencyOffsetVector); + sb.append('}'); + return sb.toString(); + } + private static GenericRow toGenericRow(final List values) { return new GenericRow().appendAll(values); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/BlockingQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/BlockingQueryPublisher.java index be75c180b6b6..6fb88155b51e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/BlockingQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/BlockingQueryPublisher.java @@ -170,7 +170,6 @@ private void doSend() { int num = 0; while (getDemand() > 0 && !queue.isEmpty()) { if (num < SEND_MAX_BATCH_SIZE) { - System.out.println("Poll next row"); doOnNext(queue.poll()); if (complete && isPullQuery && !addedCT.get()) { queryHandle.getConsistencyOffsetVector().ifPresent( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index 0a820d358d1e..82a4fc6e2bd8 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -151,11 +151,17 @@ public QueryPublisher createQueryPublisher( final DataSource.DataSourceType dataSourceType = dataSource.getDataSourceType(); Optional consistencyOffsetVector = Optional.empty(); if (requestProperties.containsKey( - KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN)) { - final ConsistencyOffsetVector ct = new ConsistencyOffsetVector(); - ct.deserialize(((String)requestProperties.get( - KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN))); - consistencyOffsetVector = Optional.of(ct); + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR)) { + final ConsistencyOffsetVector cv = new ConsistencyOffsetVector(); + consistencyOffsetVector = Optional.of(cv); + final String serializedCV = (String)requestProperties.get( + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR); + // The consistency vector is not initialized on the first request, needs the first response + // from the server + if (serializedCV != null) { + cv.deserialize(((String) requestProperties.get( + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR))); + } } switch (dataSourceType) { case KTABLE: diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index 573ad7d82fd4..bfb64516c16c 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -194,7 +194,11 @@ private PullQuerySubscription( this.subscriber = requireNonNull(subscriber, "subscriber"); this.result = requireNonNull(result, "result"); - result.onCompletion(v -> setDone()); + result.onCompletion(v -> { + result.getConsistencyOffsetVector().ifPresent( + result.getPullQueryQueue()::putConsistencyVector); + setDone(); + }); result.onException(this::setError); } @@ -206,7 +210,15 @@ Collection poll() { return null; } else { return rows.stream() - .map(kv -> StreamedRow.pushRow(kv.getKeyValue().value())) + .map(kv -> { + if (kv.getRowMetadata().isPresent() + && kv.getRowMetadata().get().getConsistencyOffsetVector().isPresent()) { + return StreamedRow.consistencyToken(Optional.of( + kv.getRowMetadata().get().getConsistencyOffsetVector().get().serialize())); + } else { + return StreamedRow.pushRow(kv.getKeyValue().value()); + } + }) .collect(Collectors.toCollection(Lists::newLinkedList)); } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryStreamWriter.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryStreamWriter.java index 1dc833bb88d8..aba5f6189b19 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryStreamWriter.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryStreamWriter.java @@ -179,7 +179,7 @@ private void writeRow( } StreamedRow streamedRow = null; if (row.getConsistencyOffsetVector().isPresent()) { - streamedRow = StreamedRow.consistencyOffsetVector(Optional.of( + streamedRow = StreamedRow.consistencyToken(Optional.of( row.getConsistencyOffsetVector().get().serialize())); } else { streamedRow = StreamedRow.pullRow(row.getGenericRow(), toKsqlHostInfo(row.getSourceNode())); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 946ab721fb9e..72e87e7ddb4f 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -491,13 +491,19 @@ private EndpointResponse handleTablePullQuery( pullBandRateLimiter.allow(KsqlQueryType.PULL); final Optional optionalDecrementer = Optional.ofNullable(decrementer); - Optional optionalCT = Optional.empty(); + Optional consistencyOffsetVector = Optional.empty(); if (requestProperties.containsKey( - KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN)) { - final ConsistencyOffsetVector ct = new ConsistencyOffsetVector(); - ct.deserialize(((String)requestProperties.get( - KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN))); - optionalCT = Optional.of(ct); + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR)) { + final ConsistencyOffsetVector cv = new ConsistencyOffsetVector(); + consistencyOffsetVector = Optional.of(cv); + final String serializedCV = (String) requestProperties.get( + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR); + // The consistency vector is not initialized on the first request, needs the first response + // from the server + if (serializedCV != null) { + cv.deserialize(((String) requestProperties.get( + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR))); + } } final PullQueryResult result = ksqlEngine.executeTablePullQuery( @@ -509,7 +515,7 @@ private EndpointResponse handleTablePullQuery( plannerOptions, pullQueryMetrics, true, - optionalCT + consistencyOffsetVector ); resultForMetrics.set(result); result.onCompletionOrException( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index 9f60f7da6dce..bfae5e0c0b42 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -81,8 +81,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling public class WSQueryEndpoint { - + // CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling private static final Logger log = LoggerFactory.getLogger(WSQueryEndpoint.class); private final KsqlConfig ksqlConfig; @@ -301,12 +302,20 @@ private void handleQuery(final RequestContext info, final Query query, final DataSource dataSource = analysis.getFrom().getDataSource(); final DataSource.DataSourceType dataSourceType = dataSource.getDataSourceType(); final Map requestProperties = info.request.getRequestProperties(); - final Optional consistencyOffsetVector = - requestProperties.containsKey(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN) - ? Optional.of((ConsistencyOffsetVector)requestProperties.get( - KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN)) - : Optional.empty(); - + Optional consistencyOffsetVector = Optional.empty(); + if (requestProperties.containsKey( + KsqlConfig.KSQL_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR_ENABLED)) { + final ConsistencyOffsetVector cv = new ConsistencyOffsetVector(); + consistencyOffsetVector = Optional.of(cv); + final String serializedCV = (String) requestProperties.get( + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR); + // The consistency vector is not initialized on the first request, needs the first response + // from the server + if (serializedCV != null) { + cv.deserialize(((String) requestProperties.get( + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR))); + } + } switch (dataSourceType) { case KTABLE: { new PullQueryPublisher( diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryMetricsWSFunctionalTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryMetricsWSFunctionalTest.java index 9d0b004378ee..8e40c9c3886c 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryMetricsWSFunctionalTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryMetricsWSFunctionalTest.java @@ -224,7 +224,9 @@ public void shouldVerifyMetrics() { "SELECT COUNT, USERID from " + AGG_TABLE + " WHERE USERID='" + AN_AGG_KEY + "';", Optional.of(MediaType.APPLICATION_JSON), Optional.of(MediaType.APPLICATION_JSON), - Optional.of(SUPER_USER) + Optional.of(SUPER_USER), + Optional.empty(), + Optional.empty() ); RestIntegrationTestUtil.makeWsRequest( @@ -232,7 +234,9 @@ public void shouldVerifyMetrics() { "SELECT * from " + PAGE_VIEW_STREAM + " WHERE PAGEID='" + A_STREAM_KEY + "';", Optional.of(MediaType.APPLICATION_JSON), Optional.of(MediaType.APPLICATION_JSON), - Optional.of(SUPER_USER) + Optional.of(SUPER_USER), + Optional.empty(), + Optional.empty() ); // Then: diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java index 8ba1bbbebacf..a1d8383bda80 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java @@ -278,7 +278,9 @@ public void shouldExecutePushQueryThatReturnsStreamOverWebSocketWithV1ContentTyp final List messages = makeWebSocketRequest( "SELECT * from " + PAGE_VIEW_STREAM + " EMIT CHANGES LIMIT " + LIMIT + ";", KsqlMediaType.KSQL_V1_JSON.mediaType(), - KsqlMediaType.KSQL_V1_JSON.mediaType() + KsqlMediaType.KSQL_V1_JSON.mediaType(), + Optional.empty(), + Optional.empty() ); // Then: @@ -303,7 +305,9 @@ public void shouldExecutePushQueryThatReturnsTableOverWebSocketWithV1ContentType final List messages = makeWebSocketRequest( "SELECT VAL from " + TOMBSTONE_TABLE + " EMIT CHANGES LIMIT " + LIMIT + ";", KsqlMediaType.KSQL_V1_JSON.mediaType(), - KsqlMediaType.KSQL_V1_JSON.mediaType() + KsqlMediaType.KSQL_V1_JSON.mediaType(), + Optional.empty(), + Optional.empty() ); // Then: @@ -326,7 +330,9 @@ public void shouldExecutePushQueryThatReturnsStreamOverWebSocketWithJsonContentT final List messages = makeWebSocketRequest( "SELECT * from " + PAGE_VIEW_STREAM + " EMIT CHANGES LIMIT " + LIMIT + ";", MediaType.APPLICATION_JSON, - MediaType.APPLICATION_JSON + MediaType.APPLICATION_JSON, + Optional.empty(), + Optional.empty() ); // Then: @@ -351,7 +357,9 @@ public void shouldExecutePullQueryThatReturnsStreamOverWebSocketWithJsonContentT final List messages = makeWebSocketRequest( "SELECT * from " + PAGE_VIEW_STREAM + ";", MediaType.APPLICATION_JSON, - MediaType.APPLICATION_JSON + MediaType.APPLICATION_JSON, + Optional.empty(), + Optional.empty() ); // Then: @@ -381,7 +389,9 @@ public void shouldExecutePushQueryThatReturnsTableOverWebSocketWithJsonContentTy final List messages = makeWebSocketRequest( "SELECT VAL from " + TOMBSTONE_TABLE + " EMIT CHANGES LIMIT " + LIMIT + ";", MediaType.APPLICATION_JSON, - MediaType.APPLICATION_JSON + MediaType.APPLICATION_JSON, + Optional.empty(), + Optional.empty() ); // Then: @@ -750,7 +760,9 @@ public void shouldExecutePullQueryOverWebSocketWithV1ContentType() { final Supplier> call = () -> makeWebSocketRequest( "SELECT * from " + AGG_TABLE + " WHERE USERID='" + AN_AGG_KEY + "';", KsqlMediaType.KSQL_V1_JSON.mediaType(), - KsqlMediaType.KSQL_V1_JSON.mediaType() + KsqlMediaType.KSQL_V1_JSON.mediaType(), + Optional.empty(), + Optional.empty() ); // Then: @@ -765,13 +777,63 @@ public void shouldExecutePullQueryOverWebSocketWithV1ContentType() { assertThat(messages.get(1), is("{\"row\":{\"columns\":[\"USER_1\",1]}}")); } + @Test + public void shouldRoundTripCVPullQueryOverWebSocketWithV1ContentType() { + // When: + Map requestProperties = ImmutableMap.of( + KsqlConfig.KSQL_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR_ENABLED, true); + final Supplier> call = () -> makeWebSocketRequest( + "SELECT * from " + AGG_TABLE + " WHERE USERID='" + AN_AGG_KEY + "';", + KsqlMediaType.KSQL_V1_JSON.mediaType(), + KsqlMediaType.KSQL_V1_JSON.mediaType(), + Optional.empty(), + Optional.of(requestProperties) + ); + + // Then: + final List messages = assertThatEventually(call, hasSize(HEADER + 3)); + assertValidJsonMessages(messages); + assertThat(messages.get(2), is("{\"serializedConsistencyToken\":\"rO0ABXNyAC5pby5jb25mbHVlbnQu" + + "a3NxbC51dGlsLkNvbnNpc3RlbmN5T2Zmc2V0VmVjdG9yJ25ZG1n38BwC" + + "AANJAAd2ZXJzaW9uTAAMb2Zmc2V0VmVjdG9ydAAPTGphdmEvdXRpbC9N" + + "YXA7TAAGcndMb2NrdAAqTGphdmEvdXRpbC9jb25jdXJyZW50L2xvY2tz" + + "L1JlYWRXcml0ZUxvY2s7eHAAAAACc3IAEWphdmEudXRpbC5IYXNoTWFwB" + + "QfawcMWYNEDAAJGAApsb2FkRmFjdG9ySQAJdGhyZXNob2xkeHA/QAAAAA" + + "AADHcIAAAAEAAAAAF0AAVkdW1teXNxAH4ABD9AAAAAAAAGdwgAAAAIAAA" + + "AA3NyABFqYXZhLmxhbmcuSW50ZWdlchLioKT3gYc4AgABSQAFdmFsdWV4" + + "cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAAFc3IADmphd" + + "mEubGFuZy5Mb25nO4vkkMyPI98CAAFKAAV2YWx1ZXhxAH4ACQAAAAAAAA" + + "AFc3EAfgAIAAAABnNxAH4ACwAAAAAAAAAGc3EAfgAIAAAAB3NxAH4ACwA" + + "AAAAAAAAHeHhzcgAxamF2YS51dGlsLmNvbmN1cnJlbnQubG9ja3MuUmVl" + + "bnRyYW50UmVhZFdyaXRlTG9ja5711QDwtWhMAgADTAAKcmVhZGVyTG9ja" + + "3QAPExqYXZhL3V0aWwvY29uY3VycmVudC9sb2Nrcy9SZWVudHJhbnRSZW" + + "FkV3JpdGVMb2NrJFJlYWRMb2NrO0wABHN5bmN0ADhMamF2YS91dGlsL2N" + + "vbmN1cnJlbnQvbG9ja3MvUmVlbnRyYW50UmVhZFdyaXRlTG9jayRTeW5j" + + "O0wACndyaXRlckxvY2t0AD1MamF2YS91dGlsL2NvbmN1cnJlbnQvbG9ja" + + "3MvUmVlbnRyYW50UmVhZFdyaXRlTG9jayRXcml0ZUxvY2s7eHBzcgA6am" + + "F2YS51dGlsLmNvbmN1cnJlbnQubG9ja3MuUmVlbnRyYW50UmVhZFdyaXR" + + "lTG9jayRSZWFkTG9ja6zWi7SYGWhMAgABTAAEc3luY3EAfgATeHBzcgA9" + + "amF2YS51dGlsLmNvbmN1cnJlbnQubG9ja3MuUmVlbnRyYW50UmVhZFdya" + + "XRlTG9jayROb25mYWlyU3luY47DL86PHQNjAgAAeHIANmphdmEudXRpbC" + + "5jb25jdXJyZW50LmxvY2tzLlJlZW50cmFudFJlYWRXcml0ZUxvY2skU3l" + + "uY1es4MU/QSu5AgAAeHIANWphdmEudXRpbC5jb25jdXJyZW50LmxvY2tz" + + "LkFic3RyYWN0UXVldWVkU3luY2hyb25pemVyZlWoQ3U/UuMCAAFJAAVzd" + + "GF0ZXhyADZqYXZhLnV0aWwuY29uY3VycmVudC5sb2Nrcy5BYnN0cmFjdE" + + "93bmFibGVTeW5jaHJvbml6ZXIz36+5rW1vqQIAAHhwAAAAAHEAfgAcc3I" + + "AO2phdmEudXRpbC5jb25jdXJyZW50LmxvY2tzLlJlZW50cmFudFJlYWRX" + + "cml0ZUxvY2skV3JpdGVMb2NrurdCaD99aEwCAAFMAARzeW5jcQB+ABN4c" + + "HEAfgAc\"}")); + } + @Test public void shouldExecutePullQueryOverWebSocketWithJsonContentType() { // When: final Supplier> call = () -> makeWebSocketRequest( "SELECT COUNT, USERID from " + AGG_TABLE + " WHERE USERID='" + AN_AGG_KEY + "';", MediaType.APPLICATION_JSON, - MediaType.APPLICATION_JSON + MediaType.APPLICATION_JSON, + Optional.empty(), + Optional.empty() ); // Then: @@ -786,13 +848,63 @@ public void shouldExecutePullQueryOverWebSocketWithJsonContentType() { is("{\"row\":{\"columns\":[1,\"USER_1\"]}}")); } + @Test + public void shouldRoundTripCVPullQueryOverWebSocketWithJsonContentType() { + // When: + Map requestProperties = ImmutableMap.of( + KsqlConfig.KSQL_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR_ENABLED, true); + final Supplier> call = () -> makeWebSocketRequest( + "SELECT COUNT, USERID from " + AGG_TABLE + " WHERE USERID='" + AN_AGG_KEY + "';", + MediaType.APPLICATION_JSON, + MediaType.APPLICATION_JSON, + Optional.empty(), + Optional.of(requestProperties) + ); + + // Then: + final List messages = assertThatEventually(call, hasSize(HEADER + 3)); + assertValidJsonMessages(messages); + assertThat(messages.get(2), is("{\"serializedConsistencyToken\":\"rO0ABXNyAC5pby5jb25mbHVlbnQu" + + "a3NxbC51dGlsLkNvbnNpc3RlbmN5T2Zmc2V0VmVjdG9yJ25ZG1n38BwC" + + "AANJAAd2ZXJzaW9uTAAMb2Zmc2V0VmVjdG9ydAAPTGphdmEvdXRpbC9N" + + "YXA7TAAGcndMb2NrdAAqTGphdmEvdXRpbC9jb25jdXJyZW50L2xvY2tz" + + "L1JlYWRXcml0ZUxvY2s7eHAAAAACc3IAEWphdmEudXRpbC5IYXNoTWFwB" + + "QfawcMWYNEDAAJGAApsb2FkRmFjdG9ySQAJdGhyZXNob2xkeHA/QAAAAA" + + "AADHcIAAAAEAAAAAF0AAVkdW1teXNxAH4ABD9AAAAAAAAGdwgAAAAIAAA" + + "AA3NyABFqYXZhLmxhbmcuSW50ZWdlchLioKT3gYc4AgABSQAFdmFsdWV4" + + "cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAAFc3IADmphd" + + "mEubGFuZy5Mb25nO4vkkMyPI98CAAFKAAV2YWx1ZXhxAH4ACQAAAAAAAA" + + "AFc3EAfgAIAAAABnNxAH4ACwAAAAAAAAAGc3EAfgAIAAAAB3NxAH4ACwA" + + "AAAAAAAAHeHhzcgAxamF2YS51dGlsLmNvbmN1cnJlbnQubG9ja3MuUmVl" + + "bnRyYW50UmVhZFdyaXRlTG9ja5711QDwtWhMAgADTAAKcmVhZGVyTG9ja" + + "3QAPExqYXZhL3V0aWwvY29uY3VycmVudC9sb2Nrcy9SZWVudHJhbnRSZW" + + "FkV3JpdGVMb2NrJFJlYWRMb2NrO0wABHN5bmN0ADhMamF2YS91dGlsL2N" + + "vbmN1cnJlbnQvbG9ja3MvUmVlbnRyYW50UmVhZFdyaXRlTG9jayRTeW5j" + + "O0wACndyaXRlckxvY2t0AD1MamF2YS91dGlsL2NvbmN1cnJlbnQvbG9ja" + + "3MvUmVlbnRyYW50UmVhZFdyaXRlTG9jayRXcml0ZUxvY2s7eHBzcgA6am" + + "F2YS51dGlsLmNvbmN1cnJlbnQubG9ja3MuUmVlbnRyYW50UmVhZFdyaXR" + + "lTG9jayRSZWFkTG9ja6zWi7SYGWhMAgABTAAEc3luY3EAfgATeHBzcgA9" + + "amF2YS51dGlsLmNvbmN1cnJlbnQubG9ja3MuUmVlbnRyYW50UmVhZFdya" + + "XRlTG9jayROb25mYWlyU3luY47DL86PHQNjAgAAeHIANmphdmEudXRpbC" + + "5jb25jdXJyZW50LmxvY2tzLlJlZW50cmFudFJlYWRXcml0ZUxvY2skU3l" + + "uY1es4MU/QSu5AgAAeHIANWphdmEudXRpbC5jb25jdXJyZW50LmxvY2tz" + + "LkFic3RyYWN0UXVldWVkU3luY2hyb25pemVyZlWoQ3U/UuMCAAFJAAVzd" + + "GF0ZXhyADZqYXZhLnV0aWwuY29uY3VycmVudC5sb2Nrcy5BYnN0cmFjdE" + + "93bmFibGVTeW5jaHJvbml6ZXIz36+5rW1vqQIAAHhwAAAAAHEAfgAcc3I" + + "AO2phdmEudXRpbC5jb25jdXJyZW50LmxvY2tzLlJlZW50cmFudFJlYWRX" + + "cml0ZUxvY2skV3JpdGVMb2NrurdCaD99aEwCAAFMAARzeW5jcQB+ABN4c" + + "HEAfgAc\"}")); + } + @Test public void shouldReturnCorrectSchemaForPullQueryWithOnlyKeyInSelect() { // When: final Supplier> call = () -> makeWebSocketRequest( "SELECT USERID from " + AGG_TABLE + " WHERE USERID='" + AN_AGG_KEY + "';", MediaType.APPLICATION_JSON, - MediaType.APPLICATION_JSON + MediaType.APPLICATION_JSON, + Optional.empty(), + Optional.empty() ); // Then: @@ -812,7 +924,9 @@ public void shouldReturnCorrectSchemaForPullQueryWithOnlyValueColumnInSelect() { final Supplier> call = () -> makeWebSocketRequest( "SELECT COUNT from " + AGG_TABLE + " WHERE USERID='" + AN_AGG_KEY + "';", MediaType.APPLICATION_JSON, - MediaType.APPLICATION_JSON + MediaType.APPLICATION_JSON, + Optional.empty(), + Optional.empty() ); // Then: @@ -892,7 +1006,9 @@ public void shouldPrintTopicOverWebSocket() { final List messages = makeWebSocketRequest( "PRINT '" + PAGE_VIEW_TOPIC + "' FROM BEGINNING LIMIT " + LIMIT + ";", MediaType.APPLICATION_JSON, - MediaType.APPLICATION_JSON); + MediaType.APPLICATION_JSON, + Optional.empty(), + Optional.empty()); // Then: assertThat(messages, hasSize(LIMIT + 1)); @@ -1001,14 +1117,18 @@ private static HttpResponse rawRestRequest( private static List makeWebSocketRequest( final String sql, final String mediaType, - final String contentType + final String contentType, + final Optional> overrides, + final Optional> requestProperties ) { return RestIntegrationTestUtil.makeWsRequest( REST_APP.getWsListener(), sql, Optional.of(mediaType), Optional.of(contentType), - Optional.of(SUPER_USER) + Optional.of(SUPER_USER), + overrides, + requestProperties ); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java index e45c06649e00..eda30c9f0f00 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java @@ -508,7 +508,9 @@ public static List makeWsRequest( final String sql, final Optional mediaType, final Optional contentType, - final Optional credentials + final Optional credentials, + final Optional> overrides, + final Optional> requestProperties ) { Vertx vertx = Vertx.vertx(); HttpClient httpClient = null; @@ -516,7 +518,7 @@ public static List makeWsRequest( httpClient = vertx.createHttpClient(); final String uri = baseUri.toString() + "/ws/query?request=" - + buildStreamingRequest(sql, Optional.empty()); + + buildStreamingRequest(sql, overrides, requestProperties); final MultiMap headers = MultiMap.caseInsensitiveMultiMap(); @@ -569,7 +571,7 @@ public static CompletableFuture makeWsRequest( final HttpClient httpClient = vertx.createHttpClient(); final String uri = baseUri.toString() + "/ws/query?request=" - + buildStreamingRequest(sql, Optional.of(overrides)); + + buildStreamingRequest(sql, Optional.of(overrides), Optional.empty()); final MultiMap headers = MultiMap.caseInsensitiveMultiMap(); @@ -613,9 +615,10 @@ private static String buildBasicAuthHeader(final Credentials credentials) { } private static String buildStreamingRequest(final String sql, - Optional> overrides) { + Optional> overrides, Optional> requestProperties + ) { KsqlRequest request = new KsqlRequest(sql, overrides.orElse(Collections.emptyMap()), - Collections.emptyMap(), null); + requestProperties.orElse(Collections.emptyMap()), null); final String requestStr; try { requestStr = ApiJsonMapper.INSTANCE.get().writeValueAsString(request); diff --git a/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java b/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java index f01c04be6600..adece28fcecb 100644 --- a/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java +++ b/ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java @@ -203,7 +203,7 @@ public RestResponse> makeQueryRequestStreamed( final Map requestProperties = new HashMap<>(); if (ConsistencyOffsetVector.isConsistencyVectorEnabled(localProperties.toMap())) { requestProperties.put( - KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN, + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR, getConsistencyOffsetVector().serialize()); } if (properties != null) { diff --git a/ksqldb-rest-client/src/test/java/io/confluent/ksql/rest/entity/StreamedRowTest.java b/ksqldb-rest-client/src/test/java/io/confluent/ksql/rest/entity/StreamedRowTest.java index b2451083995b..054042ad6f13 100644 --- a/ksqldb-rest-client/src/test/java/io/confluent/ksql/rest/entity/StreamedRowTest.java +++ b/ksqldb-rest-client/src/test/java/io/confluent/ksql/rest/entity/StreamedRowTest.java @@ -179,10 +179,10 @@ public void shouldRoundTripFinalMessage() throws Exception { @Test public void shouldRoundTripConsistencyVectorRow() throws Exception { CONSISTENCY_TOKEN.addTopicOffsets("table1", ImmutableMap.of(1, 1L, 2, 2L)); - final StreamedRow row = StreamedRow.consistencyOffsetVector(Optional.of(CONSISTENCY_TOKEN.serialize())); + final StreamedRow row = StreamedRow.consistencyToken(Optional.of(CONSISTENCY_TOKEN.serialize())); final String expectedJson = - "{\"serializedCT\":\"rO0ABXNyAC5pby5jb25mbHVlbnQua3NxbC51dGlsLkNvbnNpc3RlbmN5T2Zmc2V0VmVjdG9yJ" + "{\"serializedCV\":\"rO0ABXNyAC5pby5jb25mbHVlbnQua3NxbC51dGlsLkNvbnNpc3RlbmN5T2Zmc2V0VmVjdG9yJ" + "25ZG1n38BwCAANJAAd2ZXJzaW9uTAAMb2Zmc2V0VmVjdG9ydAAPTGphdmEvdXRpbC9NYXA7TAAGcndMb2NrdAAq" + "TGphdmEvdXRpbC9jb25jdXJyZW50L2xvY2tzL1JlYWRXcml0ZUxvY2s7eHAAAAAAc3IAEWphdmEudXRpbC5IYXN" + "oTWFwBQfawcMWYNEDAAJGAApsb2FkRmFjdG9ySQAJdGhyZXNob2xkeHA/QAAAAAAADHcIAAAAEAAAAAF0AAZ0YW" diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/StreamedRow.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/StreamedRow.java index e656bb9d967e..609de0d8a947 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/StreamedRow.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/StreamedRow.java @@ -53,7 +53,7 @@ public final class StreamedRow { private final Optional finalMessage; private final Optional sourceHost; private final Optional continuationToken; - private final Optional serializedCV; + private final Optional serializedConsistencyToken; /** * The header used in queries. @@ -166,8 +166,11 @@ public static StreamedRow finalMessage(final String finalMessage) { ); } - public static StreamedRow consistencyOffsetVector( - final Optional serializedCV + /** + * Row that contains the serialized consistency offset vector + */ + public static StreamedRow consistencyToken( + final Optional serializedConsistencyToken ) { return new StreamedRow( Optional.empty(), @@ -176,7 +179,7 @@ public static StreamedRow consistencyOffsetVector( Optional.empty(), Optional.empty(), Optional.empty(), - serializedCV + serializedConsistencyToken ); } @@ -189,7 +192,7 @@ private StreamedRow( @JsonProperty("finalMessage") final Optional finalMessage, @JsonProperty("sourceHost") final Optional sourceHost, @JsonProperty("continuationToken") final Optional continuationToken, - @JsonProperty("serializedCV") final Optional serializedCV + @JsonProperty("serializedConsistencyToken") final Optional serializedConsistencyToken ) { this.header = requireNonNull(header, "header"); this.row = requireNonNull(row, "row"); @@ -197,8 +200,10 @@ private StreamedRow( this.finalMessage = requireNonNull(finalMessage, "finalMessage"); this.sourceHost = requireNonNull(sourceHost, "sourceHost"); this.continuationToken = requireNonNull(continuationToken, "continuationToken"); - this.serializedCV = requireNonNull(serializedCV, "serializedCV"); - checkUnion(header, row, errorMessage, finalMessage, continuationToken, serializedCV); + this.serializedConsistencyToken = requireNonNull( + serializedConsistencyToken, "serializedConsistencyToken"); + checkUnion(header, row, errorMessage, finalMessage, continuationToken, + serializedConsistencyToken); } public Optional
getHeader() { @@ -225,8 +230,8 @@ public Optional getContinuationToken() { return continuationToken; } - public Optional getSerializedCV() { - return serializedCV; + public Optional getSerializedConsistencyToken() { + return serializedConsistencyToken; } @JsonIgnore @@ -249,13 +254,13 @@ public boolean equals(final Object o) { && Objects.equals(finalMessage, that.finalMessage) && Objects.equals(sourceHost, that.sourceHost) && Objects.equals(continuationToken, that.continuationToken) - && Objects.equals(serializedCV, that.serializedCV); + && Objects.equals(serializedConsistencyToken, that.serializedConsistencyToken); } @Override public int hashCode() { return Objects.hash(header, row, errorMessage, finalMessage, sourceHost, continuationToken, - serializedCV); + serializedConsistencyToken); } @Override