Skip to content

Commit

Permalink
refactor: move custom schema registry not configured error messages t…
Browse files Browse the repository at this point in the history
…o rest endpoints (#4348)
  • Loading branch information
stevenpyzhang authored Jan 18, 2020
1 parent 0d542c5 commit 4a74d24
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;

import org.apache.kafka.common.config.ConfigException;

public class KsqlSchemaRegistryNotConfiguredException extends ConfigException {

public KsqlSchemaRegistryNotConfiguredException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,29 @@
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;

import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlSchemaRegistryNotConfiguredException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.config.ConfigException;

/**
* Implements the SchemaRegistryClient interface. Used as default when the Schema Registry URL isn't
* specified in {@link io.confluent.ksql.util.KsqlConfig}
*/
public class DefaultSchemaRegistryClient implements SchemaRegistryClient {

private final ConfigException configException;

public DefaultSchemaRegistryClient(final ErrorMessages errorMessages) {
configException = new ConfigException(errorMessages.schemaRegistryUnconfiguredErrorMessage());

public static final String SCHEMA_REGISTRY_CONFIG_NOT_SET =
"KSQL is not configured to use a schema registry. To enable it, please set "
+ KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY;

private final KsqlSchemaRegistryNotConfiguredException configException;

public DefaultSchemaRegistryClient() {
configException =
new KsqlSchemaRegistryNotConfiguredException(SCHEMA_REGISTRY_CONFIG_NOT_SET);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
import org.apache.kafka.common.network.Mode;
Expand All @@ -39,7 +37,6 @@ public class KsqlSchemaRegistryClientFactory {
private final SchemaRegistryClientFactory schemaRegistryClientFactory;
private final Map<String, String> httpHeaders;
private final String schemaRegistryUrl;
private final ErrorMessages errorMessages;

interface SchemaRegistryClientFactory {
CachedSchemaRegistryClient create(RestService service,
Expand All @@ -50,20 +47,17 @@ CachedSchemaRegistryClient create(RestService service,

public KsqlSchemaRegistryClientFactory(
final KsqlConfig config,
final ErrorMessages errorMessages,
final Map<String, String> schemaRegistryHttpHeaders
) {
this(config, errorMessages, newSchemaRegistrySslFactory(config), schemaRegistryHttpHeaders);
this(config, newSchemaRegistrySslFactory(config), schemaRegistryHttpHeaders);
}

public KsqlSchemaRegistryClientFactory(
final KsqlConfig config,
final ErrorMessages errorMessages,
final SslFactory sslFactory,
final Map<String, String> schemaRegistryHttpHeaders
) {
this(config,
errorMessages,
() -> new RestService(config.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY)),
sslFactory,
CachedSchemaRegistryClient::new,
Expand All @@ -76,7 +70,6 @@ public KsqlSchemaRegistryClientFactory(

@VisibleForTesting
KsqlSchemaRegistryClientFactory(final KsqlConfig config,
final ErrorMessages errorMessages,
final Supplier<RestService> serviceSupplier,
final SslFactory sslFactory,
final SchemaRegistryClientFactory schemaRegistryClientFactory,
Expand All @@ -89,7 +82,6 @@ public KsqlSchemaRegistryClientFactory(
this.schemaRegistryClientFactory = schemaRegistryClientFactory;
this.httpHeaders = httpHeaders;
this.schemaRegistryUrl = config.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY).trim();
this.errorMessages = Objects.requireNonNull(errorMessages, "errorMessages");
}

/**
Expand All @@ -109,7 +101,7 @@ static void configureSslFactory(final KsqlConfig config, final SslFactory sslFac

public SchemaRegistryClient get() {
if (schemaRegistryUrl.equals("")) {
return new DefaultSchemaRegistryClient(errorMessages);
return new DefaultSchemaRegistryClient();
}

final RestService restService = serviceSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.confluent.ksql.services;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Collections;
Expand All @@ -38,7 +37,6 @@ public static ServiceContext create(
new DefaultKafkaClientSupplier(),
new KsqlSchemaRegistryClientFactory(
ksqlConfig,
new DefaultErrorMessages(),
Collections.emptyMap())::get,
() -> new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY),
Optional.empty()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -61,8 +60,6 @@ public class KsqlSchemaRegistryClientFactoryTest {
private SslFactory sslFactory;
@Mock
private SslEngineBuilder sslEngineBuilder;
@Mock
private ErrorMessages errorMessages;

@Mock
private KsqlSchemaRegistryClientFactory.SchemaRegistryClientFactory srClientFactory;
Expand Down Expand Up @@ -139,9 +136,9 @@ public void shouldUseDefaultSchemaRegistryClientWhenUrlNotSpecified() {

// When:
SchemaRegistryClient client1 = new KsqlSchemaRegistryClientFactory(
config1, errorMessages, restServiceSupplier, sslFactory, srClientFactory, Collections.emptyMap()).get();
config1, restServiceSupplier, sslFactory, srClientFactory, Collections.emptyMap()).get();
SchemaRegistryClient client2 = new KsqlSchemaRegistryClientFactory(
config2, errorMessages, restServiceSupplier, sslFactory, srClientFactory, Collections.emptyMap()).get();
config2, restServiceSupplier, sslFactory, srClientFactory, Collections.emptyMap()).get();

// Then:
assertThat(client1, instanceOf(DefaultSchemaRegistryClient.class));
Expand All @@ -165,7 +162,7 @@ public void shouldPassBasicAuthCredentialsToSchemaRegistryClient() {

// When:
new KsqlSchemaRegistryClientFactory(
config, errorMessages, restServiceSupplier, sslFactory, srClientFactory, Collections.emptyMap()).get();
config, restServiceSupplier, sslFactory, srClientFactory, Collections.emptyMap()).get();

// Then:
verify(restService).setSslSocketFactory(isA(SSL_CONTEXT.getSocketFactory().getClass()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,7 @@ static KsqlRestApplication buildApplication(
) {
final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties());
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory =
new KsqlSchemaRegistryClientFactory(ksqlConfig, restConfig.getConfiguredInstance(
KsqlRestConfig.KSQL_SERVER_ERROR_MESSAGES,
ErrorMessages.class
),Collections.emptyMap())::get;
new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap())::get;
final ServiceContext serviceContext = new LazyServiceContext(() ->
RestServiceContextFactory.create(ksqlConfig, Optional.empty(),
schemaRegistryClientFactory));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.rest.client.BasicCredentials;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.RestResponse;
Expand All @@ -39,7 +37,6 @@
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.entity.TablesList;
import io.confluent.ksql.rest.server.context.KsqlSecurityContextBinder;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.security.KsqlSecurityContext;
Expand Down Expand Up @@ -446,7 +443,7 @@ private Builder(final Supplier<String> bootstrapServers) {
() -> defaultServiceContext(bootstrapServers, buildBaseConfig(additionalProps));
this.securityContextBinder = (config, securityExtension) ->
new KsqlSecurityContextBinder(config, securityExtension,
new KsqlSchemaRegistryClientFactory(config, new DefaultErrorMessages(), Collections.emptyMap())::get);
new KsqlSchemaRegistryClientFactory(config, Collections.emptyMap())::get);
}

@SuppressWarnings("unused") // Part of public API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,17 @@

package io.confluent.ksql.rest;

import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.ErrorMessageUtil;

public class DefaultErrorMessages implements ErrorMessages {

@Override
public String kafkaAuthorizationErrorMessage(final Exception e) {
return e.getMessage();
return ErrorMessageUtil.buildErrorMessage(e);
}

@Override
public String schemaRegistryUnconfiguredErrorMessage() {
return "KSQL is not configured to use a schema registry. To enable it, please set "
+ KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY;
public String schemaRegistryUnconfiguredErrorMessage(final Exception e) {
return ErrorMessageUtil.buildErrorMessage(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ public interface ErrorMessages {

String kafkaAuthorizationErrorMessage(Exception e);

String schemaRegistryUnconfiguredErrorMessage();
String schemaRegistryUnconfiguredErrorMessage(Exception e);
}
20 changes: 20 additions & 0 deletions ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.PRECONDITION_REQUIRED;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
import static javax.ws.rs.core.Response.Status.UNAUTHORIZED;

import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage;
import io.confluent.ksql.util.KsqlSchemaRegistryNotConfiguredException;
import java.util.Objects;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.common.errors.TopicAuthorizationException;

Expand All @@ -45,6 +48,9 @@ public final class Errors {
public static final int ERROR_CODE_FORBIDDEN_KAFKA_ACCESS =
toErrorCode(FORBIDDEN.getStatusCode()) + 1;

public static final int ERROR_CODE_SCHEMA_REGISTRY_UNCOFIGURED =
toErrorCode(PRECONDITION_REQUIRED.getStatusCode()) + 1;

public static final int ERROR_CODE_NOT_FOUND = toErrorCode(NOT_FOUND.getStatusCode());

public static final int ERROR_CODE_SERVER_SHUTTING_DOWN =
Expand Down Expand Up @@ -91,6 +97,13 @@ private Response constructAccessDeniedFromKafkaResponse(final String errorMessag
.build();
}

private Response constructSchemaRegistryNotConfiguredResponse(final String errorMessage) {
return Response
.status(PRECONDITION_REQUIRED)
.entity(new KsqlErrorMessage(ERROR_CODE_SCHEMA_REGISTRY_UNCOFIGURED, errorMessage))
.build();
}

public static Response badRequest(final String msg) {
return Response
.status(BAD_REQUEST)
Expand Down Expand Up @@ -203,6 +216,11 @@ public Response accessDeniedFromKafkaResponse(final Exception e) {
return constructAccessDeniedFromKafkaResponse(errorMessages.kafkaAuthorizationErrorMessage(e));
}

public Response schemaRegistryNotConfiguredResponse(final Exception e) {
return constructSchemaRegistryNotConfiguredResponse(
errorMessages.schemaRegistryUnconfiguredErrorMessage(e));
}

public String kafkaAuthorizationErrorMessage(final Exception e) {
return errorMessages.kafkaAuthorizationErrorMessage(e);
}
Expand All @@ -213,6 +231,8 @@ public Response generateResponse(
) {
if (ExceptionUtils.indexOfType(e, TopicAuthorizationException.class) >= 0) {
return accessDeniedFromKafkaResponse(e);
} else if (ExceptionUtils.indexOfType(e, KsqlSchemaRegistryNotConfiguredException.class) >= 0) {
return schemaRegistryNotConfiguredResponse(e);
} else {
return defaultResponse;
}
Expand Down
Loading

0 comments on commit 4a74d24

Please sign in to comment.