Skip to content

Commit

Permalink
refactor: wrap SchemaRegistry authorization errors into a KsqlSchemaA…
Browse files Browse the repository at this point in the history
…uthorizationException (#7783)
  • Loading branch information
spena authored Jul 13, 2021
1 parent e05c6ce commit 54eb419
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.engine.generic.GenericRecordFactory;
import io.confluent.ksql.engine.generic.KsqlGenericRecord;
import io.confluent.ksql.exception.KsqlSchemaAuthorizationException;
import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
Expand Down Expand Up @@ -274,7 +275,7 @@ private byte[] serializeKey(
FormatFactory.fromName(dataSource.getKsqlTopic().getKeyFormat().getFormat()),
topicName,
true,
"write",
AclOperation.WRITE,
e);
LOG.error("Could not serialize key.", e);
throw new KsqlException("Could not serialize key: " + keyValue, e);
Expand Down Expand Up @@ -311,7 +312,8 @@ private static void ensureKeySchemasMatch(
true);

} catch (final KsqlException e) {
maybeThrowSchemaRegistryAuthError(format, dataSource.getKafkaTopicName(), true, "read", e);
maybeThrowSchemaRegistryAuthError(format, dataSource.getKafkaTopicName(), true,
AclOperation.READ, e);
throw new KsqlException("Could not determine that insert values operations is safe; "
+ "operation potentially overrides existing key schema in schema registry.", e);
}
Expand Down Expand Up @@ -355,7 +357,7 @@ private byte[] serializeValue(
FormatFactory.fromName(dataSource.getKsqlTopic().getValueFormat().getFormat()),
topicName,
false,
"write",
AclOperation.WRITE,
e);
LOG.error("Could not serialize value.", e);
throw new KsqlException("Could not serialize value: " + row + ". " + e.getMessage(), e);
Expand All @@ -366,7 +368,7 @@ private static void maybeThrowSchemaRegistryAuthError(
final Format format,
final String topicName,
final boolean isKey,
final String op,
final AclOperation op,
final Exception e
) {
if (format.supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) {
Expand All @@ -375,11 +377,10 @@ private static void maybeThrowSchemaRegistryAuthError(
switch (((RestClientException) rootCause).getStatus()) {
case HttpStatus.SC_UNAUTHORIZED:
case HttpStatus.SC_FORBIDDEN:
throw new KsqlException(String.format(
"Not authorized to %s Schema Registry subject: [%s]",
throw new KsqlSchemaAuthorizationException(
op,
KsqlConstants.getSRSubject(topicName, isKey)
));
);
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.exception;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.acl.AclOperation;

public class KsqlSchemaAuthorizationException extends RuntimeException {
public KsqlSchemaAuthorizationException(final AclOperation operation, final String objectName) {
super(String.format("Authorization denied to %s on Schema Registry subject: [%s]",
StringUtils.capitalize(
operation.toString().toLowerCase()),
objectName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufSerializer;
import io.confluent.kafka.serializers.subject.DefaultReferenceSubjectNameStrategy;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.exception.KsqlSchemaAuthorizationException;
import io.confluent.ksql.execution.ddl.commands.CreateSourceCommand;
import io.confluent.ksql.parser.properties.with.SourcePropertiesUtil;
import io.confluent.ksql.parser.tree.CreateAsSelect;
Expand All @@ -50,6 +51,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.common.acl.AclOperation;

public class SchemaRegisterInjector implements Injector {

Expand Down Expand Up @@ -229,6 +231,16 @@ private void registerSchema(
}
}
} catch (IOException | RestClientException e) {
if (SchemaRegistryUtil.isAuthErrorCode(e)) {
final AclOperation deniedOperation = SchemaRegistryUtil.getDeniedOperation(e.getMessage());

if (deniedOperation != AclOperation.UNKNOWN) {
throw new KsqlSchemaAuthorizationException(
deniedOperation,
subject);
}
}

throw new KsqlStatementException(
"Could not register schema for topic: " + e.getMessage(),
statementText,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ private static SchemaResult incorrectFormat(
private static SchemaResult notFound(final String topicName, final boolean isKey) {
final String subject = getSRSubject(topicName, isKey);
return SchemaResult.failure(new KsqlException(
"Schema for message " + (isKey ? "keys" : "values") + " on topic " + topicName
"Schema for message " + (isKey ? "keys" : "values") + " on topic '" + topicName + "'"
+ " does not exist in the Schema Registry."
+ System.lineSeparator()
+ "Subject: " + subject
+ System.lineSeparator()
+ "Possible causes include:"
Expand All @@ -196,7 +197,7 @@ private static SchemaResult notFound(final String topicName, final boolean isKey
+ "\t-> Use the REST API to list available subjects"
+ "\t" + DocumentationLinks.SR_REST_GETSUBJECTS_DOC_URL
+ System.lineSeparator()
+ "- You do not have permissions to access the Schema Registry.Subject: " + subject
+ "- You do not have permissions to access the Schema Registry."
+ System.lineSeparator()
+ "\t-> See " + DocumentationLinks.SCHEMA_REGISTRY_SECURITY_DOC_URL));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,26 @@
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.exception.KsqlSchemaAuthorizationException;
import io.confluent.ksql.util.ExecutorUtil;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.kafka.common.acl.AclOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class SchemaRegistryUtil {

private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryUtil.class);

private static final Pattern DENIED_OPERATION_STRING_PATTERN =
Pattern.compile("User is denied operation (.*) on .*");

@VisibleForTesting
public static final int SUBJECT_NOT_FOUND_ERROR_CODE = 40401;

Expand Down Expand Up @@ -69,10 +77,21 @@ private static Stream<String> getSubjectNames(
public static void deleteSubjectWithRetries(
final SchemaRegistryClient schemaRegistryClient,
final String subject) throws Exception {
ExecutorUtil.executeWithRetries(
() -> schemaRegistryClient.deleteSubject(subject),
error -> !isSubjectNotFoundErrorCode(error)
);
try {
ExecutorUtil.executeWithRetries(
() -> schemaRegistryClient.deleteSubject(subject),
error -> isRetriableError(error)
);
} catch (final RestClientException e) {
if (isAuthErrorCode(e)) {
throw new KsqlSchemaAuthorizationException(
AclOperation.DELETE,
subject
);
}

throw e;
}
}


Expand All @@ -93,7 +112,7 @@ public static Optional<SchemaMetadata> getLatestSchema(
return getLatestSchema(srClient, subject);
}

private static Optional<SchemaMetadata> getLatestSchema(
public static Optional<SchemaMetadata> getLatestSchema(
final SchemaRegistryClient srClient,
final String subject
) {
Expand All @@ -104,22 +123,68 @@ private static Optional<SchemaMetadata> getLatestSchema(
if (isSubjectNotFoundErrorCode(e)) {
return Optional.empty();
}

if (isAuthErrorCode(e)) {
final AclOperation deniedOperation = SchemaRegistryUtil.getDeniedOperation(e.getMessage());

if (deniedOperation != AclOperation.UNKNOWN) {
throw new KsqlSchemaAuthorizationException(
deniedOperation,
subject
);
}
}

throw new KsqlException("Could not get latest schema for subject " + subject, e);
}
}

public static AclOperation getDeniedOperation(final String errorMessage) {
final Matcher matcher = DENIED_OPERATION_STRING_PATTERN.matcher(errorMessage);
if (matcher.matches()) {
return AclOperation.fromString(matcher.group(1));
} else {
return AclOperation.UNKNOWN;
}
}

public static boolean isSubjectNotFoundErrorCode(final Throwable error) {
return (error instanceof RestClientException
&& ((RestClientException) error).getErrorCode() == SUBJECT_NOT_FOUND_ERROR_CODE);
}

public static boolean isAuthErrorCode(final Throwable error) {
return (error instanceof RestClientException
&& ((((RestClientException) error).getStatus() == HttpStatus.SC_UNAUTHORIZED)
|| ((RestClientException) error).getStatus() == HttpStatus.SC_FORBIDDEN));
}

private static boolean isRetriableError(final Throwable error) {
return !isSubjectNotFoundErrorCode(error) && !isAuthErrorCode(error);
}

private static void hardDeleteSubjectWithRetries(
final SchemaRegistryClient schemaRegistryClient,
final String subject) throws Exception {
ExecutorUtil.executeWithRetries(
() -> schemaRegistryClient.deleteSubject(subject, true),
error -> !isSubjectNotFoundErrorCode(error)
);
try {
ExecutorUtil.executeWithRetries(
() -> schemaRegistryClient.deleteSubject(subject, true),
error -> isRetriableError(error)
);
} catch (final RestClientException e) {
if (isAuthErrorCode(e)) {
final AclOperation deniedOperation = SchemaRegistryUtil.getDeniedOperation(e.getMessage());

if (deniedOperation != AclOperation.UNKNOWN) {
throw new KsqlSchemaAuthorizationException(
deniedOperation,
subject
);
}
}

throw e;
}
}

private static Stream<String> getInternalSubjectNames(
Expand Down Expand Up @@ -164,4 +229,6 @@ private static void tryDeleteInternalSubject(
+ ", subject: " + subjectName, e);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,8 @@ public void shouldThrowWhenNotAuthorizedToReadKeySchemaToSR() throws Exception {

// Then:
assertThat(e.getMessage(), containsString(
"Not authorized to read Schema Registry subject: [" + KsqlConstants.getSRSubject(TOPIC_NAME, true)));
"Authorization denied to Read on Schema Registry subject: ["
+ KsqlConstants.getSRSubject(TOPIC_NAME, true)));
}

@Test
Expand Down Expand Up @@ -1006,7 +1007,8 @@ public void shouldThrowWhenNotAuthorizedToWriteKeySchemaToSR() throws Exception

// Then:
assertThat(e.getMessage(), containsString(
"Not authorized to write Schema Registry subject: [" + KsqlConstants.getSRSubject(TOPIC_NAME, true)));
"Authorization denied to Write on Schema Registry subject: ["
+ KsqlConstants.getSRSubject(TOPIC_NAME, true)));
}

@Test
Expand Down Expand Up @@ -1042,7 +1044,8 @@ public void shouldThrowWhenNotAuthorizedToWriteValSchemaToSR() throws Exception

// Then:
assertThat(e.getMessage(), containsString(
"Not authorized to write Schema Registry subject: [" + KsqlConstants.getSRSubject(TOPIC_NAME, false)));
"Authorization denied to Write on Schema Registry subject: ["
+ KsqlConstants.getSRSubject(TOPIC_NAME, false)));
}

private static ConfiguredStatement<InsertValues> givenInsertValues(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;
Expand All @@ -40,6 +41,7 @@
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.exception.KsqlSchemaAuthorizationException;
import io.confluent.ksql.execution.ddl.commands.CreateSourceCommand;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.plan.Formats;
Expand Down Expand Up @@ -296,6 +298,25 @@ public void shouldPropagateErrorOnFailureToPlanQuery() {
"Could not determine output schema for query due to error: fail!"));
}

@Test
public void shouldThrowAuthorizationException() throws Exception {
// Given:
givenStatement("CREATE STREAM sink WITH(value_format='AVRO') AS SELECT * FROM SOURCE;");
when(schemaRegistryClient.register(anyString(), any(ParsedSchema.class)))
.thenThrow(new RestClientException(
"User is denied operation Write on Subject", 403, 40301));

// When:
final Exception e = assertThrows(
KsqlSchemaAuthorizationException.class,
() -> injector.inject(statement)
);

// Then:
assertThat(e.getMessage(), equalTo(
"Authorization denied to Write on Schema Registry subject: [SINK-key]"));
}

@Test
public void shouldPropagateErrorOnSRClientError() throws Exception {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ private void verifyFailureMessage(final SchemaResult result,
final String keyOrValue = isKey ? "keys" : "values";
final String keyOrValueSuffix = isKey ? "key" : "value";
assertThat(result.failureReason.get().getMessage(), is(
"Schema for message " + keyOrValue + " on topic " + TOPIC_NAME + " does not exist in the Schema Registry.Subject: "
+ TOPIC_NAME + "-" + keyOrValueSuffix + System.lineSeparator()
"Schema for message " + keyOrValue + " on topic '" + TOPIC_NAME + "' does not exist in the Schema Registry." + System.lineSeparator()
+ "Subject: " + TOPIC_NAME + "-" + keyOrValueSuffix + System.lineSeparator()
+ "Possible causes include:" + System.lineSeparator()
+ "- The topic itself does not exist" + System.lineSeparator()
+ "\t-> Use SHOW TOPICS; to check" + System.lineSeparator()
Expand All @@ -301,7 +301,7 @@ private void verifyFailureMessage(final SchemaResult result,
+ "\t-> See https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html" + System.lineSeparator()
+ "- The schema is registered on a different instance of the Schema Registry" + System.lineSeparator()
+ "\t-> Use the REST API to list available subjects\thttps://docs.confluent.io/current/schema-registry/docs/api.html#get--subjects" + System.lineSeparator()
+ "- You do not have permissions to access the Schema Registry.Subject: " + TOPIC_NAME + "-" + keyOrValueSuffix + System.lineSeparator()
+ "- You do not have permissions to access the Schema Registry." + System.lineSeparator()
+ "\t-> See https://docs.confluent.io/current/schema-registry/docs/security.html"));
}

Expand Down
Loading

0 comments on commit 54eb419

Please sign in to comment.