Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
vpapavas committed Sep 25, 2020
1 parent b635f18 commit db74090
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private QueryPublisher createPullQueryPublisher(
final long startTimeNanos
) {
final PullQueryResult result = pullQueryExecutor.execute(
statement, serviceContext, Optional.of(false), pullQueryMetrics);
statement, ImmutableMap.of(), serviceContext, Optional.of(false), pullQueryMetrics);
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
final TableRows tableRows = result.getTableRows();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.streams.materialization.Locator.KsqlNode;
import io.confluent.ksql.parser.tree.Query;
Expand Down Expand Up @@ -66,7 +67,7 @@ public synchronized void subscribe(final Subscriber<Collection<StreamedRow>> sub
subscriber,
() -> {
final PullQueryResult result = pullQueryExecutor.execute(
query, serviceContext, Optional.of(false), pullQueryMetrics);
query, ImmutableMap.of(), serviceContext, Optional.of(false), pullQueryMetrics);
//Record latency at microsecond scale
pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics
.recordLatency(startTimeNanos));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ private EndpointResponse handlePullQuery(
final ConfiguredStatement<Query> configured = ConfiguredStatement
.of(statement, SessionConfig.of(ksqlConfig, configOverrides));

final PullQueryResult result = pullQueryExecutor
.execute(configured, serviceContext, isInternalRequest, pullQueryMetrics);
final PullQueryResult result = pullQueryExecutor.execute(
configured, requestProperties, serviceContext, isInternalRequest, pullQueryMetrics);
final TableRows tableRows = result.getTableRows();
final Optional<KsqlHostInfoEntity> host = result.getSourceNode()
.map(KsqlNode::location)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void shouldThrowExceptionIfConfigDisabled() {
final Exception e = assertThrows(
KsqlStatementException.class,
() -> pullQueryExecutor.execute(
query, engine.getServiceContext(), Optional.empty(), Optional.empty())
query, ImmutableMap.of(), engine.getServiceContext(), Optional.empty(), Optional.empty())
);

// Then:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void setUp() {
TIME_NANOS);

PullQueryResult result = new PullQueryResult(entity, Optional.empty());
when(pullQueryExecutor.execute(any(), any(), any(), any())).thenReturn(result);
when(pullQueryExecutor.execute(any(), any(), any(), any(), any())).thenReturn(result);
when(entity.getSchema()).thenReturn(SCHEMA);

doAnswer(callRequestAgain()).when(subscriber).onNext(any());
Expand All @@ -110,7 +110,7 @@ public void shouldRunQueryWithCorrectParams() {
subscription.request(1);

// Then:
verify(pullQueryExecutor).execute(statement, serviceContext, Optional.of(false), Optional.empty());
verify(pullQueryExecutor).execute(statement, ImmutableMap.of(), serviceContext, Optional.of(false), Optional.empty());
}

@Test
Expand All @@ -123,7 +123,7 @@ public void shouldOnlyExecuteOnce() {

// Then:
verify(subscriber).onNext(any());
verify(pullQueryExecutor).execute(statement, serviceContext, Optional.of(false), Optional.empty());
verify(pullQueryExecutor).execute(statement, ImmutableMap.of(), serviceContext, Optional.of(false), Optional.empty());
}

@Test
Expand Down Expand Up @@ -158,7 +158,7 @@ public void shouldCallOnErrorOnFailure() {
// Given:
givenSubscribed();
final Throwable e = new RuntimeException("Boom!");
when(pullQueryExecutor.execute(any(), any(), any(), any())).thenThrow(e);
when(pullQueryExecutor.execute(any(), any(), any(), any(), any())).thenThrow(e);

// When:
subscription.request(1);
Expand Down

0 comments on commit db74090

Please sign in to comment.