Skip to content

Commit

Permalink
refactor: wrap SchemaRegistry authorization errors into a KsqlSchemaA…
Browse files Browse the repository at this point in the history
…uthorizationException (confluentinc#7783)
  • Loading branch information
spena authored and ConfluentJenkins committed Jul 26, 2021
1 parent f3076fe commit 818e0d3
Show file tree
Hide file tree
Showing 19 changed files with 760 additions and 187 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.security;

import static java.util.Objects.requireNonNull;

public enum AuthObjectType {
TOPIC("Topic"),
SUBJECT("Subject");

private final String name;

AuthObjectType(final String name) {
this.name = requireNonNull(name, "name");
}

@Override
public String toString() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,30 @@
*/
public interface KsqlAccessValidator {
/**
* Checks if an authenticated user provided by the {@code securityContext} has authorization
* to execute the {@code operation} on the kafka {@code topicName}.
* Checks if an authenticated user, provided by the {@code securityContext}, has authorization
* to execute all specified {@code actions} on the {@code topicName}.
*
* @param securityContext The context for the authenticated user.
* @param topicName The topic name to check access.
* @param operation The {@code AclOperation} to validate against the {@code topicName}.
*/
void checkAccess(KsqlSecurityContext securityContext, String topicName, AclOperation operation);
void checkTopicAccess(
KsqlSecurityContext securityContext,
String topicName,
AclOperation operation
);

/**
* Checks if an authenticated user, provided by the {@code securityContext}, has authorization
* to execute all specified {@code actions} on the {@code subjectName}.
*
* @param securityContext The context for the authenticated user.
* @param subjectName The subject name to check access.
* @param operation The {@code AclOperation} to validate against the {@code subjectName}.
*/
void checkSubjectAccess(
KsqlSecurityContext securityContext,
String subjectName,
AclOperation operation
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.confluent.ksql.security;

import java.security.Principal;
import java.util.List;
import org.apache.kafka.common.acl.AclOperation;

/**
* Interface that provides authorization to KSQL.
Expand All @@ -29,4 +31,18 @@ public interface KsqlAuthorizationProvider {
* @param path The endpoint path to access, i.e. "/ksql", "/ksql/terminate", "/query"*
*/
void checkEndpointAccess(Principal user, String method, String path);

/**
* Checks if the user (if available) with the {@code userSecurityContext} has the specified
* {@code privileges} on the the specified {@code objectType} and {@code objectName}.
*
* @param userSecurityContext The user security context which privileges will be checked
* @param objectType The object type to check for privileges
* @param objectName The object name to check for privileges
* @param privileges The list of privileges to check in the resource
*/
void checkPrivileges(KsqlSecurityContext userSecurityContext,
AuthObjectType objectType,
String objectName,
List<AclOperation> privileges);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,44 +35,73 @@ private KsqlAuthorizationValidatorFactory() {
}

public static Optional<KsqlAuthorizationValidator> create(
final KsqlConfig ksqlConfig,
final ServiceContext serviceContext,
final Optional<KsqlAuthorizationProvider> externalAuthorizationProvider
) {
final Optional<KsqlAccessValidator> accessValidator = getAccessValidator(
ksqlConfig,
serviceContext,
externalAuthorizationProvider
);

return accessValidator.map(v ->
new KsqlAuthorizationValidatorImpl(cacheIfEnabled(ksqlConfig, v)));
}

private static Optional<KsqlAccessValidator> getAccessValidator(
final KsqlConfig ksqlConfig,
final ServiceContext serviceContext,
final Optional<KsqlAuthorizationProvider> externalAuthorizationProvider
) {
if (externalAuthorizationProvider.isPresent()) {
return Optional.of(new KsqlProvidedAccessValidator(externalAuthorizationProvider.get()));
} else if (isTopicAccessValidatorEnabled(ksqlConfig, serviceContext)) {
return Optional.of(new KsqlBackendAccessValidator());
}

return Optional.empty();
}

private static KsqlAccessValidator cacheIfEnabled(
final KsqlConfig ksqlConfig,
final KsqlAccessValidator accessValidator
) {
return isCacheEnabled(ksqlConfig)
? new KsqlCacheAccessValidator(ksqlConfig, accessValidator)
: accessValidator;
}

private static boolean isCacheEnabled(final KsqlConfig ksqlConfig) {
// The cache expiry time is used to decided whether to enable the cache or not
return ksqlConfig.getLong(KsqlConfig.KSQL_AUTH_CACHE_EXPIRY_TIME_SECS) > 0;
}

private static boolean isTopicAccessValidatorEnabled(
final KsqlConfig ksqlConfig,
final ServiceContext serviceContext
) {
final String enabled = ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR);
if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON)) {
LOG.info("Forcing topic access validator");
return Optional.of(createAuthorizationValidator(ksqlConfig));
return true;
} else if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF)) {
return Optional.empty();
return false;
}

// If KSQL_ACCESS_VALIDATOR_AUTO, then check if Kafka has an authorizer enabled
final Admin adminClient = serviceContext.getAdminClient();

if (isKafkaAuthorizerEnabled(adminClient)) {
if (KafkaClusterUtil.isAuthorizedOperationsSupported(adminClient)) {
LOG.info("KSQL topic authorization checks enabled.");
return Optional.of(createAuthorizationValidator(ksqlConfig));
return true;
}

LOG.warn("The Kafka broker has an authorization service enabled, but the Kafka "
+ "version does not support authorizedOperations(). "
+ "KSQL topic authorization checks will not be enabled.");
}
return Optional.empty();
}

