diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java index 2f65c8e8fd8f..7d5c0d1854af 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java @@ -38,8 +38,6 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlRequestConfig; -import io.confluent.ksql.util.KsqlServerException; -import io.confluent.ksql.util.KsqlStatementException; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -121,7 +119,7 @@ public CompletableFuture handlePullQuery( LOG.debug("Unable to execute pull query: {}. All nodes are dead or exceed max allowed lag.", statement.getStatementText()); throw new MaterializationException(String.format( - "Unable to execute pull query %s. All nodes are dead or exceed max allowed lag.", + "Unable to execute pull query \"%s\". All nodes are dead or exceed max allowed lag.", statement.getStatementText())); } @@ -169,16 +167,13 @@ private void executeRounds( // Make requests to each host, specifying the partitions we're interested in from // this host. - final Map> futures = new LinkedHashMap<>(); + final Map> futures = new LinkedHashMap<>(); for (Map.Entry> entry : groupedByHost.entrySet()) { final KsqlNode node = entry.getKey(); futures.put(node, executorService.submit( - () -> { - routeQuery.routeQuery( - node, entry.getValue(), statement, serviceContext, routingOptions, - pullQueryMetrics, pullPhysicalPlan, outputSchema, queryId, pullQueryQueue); - return null; - } + () -> routeQuery.routeQuery( + node, entry.getValue(), statement, serviceContext, routingOptions, + pullQueryMetrics, pullPhysicalPlan, outputSchema, queryId, pullQueryQueue) )); } @@ -186,15 +181,23 @@ private void executeRounds( // the locations to the nextRoundRemaining list. final ImmutableList.Builder nextRoundRemaining = ImmutableList.builder(); - for (Map.Entry> entry : futures.entrySet()) { - final Future future = entry.getValue(); + for (Map.Entry> entry : futures.entrySet()) { + final Future future = entry.getValue(); final KsqlNode node = entry.getKey(); + RoutingResult routingResult = null; try { - future.get(); + routingResult = future.get(); } catch (ExecutionException e) { LOG.warn("Error routing query {} to host {} at timestamp {} with exception {}", statement.getStatementText(), node, System.currentTimeMillis(), e.getCause()); + throw new MaterializationException(String.format( + "Unable to execute pull query \"%s\". %s", + statement.getStatementText(), e.getCause().getMessage())); + } + if (routingResult == RoutingResult.STANDBY_FALLBACK) { nextRoundRemaining.addAll(groupedByHost.get(node)); + } else { + Preconditions.checkState(routingResult == RoutingResult.SUCCESS); } } remainingLocations = nextRoundRemaining.build(); @@ -225,7 +228,7 @@ private static Map> groupByHost( // If one of the partitions required is out of nodes, then we cannot continue. if (round >= location.getNodes().size()) { throw new MaterializationException(String.format( - "Unable to execute pull query: %s. Exhausted standby hosts to try.", + "Unable to execute pull query: \"%s\". Exhausted standby hosts to try.", statement.getStatementText())); } final KsqlNode nextHost = location.getNodes().get(round); @@ -236,7 +239,7 @@ private static Map> groupByHost( @VisibleForTesting interface RouteQuery { - void routeQuery( + RoutingResult routeQuery( KsqlNode node, List locations, ConfiguredStatement statement, @@ -251,7 +254,7 @@ void routeQuery( } @VisibleForTesting - static void executeOrRouteQuery( + static RoutingResult executeOrRouteQuery( final KsqlNode node, final List locations, final ConfiguredStatement statement, @@ -273,29 +276,41 @@ static void executeOrRouteQuery( pullQueryMetrics .ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordLocalRequests(1)); pullPhysicalPlan.execute(locations, pullQueryQueue, rowFactory); - } catch (Exception e) { - LOG.error("Error executing query {} locally at node {} with exception", + return RoutingResult.SUCCESS; + } catch (StandbyFallbackException e) { + LOG.warn("Error executing query {} locally at node {}. Falling back to standby state which " + + "may return stale results", statement.getStatementText(), node, e.getCause()); + return RoutingResult.STANDBY_FALLBACK; + } catch (Exception e) { + LOG.error("Error executing query {} locally at node {}", + statement.getStatementText(), node.location(), e.getCause()); throw new KsqlException( - String.format("Error executing query %s locally at node %s", - statement.getStatementText(), node), + String.format("Error executing query locally at node %s: %s", node.location(), + e.getMessage()), e ); } } else { try { LOG.debug("Query {} routed to host {} at timestamp {}.", - statement.getStatementText(), node.location(), System.currentTimeMillis()); + statement.getStatementText(), node.location(), System.currentTimeMillis()); pullQueryMetrics .ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordRemoteRequests(1)); forwardTo(node, locations, statement, serviceContext, pullQueryQueue, rowFactory, outputSchema); + return RoutingResult.SUCCESS; + } catch (StandbyFallbackException e) { + LOG.warn("Error forwarding query {} to node {}. Falling back to standby state which may " + + "return stale results", + statement.getStatementText(), node.location(), e.getCause()); + return RoutingResult.STANDBY_FALLBACK; } catch (Exception e) { - LOG.error("Error forwarding query {} to node {} with exception {}", + LOG.error("Error forwarding query {} to node {}", statement.getStatementText(), node, e.getCause()); throw new KsqlException( - String.format("Error forwarding query %s to node %s", - statement.getStatementText(), node), + String.format("Error forwarding query to node %s: %s", node.location(), + e.getMessage()), e ); } @@ -322,38 +337,62 @@ private static void forwardTo( KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING, true, KsqlRequestConfig.KSQL_REQUEST_INTERNAL_REQUEST, true, KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS, partitions); - final RestResponse response = serviceContext - .getKsqlClient() - .makeQueryRequest( - owner.location(), - statement.getStatementText(), - statement.getSessionConfig().getOverrides(), - requestProperties, - streamedRowsHandler(owner, statement, requestProperties, pullQueryQueue, rowFactory, - outputSchema) - ); + final RestResponse response; + + try { + response = serviceContext + .getKsqlClient() + .makeQueryRequest( + owner.location(), + statement.getStatementText(), + statement.getSessionConfig().getOverrides(), + requestProperties, + streamedRowsHandler(owner, pullQueryQueue, rowFactory, outputSchema) + ); + } catch (Exception e) { + // If we threw some explicit exception, then let it bubble up. All of the row handling is + // wrapped in a KsqlException, so any intentional exception or bug will be surfaced. + final KsqlException ksqlException = causedByKsqlException(e); + if (ksqlException != null) { + throw ksqlException; + } + // If we get some kind of unknown error, we assume it's network or other error from the + // KsqlClient and try standbys + throw new StandbyFallbackException(String.format( + "Forwarding pull query request [%s, %s] failed with error %s ", + statement.getSessionConfig().getOverrides(), requestProperties, + e.getMessage()), e); + } if (response.isErroneous()) { - throw new KsqlServerException(String.format( - "Forwarding pull query request [%s, %s, %s] to node %s failed with error %s ", - statement.getStatement(), statement.getSessionConfig().getOverrides(), requestProperties, - owner, response.getErrorMessage())); + throw new KsqlException(String.format( + "Forwarding pull query request [%s, %s] failed with error %s ", + statement.getSessionConfig().getOverrides(), requestProperties, + response.getErrorMessage())); } final int numRows = response.getResponse(); if (numRows == 0) { - throw new KsqlServerException(String.format( - "Forwarding pull query request [%s, %s, %s] to node %s failed due to invalid " + throw new KsqlException(String.format( + "Forwarding pull query request [%s, %s] failed due to invalid " + "empty response from forwarding call, expected a header row.", - statement.getStatement(), statement.getSessionConfig().getOverrides(), requestProperties, - owner)); + statement.getSessionConfig().getOverrides(), requestProperties)); + } + } + + private static KsqlException causedByKsqlException(final Exception e) { + Throwable throwable = e; + while (throwable != null) { + if (throwable instanceof KsqlException) { + return (KsqlException) throwable; + } + throwable = throwable.getCause(); } + return null; } private static Consumer> streamedRowsHandler( final KsqlNode owner, - final ConfiguredStatement statement, - final Map requestProperties, final PullQueryQueue pullQueryQueue, final BiFunction, LogicalSchema, PullQueryRow> rowFactory, final LogicalSchema outputSchema @@ -361,44 +400,42 @@ private static Consumer> streamedRowsHandler( final AtomicInteger processedRows = new AtomicInteger(0); final AtomicReference
header = new AtomicReference<>(); return streamedRows -> { - if (streamedRows == null || streamedRows.isEmpty()) { - return; - } - final List rows = new ArrayList<>(); - - // If this is the first row overall, skip the header - final int previousProcessedRows = processedRows.getAndAdd(streamedRows.size()); - for (int i = 0; i < streamedRows.size(); i++) { - final StreamedRow row = streamedRows.get(i); - if (i == 0 && previousProcessedRows == 0) { - final Optional
optionalHeader = row.getHeader(); - optionalHeader.ifPresent(h -> validateSchema(outputSchema, h.getSchema(), owner)); - optionalHeader.ifPresent(header::set); - continue; + try { + if (streamedRows == null || streamedRows.isEmpty()) { + return; } + final List rows = new ArrayList<>(); - if (row.getErrorMessage().isPresent()) { - throw new KsqlStatementException( - row.getErrorMessage().get().getMessage(), - statement.getStatementText() - ); - } + // If this is the first row overall, skip the header + final int previousProcessedRows = processedRows.getAndAdd(streamedRows.size()); + for (int i = 0; i < streamedRows.size(); i++) { + final StreamedRow row = streamedRows.get(i); + if (i == 0 && previousProcessedRows == 0) { + final Optional
optionalHeader = row.getHeader(); + optionalHeader.ifPresent(h -> validateSchema(outputSchema, h.getSchema(), owner)); + optionalHeader.ifPresent(header::set); + continue; + } - if (!row.getRow().isPresent()) { - throw new KsqlServerException(String.format( - "Forwarding pull query request [%s, %s, %s] to node %s failed due to " - + "missing row data.", - statement.getStatement(), statement.getSessionConfig().getOverrides(), - requestProperties, owner)); - } + if (row.getErrorMessage().isPresent()) { + // If we receive an error that's not a network error, we let that bubble up. + throw new KsqlException(row.getErrorMessage().get().getMessage()); + } - final List r = row.getRow().get().getColumns(); - Preconditions.checkNotNull(header.get()); - rows.add(rowFactory.apply(r, header.get().getSchema())); - } + if (!row.getRow().isPresent()) { + throw new KsqlException("Missing row data on row " + i + " of chunk"); + } + + final List r = row.getRow().get().getColumns(); + Preconditions.checkNotNull(header.get()); + rows.add(rowFactory.apply(r, header.get().getSchema())); + } - if (!pullQueryQueue.acceptRows(rows)) { - LOG.info("Failed to queue all rows"); + if (!pullQueryQueue.acceptRows(rows)) { + LOG.error("Failed to queue all rows"); + } + } catch (Exception e) { + throw new KsqlException("Error handling streamed rows: " + e.getMessage(), e); } }; } @@ -414,4 +451,9 @@ private static void validateSchema( forwardedSchema, forwardedNode, expectedSchema)); } } + + private enum RoutingResult { + SUCCESS, + STANDBY_FALLBACK + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/StandbyFallbackException.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/StandbyFallbackException.java new file mode 100644 index 000000000000..9ef774f9dc29 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/StandbyFallbackException.java @@ -0,0 +1,37 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.physical.pull; + +import io.confluent.ksql.util.KsqlException; + +/** + * This exception is thrown to indicate that pull queries should fallback on the next standby in + * line. + */ +public class StandbyFallbackException extends KsqlException { + + public StandbyFallbackException(final Throwable cause) { + super(cause); + } + + public StandbyFallbackException(final String message) { + super(message); + } + + public StandbyFallbackException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java index 7e06abeff888..7f609fd49061 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java @@ -27,6 +27,9 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory; import io.confluent.ksql.execution.streams.RoutingOptions; import io.confluent.ksql.execution.streams.materialization.Locator; @@ -38,16 +41,22 @@ import io.confluent.ksql.physical.pull.HARouting.RouteQuery; import io.confluent.ksql.query.PullQueryQueue; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.rest.client.RestResponse; +import io.confluent.ksql.rest.entity.StreamedRow; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.services.SimpleKsqlClient; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; -import java.util.ArrayList; +import io.confluent.ksql.util.KsqlRequestConfig; +import java.net.URI; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -90,6 +99,8 @@ public class HARoutingTest { @Mock private LogicalSchema logicalSchema; @Mock + private LogicalSchema logicalSchema2; + @Mock private RoutingFilterFactory routingFilterFactory; @Mock private PullPhysicalPlan pullPhysicalPlan; @@ -101,6 +112,8 @@ public class HARoutingTest { private RouteQuery routeQuery; @Mock private KsqlConfig ksqlConfig; + @Mock + private SimpleKsqlClient ksqlClient; private PullQueryQueue pullQueryQueue = new PullQueryQueue(); @@ -108,15 +121,29 @@ public class HARoutingTest { @Before public void setUp() { + when(pullPhysicalPlan.getMaterialization()).thenReturn(materialization); + when(pullPhysicalPlan.getMaterialization().locator()).thenReturn(locator); when(statement.getStatementText()).thenReturn("foo"); + when(statement.getSessionConfig()).thenReturn(SessionConfig.of(ksqlConfig, + ImmutableMap.of())); + when(node1.isLocal()).thenReturn(true); + when(node2.isLocal()).thenReturn(false); + when(node1.location()).thenReturn(URI.create("http://node1:8088")); + when(node2.location()).thenReturn(URI.create("http://node2:8089")); when(location1.getNodes()).thenReturn(ImmutableList.of(node1, node2)); when(location2.getNodes()).thenReturn(ImmutableList.of(node2, node1)); when(location3.getNodes()).thenReturn(ImmutableList.of(node1, node2)); when(location4.getNodes()).thenReturn(ImmutableList.of(node2, node1)); + when(location1.getPartition()).thenReturn(1); + when(location2.getPartition()).thenReturn(2); + when(location3.getPartition()).thenReturn(3); + when(location4.getPartition()).thenReturn(4); // We require at least two threads, one for the orchestrator, and the other for the partitions. when(ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG)).thenReturn(2); + + when(serviceContext.getKsqlClient()).thenReturn(ksqlClient); haRouting = new HARouting( - routingFilterFactory, Optional.empty(), ksqlConfig, routeQuery); + routingFilterFactory, Optional.empty(), ksqlConfig); } @After @@ -129,29 +156,25 @@ public void tearDown() { @Test public void shouldCallRouteQuery_success() throws InterruptedException, ExecutionException { // Given: - List locations = ImmutableList.of(location1, location2, location3, location4); - when(pullPhysicalPlan.getMaterialization()).thenReturn(materialization); - when(pullPhysicalPlan.getMaterialization().locator()).thenReturn(locator); - when(pullPhysicalPlan.getMaterialization().locator().locate( - pullPhysicalPlan.getKeys(), - routingOptions, - routingFilterFactory - )).thenReturn(locations); - List> locationsQueried = new ArrayList<>(); - doAnswer(inv -> { - locationsQueried.add(inv.getArgument(1)); - PullQueryQueue queue = inv.getArgument(9); + locate(location1, location2, location3, location4); + doAnswer(i -> { + final PullQueryQueue queue = i.getArgument(1); queue.acceptRow(PQ_ROW1); return null; - }).when(routeQuery).routeQuery(eq(node1), any(), any(), any(), any(), any(), any(), any(), - any(), any()); - doAnswer(inv -> { - locationsQueried.add(inv.getArgument(1)); - PullQueryQueue queue = inv.getArgument(9); - queue.acceptRow(PQ_ROW2); - return null; - }).when(routeQuery).routeQuery(eq(node2), any(), any(), any(), any(), any(), any(), any(), - any(), any()); + }).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location1, location3)), any(), any()); + when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any())).thenAnswer( + i -> { + Map requestProperties = i.getArgument(3); + Consumer> rowConsumer = i.getArgument(4); + assertThat(requestProperties.get(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS), + is ("2,4")); + rowConsumer.accept( + ImmutableList.of( + StreamedRow.header(queryId, logicalSchema), + StreamedRow.pullRow(GenericRow.fromList(ROW2), Optional.empty()))); + return RestResponse.successful(200, 2); + } + ); // When: CompletableFuture future = haRouting.handlePullQuery( @@ -160,14 +183,7 @@ public void shouldCallRouteQuery_success() throws InterruptedException, Executio future.get(); // Then: - verify(routeQuery).routeQuery(eq(node1), any(), any(), any(), any(), any(), any(), any(), any(), - any()); - assertThat(locationsQueried.get(0).get(0), is(location1)); - assertThat(locationsQueried.get(0).get(1), is(location3)); - verify(routeQuery).routeQuery(eq(node2), any(), any(), any(), any(), any(), any(), any(), any(), - any()); - assertThat(locationsQueried.get(1).get(0), is(location2)); - assertThat(locationsQueried.get(1).get(1), is(location4)); + verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1, location3)), any(), any()); assertThat(pullQueryQueue.size(), is(2)); assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW1)); @@ -177,35 +193,37 @@ public void shouldCallRouteQuery_success() throws InterruptedException, Executio @Test public void shouldCallRouteQuery_twoRound() throws InterruptedException, ExecutionException { // Given: - List locations = ImmutableList.of(location1, location2, location3, location4); - when(pullPhysicalPlan.getMaterialization()).thenReturn(materialization); - when(pullPhysicalPlan.getMaterialization().locator()).thenReturn(locator); - when(pullPhysicalPlan.getMaterialization().locator().locate( - pullPhysicalPlan.getKeys(), - routingOptions, - routingFilterFactory - )).thenReturn(locations); - List> locationsQueried = new ArrayList<>(); - doAnswer(inv -> { - locationsQueried.add(inv.getArgument(1)); - throw new RuntimeException("Error!"); - }).when(routeQuery).routeQuery(eq(node1), any(), any(), any(), any(), any(), any(), any(), - any(), any()); - doAnswer(new Answer() { - private int count = 0; - - public Object answer(InvocationOnMock invocation) { - locationsQueried.add(invocation.getArgument(1)); - PullQueryQueue queue = invocation.getArgument(9); - if (++count == 1) { - queue.acceptRow(PQ_ROW2); - } else { - queue.acceptRow(PQ_ROW1); + locate(location1, location2, location3, location4); + doAnswer(i -> { + throw new StandbyFallbackException("Error!"); + }).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location1, location3)), any(), any()); + when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any())).thenAnswer( + new Answer() { + private int count = 0; + + public Object answer(InvocationOnMock i) { + Map requestProperties = i.getArgument(3); + Consumer> rowConsumer = i.getArgument(4); + if (++count == 1) { + assertThat(requestProperties.get( + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS), is ("2,4")); + rowConsumer.accept( + ImmutableList.of( + StreamedRow.header(queryId, logicalSchema), + StreamedRow.pullRow(GenericRow.fromList(ROW2), Optional.empty()))); + } else { + assertThat(requestProperties.get( + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS), is ("1,3")); + rowConsumer.accept( + ImmutableList.of( + StreamedRow.header(queryId, logicalSchema), + StreamedRow.pullRow(GenericRow.fromList(ROW1), Optional.empty()))); + } + + return RestResponse.successful(200, 2); + } } - return null; - } - }).when(routeQuery).routeQuery(eq(node2), any(), any(), any(), any(), any(), any(), any(), - any(), any()); + ); // When: CompletableFuture future = haRouting.handlePullQuery(serviceContext, pullPhysicalPlan, @@ -213,16 +231,8 @@ public Object answer(InvocationOnMock invocation) { future.get(); // Then: - verify(routeQuery).routeQuery(eq(node1), any(), any(), any(), any(), any(), any(), any(), any(), - any()); - assertThat(locationsQueried.get(0).get(0), is(location1)); - assertThat(locationsQueried.get(0).get(1), is(location3)); - verify(routeQuery, times(2)).routeQuery(eq(node2), any(), any(), any(), any(), any(), any(), - any(), any(), any()); - assertThat(locationsQueried.get(1).get(0), is(location2)); - assertThat(locationsQueried.get(1).get(1), is(location4)); - assertThat(locationsQueried.get(2).get(0), is(location1)); - assertThat(locationsQueried.get(2).get(1), is(location3)); + verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1, location3)), any(), any()); + verify(ksqlClient, times(2)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any()); assertThat(pullQueryQueue.size(), is(2)); assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW2)); @@ -230,37 +240,65 @@ public Object answer(InvocationOnMock invocation) { } @Test - public void shouldCallRouteQuery_allFail() { + public void shouldCallRouteQuery_twoRound_networkError() + throws InterruptedException, ExecutionException { // Given: - List locations = ImmutableList.of(location1, location2, location3, location4); - when(pullPhysicalPlan.getMaterialization()).thenReturn(materialization); - when(pullPhysicalPlan.getMaterialization().locator()).thenReturn(locator); - when(pullPhysicalPlan.getMaterialization().locator().locate( - pullPhysicalPlan.getKeys(), - routingOptions, - routingFilterFactory - )).thenReturn(locations); - List> locationsQueried = new ArrayList<>(); - doAnswer(inv -> { - locationsQueried.add(inv.getArgument(1)); - throw new RuntimeException("Error!"); - }).when(routeQuery).routeQuery(eq(node1), any(), any(), any(), any(), any(), any(), any(), - any(), any()); - doAnswer(new Answer() { - private int count = 0; - - public Object answer(InvocationOnMock invocation) { - locationsQueried.add(invocation.getArgument(1)); - PullQueryQueue queue = invocation.getArgument(9); - if (++count == 1) { - queue.acceptRow(PQ_ROW2); - } else { - throw new RuntimeException("Error!"); + locate(location2); + when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any())).thenAnswer( + i -> { + throw new RuntimeException("Network error!"); } - return null; - } - }).when(routeQuery).routeQuery(eq(node2), any(), any(), any(), any(), any(), any(), any(), - any(), any()); + ); + doAnswer(i -> { + final PullQueryQueue queue = i.getArgument(1); + queue.acceptRow(PQ_ROW1); + return null; + }).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location2)), any(), any()); + + // When: + CompletableFuture future = haRouting.handlePullQuery(serviceContext, pullPhysicalPlan, + statement, routingOptions, logicalSchema, queryId, pullQueryQueue); + future.get(); + + // Then: + verify(ksqlClient, times(1)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any()); + verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location2)), any(), any()); + + assertThat(pullQueryQueue.size(), is(1)); + assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW1)); + } + + @Test + public void shouldCallRouteQuery_allStandbysFail() { + // Given: + locate(location1, location2, location3, location4); + doAnswer(i -> { + throw new StandbyFallbackException("Error1!"); + }).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location1, location3)), any(), any()); + when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any())).thenAnswer( + new Answer() { + private int count = 0; + + public Object answer(InvocationOnMock i) { + Map requestProperties = i.getArgument(3); + Consumer> rowConsumer = i.getArgument(4); + if (++count == 1) { + assertThat(requestProperties.get( + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS), is ("2,4")); + rowConsumer.accept( + ImmutableList.of( + StreamedRow.header(queryId, logicalSchema), + StreamedRow.pullRow(GenericRow.fromList(ROW2), Optional.empty()))); + } else { + assertThat(requestProperties.get( + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS), is ("1,3")); + throw new RuntimeException("Error2!"); + } + + return RestResponse.successful(200, 2); + } + } + ); // When: final Exception e = assertThrows( @@ -273,18 +311,10 @@ public Object answer(InvocationOnMock invocation) { ); // Then: - verify(routeQuery).routeQuery(eq(node1), any(), any(), any(), any(), any(), any(), any(), any(), - any()); - assertThat(locationsQueried.get(0).get(0), is(location1)); - assertThat(locationsQueried.get(0).get(1), is(location3)); - verify(routeQuery, times(2)).routeQuery(eq(node2), any(), any(), any(), any(), any(), any(), - any(), any(), any()); - assertThat(locationsQueried.get(1).get(0), is(location2)); - assertThat(locationsQueried.get(1).get(1), is(location4)); - assertThat(locationsQueried.get(2).get(0), is(location1)); - assertThat(locationsQueried.get(2).get(1), is(location3)); - - assertThat(e.getCause().getMessage(), containsString("Unable to execute pull query: foo. " + verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1, location3)), any(), any()); + verify(ksqlClient, times(2)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any()); + + assertThat(e.getCause().getMessage(), containsString("Unable to execute pull query: \"foo\". " + "Exhausted standby hosts to try.")); } @@ -292,14 +322,7 @@ public Object answer(InvocationOnMock invocation) { public void shouldCallRouteQuery_allFiltered() { // Given: when(location1.getNodes()).thenReturn(ImmutableList.of()); - List locations = ImmutableList.of(location1, location2, location3, location4); - when(pullPhysicalPlan.getMaterialization()).thenReturn(materialization); - when(pullPhysicalPlan.getMaterialization().locator()).thenReturn(locator); - when(pullPhysicalPlan.getMaterialization().locator().locate( - pullPhysicalPlan.getKeys(), - routingOptions, - routingFilterFactory - )).thenReturn(locations); + locate(location1, location2, location3, location4); // When: final Exception e = assertThrows( @@ -310,6 +333,139 @@ public void shouldCallRouteQuery_allFiltered() { // Then: assertThat(e.getMessage(), containsString( - "Unable to execute pull query foo. All nodes are dead or exceed max allowed lag.")); + "Unable to execute pull query \"foo\". All nodes are dead or exceed max allowed lag.")); + } + + @Test + public void forwardingError_errorRow() { + // Given: + locate(location2); + when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any())).thenAnswer( + i -> { + Map requestProperties = i.getArgument(3); + Consumer> rowConsumer = i.getArgument(4); + assertThat(requestProperties.get(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS), + is ("2")); + rowConsumer.accept( + ImmutableList.of( + StreamedRow.header(queryId, logicalSchema), + StreamedRow.error(new RuntimeException("Row Error!"), 500))); + return RestResponse.successful(200, 2); + } + ); + + // When: + CompletableFuture future = haRouting.handlePullQuery( + serviceContext, pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId, + pullQueryQueue); + final Exception e = assertThrows( + ExecutionException.class, + future::get + ); + + // Then: + assertThat(pullQueryQueue.size(), is(0)); + assertThat(e.getMessage(), containsString("Row Error!")); + } + + @Test + public void forwardingError_authError() { + // Given: + locate(location2); + when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any())).thenAnswer( + i -> { + Map requestProperties = i.getArgument(3); + Consumer> rowConsumer = i.getArgument(4); + assertThat(requestProperties.get(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS), + is ("2")); + rowConsumer.accept(ImmutableList.of()); + return RestResponse.erroneous(401, "Authentication Error"); + } + ); + + // When: + CompletableFuture future = haRouting.handlePullQuery( + serviceContext, pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId, + pullQueryQueue); + final Exception e = assertThrows( + ExecutionException.class, + future::get + ); + + // Then: + assertThat(pullQueryQueue.size(), is(0)); + assertThat(e.getMessage(), containsString("Authentication Error")); + } + + @Test + public void forwardingError_noRows() { + // Given: + locate(location2); + when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any())).thenAnswer( + i -> { + Map requestProperties = i.getArgument(3); + Consumer> rowConsumer = i.getArgument(4); + assertThat(requestProperties.get(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS), + is ("2")); + rowConsumer.accept(ImmutableList.of()); + return RestResponse.successful(200, 0); + } + ); + + // When: + CompletableFuture future = haRouting.handlePullQuery( + serviceContext, pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId, + pullQueryQueue); + final Exception e = assertThrows( + ExecutionException.class, + future::get + ); + + // Then: + assertThat(pullQueryQueue.size(), is(0)); + assertThat(e.getMessage(), + containsString("empty response from forwarding call, expected a header row")); + } + + @Test + public void forwardingError_invalidSchema() { + // Given: + locate(location2); + when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any())).thenAnswer( + i -> { + Map requestProperties = i.getArgument(3); + Consumer> rowConsumer = i.getArgument(4); + assertThat(requestProperties.get(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS), + is ("2")); + rowConsumer.accept( + ImmutableList.of( + StreamedRow.header(queryId, logicalSchema2), + StreamedRow.error(new RuntimeException("Row Error!"), 500))); + return RestResponse.successful(200, 2); + } + ); + + // When: + CompletableFuture future = haRouting.handlePullQuery( + serviceContext, pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId, + pullQueryQueue); + final Exception e = assertThrows( + ExecutionException.class, + future::get + ); + + // Then: + assertThat(pullQueryQueue.size(), is(0)); + assertThat(e.getMessage(), + containsString("Schemas logicalSchema2 from host node2 differs from schema logicalSchema")); + } + + private void locate(final KsqlPartitionLocation... locations) { + List locationsList = ImmutableList.copyOf(locations); + when(pullPhysicalPlan.getMaterialization().locator().locate( + pullPhysicalPlan.getKeys(), + routingOptions, + routingFilterFactory + )).thenReturn(locationsList); } }