-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Perform Schema Registry permissions checks #3323
Conversation
@@ -137,8 +137,7 @@ private static SchemaResult notFound(final String topicName) { | |||
+ "\t-> Use the REST API to list available subjects" | |||
+ "\t" + DocumentationLinks.SR_REST_GETSUBJECTS_DOC_URL | |||
+ System.lineSeparator() | |||
+ "- You do not have permissions to access the Schema Registry.Subject: " | |||
+ topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this because the subject name is already displayed on the full message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @spena
The change makes sense. I've noted a few nits.
However, this change sees similar logic peppered through any use of the SchemaRegistryClient
in the code base. Any new use of the client will also need to have the same exception conversion logic, which people are likely to forget.
Would a better approach not implement our own SchemaRegistryClient
that internally handles the exception conversion, and ensure we wrap the true client in ours. Such wrapping should be done in one place, e.g. SchemaRegistryClientFactory
.
This approach would tidy up the code, removing the duplication and would be more future proof
What do you think?
} | ||
} | ||
|
||
private boolean isAvroSchema(final DataSource dataSource) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: static
return dataSource.getKsqlTopic().getValueFormat().getFormat() == Format.AVRO; | ||
} | ||
|
||
private boolean isSchemaAccessUnauthorized(final Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: static
errorMessage = String.format( | ||
"Not authorized to access Schema Registry subject: [%s]", | ||
topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX | ||
throw new KsqlSchemaAuthorizationException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how the ACLs work on SR, but we should correctly set the ACL in this exception so the user knows what acls they need to set.
import org.apache.kafka.common.acl.AclOperation; | ||
|
||
public class KsqlSchemaAuthorizationException extends RuntimeException { | ||
public KsqlSchemaAuthorizationException(final Set<String> unauthorizedSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change this to:
public KsqlSchemaAuthorizationException(final Set<String> unauthorizedSchema) { | |
public KsqlSchemaAuthorizationException(final String unauthorizedSchema) { |
As it looks to be always called with a single schema
|
||
public KsqlSchemaAuthorizationException( | ||
final AclOperation operation, | ||
final Set<String> unauthorizedSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final Set<String> unauthorizedSchema | |
final String unauthorizedSchema |
Again, it looks to be always called with a single schema
@@ -137,8 +137,7 @@ private static SchemaResult notFound(final String topicName) { | |||
+ "\t-> Use the REST API to list available subjects" | |||
+ "\t" + DocumentationLinks.SR_REST_GETSUBJECTS_DOC_URL | |||
+ System.lineSeparator() | |||
+ "- You do not have permissions to access the Schema Registry.Subject: " | |||
+ topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX | |||
+ "- You do not have permissions to access the Schema Registry.Subject" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+ "- You do not have permissions to access the Schema Registry.Subject" | |
+ "- You do not have permissions to access the Schema Registry." |
@big-andy-coates Thanks for the review. Your idea of wrapping the exception in our own I'll submit another PR with those changes. |
@big-andy-coates Btw, I created a JIRA for the SchemaRegistry team to fix the output error of the I was looking on how to create our own Anyway, I wonder if we should wait for the SR fix instead? I am pushing the team to commit those changes to in 5.4. Regardless the changes on the SR side or on our side, the |
bec0518
to
80560cd
Compare
To be clear, my suggestion is to wrap the SR client and handle the exceptions in one place, rather than throughout the code.
You could raise a suitable PR yourself against the SR project
Yeah, that'll work. There's a definite performance hit for using a proxy, so you might want to check if anything is on the critical path. The alternative is to implement the |
public class KsqlSchemaAuthorizationException extends RuntimeException { | ||
public KsqlSchemaAuthorizationException(final String unauthorizedSchema) { | ||
super( | ||
"Authorization denied to access Schema Registry subject(s): " + unauthorizedSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "Not authorized to access" or "Denied access to"
ExecutorUtil.executeWithRetries(() -> schemaRegistryClient.deleteSubject(subject), ALWAYS); | ||
ExecutorUtil.executeWithRetries(() -> { | ||
try { | ||
schemaRegistryClient.deleteSubject(subject); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of a small point, but we could make this endpoint snappier if we just return immediately if we get a 403 or 401 (or any 4xx for that matter). Retries certainly won't help in those cases. We could either have the function return an Optional and throw if we get something back. Or change executeWithRetries
to support a whitelist of exceptions to not retry on.
extractor.process(query, null); | ||
for (String kafkaTopic : extractor.getSourceTopics()) { | ||
checkAccess(serviceContext, kafkaTopic, AclOperation.READ); | ||
for (KsqlTopic topic : extractQueryTopics(query, metaStore)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we also check for read access on the schema in this case?
final String subject = topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX; | ||
|
||
try { | ||
schemaRegistryClient.getLatestSchemaMetadata(subject); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this succeeding imply that any operation (READ/WRITE/etc) will work?
Closing this PR. Working on this other #7773 |
Description
This change makes KSQL perform Schema access permissions checks on the statement to execute in order to provide better authorization error messages when the User and/or the KSQL service principal do not have access to the Schema.
First, when SR is configured with Authorization, the following invalid error was displayed on KSQL when it cannot access the schema which was not helping users to recognize it as an access denied aerror:
This PR changes the message to like:
It also perform the check for the KSQL service principal, and the error will look like:
SR checks are done in
KsqlAuthorizationValidatorImpl
(where Topic permissions checks are done). This class checks access to the schema of some statements that use AVRO topics only.How to review it?
KsqlSchemaAuthorizationException
which will be used throw the desired error message.KsqlSchemaAuthorizationException
on places where SR is denying access + when doing permissions checks onKsqlAuthorizationValidatorImpl
Testing done
Reviewer checklist