Skip to content

Commit

Permalink
fix: Bubble up errors from HARouting unless using StandbyFallbackExce…
Browse files Browse the repository at this point in the history
…ption (#7238)

* fix: Bubble up errors from HARouting unless explicit use of StandbyFallbackException
  • Loading branch information
AlanConfluent authored Mar 26, 2021
1 parent b0a25c1 commit ec12516
Show file tree
Hide file tree
Showing 3 changed files with 431 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlRequestConfig;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.KsqlStatementException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -121,7 +119,7 @@ public CompletableFuture<Void> handlePullQuery(
LOG.debug("Unable to execute pull query: {}. All nodes are dead or exceed max allowed lag.",
statement.getStatementText());
throw new MaterializationException(String.format(
"Unable to execute pull query %s. All nodes are dead or exceed max allowed lag.",
"Unable to execute pull query \"%s\". All nodes are dead or exceed max allowed lag.",
statement.getStatementText()));
}

Expand Down Expand Up @@ -169,32 +167,37 @@ private void executeRounds(

// Make requests to each host, specifying the partitions we're interested in from
// this host.
final Map<KsqlNode, Future<Void>> futures = new LinkedHashMap<>();
final Map<KsqlNode, Future<RoutingResult>> futures = new LinkedHashMap<>();
for (Map.Entry<KsqlNode, List<KsqlPartitionLocation>> entry : groupedByHost.entrySet()) {
final KsqlNode node = entry.getKey();
futures.put(node, executorService.submit(
() -> {
routeQuery.routeQuery(
node, entry.getValue(), statement, serviceContext, routingOptions,
pullQueryMetrics, pullPhysicalPlan, outputSchema, queryId, pullQueryQueue);
return null;
}
() -> routeQuery.routeQuery(
node, entry.getValue(), statement, serviceContext, routingOptions,
pullQueryMetrics, pullPhysicalPlan, outputSchema, queryId, pullQueryQueue)
));
}

// Go through all of the results of the requests, either aggregating rows or adding
// the locations to the nextRoundRemaining list.
final ImmutableList.Builder<KsqlPartitionLocation> nextRoundRemaining
= ImmutableList.builder();
for (Map.Entry<KsqlNode, Future<Void>> entry : futures.entrySet()) {
final Future<Void> future = entry.getValue();
for (Map.Entry<KsqlNode, Future<RoutingResult>> entry : futures.entrySet()) {
final Future<RoutingResult> future = entry.getValue();
final KsqlNode node = entry.getKey();
RoutingResult routingResult = null;
try {
future.get();
routingResult = future.get();
} catch (ExecutionException e) {
LOG.warn("Error routing query {} to host {} at timestamp {} with exception {}",
statement.getStatementText(), node, System.currentTimeMillis(), e.getCause());
throw new MaterializationException(String.format(
"Unable to execute pull query \"%s\". %s",
statement.getStatementText(), e.getCause().getMessage()));
}
if (routingResult == RoutingResult.STANDBY_FALLBACK) {
nextRoundRemaining.addAll(groupedByHost.get(node));
} else {
Preconditions.checkState(routingResult == RoutingResult.SUCCESS);
}
}
remainingLocations = nextRoundRemaining.build();
Expand Down Expand Up @@ -225,7 +228,7 @@ private static Map<KsqlNode, List<KsqlPartitionLocation>> groupByHost(
// If one of the partitions required is out of nodes, then we cannot continue.
if (round >= location.getNodes().size()) {
throw new MaterializationException(String.format(
"Unable to execute pull query: %s. Exhausted standby hosts to try.",
"Unable to execute pull query: \"%s\". Exhausted standby hosts to try.",
statement.getStatementText()));
}
final KsqlNode nextHost = location.getNodes().get(round);
Expand All @@ -236,7 +239,7 @@ private static Map<KsqlNode, List<KsqlPartitionLocation>> groupByHost(

@VisibleForTesting
interface RouteQuery {
void routeQuery(
RoutingResult routeQuery(
KsqlNode node,
List<KsqlPartitionLocation> locations,
ConfiguredStatement<Query> statement,
Expand All @@ -251,7 +254,7 @@ void routeQuery(
}

@VisibleForTesting
static void executeOrRouteQuery(
static RoutingResult executeOrRouteQuery(
final KsqlNode node,
final List<KsqlPartitionLocation> locations,
final ConfiguredStatement<Query> statement,
Expand All @@ -273,29 +276,41 @@ static void executeOrRouteQuery(
pullQueryMetrics
.ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordLocalRequests(1));
pullPhysicalPlan.execute(locations, pullQueryQueue, rowFactory);
} catch (Exception e) {
LOG.error("Error executing query {} locally at node {} with exception",
return RoutingResult.SUCCESS;
} catch (StandbyFallbackException e) {
LOG.warn("Error executing query {} locally at node {}. Falling back to standby state which "
+ "may return stale results",
statement.getStatementText(), node, e.getCause());
return RoutingResult.STANDBY_FALLBACK;
} catch (Exception e) {
LOG.error("Error executing query {} locally at node {}",
statement.getStatementText(), node.location(), e.getCause());
throw new KsqlException(
String.format("Error executing query %s locally at node %s",
statement.getStatementText(), node),
String.format("Error executing query locally at node %s: %s", node.location(),
e.getMessage()),
e
);
}
} else {
try {
LOG.debug("Query {} routed to host {} at timestamp {}.",
statement.getStatementText(), node.location(), System.currentTimeMillis());
statement.getStatementText(), node.location(), System.currentTimeMillis());
pullQueryMetrics
.ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordRemoteRequests(1));
forwardTo(node, locations, statement, serviceContext, pullQueryQueue, rowFactory,
outputSchema);
return RoutingResult.SUCCESS;
} catch (StandbyFallbackException e) {
LOG.warn("Error forwarding query {} to node {}. Falling back to standby state which may "
+ "return stale results",
statement.getStatementText(), node.location(), e.getCause());
return RoutingResult.STANDBY_FALLBACK;
} catch (Exception e) {
LOG.error("Error forwarding query {} to node {} with exception {}",
LOG.error("Error forwarding query {} to node {}",
statement.getStatementText(), node, e.getCause());
throw new KsqlException(
String.format("Error forwarding query %s to node %s",
statement.getStatementText(), node),
String.format("Error forwarding query to node %s: %s", node.location(),
e.getMessage()),
e
);
}
Expand All @@ -322,83 +337,105 @@ private static void forwardTo(
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING, true,
KsqlRequestConfig.KSQL_REQUEST_INTERNAL_REQUEST, true,
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS, partitions);
final RestResponse<Integer> response = serviceContext
.getKsqlClient()
.makeQueryRequest(
owner.location(),
statement.getStatementText(),
statement.getSessionConfig().getOverrides(),
requestProperties,
streamedRowsHandler(owner, statement, requestProperties, pullQueryQueue, rowFactory,
outputSchema)
);
final RestResponse<Integer> response;

try {
response = serviceContext
.getKsqlClient()
.makeQueryRequest(
owner.location(),
statement.getStatementText(),
statement.getSessionConfig().getOverrides(),
requestProperties,
streamedRowsHandler(owner, pullQueryQueue, rowFactory, outputSchema)
);
} catch (Exception e) {
// If we threw some explicit exception, then let it bubble up. All of the row handling is
// wrapped in a KsqlException, so any intentional exception or bug will be surfaced.
final KsqlException ksqlException = causedByKsqlException(e);
if (ksqlException != null) {
throw ksqlException;
}
// If we get some kind of unknown error, we assume it's network or other error from the
// KsqlClient and try standbys
throw new StandbyFallbackException(String.format(
"Forwarding pull query request [%s, %s] failed with error %s ",
statement.getSessionConfig().getOverrides(), requestProperties,
e.getMessage()), e);
}

if (response.isErroneous()) {
throw new KsqlServerException(String.format(
"Forwarding pull query request [%s, %s, %s] to node %s failed with error %s ",
statement.getStatement(), statement.getSessionConfig().getOverrides(), requestProperties,
owner, response.getErrorMessage()));
throw new KsqlException(String.format(
"Forwarding pull query request [%s, %s] failed with error %s ",
statement.getSessionConfig().getOverrides(), requestProperties,
response.getErrorMessage()));
}

final int numRows = response.getResponse();
if (numRows == 0) {
throw new KsqlServerException(String.format(
"Forwarding pull query request [%s, %s, %s] to node %s failed due to invalid "
throw new KsqlException(String.format(
"Forwarding pull query request [%s, %s] failed due to invalid "
+ "empty response from forwarding call, expected a header row.",
statement.getStatement(), statement.getSessionConfig().getOverrides(), requestProperties,
owner));
statement.getSessionConfig().getOverrides(), requestProperties));
}
}

private static KsqlException causedByKsqlException(final Exception e) {
Throwable throwable = e;
while (throwable != null) {
if (throwable instanceof KsqlException) {
return (KsqlException) throwable;
}
throwable = throwable.getCause();
}
return null;
}

private static Consumer<List<StreamedRow>> streamedRowsHandler(
final KsqlNode owner,
final ConfiguredStatement<Query> statement,
final Map<String, Object> requestProperties,
final PullQueryQueue pullQueryQueue,
final BiFunction<List<?>, LogicalSchema, PullQueryRow> rowFactory,
final LogicalSchema outputSchema
) {
final AtomicInteger processedRows = new AtomicInteger(0);
final AtomicReference<Header> header = new AtomicReference<>();
return streamedRows -> {
if (streamedRows == null || streamedRows.isEmpty()) {
return;
}
final List<PullQueryRow> rows = new ArrayList<>();

// If this is the first row overall, skip the header
final int previousProcessedRows = processedRows.getAndAdd(streamedRows.size());
for (int i = 0; i < streamedRows.size(); i++) {
final StreamedRow row = streamedRows.get(i);
if (i == 0 && previousProcessedRows == 0) {
final Optional<Header> optionalHeader = row.getHeader();
optionalHeader.ifPresent(h -> validateSchema(outputSchema, h.getSchema(), owner));
optionalHeader.ifPresent(header::set);
continue;
try {
if (streamedRows == null || streamedRows.isEmpty()) {
return;
}
final List<PullQueryRow> rows = new ArrayList<>();

if (row.getErrorMessage().isPresent()) {
throw new KsqlStatementException(
row.getErrorMessage().get().getMessage(),
statement.getStatementText()
);
}
// If this is the first row overall, skip the header
final int previousProcessedRows = processedRows.getAndAdd(streamedRows.size());
for (int i = 0; i < streamedRows.size(); i++) {
final StreamedRow row = streamedRows.get(i);
if (i == 0 && previousProcessedRows == 0) {
final Optional<Header> optionalHeader = row.getHeader();
optionalHeader.ifPresent(h -> validateSchema(outputSchema, h.getSchema(), owner));
optionalHeader.ifPresent(header::set);
continue;
}

if (!row.getRow().isPresent()) {
throw new KsqlServerException(String.format(
"Forwarding pull query request [%s, %s, %s] to node %s failed due to "
+ "missing row data.",
statement.getStatement(), statement.getSessionConfig().getOverrides(),
requestProperties, owner));
}
if (row.getErrorMessage().isPresent()) {
// If we receive an error that's not a network error, we let that bubble up.
throw new KsqlException(row.getErrorMessage().get().getMessage());
}

final List<?> r = row.getRow().get().getColumns();
Preconditions.checkNotNull(header.get());
rows.add(rowFactory.apply(r, header.get().getSchema()));
}
if (!row.getRow().isPresent()) {
throw new KsqlException("Missing row data on row " + i + " of chunk");
}

final List<?> r = row.getRow().get().getColumns();
Preconditions.checkNotNull(header.get());
rows.add(rowFactory.apply(r, header.get().getSchema()));
}

if (!pullQueryQueue.acceptRows(rows)) {
LOG.info("Failed to queue all rows");
if (!pullQueryQueue.acceptRows(rows)) {
LOG.error("Failed to queue all rows");
}
} catch (Exception e) {
throw new KsqlException("Error handling streamed rows: " + e.getMessage(), e);
}
};
}
Expand All @@ -414,4 +451,9 @@ private static void validateSchema(
forwardedSchema, forwardedNode, expectedSchema));
}
}

private enum RoutingResult {
SUCCESS,
STANDBY_FALLBACK
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.physical.pull;

import io.confluent.ksql.util.KsqlException;

/**
* This exception is thrown to indicate that pull queries should fallback on the next standby in
* line.
*/
public class StandbyFallbackException extends KsqlException {

public StandbyFallbackException(final Throwable cause) {
super(cause);
}

public StandbyFallbackException(final String message) {
super(message);
}

public StandbyFallbackException(final String message, final Throwable cause) {
super(message, cause);
}
}
Loading

0 comments on commit ec12516

Please sign in to comment.