Skip to content

Commit

Permalink
feat: Allow to plug-in custom error handling for Connect server errors (
Browse files Browse the repository at this point in the history
#8480)

* ConnectExecutor has an immutable error handler as part of its state
* ConnectExecutor#execute is an instance method now
* CustomExecutors is a class now rather than an enum of static methods
  • Loading branch information
Gerrrr authored Dec 16, 2021
1 parent be89c60 commit c4b4d67
Show file tree
Hide file tree
Showing 32 changed files with 1,094 additions and 271 deletions.
13 changes: 13 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ public class KsqlConfig extends AbstractConfig {
"Custom extension to allow for more fine-grained control of connector requests made by "
+ "ksqlDB. Extensions should implement the ConnectRequestHeadersExtension interface.";

public static final String KSQL_CONNECT_SERVER_ERROR_HANDLER =
KSQL_CONNECT_PREFIX + "error.handler";
public static final String KSQL_CONNECT_SERVER_ERROR_HANDLER_DEFAULT = null;
private static final String KSQL_CONNECT_SERVER_ERROR_HANDLER_DOC =
"A class that allows the ksqlDB server to customize error handling from connector requests.";

public static final String KSQL_ENABLE_UDFS = "ksql.udfs.enabled";

public static final String KSQL_EXT_DIR = "ksql.extension.dir";
Expand Down Expand Up @@ -1330,6 +1336,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_HEADERS_COLUMNS_ENABLED_DOC
)
.define(
KSQL_CONNECT_SERVER_ERROR_HANDLER,
Type.CLASS,
KSQL_CONNECT_SERVER_ERROR_HANDLER_DEFAULT,
Importance.LOW,
KSQL_CONNECT_SERVER_ERROR_HANDLER_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
import io.confluent.ksql.rest.server.computation.CommandStore;
import io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor;
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.rest.server.execution.ConnectServerErrors;
import io.confluent.ksql.rest.server.execution.DefaultConnectServerErrors;
import io.confluent.ksql.rest.server.query.QueryExecutor;
import io.confluent.ksql.rest.server.resources.ClusterStatusResource;
import io.confluent.ksql.rest.server.resources.HealthCheckResource;
Expand Down Expand Up @@ -807,6 +809,8 @@ static KsqlRestApplication buildApplication(
ErrorMessages.class
));

final ConnectServerErrors connectErrorHandler = loadConnectErrorHandler(ksqlConfig);

final Optional<LagReportingAgent> lagReportingAgent =
initializeLagReportingAgent(restConfig, ksqlEngine, serviceContext);
final Optional<HeartbeatAgent> heartbeatAgent =
Expand Down Expand Up @@ -925,6 +929,7 @@ static KsqlRestApplication buildApplication(
versionChecker::updateLastRequestTime,
authorizationValidator,
errorHandler,
connectErrorHandler,
denyListPropertyValidator
);

Expand Down Expand Up @@ -1103,6 +1108,15 @@ private static Optional<AuthenticationPlugin> loadAuthenticationPlugin(
return authenticationPlugin;
}

private static ConnectServerErrors loadConnectErrorHandler(
final KsqlConfig ksqlConfig) {
return Optional.ofNullable(
ksqlConfig.getConfiguredInstance(
KsqlConfig.KSQL_CONNECT_SERVER_ERROR_HANDLER,
ConnectServerErrors.class
)).orElse(new DefaultConnectServerErrors());
}

private void displayWelcomeMessage() {

final Console console = System.console();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;

public final class ConnectExecutor {
private final ConnectServerErrors connectErrorHandler;

private ConnectExecutor() {
ConnectExecutor(final ConnectServerErrors connectErrorHandler) {
this.connectErrorHandler = connectErrorHandler;
}

public static StatementExecutorResponse execute(
public StatementExecutorResponse execute(
final ConfiguredStatement<CreateConnector> statement,
final SessionProperties sessionProperties,
final KsqlExecutionContext executionContext,
Expand Down Expand Up @@ -90,8 +92,7 @@ public static StatementExecutorResponse execute(
}
}

return StatementExecutorResponse.handled(response.error()
.map(err -> new ErrorEntity(statement.getStatementText(), err)));
return StatementExecutorResponse.handled(connectErrorHandler.handle(statement, response));
}

private static List<String> validate(final CreateConnector createConnector,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.execution;

import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;

import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.services.ConnectClient.ConnectResponse;
import io.confluent.ksql.statement.ConfiguredStatement;
import java.util.Optional;

/**
* An interface that allows to plug-in custom error handling for Connect server errors, such as 403
* Forbidden or 401 Unauthorized.
*/
public interface ConnectServerErrors {

/**
* Handles error response for a create connector request. This method dispatches to specific
* methods based on error codes.
*
* @param statement the executed statement
* @param response the failed response
* @return the optional {@link KsqlEntity} that represents server error
*/
default Optional<KsqlEntity> handle(
final ConfiguredStatement<? extends Statement> statement,
final ConnectResponse<?> response) {
if (response.httpCode() == FORBIDDEN.code()) {
return handleForbidden(statement, response);
} else if (response.httpCode() == UNAUTHORIZED.code()) {
return handleUnauthorized(statement, response);
} else {
return handleDefault(statement, response);
}
}

/**
* This method allows altering error response on 403 Forbidden.
*
* @param statement the executed statement
* @param response the failed response
* @return the optional {@code KsqlEntity} that represents server error
*/
Optional<KsqlEntity> handleForbidden(
ConfiguredStatement<? extends Statement> statement,
ConnectResponse<?> response);

/**
* This method allows altering error response on 401 Unauthorized.
*
* @param statement the executed statement
* @param response the failed response
* @return the optional {@code KsqlEntity} that represents server error
*/
Optional<KsqlEntity> handleUnauthorized(
ConfiguredStatement<? extends Statement> statement,
ConnectResponse<?> response);

/**
* This method is a fall-back for errors that are not handled by the error-specific methods.
*
* @param statement the executed statement
* @param response the failed response
* @return the optional {@code KsqlEntity} that represents server error
*/
Optional<KsqlEntity> handleDefault(
ConfiguredStatement<? extends Statement> statement,
ConnectResponse<?> response);
}
Loading

0 comments on commit c4b4d67

Please sign in to comment.