-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Perform Schema Registry permissions checks #3323
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright 2019 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.exception; | ||
|
||
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.kafka.common.acl.AclOperation; | ||
|
||
public class KsqlSchemaAuthorizationException extends RuntimeException { | ||
public KsqlSchemaAuthorizationException(final String unauthorizedSchema) { | ||
super( | ||
"Authorization denied to access Schema Registry subject(s): " + unauthorizedSchema | ||
); | ||
} | ||
|
||
public KsqlSchemaAuthorizationException( | ||
final AclOperation operation, | ||
final String unauthorizedSchema | ||
) { | ||
super(String.format( | ||
"Authorization denied to %s on Schema Registry subject(s): %s", | ||
StringUtils.capitalize(operation.toString().toLowerCase()), unauthorizedSchema | ||
)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -137,8 +137,7 @@ private static SchemaResult notFound(final String topicName) { | |
+ "\t-> Use the REST API to list available subjects" | ||
+ "\t" + DocumentationLinks.SR_REST_GETSUBJECTS_DOC_URL | ||
+ System.lineSeparator() | ||
+ "- You do not have permissions to access the Schema Registry.Subject: " | ||
+ topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed this because the subject name is already displayed on the full message. |
||
+ "- You do not have permissions to access the Schema Registry." | ||
+ "\t-> See " + DocumentationLinks.SCHEMA_REGISTRY_SECURITY_DOC_URL)); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,9 +18,13 @@ | |
import static io.confluent.ksql.util.ExecutorUtil.RetryBehaviour.ALWAYS; | ||
|
||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; | ||
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; | ||
import io.confluent.ksql.exception.KsqlSchemaAuthorizationException; | ||
import io.confluent.ksql.util.ExecutorUtil; | ||
import io.confluent.ksql.util.KsqlConstants; | ||
import java.util.stream.Stream; | ||
import org.apache.http.HttpStatus; | ||
import org.apache.kafka.common.acl.AclOperation; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -64,7 +68,24 @@ private static Stream<String> getSubjectNames( | |
public static void deleteSubjectWithRetries( | ||
final SchemaRegistryClient schemaRegistryClient, | ||
final String subject) throws Exception { | ||
ExecutorUtil.executeWithRetries(() -> schemaRegistryClient.deleteSubject(subject), ALWAYS); | ||
ExecutorUtil.executeWithRetries(() -> { | ||
try { | ||
schemaRegistryClient.deleteSubject(subject); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kind of a small point, but we could make this endpoint snappier if we just return immediately if we get a 403 or 401 (or any 4xx for that matter). Retries certainly won't help in those cases. We could either have the function return an Optional and throw if we get something back. Or change |
||
} catch (final RestClientException e) { | ||
switch (e.getStatus()) { | ||
case HttpStatus.SC_UNAUTHORIZED: | ||
case HttpStatus.SC_FORBIDDEN: | ||
throw new KsqlSchemaAuthorizationException( | ||
AclOperation.DELETE, | ||
subject | ||
); | ||
default: | ||
throw e; | ||
} | ||
} catch (final Exception e) { | ||
throw e; | ||
} | ||
}, ALWAYS); | ||
} | ||
|
||
private static Stream<String> getInternalSubjectNames( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,11 @@ | |
|
||
package io.confluent.ksql.security; | ||
|
||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; | ||
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; | ||
import io.confluent.ksql.exception.KsqlSchemaAuthorizationException; | ||
import io.confluent.ksql.exception.KsqlTopicAuthorizationException; | ||
import io.confluent.ksql.execution.ddl.commands.KsqlTopic; | ||
import io.confluent.ksql.metastore.MetaStore; | ||
import io.confluent.ksql.metastore.model.DataSource; | ||
import io.confluent.ksql.parser.tree.CreateAsSelect; | ||
|
@@ -24,11 +28,15 @@ | |
import io.confluent.ksql.parser.tree.PrintTopic; | ||
import io.confluent.ksql.parser.tree.Query; | ||
import io.confluent.ksql.parser.tree.Statement; | ||
import io.confluent.ksql.serde.Format; | ||
import io.confluent.ksql.services.KafkaTopicClient; | ||
import io.confluent.ksql.services.ServiceContext; | ||
import io.confluent.ksql.topic.SourceTopicsExtractor; | ||
import io.confluent.ksql.util.KsqlConstants; | ||
import io.confluent.ksql.util.KsqlException; | ||
import java.util.Collections; | ||
import java.util.Set; | ||
import org.apache.http.HttpStatus; | ||
import org.apache.kafka.common.acl.AclOperation; | ||
|
||
/** | ||
|
@@ -62,10 +70,12 @@ private void validateQuery( | |
final MetaStore metaStore, | ||
final Query query | ||
) { | ||
final SourceTopicsExtractor extractor = new SourceTopicsExtractor(metaStore); | ||
extractor.process(query, null); | ||
for (String kafkaTopic : extractor.getSourceTopics()) { | ||
checkAccess(serviceContext, kafkaTopic, AclOperation.READ); | ||
for (KsqlTopic topic : extractQueryTopics(query, metaStore)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we also check for read access on the schema in this case? |
||
checkTopicAccess( | ||
topic.getKafkaTopicName(), | ||
AclOperation.READ, | ||
serviceContext.getTopicClient() | ||
); | ||
} | ||
} | ||
|
||
|
@@ -87,7 +97,8 @@ private void validateCreateAsSelect( | |
|
||
// At this point, the topic should have been created by the TopicCreateInjector | ||
final String kafkaTopic = getCreateAsSelectSinkTopic(metaStore, createAsSelect); | ||
checkAccess(serviceContext, kafkaTopic, AclOperation.WRITE); | ||
checkTopicAccess(kafkaTopic, AclOperation.WRITE, serviceContext.getTopicClient()); | ||
checkSchemaAccess(kafkaTopic, AclOperation.WRITE, serviceContext.getSchemaRegistryClient()); | ||
} | ||
|
||
private void validateInsertInto( | ||
|
@@ -103,26 +114,34 @@ private void validateInsertInto( | |
|
||
validateQuery(serviceContext, metaStore, insertInto.getQuery()); | ||
|
||
final String kafkaTopic = getSourceTopicName(metaStore, insertInto.getTarget().name()); | ||
checkAccess(serviceContext, kafkaTopic, AclOperation.WRITE); | ||
final String kafkaTopic = getSinkTopicName(metaStore, insertInto.getTarget().name()); | ||
checkTopicAccess(kafkaTopic, AclOperation.WRITE, serviceContext.getTopicClient()); | ||
} | ||
|
||
private void validatePrintTopic( | ||
final ServiceContext serviceContext, | ||
final PrintTopic printTopic | ||
final ServiceContext serviceContext, | ||
final PrintTopic printTopic | ||
) { | ||
checkAccess(serviceContext, printTopic.getTopic().toString(), AclOperation.READ); | ||
checkTopicAccess( | ||
printTopic.getTopic().toString(), | ||
AclOperation.READ, | ||
serviceContext.getTopicClient() | ||
); | ||
} | ||
|
||
private void validateCreateSource( | ||
final ServiceContext serviceContext, | ||
final CreateSource createSource | ||
) { | ||
final String sourceTopic = createSource.getProperties().getKafkaTopic(); | ||
checkAccess(serviceContext, sourceTopic, AclOperation.READ); | ||
checkTopicAccess(sourceTopic, AclOperation.READ, serviceContext.getTopicClient()); | ||
|
||
if (createSource.getProperties().getValueFormat() == Format.AVRO) { | ||
checkSchemaAccess(sourceTopic, AclOperation.READ, serviceContext.getSchemaRegistryClient()); | ||
} | ||
} | ||
|
||
private String getSourceTopicName(final MetaStore metaStore, final String streamOrTable) { | ||
private String getSinkTopicName(final MetaStore metaStore, final String streamOrTable) { | ||
final DataSource<?> dataSource = metaStore.getSource(streamOrTable); | ||
if (dataSource == null) { | ||
throw new KsqlException("Cannot validate for topic access from an unknown stream/table: " | ||
|
@@ -132,31 +151,55 @@ private String getSourceTopicName(final MetaStore metaStore, final String stream | |
return dataSource.getKafkaTopicName(); | ||
} | ||
|
||
/** | ||
* Checks if the ServiceContext has access to the topic with the specified AclOperation. | ||
*/ | ||
private void checkAccess( | ||
final ServiceContext serviceContext, | ||
private void checkTopicAccess( | ||
final String topicName, | ||
final AclOperation operation | ||
final AclOperation operation, | ||
final KafkaTopicClient topicClient | ||
) { | ||
final Set<AclOperation> authorizedOperations = serviceContext.getTopicClient() | ||
.describeTopic(topicName).authorizedOperations(); | ||
final Set<AclOperation> authorizedOperations = topicClient.describeTopic(topicName) | ||
.authorizedOperations(); | ||
|
||
// Kakfa 2.2 or lower do not support authorizedOperations(). In case of running on a | ||
// unsupported broker version, then the authorizeOperation will be null. | ||
if (authorizedOperations != null && !authorizedOperations.contains(operation)) { | ||
// This error message is similar to what Kafka throws when it cannot access the topic | ||
// due to an authorization error. I used this message to keep a consistent message. | ||
throw new KsqlTopicAuthorizationException(operation, Collections.singleton(topicName)); | ||
} | ||
} | ||
|
||
private void checkSchemaAccess( | ||
final String topicName, | ||
final AclOperation operation, | ||
final SchemaRegistryClient schemaRegistryClient | ||
) { | ||
final String subject = topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX; | ||
|
||
try { | ||
schemaRegistryClient.getLatestSchemaMetadata(subject); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this succeeding imply that any operation (READ/WRITE/etc) will work? |
||
} catch (final RestClientException e) { | ||
switch (e.getStatus()) { | ||
case HttpStatus.SC_UNAUTHORIZED: | ||
case HttpStatus.SC_FORBIDDEN: | ||
throw new KsqlSchemaAuthorizationException(operation, subject); | ||
default: | ||
// Do nothing. We assume the NOT FOUND and other errors are caught and displayed | ||
// in different place | ||
} | ||
} catch (final Exception e) { | ||
throw new KsqlException(e); | ||
} | ||
} | ||
|
||
private String getCreateAsSelectSinkTopic( | ||
final MetaStore metaStore, | ||
final CreateAsSelect createAsSelect | ||
) { | ||
return createAsSelect.getProperties().getKafkaTopic() | ||
.orElseGet(() -> getSourceTopicName(metaStore, createAsSelect.getName().name())); | ||
.orElseGet(() -> getSinkTopicName(metaStore, createAsSelect.getName().name())); | ||
} | ||
|
||
private Set<KsqlTopic> extractQueryTopics(final Query query, final MetaStore metaStore) { | ||
final SourceTopicsExtractor extractor = new SourceTopicsExtractor(metaStore); | ||
extractor.process(query, null); | ||
return extractor.getKsqlTopics(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,10 +17,12 @@ | |
|
||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; | ||
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; | ||
import io.confluent.ksql.exception.KsqlSchemaAuthorizationException; | ||
import io.confluent.ksql.serde.Format; | ||
import java.io.IOException; | ||
|
||
import org.apache.http.HttpStatus; | ||
import org.apache.kafka.common.acl.AclOperation; | ||
|
||
public final class AvroUtil { | ||
|
||
|
@@ -86,16 +88,15 @@ private static boolean isValidAvroSchemaForTopic( | |
return true; | ||
} | ||
|
||
String errorMessage = e.getMessage(); | ||
if (e.getStatus() == HttpStatus.SC_UNAUTHORIZED || e.getStatus() == HttpStatus.SC_FORBIDDEN) { | ||
errorMessage = String.format( | ||
"Not authorized to access Schema Registry subject: [%s]", | ||
throw new KsqlSchemaAuthorizationException( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure how the ACLs work on SR, but we should correctly set the ACL in this exception so the user knows what acls they need to set. |
||
AclOperation.WRITE, | ||
topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX | ||
); | ||
} | ||
|
||
throw new KsqlException(String.format( | ||
"Could not connect to Schema Registry service: %s", errorMessage | ||
"Could not connect to Schema Registry service: %s", e.getMessage() | ||
)); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "Not authorized to access" or "Denied access to"