From db7409073c6e089358ce4eeb62ac90217f3e3da6 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Fri, 25 Sep 2020 14:50:39 -0700 Subject: [PATCH] rebase --- .../java/io/confluent/ksql/api/impl/QueryEndpoint.java | 2 +- .../server/resources/streaming/PullQueryPublisher.java | 3 ++- .../server/resources/streaming/StreamedQueryResource.java | 4 ++-- .../ksql/rest/server/execution/PullQueryExecutorTest.java | 2 +- .../resources/streaming/PullQueryPublisherTest.java | 8 ++++---- 5 files changed, 10 insertions(+), 9 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index 311dbc0f5814..a430784b6eea 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -109,7 +109,7 @@ private QueryPublisher createPullQueryPublisher( final long startTimeNanos ) { final PullQueryResult result = pullQueryExecutor.execute( - statement, serviceContext, Optional.of(false), pullQueryMetrics); + statement, ImmutableMap.of(), serviceContext, Optional.of(false), pullQueryMetrics); pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos)); final TableRows tableRows = result.getTableRows(); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index bf113e168cd0..f2a5b3517741 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.streams.materialization.Locator.KsqlNode; import io.confluent.ksql.parser.tree.Query; @@ -66,7 +67,7 @@ public synchronized void subscribe(final Subscriber> sub subscriber, () -> { final PullQueryResult result = pullQueryExecutor.execute( - query, serviceContext, Optional.of(false), pullQueryMetrics); + query, ImmutableMap.of(), serviceContext, Optional.of(false), pullQueryMetrics); //Record latency at microsecond scale pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics .recordLatency(startTimeNanos)); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 4d2e9546f050..710f5be92a6f 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -263,8 +263,8 @@ private EndpointResponse handlePullQuery( final ConfiguredStatement configured = ConfiguredStatement .of(statement, SessionConfig.of(ksqlConfig, configOverrides)); - final PullQueryResult result = pullQueryExecutor - .execute(configured, serviceContext, isInternalRequest, pullQueryMetrics); + final PullQueryResult result = pullQueryExecutor.execute( + configured, requestProperties, serviceContext, isInternalRequest, pullQueryMetrics); final TableRows tableRows = result.getTableRows(); final Optional host = result.getSourceNode() .map(KsqlNode::location) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java index 2c348471597f..ac8f21a26be5 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java @@ -81,7 +81,7 @@ public void shouldThrowExceptionIfConfigDisabled() { final Exception e = assertThrows( KsqlStatementException.class, () -> pullQueryExecutor.execute( - query, engine.getServiceContext(), Optional.empty(), Optional.empty()) + query, ImmutableMap.of(), engine.getServiceContext(), Optional.empty(), Optional.empty()) ); // Then: diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java index 53389092ece9..09b5fca11fbe 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java @@ -86,7 +86,7 @@ public void setUp() { TIME_NANOS); PullQueryResult result = new PullQueryResult(entity, Optional.empty()); - when(pullQueryExecutor.execute(any(), any(), any(), any())).thenReturn(result); + when(pullQueryExecutor.execute(any(), any(), any(), any(), any())).thenReturn(result); when(entity.getSchema()).thenReturn(SCHEMA); doAnswer(callRequestAgain()).when(subscriber).onNext(any()); @@ -110,7 +110,7 @@ public void shouldRunQueryWithCorrectParams() { subscription.request(1); // Then: - verify(pullQueryExecutor).execute(statement, serviceContext, Optional.of(false), Optional.empty()); + verify(pullQueryExecutor).execute(statement, ImmutableMap.of(), serviceContext, Optional.of(false), Optional.empty()); } @Test @@ -123,7 +123,7 @@ public void shouldOnlyExecuteOnce() { // Then: verify(subscriber).onNext(any()); - verify(pullQueryExecutor).execute(statement, serviceContext, Optional.of(false), Optional.empty()); + verify(pullQueryExecutor).execute(statement, ImmutableMap.of(), serviceContext, Optional.of(false), Optional.empty()); } @Test @@ -158,7 +158,7 @@ public void shouldCallOnErrorOnFailure() { // Given: givenSubscribed(); final Throwable e = new RuntimeException("Boom!"); - when(pullQueryExecutor.execute(any(), any(), any(), any())).thenThrow(e); + when(pullQueryExecutor.execute(any(), any(), any(), any(), any())).thenThrow(e); // When: subscription.request(1);