Skip to content

Commit

Permalink
fix: incorrect SR authorization message is displayed (#3186)
Browse files Browse the repository at this point in the history
Fixes a few error messages caused by SchemaRegistry authorization errors.
  • Loading branch information
spena authored Aug 8, 2019
1 parent dd3eb5f commit b3b6c82
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public final class DocumentationLinks {
public static final String SR_REST_GETSUBJECTS_DOC_URL = SCHEMA_REGISTRY_API_DOC_URL
+ "#get--subjects";

public static final String SCHEMA_REGISTRY_SECURITY_DOC_URL = SCHEMA_REGISTRY_DOCS_ROOT_URL
+ "security.html";

private DocumentationLinks() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Streams;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
Expand All @@ -33,13 +34,15 @@
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SqlValueCoercer;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KeySerdeFactory;
import io.confluent.ksql.serde.ValueSerdeFactory;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import java.time.Duration;
Expand All @@ -52,6 +55,8 @@
import java.util.concurrent.Future;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.HttpStatus;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand Down Expand Up @@ -142,21 +147,21 @@ public void execute(
final KsqlConfig config = statement.getConfig()
.cloneWithPropertyOverwrite(statement.getOverrides());

final RowData row = extractRow(insertValues, dataSource);
final byte[] key = serializeKey(row.key, dataSource, config, serviceContext);
final byte[] value = serializeValue(row.value, dataSource, config, serviceContext);
try {
final RowData row = extractRow(insertValues, dataSource);
final byte[] key = serializeKey(row.key, dataSource, config, serviceContext);
final byte[] value = serializeValue(row.value, dataSource, config, serviceContext);

final String topicName = dataSource.getKafkaTopicName();
final String topicName = dataSource.getKafkaTopicName();

final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
topicName,
null,
row.ts,
key,
value
);
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
topicName,
null,
row.ts,
key,
value
);

try {
producer.sendRecord(record, serviceContext, config.getProducerClientConfigProps());
} catch (final Exception e) {
throw new KsqlException("Failed to insert values into stream/table: "
Expand Down Expand Up @@ -342,9 +347,27 @@ private byte[] serializeValue(
NoopProcessingLogContext.INSTANCE
);

final String topicName = dataSource.getKafkaTopicName();

try {
return valueSerde.serializer().serialize(dataSource.getKafkaTopicName(), row);
return valueSerde.serializer().serialize(topicName, row);
} catch (final Exception e) {
if (dataSource.getKsqlTopic().getValueFormat().getFormat() == Format.AVRO) {
final Throwable rootCause = ExceptionUtils.getRootCause(e);
if (rootCause instanceof RestClientException) {
switch (((RestClientException) rootCause).getStatus()) {
case HttpStatus.SC_UNAUTHORIZED:
case HttpStatus.SC_FORBIDDEN:
throw new KsqlException(String.format(
"Not authorized to write Schema Registry subject: [%s]",
topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX
));
default:
break;
}
}
}

throw new KsqlException("Could not serialize row: " + row, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,15 @@ private Optional<SchemaMetadata> getSchema(

return Optional.of(srClient.getLatestSchemaMetadata(subject));
} catch (final RestClientException e) {
if (e.getStatus() == HttpStatus.SC_NOT_FOUND) {
return Optional.empty();
switch (e.getStatus()) {
case HttpStatus.SC_NOT_FOUND:
case HttpStatus.SC_UNAUTHORIZED:
case HttpStatus.SC_FORBIDDEN:
return Optional.empty();
default:
throw new KsqlException("Schema registry fetch for topic "
+ topicName + " request failed.", e);
}

throw new KsqlException("Schema registry fetch for topic "
+ topicName + " request failed.", e);
} catch (final Exception e) {
throw new KsqlException("Schema registry fetch for topic "
+ topicName + " request failed.", e);
Expand Down Expand Up @@ -132,7 +135,11 @@ private static SchemaResult notFound(final String topicName) {
+ System.lineSeparator()
+ "- The schema is registered on a different instance of the Schema Registry"
+ "\t-> Use the REST API to list available subjects"
+ "\t" + DocumentationLinks.SR_REST_GETSUBJECTS_DOC_URL));
+ "\t" + DocumentationLinks.SR_REST_GETSUBJECTS_DOC_URL
+ System.lineSeparator()
+ "- You do not have permissions to access the Schema Registry.Subject: "
+ topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX
+ "\t-> See " + DocumentationLinks.SCHEMA_REGISTRY_SECURITY_DOC_URL));
}

private static SchemaResult notCompatible(
Expand Down
11 changes: 10 additions & 1 deletion ksql-engine/src/main/java/io/confluent/ksql/util/AvroUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,17 @@ private static boolean isValidAvroSchemaForTopic(
// See https://github.com/confluentinc/schema-registry/issues/951
return true;
}

String errorMessage = e.getMessage();
if (e.getStatus() == HttpStatus.SC_UNAUTHORIZED || e.getStatus() == HttpStatus.SC_FORBIDDEN) {
errorMessage = String.format(
"Not authorized to access Schema Registry subject: [%s]",
topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX
);
}

throw new KsqlException(String.format(
"Could not connect to Schema Registry service: %s", e.getMessage()
"Could not connect to Schema Registry service: %s", errorMessage
));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package io.confluent.ksql.engine;

import static org.hamcrest.Matchers.containsString;
import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -485,7 +487,7 @@ public void shouldThrowOnSerializingKeyError() {

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Could not serialize key");
expectedException.expectCause(hasMessage(containsString("Could not serialize key")));

// When:
executor.execute(statement, engine, serviceContext);
Expand All @@ -507,7 +509,7 @@ public void shouldThrowOnSerializingValueError() {

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Could not serialize row");
expectedException.expectCause(hasMessage(containsString("Could not serialize row")));

// When:
executor.execute(statement, engine, serviceContext);
Expand All @@ -525,7 +527,7 @@ public void shouldThrowIfRowKeyAndKeyDoNotMatch() {

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Expected ROWKEY and COL0 to match");
expectedException.expectCause(hasMessage(containsString("Expected ROWKEY and COL0 to match")));

// When:
executor.execute(statement, engine, serviceContext);
Expand All @@ -542,7 +544,7 @@ public void shouldThrowIfNotEnoughValuesSuppliedWithNoSchema() {

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Expected a value for each column");
expectedException.expectCause(hasMessage(containsString("Expected a value for each column")));

// When:
executor.execute(statement, engine, serviceContext);
Expand All @@ -562,7 +564,7 @@ public void shouldFailOnDowncast() {

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Expected type INTEGER for field");
expectedException.expectCause(hasMessage(containsString("Expected type INTEGER for field")));

// When:
executor.execute(statement, engine, serviceContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,40 @@ public void shouldReturnErrorFromGetValueWithIdSchemaIfNotFound() throws Excepti
+ " does not exist in the Schema Registry."));
}

@Test
public void shouldReturnErrorFromGetValueIfUnauthorized() throws Exception {
// Given:
when(srClient.getSchemaMetadata(any(), anyInt()))
.thenThrow(unauthorizedException());

// When:
final SchemaResult result = supplier.getValueSchema(TOPIC_NAME, Optional.of(42));

// Then:
assertThat(result.schemaAndId, is(Optional.empty()));
assertThat(result.failureReason, is(not(Optional.empty())));
assertThat(result.failureReason.get().getMessage(), containsString(
"Avro schema for message values on topic " + TOPIC_NAME
+ " does not exist in the Schema Registry."));
}

@Test
public void shouldReturnErrorFromGetValueIfForbidden() throws Exception {
// Given:
when(srClient.getSchemaMetadata(any(), anyInt()))
.thenThrow(forbiddenException());

// When:
final SchemaResult result = supplier.getValueSchema(TOPIC_NAME, Optional.of(42));

// Then:
assertThat(result.schemaAndId, is(Optional.empty()));
assertThat(result.failureReason, is(not(Optional.empty())));
assertThat(result.failureReason.get().getMessage(), containsString(
"Avro schema for message values on topic " + TOPIC_NAME
+ " does not exist in the Schema Registry."));
}

@Test
public void shouldThrowFromGetValueSchemaOnOtherRestExceptions() throws Exception {
// Given:
Expand Down Expand Up @@ -298,4 +332,12 @@ public void shouldReturnSchemaFromGetValueSchemaIfFound() {
private static Throwable notFoundException() {
return new RestClientException("no found", HttpStatus.SC_NOT_FOUND, -1);
}

private static Throwable unauthorizedException() {
return new RestClientException("unauthorized", HttpStatus.SC_UNAUTHORIZED, -1);
}

private static Throwable forbiddenException() {
return new RestClientException("forbidden", HttpStatus.SC_FORBIDDEN, -1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.confluent.ksql.util;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -203,11 +204,30 @@ public void shouldReturnValidEvolutionIfSubjectNotRegistered() throws Exception
}

@Test
public void shouldThrowOnAnyOtherEvolutionSrException() throws Exception {
public void shouldThrowOnSrAuthorizationErrors() throws Exception {
// Given:
when(srClient.testCompatibility(any(), any()))
.thenThrow(new RestClientException("Unknown subject", 403, 40401));

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Could not connect to Schema Registry service");
expectedException.expectMessage(containsString(String.format(
"Not authorized to access Schema Registry subject: [%s]",
persistentQuery.getResultTopic().getKafkaTopicName()
+ KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX
)));

// When:
AvroUtil.isValidSchemaEvolution(persistentQuery, srClient);
}

@Test
public void shouldThrowOnAnyOtherEvolutionSrException() throws Exception {
// Given:
when(srClient.testCompatibility(any(), any()))
.thenThrow(new RestClientException("Unknown subject", 500, 40401));

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Could not connect to Schema Registry service");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void shouldPropegateInsertValuesExecutorError() throws Exception {

// Then:
assertThat(errContent.toString(UTF_8),
containsString("Test failed: Expected type INTEGER for field ID but got 14.5\n"));
containsString("Test failed: Failed to insert values into stream/table: TEST\n"));
}

private void runTestCaseAndAssertPassed(
Expand Down

0 comments on commit b3b6c82

Please sign in to comment.