private static KsqlAuthorizationValidator createAuthorizationValidator(
final KsqlConfig ksqlConfig
) {
KsqlAccessValidator accessValidator = new KsqlBackendAccessValidator();

// The cache expiry time is used to decided whether to enable the cache or not
final long expiryTime = ksqlConfig.getLong(KsqlConfig.KSQL_AUTH_CACHE_EXPIRY_TIME_SECS);
if (expiryTime > 0) {
accessValidator = new KsqlCacheAccessValidator(ksqlConfig, accessValidator);
}

return new KsqlAuthorizationValidatorImpl(accessValidator);
return false;
}

private static boolean isKafkaAuthorizerEnabled(final Admin adminClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.security;

import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
Expand All @@ -24,8 +25,13 @@
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.topic.SourceTopicsExtractor;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.util.Set;
import org.apache.kafka.common.acl.AclOperation;

/**
Expand Down Expand Up @@ -69,10 +75,9 @@ private void validateQuery(
final MetaStore metaStore,
final Query query
) {
final SourceTopicsExtractor extractor = new SourceTopicsExtractor(metaStore);
extractor.process(query, null);
for (String kafkaTopic : extractor.getSourceTopics()) {
accessValidator.checkAccess(securityContext, kafkaTopic, AclOperation.READ);
for (KsqlTopic ksqlTopic : extractQueryTopics(query, metaStore)) {
checkTopicAccess(securityContext, ksqlTopic.getKafkaTopicName(), AclOperation.READ);
checkSchemaAccess(securityContext, ksqlTopic, AclOperation.READ);
}
}

Expand All @@ -94,7 +99,7 @@ private void validateCreateAsSelect(

// At this point, the topic should have been created by the TopicCreateInjector
final String kafkaTopic = getCreateAsSelectSinkTopic(metaStore, createAsSelect);
accessValidator.checkAccess(securityContext, kafkaTopic, AclOperation.WRITE);
checkTopicAccess(securityContext, kafkaTopic, AclOperation.WRITE);
}

private void validateInsertInto(
Expand All @@ -111,22 +116,28 @@ private void validateInsertInto(
validateQuery(securityContext, metaStore, insertInto.getQuery());

final String kafkaTopic = getSourceTopicName(metaStore, insertInto.getTarget());
accessValidator.checkAccess(securityContext, kafkaTopic, AclOperation.WRITE);
checkTopicAccess(securityContext, kafkaTopic, AclOperation.WRITE);
}

private void validatePrintTopic(
final KsqlSecurityContext securityContext,
final PrintTopic printTopic
) {
accessValidator.checkAccess(securityContext, printTopic.getTopic(), AclOperation.READ);
checkTopicAccess(securityContext, printTopic.getTopic(), AclOperation.READ);

// SchemaRegistry permissions cannot be validated here because the schema is guessed when
// printing the topic by obtaining the first row and attempt to find the right schema
}

private void validateCreateSource(
final KsqlSecurityContext securityContext,
final CreateSource createSource
) {
final String sourceTopic = createSource.getProperties().getKafkaTopic();
accessValidator.checkAccess(securityContext, sourceTopic, AclOperation.READ);
checkTopicAccess(securityContext, sourceTopic, AclOperation.READ);

// SchemaRegistry permissions are validated when SchemaRegisterInjector is called during CREATE
// operations. There's no need to validate the user has READ permissions here.
}

private String getSourceTopicName(final MetaStore metaStore, final SourceName streamOrTable) {
Expand All @@ -146,4 +157,39 @@ private String getCreateAsSelectSinkTopic(
return createAsSelect.getProperties().getKafkaTopic()
.orElseGet(() -> getSourceTopicName(metaStore, createAsSelect.getName()));
}

private Set<KsqlTopic> extractQueryTopics(final Query query, final MetaStore metaStore) {
final SourceTopicsExtractor extractor = new SourceTopicsExtractor(metaStore);
extractor.process(query, null);
return extractor.getSourceTopics();
}

private void checkTopicAccess(
final KsqlSecurityContext securityContext,
final String resourceName,
final AclOperation operation
) {
accessValidator.checkTopicAccess(securityContext, resourceName, operation);
}

private void checkSchemaAccess(
final KsqlSecurityContext securityContext,
final KsqlTopic ksqlTopic,
final AclOperation operation
) {

if (formatSupportsSchemaInference(ksqlTopic.getKeyFormat().getFormatInfo())) {
accessValidator.checkSubjectAccess(securityContext,
KsqlConstants.getSRSubject(ksqlTopic.getKafkaTopicName(), true), operation);
}

if (formatSupportsSchemaInference(ksqlTopic.getValueFormat().getFormatInfo())) {
accessValidator.checkSubjectAccess(securityContext,
KsqlConstants.getSRSubject(ksqlTopic.getKafkaTopicName(), false), operation);
}
}

private static boolean formatSupportsSchemaInference(final FormatInfo format) {
return FormatFactory.of(format).supportsFeature(SerdeFeature.SCHEMA_INFERENCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@

/**
* An implementation of {@link KsqlAccessValidator} that provides authorization checks
* from the Kafka service.
* from the backend services.
*/
public class KsqlBackendAccessValidator implements KsqlAccessValidator {
@Override
public void checkAccess(
public void checkTopicAccess(
final KsqlSecurityContext securityContext,
final String topicName,
final AclOperation operation
Expand All @@ -42,4 +42,16 @@ public void checkAccess(
throw new KsqlTopicAuthorizationException(operation, Collections.singleton(topicName));
}
}

@Override
public void checkSubjectAccess(
final KsqlSecurityContext securityContext,
final String subjectName,
final AclOperation operation
) {
// Nothing to do. Checking permissions for Schema Registry require an external authorization
// provider. Schema Registry does not have an API to list the allowed permissions for
// the user in a specified subject.
return;
}
}
Loading

0 comments on commit 818e0d3

Please sign in to comment.