Skip to content

Commit

Permalink
feat: Adds support to /query-stream for http1.1 and StreamedRow json …
Browse files Browse the repository at this point in the history
…format (#8449)

* feat: Adds support to /query-stream for http1.1 and StreamedRow json format
  • Loading branch information
AlanConfluent authored Dec 16, 2021
1 parent 3580fe6 commit dd61db3
Show file tree
Hide file tree
Showing 30 changed files with 603 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void shouldStreamPullQueryAsync() throws Exception {
// Then
assertThat(streamedQueryResult.columnNames(), is(DEFAULT_COLUMN_NAMES));
assertThat(streamedQueryResult.columnTypes(), is(DEFAULT_COLUMN_TYPES));
assertThat(streamedQueryResult.queryID(), is(nullValue()));
assertThat(streamedQueryResult.queryID(), is(notNullValue()));

shouldReceiveRows(streamedQueryResult, true);

Expand All @@ -233,7 +233,7 @@ public void shouldStreamPullQuerySync() throws Exception {
// Then
assertThat(streamedQueryResult.columnNames(), is(DEFAULT_COLUMN_NAMES));
assertThat(streamedQueryResult.columnTypes(), is(DEFAULT_COLUMN_TYPES));
assertThat(streamedQueryResult.queryID(), is(nullValue()));
assertThat(streamedQueryResult.queryID(), is(notNullValue()));

for (int i = 0; i < DEFAULT_JSON_ROWS.size(); i++) {
final Row row = streamedQueryResult.poll();
Expand Down Expand Up @@ -475,7 +475,7 @@ public void shouldExecutePullQuery() throws Exception {
final BatchedQueryResult batchedQueryResult = javaClient.executeQuery(DEFAULT_PULL_QUERY);

// Then
assertThat(batchedQueryResult.queryID().get(), is(nullValue()));
assertThat(batchedQueryResult.queryID().get(), is(notNullValue()));

verifyRows(batchedQueryResult.get());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ public void shouldStreamPullQueryOnTableAsync() throws Exception {
// Then
assertThat(streamedQueryResult.columnNames(), is(PULL_QUERY_COLUMN_NAMES));
assertThat(streamedQueryResult.columnTypes(), is(PULL_QUERY_COLUMN_TYPES));
assertThat(streamedQueryResult.queryID(), is(nullValue()));
assertThat(streamedQueryResult.queryID(), is(notNullValue()));

shouldReceiveRows(
streamedQueryResult,
Expand All @@ -470,7 +470,7 @@ public void shouldStreamPullQueryOnTableSync() throws Exception {
// Then
assertThat(streamedQueryResult.columnNames(), is(PULL_QUERY_COLUMN_NAMES));
assertThat(streamedQueryResult.columnTypes(), is(PULL_QUERY_COLUMN_TYPES));
assertThat(streamedQueryResult.queryID(), is(nullValue()));
assertThat(streamedQueryResult.queryID(), is(notNullValue()));

final Row row = streamedQueryResult.poll();
verifyPullQueryRow(row);
Expand Down Expand Up @@ -564,7 +564,7 @@ public void shouldExecutePullQuery() throws Exception {
final BatchedQueryResult batchedQueryResult = client.executeQuery(PULL_QUERY_ON_TABLE);

// Then
assertThat(batchedQueryResult.queryID().get(), is(nullValue()));
assertThat(batchedQueryResult.queryID().get(), is(notNullValue()));

verifyPullQueryRows(batchedQueryResult.get());
}
Expand All @@ -577,7 +577,7 @@ public void shouldExecutePullQueryWithVariables() throws Exception {
final BatchedQueryResult batchedQueryResult = client.executeQuery("SELECT ${value} from ${AGG_TABLE} WHERE K=STRUCT(F1 := ARRAY['a']);");

// Then
assertThat(batchedQueryResult.queryID().get(), is(nullValue()));
assertThat(batchedQueryResult.queryID().get(), is(notNullValue()));
assertThat(batchedQueryResult.get().get(0).getBoolean(1), is(false));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void shouldRoundTripCVWhenExecutePullQuery() throws Exception {

// Then
assertThat(rows, hasSize(1));
assertThat(batchedQueryResult.queryID().get(), is(nullValue()));
assertThat(batchedQueryResult.queryID().get(), is(notNullValue()));
assertThatEventually(() -> ((ClientImpl)client).getSerializedConsistencyVector(),
is(notNullValue()));
final String serializedCV = ((ClientImpl)client).getSerializedConsistencyVector();
Expand Down Expand Up @@ -280,7 +280,7 @@ public void shouldNotRoundTripCVWhenExecutePullQuery() throws Exception {
batchedQueryResult.get();

// Then
assertThat(batchedQueryResult.queryID().get(), is(nullValue()));
assertThat(batchedQueryResult.queryID().get(), is(notNullValue()));
assertThat(((ClientImpl)client).getSerializedConsistencyVector(), is(isEmptyString()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.TransientQueryQueue;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.ConsistencyOffsetVector;
import io.confluent.ksql.util.KeyValueMetadata;
import io.vertx.core.Context;
Expand Down Expand Up @@ -97,6 +98,11 @@ public List<String> getColumnTypes() {
return new ArrayList<>();
}

@Override
public LogicalSchema getLogicalSchema() {
return LogicalSchema.builder().build();
}

@Override
public void start() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,19 @@ public class PullQueryQueue implements BlockingRowQueue {
private final OptionalInt limit;

/**
* The callback run when we've hit the end of the data. Specifically, this happens when
* {@link #close()} is called.
* The callback run when we've hit the limit. Specifically, this happens when
* {@link #closeInternal(boolean)}} is called with true.
*/
private LimitHandler limitHandler;
/**
* Callback is checked before enqueueing new rows and called when new rows are actually added.
*/
private Runnable queuedCallback;
/**
* The callback run when we've hit the end of the data. Specifically, this happens when
* {@link #close()} is called.
*/
private CompletionHandler completionHandler;

public PullQueryQueue(final OptionalInt limit) {
this(BLOCKING_QUEUE_CAPACITY, DEFAULT_OFFER_TIMEOUT_MS, limit);
Expand All @@ -80,6 +85,7 @@ public PullQueryQueue(
final OptionalInt limit) {
this.queuedCallback = () -> { };
this.limitHandler = () -> { };
this.completionHandler = () -> { };
this.rowQueue = new ArrayBlockingQueue<>(queueSizeLimit);
this.offerTimeoutMs = offerTimeoutMs;
this.limit = limit;
Expand All @@ -92,8 +98,7 @@ public void setLimitHandler(final LimitHandler limitHandler) {

@Override
public void setCompletionHandler(final CompletionHandler completionHandler) {
// not currently used in pull queries, although future refactoring might be able to
// take advantage of this mechanism.
this.completionHandler = completionHandler;
}

@Override
Expand Down Expand Up @@ -156,16 +161,24 @@ public boolean isEmpty() {
* wants to end pull queries prematurely, such as when the client connection closes, this should
* also be called then.
*/
@Override
public void close() {
private void closeInternal(final boolean limitHit) {
if (!closed.getAndSet(true)) {
// Unlike limits based on a number of rows which can be checked and possibly triggered after
// every queuing of a row, pull queries just declare they've reached their limit when close is
// called.
limitHandler.limitReached();
if (limitHit) {
limitHandler.limitReached();
} else {
completionHandler.complete();
}
}
}

@Override
public void close() {
closeInternal(false);
}

public boolean isClosed() {
return closed.get();
}
Expand Down Expand Up @@ -210,7 +223,7 @@ public boolean acceptRow(final PullQueryRow row) {
return false;
}
if (limit.isPresent() && totalRowsQueued.get() >= limit.getAsInt()) {
close();
closeInternal(true);
return false;
}

Expand All @@ -219,7 +232,7 @@ public boolean acceptRow(final PullQueryRow row) {
totalRowsQueued.incrementAndGet();
queuedCallback.run();
if (limit.isPresent() && totalRowsQueued.get() >= limit.getAsInt()) {
close();
closeInternal(true);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class PullQueryQueueTest {
@Mock
private LimitHandler limitHandler;
@Mock
private CompletionHandler completionHandler;
@Mock
private Runnable queuedCallback;
private PullQueryQueue queue;
private ScheduledExecutorService executorService;
Expand Down Expand Up @@ -116,13 +118,13 @@ public void shouldPoll() throws Exception {
}

@Test
public void shouldCallLimitHandlerOnClose() {
public void shouldCallCompleteOnClose() {
// When:
queue.close();
queue.close();

// Then:
verify(limitHandler, times(1)).limitReached();
verify(completionHandler, times(1)).complete();
}


Expand Down Expand Up @@ -213,6 +215,7 @@ private void givenQueue() {
queue = new PullQueryQueue(QUEUE_SIZE, 1, OptionalInt.empty());

queue.setLimitHandler(limitHandler);
queue.setCompletionHandler(completionHandler);
queue.setQueuedCallback(queuedCallback);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.query.PullQueryQueue;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.reactive.BasePublisher;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.KeyValueMetadata;
import io.vertx.core.Context;
import io.vertx.core.WorkerExecutor;
Expand Down Expand Up @@ -55,8 +56,10 @@ public class BlockingQueryPublisher extends BasePublisher<KeyValueMetadata<List<
private QueryHandle queryHandle;
private ImmutableList<String> columnNames;
private ImmutableList<String> columnTypes;
private LogicalSchema logicalSchema;
private QueryId queryId;
private boolean complete;
private boolean hitLimit;
private volatile boolean closed;

public BlockingQueryPublisher(final Context ctx,
Expand All @@ -69,6 +72,7 @@ public void setQueryHandle(final QueryHandle queryHandle, final boolean isPullQu
final boolean isScalablePushQuery) {
this.columnNames = ImmutableList.copyOf(queryHandle.getColumnNames());
this.columnTypes = ImmutableList.copyOf(queryHandle.getColumnTypes());
this.logicalSchema = queryHandle.getLogicalSchema();
this.queue = queryHandle.getQueue();
this.isPullQuery = isPullQuery;
this.isScalablePushQuery = isScalablePushQuery;
Expand All @@ -81,6 +85,7 @@ public void setQueryHandle(final QueryHandle queryHandle, final boolean isPullQu
maybeSend();
}
complete = true;
hitLimit = true;
// This allows us to hit the limit without having to queue one last row
if (queue.isEmpty()) {
ctx.runOnContext(v -> sendComplete());
Expand Down Expand Up @@ -119,6 +124,11 @@ public List<String> getColumnTypes() {
return columnTypes;
}

@Override
public LogicalSchema geLogicalSchema() {
return logicalSchema;
}

public void close() {
if (closed) {
return;
Expand All @@ -144,6 +154,11 @@ public QueryId queryId() {
return queryId;
}

@Override
public boolean hitLimit() {
return hitLimit;
}

@Override
protected void maybeSend() {
ctx.runOnContext(v -> doSend());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.ksql.rest.server.query.QueryExecutor;
import io.confluent.ksql.rest.server.query.QueryMetadataHolder;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.utils.FormatOptions;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
Expand Down Expand Up @@ -183,6 +184,11 @@ public List<String> getColumnTypes() {
return colTypesFromSchema(queryMetadata.getLogicalSchema().value());
}

@Override
public LogicalSchema getLogicalSchema() {
return queryMetadata.getLogicalSchema();
}

@Override
public void start() {
queryMetadata.start();
Expand Down Expand Up @@ -239,6 +245,11 @@ public List<String> getColumnTypes() {
return colTypesFromSchema(result.getSchema().columns());
}

@Override
public LogicalSchema getLogicalSchema() {
return result.getSchema();
}

@Override
public void start() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ public QueryStreamResponseWriter writeConsistencyToken(final ConsistencyToken co
return this;
}

@Override
public QueryStreamResponseWriter writeCompletionMessage(final String completionMessage) {
return this;
}

@Override
public QueryStreamResponseWriter writeLimitMessage() {
return this;
}


@Override
public void end() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ public QueryStreamResponseWriter writeConsistencyToken(final ConsistencyToken co
return this;
}

@Override
public QueryStreamResponseWriter writeCompletionMessage(final String completionMessage) {
return this;
}

@Override
public QueryStreamResponseWriter writeLimitMessage() {
return this;
}

private void writeBuffer(final Buffer buffer) {
final Buffer buff = Buffer.buffer().appendByte((byte) ',');
Expand Down
Loading

0 comments on commit dd61db3

Please sign in to comment.