From 818e0d343e58430e4565753915f0138cbda8e9a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Tue, 13 Jul 2021 09:11:48 -0500 Subject: [PATCH 1/2] refactor: wrap SchemaRegistry authorization errors into a KsqlSchemaAuthorizationException (#7783) --- .../ksql/security/AuthObjectType.java | 34 ++++ .../ksql/security/KsqlAccessValidator.java | 24 ++- .../security/KsqlAuthorizationProvider.java | 16 ++ .../KsqlAuthorizationValidatorFactory.java | 67 +++++-- .../KsqlAuthorizationValidatorImpl.java | 62 ++++++- .../security/KsqlBackendAccessValidator.java | 16 +- .../security/KsqlCacheAccessValidator.java | 93 +++++++--- .../security/KsqlProvidedAccessValidator.java | 61 +++++++ .../ksql/topic/SourceTopicsExtractor.java | 17 +- .../ksql/topic/TopicCreateInjector.java | 2 +- ...KsqlAuthorizationValidatorFactoryTest.java | 96 ++++++++-- .../KsqlAuthorizationValidatorImplTest.java | 164 ++++++++++++------ .../KsqlBackendAccessValidatorTest.java | 9 +- .../KsqlCacheAccessValidatorTest.java | 77 ++++++-- .../KsqlProvidedAccessValidatorTest.java | 69 ++++++++ .../ksql/topic/SourceTopicsExtractorTest.java | 44 ++--- .../ksql/rest/server/KsqlRestApplication.java | 6 +- .../java/io/confluent/ksql/api/AuthTest.java | 65 ++++++- .../MockKsqlSecurityExtension.java | 25 ++- 19 files changed, 760 insertions(+), 187 deletions(-) create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/security/AuthObjectType.java create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlProvidedAccessValidator.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlProvidedAccessValidatorTest.java diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/security/AuthObjectType.java b/ksqldb-engine/src/main/java/io/confluent/ksql/security/AuthObjectType.java new file mode 100644 index 000000000000..476ffe73cda1 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/security/AuthObjectType.java @@ -0,0 +1,34 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.security; + +import static java.util.Objects.requireNonNull; + +public enum AuthObjectType { + TOPIC("Topic"), + SUBJECT("Subject"); + + private final String name; + + AuthObjectType(final String name) { + this.name = requireNonNull(name, "name"); + } + + @Override + public String toString() { + return name; + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAccessValidator.java b/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAccessValidator.java index 4f6f3912f093..87be8d77ff42 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAccessValidator.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAccessValidator.java @@ -22,12 +22,30 @@ */ public interface KsqlAccessValidator { /** - * Checks if an authenticated user provided by the {@code securityContext} has authorization - * to execute the {@code operation} on the kafka {@code topicName}. + * Checks if an authenticated user, provided by the {@code securityContext}, has authorization + * to execute all specified {@code actions} on the {@code topicName}. * * @param securityContext The context for the authenticated user. * @param topicName The topic name to check access. * @param operation The {@code AclOperation} to validate against the {@code topicName}. */ - void checkAccess(KsqlSecurityContext securityContext, String topicName, AclOperation operation); + void checkTopicAccess( + KsqlSecurityContext securityContext, + String topicName, + AclOperation operation + ); + + /** + * Checks if an authenticated user, provided by the {@code securityContext}, has authorization + * to execute all specified {@code actions} on the {@code subjectName}. + * + * @param securityContext The context for the authenticated user. + * @param subjectName The subject name to check access. + * @param operation The {@code AclOperation} to validate against the {@code subjectName}. + */ + void checkSubjectAccess( + KsqlSecurityContext securityContext, + String subjectName, + AclOperation operation + ); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationProvider.java b/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationProvider.java index 15de0d0c54a5..ec467fbe738a 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationProvider.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationProvider.java @@ -16,6 +16,8 @@ package io.confluent.ksql.security; import java.security.Principal; +import java.util.List; +import org.apache.kafka.common.acl.AclOperation; /** * Interface that provides authorization to KSQL. @@ -29,4 +31,18 @@ public interface KsqlAuthorizationProvider { * @param path The endpoint path to access, i.e. "/ksql", "/ksql/terminate", "/query"* */ void checkEndpointAccess(Principal user, String method, String path); + + /** + * Checks if the user (if available) with the {@code userSecurityContext} has the specified + * {@code privileges} on the the specified {@code objectType} and {@code objectName}. + * + * @param userSecurityContext The user security context which privileges will be checked + * @param objectType The object type to check for privileges + * @param objectName The object name to check for privileges + * @param privileges The list of privileges to check in the resource + */ + void checkPrivileges(KsqlSecurityContext userSecurityContext, + AuthObjectType objectType, + String objectName, + List privileges); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactory.java b/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactory.java index 1b567de19488..2c389425b406 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactory.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactory.java @@ -35,44 +35,73 @@ private KsqlAuthorizationValidatorFactory() { } public static Optional create( + final KsqlConfig ksqlConfig, + final ServiceContext serviceContext, + final Optional externalAuthorizationProvider + ) { + final Optional accessValidator = getAccessValidator( + ksqlConfig, + serviceContext, + externalAuthorizationProvider + ); + + return accessValidator.map(v -> + new KsqlAuthorizationValidatorImpl(cacheIfEnabled(ksqlConfig, v))); + } + + private static Optional getAccessValidator( + final KsqlConfig ksqlConfig, + final ServiceContext serviceContext, + final Optional externalAuthorizationProvider + ) { + if (externalAuthorizationProvider.isPresent()) { + return Optional.of(new KsqlProvidedAccessValidator(externalAuthorizationProvider.get())); + } else if (isTopicAccessValidatorEnabled(ksqlConfig, serviceContext)) { + return Optional.of(new KsqlBackendAccessValidator()); + } + + return Optional.empty(); + } + + private static KsqlAccessValidator cacheIfEnabled( + final KsqlConfig ksqlConfig, + final KsqlAccessValidator accessValidator + ) { + return isCacheEnabled(ksqlConfig) + ? new KsqlCacheAccessValidator(ksqlConfig, accessValidator) + : accessValidator; + } + + private static boolean isCacheEnabled(final KsqlConfig ksqlConfig) { + // The cache expiry time is used to decided whether to enable the cache or not + return ksqlConfig.getLong(KsqlConfig.KSQL_AUTH_CACHE_EXPIRY_TIME_SECS) > 0; + } + + private static boolean isTopicAccessValidatorEnabled( final KsqlConfig ksqlConfig, final ServiceContext serviceContext ) { final String enabled = ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR); if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON)) { - LOG.info("Forcing topic access validator"); - return Optional.of(createAuthorizationValidator(ksqlConfig)); + return true; } else if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF)) { - return Optional.empty(); + return false; } + // If KSQL_ACCESS_VALIDATOR_AUTO, then check if Kafka has an authorizer enabled final Admin adminClient = serviceContext.getAdminClient(); - if (isKafkaAuthorizerEnabled(adminClient)) { if (KafkaClusterUtil.isAuthorizedOperationsSupported(adminClient)) { LOG.info("KSQL topic authorization checks enabled."); - return Optional.of(createAuthorizationValidator(ksqlConfig)); + return true; } LOG.warn("The Kafka broker has an authorization service enabled, but the Kafka " + "version does not support authorizedOperations(). " + "KSQL topic authorization checks will not be enabled."); } - return Optional.empty(); - } - - private static KsqlAuthorizationValidator createAuthorizationValidator( - final KsqlConfig ksqlConfig - ) { - KsqlAccessValidator accessValidator = new KsqlBackendAccessValidator(); - - // The cache expiry time is used to decided whether to enable the cache or not - final long expiryTime = ksqlConfig.getLong(KsqlConfig.KSQL_AUTH_CACHE_EXPIRY_TIME_SECS); - if (expiryTime > 0) { - accessValidator = new KsqlCacheAccessValidator(ksqlConfig, accessValidator); - } - return new KsqlAuthorizationValidatorImpl(accessValidator); + return false; } private static boolean isKafkaAuthorizerEnabled(final Admin adminClient) { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationValidatorImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationValidatorImpl.java index 0a88d64b6c08..dfa395c80f54 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationValidatorImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlAuthorizationValidatorImpl.java @@ -15,6 +15,7 @@ package io.confluent.ksql.security; +import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.name.SourceName; @@ -24,8 +25,13 @@ import io.confluent.ksql.parser.tree.PrintTopic; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.serde.FormatFactory; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.SerdeFeature; import io.confluent.ksql.topic.SourceTopicsExtractor; +import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; +import java.util.Set; import org.apache.kafka.common.acl.AclOperation; /** @@ -69,10 +75,9 @@ private void validateQuery( final MetaStore metaStore, final Query query ) { - final SourceTopicsExtractor extractor = new SourceTopicsExtractor(metaStore); - extractor.process(query, null); - for (String kafkaTopic : extractor.getSourceTopics()) { - accessValidator.checkAccess(securityContext, kafkaTopic, AclOperation.READ); + for (KsqlTopic ksqlTopic : extractQueryTopics(query, metaStore)) { + checkTopicAccess(securityContext, ksqlTopic.getKafkaTopicName(), AclOperation.READ); + checkSchemaAccess(securityContext, ksqlTopic, AclOperation.READ); } } @@ -94,7 +99,7 @@ private void validateCreateAsSelect( // At this point, the topic should have been created by the TopicCreateInjector final String kafkaTopic = getCreateAsSelectSinkTopic(metaStore, createAsSelect); - accessValidator.checkAccess(securityContext, kafkaTopic, AclOperation.WRITE); + checkTopicAccess(securityContext, kafkaTopic, AclOperation.WRITE); } private void validateInsertInto( @@ -111,14 +116,17 @@ private void validateInsertInto( validateQuery(securityContext, metaStore, insertInto.getQuery()); final String kafkaTopic = getSourceTopicName(metaStore, insertInto.getTarget()); - accessValidator.checkAccess(securityContext, kafkaTopic, AclOperation.WRITE); + checkTopicAccess(securityContext, kafkaTopic, AclOperation.WRITE); } private void validatePrintTopic( final KsqlSecurityContext securityContext, final PrintTopic printTopic ) { - accessValidator.checkAccess(securityContext, printTopic.getTopic(), AclOperation.READ); + checkTopicAccess(securityContext, printTopic.getTopic(), AclOperation.READ); + + // SchemaRegistry permissions cannot be validated here because the schema is guessed when + // printing the topic by obtaining the first row and attempt to find the right schema } private void validateCreateSource( @@ -126,7 +134,10 @@ private void validateCreateSource( final CreateSource createSource ) { final String sourceTopic = createSource.getProperties().getKafkaTopic(); - accessValidator.checkAccess(securityContext, sourceTopic, AclOperation.READ); + checkTopicAccess(securityContext, sourceTopic, AclOperation.READ); + + // SchemaRegistry permissions are validated when SchemaRegisterInjector is called during CREATE + // operations. There's no need to validate the user has READ permissions here. } private String getSourceTopicName(final MetaStore metaStore, final SourceName streamOrTable) { @@ -146,4 +157,39 @@ private String getCreateAsSelectSinkTopic( return createAsSelect.getProperties().getKafkaTopic() .orElseGet(() -> getSourceTopicName(metaStore, createAsSelect.getName())); } + + private Set extractQueryTopics(final Query query, final MetaStore metaStore) { + final SourceTopicsExtractor extractor = new SourceTopicsExtractor(metaStore); + extractor.process(query, null); + return extractor.getSourceTopics(); + } + + private void checkTopicAccess( + final KsqlSecurityContext securityContext, + final String resourceName, + final AclOperation operation + ) { + accessValidator.checkTopicAccess(securityContext, resourceName, operation); + } + + private void checkSchemaAccess( + final KsqlSecurityContext securityContext, + final KsqlTopic ksqlTopic, + final AclOperation operation + ) { + + if (formatSupportsSchemaInference(ksqlTopic.getKeyFormat().getFormatInfo())) { + accessValidator.checkSubjectAccess(securityContext, + KsqlConstants.getSRSubject(ksqlTopic.getKafkaTopicName(), true), operation); + } + + if (formatSupportsSchemaInference(ksqlTopic.getValueFormat().getFormatInfo())) { + accessValidator.checkSubjectAccess(securityContext, + KsqlConstants.getSRSubject(ksqlTopic.getKafkaTopicName(), false), operation); + } + } + + private static boolean formatSupportsSchemaInference(final FormatInfo format) { + return FormatFactory.of(format).supportsFeature(SerdeFeature.SCHEMA_INFERENCE); + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlBackendAccessValidator.java b/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlBackendAccessValidator.java index ceb7eabf182a..c562dd3f3aae 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlBackendAccessValidator.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlBackendAccessValidator.java @@ -22,11 +22,11 @@ /** * An implementation of {@link KsqlAccessValidator} that provides authorization checks - * from the Kafka service. + * from the backend services. */ public class KsqlBackendAccessValidator implements KsqlAccessValidator { @Override - public void checkAccess( + public void checkTopicAccess( final KsqlSecurityContext securityContext, final String topicName, final AclOperation operation @@ -42,4 +42,16 @@ public void checkAccess( throw new KsqlTopicAuthorizationException(operation, Collections.singleton(topicName)); } } + + @Override + public void checkSubjectAccess( + final KsqlSecurityContext securityContext, + final String subjectName, + final AclOperation operation + ) { + // Nothing to do. Checking permissions for Schema Registry require an external authorization + // provider. Schema Registry does not have an API to list the allowed permissions for + // the user in a specified subject. + return; + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlCacheAccessValidator.java b/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlCacheAccessValidator.java index 1a4c72e18f28..a8174ea57945 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlCacheAccessValidator.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlCacheAccessValidator.java @@ -20,6 +20,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import io.confluent.ksql.exception.KsqlSchemaAuthorizationException; import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.util.KsqlConfig; @@ -41,16 +42,19 @@ static class CacheKey { private static final String UNKNOWN_USER = ""; private final KsqlSecurityContext securityContext; - private final String topicName; + private final AuthObjectType authObjectType; + private final String objectName; private final AclOperation operation; CacheKey( final KsqlSecurityContext securityContext, - final String topicName, + final AuthObjectType authObjectType, + final String objectName, final AclOperation operation ) { this.securityContext = securityContext; - this.topicName = topicName; + this.authObjectType = authObjectType; + this.objectName = objectName; this.operation = operation; } @@ -62,7 +66,8 @@ public boolean equals(final Object o) { final CacheKey other = (CacheKey)o; return getUserName(securityContext).equals(getUserName(other.securityContext)) - && topicName.equals(other.topicName) + && authObjectType.equals(other.authObjectType) + && objectName.equals(other.objectName) && operation.code() == other.operation.code(); } @@ -70,7 +75,8 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash( getUserName(securityContext), - topicName, + authObjectType, + objectName, operation.code() ); } @@ -124,31 +130,76 @@ private CacheLoader buildCacheLoader() { return new CacheLoader() { @Override public CacheValue load(final CacheKey cacheKey) { - try { - backendValidator.checkAccess( - cacheKey.securityContext, - cacheKey.topicName, - cacheKey.operation - ); - } catch (KsqlTopicAuthorizationException e) { - return new CacheValue(!ALLOW_ACCESS, Optional.of(e)); + switch (cacheKey.authObjectType) { + case TOPIC: + return internalTopicAccessValidator(cacheKey); + case SUBJECT: + return internalSubjectAccessValidator(cacheKey); + default: + throw new IllegalStateException("Unknown access validator type: " + + cacheKey.authObjectType); } - - return new CacheValue(ALLOW_ACCESS, Optional.empty()); } }; } + private CacheValue internalTopicAccessValidator(final CacheKey cacheKey) { + try { + backendValidator.checkTopicAccess( + cacheKey.securityContext, + cacheKey.objectName, + cacheKey.operation + ); + } catch (final KsqlTopicAuthorizationException e) { + return new CacheValue(!ALLOW_ACCESS, Optional.of(e)); + } + + return new CacheValue(ALLOW_ACCESS, Optional.empty()); + } + + private CacheValue internalSubjectAccessValidator(final CacheKey cacheKey) { + try { + backendValidator.checkSubjectAccess( + cacheKey.securityContext, + cacheKey.objectName, + cacheKey.operation + ); + } catch (final KsqlSchemaAuthorizationException e) { + return new CacheValue(!ALLOW_ACCESS, Optional.of(e)); + } + + return new CacheValue(ALLOW_ACCESS, Optional.empty()); + } + + private void checkAccess(final CacheKey cacheKey) { + final CacheValue cacheValue = cache.getUnchecked(cacheKey); + if (!cacheValue.allowAccess) { + throw cacheValue.denialReason.get(); + } + } + @Override - public void checkAccess( + public void checkTopicAccess( final KsqlSecurityContext securityContext, final String topicName, final AclOperation operation ) { - final CacheKey cacheKey = new CacheKey(securityContext, topicName, operation); - final CacheValue cacheValue = cache.getUnchecked(cacheKey); - if (!cacheValue.allowAccess) { - throw cacheValue.denialReason.get(); - } + checkAccess(new CacheKey(securityContext, + AuthObjectType.TOPIC, + topicName, + operation)); + + } + + @Override + public void checkSubjectAccess( + final KsqlSecurityContext securityContext, + final String subjectName, + final AclOperation operation + ) { + checkAccess(new CacheKey(securityContext, + AuthObjectType.SUBJECT, + subjectName, + operation)); } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlProvidedAccessValidator.java b/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlProvidedAccessValidator.java new file mode 100644 index 000000000000..cd7e680dd4b2 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/security/KsqlProvidedAccessValidator.java @@ -0,0 +1,61 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.security; + +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.ImmutableList; +import org.apache.kafka.common.acl.AclOperation; + +/** + * An implementation of {@link KsqlAccessValidator} that provides authorization checks + * from a external authorization provider. + */ +public class KsqlProvidedAccessValidator implements KsqlAccessValidator { + private final KsqlAuthorizationProvider authorizationProvider; + + public KsqlProvidedAccessValidator(final KsqlAuthorizationProvider authorizationProvider) { + this.authorizationProvider = requireNonNull(authorizationProvider, "authorizationProvider"); + } + + @Override + public void checkTopicAccess( + final KsqlSecurityContext securityContext, + final String topicName, + final AclOperation operation + ) { + authorizationProvider.checkPrivileges( + securityContext, + AuthObjectType.TOPIC, + topicName, + ImmutableList.of(operation) + ); + } + + @Override + public void checkSubjectAccess( + final KsqlSecurityContext securityContext, + final String subjectName, + final AclOperation operation + ) { + authorizationProvider.checkPrivileges( + securityContext, + AuthObjectType.SUBJECT, + subjectName, + ImmutableList.of(operation) + ); + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/SourceTopicsExtractor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/SourceTopicsExtractor.java index 9b1c58320c52..f614775a58cd 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/SourceTopicsExtractor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/SourceTopicsExtractor.java @@ -15,6 +15,7 @@ package io.confluent.ksql.topic; +import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.name.SourceName; @@ -32,20 +33,20 @@ * Helper class that extracts all source topics from a query node. */ public class SourceTopicsExtractor extends DefaultTraversalVisitor { - private final Set sourceTopics = new HashSet<>(); + private final Set sourceTopics = new HashSet<>(); private final MetaStore metaStore; - private String primaryKafkaTopicName = null; + private KsqlTopic primarySourceTopic = null; public SourceTopicsExtractor(final MetaStore metaStore) { this.metaStore = metaStore; } - public String getPrimaryKafkaTopicName() { - return primaryKafkaTopicName; + public KsqlTopic getPrimarySourceTopic() { + return primarySourceTopic; } - public Set getSourceTopics() { + public Set getSourceTopics() { return Collections.unmodifiableSet(sourceTopics); } @@ -65,11 +66,11 @@ protected AstNode visitAliasedRelation(final AliasedRelation node, final Void co } // This method is called first with the primary kafka topic (or the node.getFrom() node) - if (primaryKafkaTopicName == null) { - primaryKafkaTopicName = source.getKafkaTopicName(); + if (primarySourceTopic == null) { + primarySourceTopic = source.getKsqlTopic(); } - sourceTopics.add(source.getKafkaTopicName()); + sourceTopics.add(source.getKsqlTopic()); return node; } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java index c2e9332d2707..3fe53fd3054a 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java @@ -147,7 +147,7 @@ private ConfiguredStatement injectForCreateAsSelec final SourceTopicsExtractor extractor = new SourceTopicsExtractor(metaStore); extractor.process(statement.getStatement().getQuery(), null); - final String sourceTopicName = extractor.getPrimaryKafkaTopicName(); + final String sourceTopicName = extractor.getPrimarySourceTopic().getKafkaTopicName(); topicPropertiesBuilder .withName(prefix + createAsSelect.getName().text()) diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactoryTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactoryTest.java index df7ec7c7f58d..01bf8d08a7fd 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactoryTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactoryTest.java @@ -59,6 +59,8 @@ public class KsqlAuthorizationValidatorFactoryTest { private ServiceContext serviceContext; @Mock private AdminClient adminClient; + @Mock + private KsqlAuthorizationProvider authorizationProvider; private Node node; @@ -72,30 +74,72 @@ public void setUp() { } @Test - public void shouldReturnAuthorizationValidator() { + public void shouldReturnProvidedAuthorizationValidatorWhenAuthorizationProviderIsNonEmpty() { + // Given: + givenKafkaAuthorizer("", Collections.emptySet()); + + // When: + final Optional validator = KsqlAuthorizationValidatorFactory.create( + ksqlConfig, + serviceContext, + Optional.of(authorizationProvider) + ); + + // Then + assertThat("validator should be present", validator.isPresent()); + assertThat(validator.get(), is(instanceOf(KsqlAuthorizationValidatorImpl.class))); + assertThat(((KsqlAuthorizationValidatorImpl)validator.get()).getAccessValidator(), + is(instanceOf(KsqlProvidedAccessValidator.class))); + } + + @Test + public void shouldReturnBackendAuthorizationValidatorWhenKafkaAuthorizerIsSet() { + // Given: + givenKafkaAuthorizer("an-authorizer-class", Collections.emptySet()); + + // When: + final Optional validator = KsqlAuthorizationValidatorFactory.create( + ksqlConfig, + serviceContext, + Optional.empty() + ); + + // Then + assertThat("validator should be present", validator.isPresent()); + assertThat(validator.get(), is(instanceOf(KsqlAuthorizationValidatorImpl.class))); + assertThat(((KsqlAuthorizationValidatorImpl)validator.get()).getAccessValidator(), + is(instanceOf(KsqlBackendAccessValidator.class))); + } + + @Test + public void shouldChooseProvidedAuthorizationValidatorOverKafkaBackendValidator() { // Given: givenKafkaAuthorizer("an-authorizer-class", Collections.emptySet()); // When: final Optional validator = KsqlAuthorizationValidatorFactory.create( ksqlConfig, - serviceContext + serviceContext, + Optional.of(authorizationProvider) ); // Then assertThat("validator should be present", validator.isPresent()); assertThat(validator.get(), is(instanceOf(KsqlAuthorizationValidatorImpl.class))); + assertThat(((KsqlAuthorizationValidatorImpl)validator.get()).getAccessValidator(), + is(instanceOf(KsqlProvidedAccessValidator.class))); } @Test - public void shouldReturnEmptyValidator() { + public void shouldReturnEmptyAuthorizationValidatorWhenNoAuthorizationProviderIsFound() { // Given: givenKafkaAuthorizer("", Collections.emptySet()); // When: final Optional validator = KsqlAuthorizationValidatorFactory.create( ksqlConfig, - serviceContext + serviceContext, + Optional.empty() ); // Then @@ -103,15 +147,17 @@ public void shouldReturnEmptyValidator() { } @Test - public void shouldReturnEmptyValidatorIfNotEnabled() { + public void shouldReturnEmptyAuthorizationValidatorKafkaAuthorizerIsSetButNotEnabled() { // Given: + givenKafkaAuthorizer("an-authorizer-class", Collections.emptySet()); when(ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR)) .thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF); // When: final Optional validator = KsqlAuthorizationValidatorFactory.create( ksqlConfig, - serviceContext + serviceContext, + Optional.empty() ); // Then: @@ -130,7 +176,8 @@ public void shouldReturnAuthorizationValidatorIfEnabled() { // When: final Optional validator = KsqlAuthorizationValidatorFactory.create( ksqlConfig, - serviceContext + serviceContext, + Optional.empty() ); // Then: @@ -142,7 +189,28 @@ public void shouldReturnAuthorizationValidatorIfEnabled() { } @Test - public void shouldReturnAuthorizationValidatorWithCacheExpiryTimeIsPositive() { + public void shouldReturnProvidedAuthorizationValidatorWhenCacheIsEnabled() { + // Given: + givenKafkaAuthorizer("", Collections.emptySet()); + when(ksqlConfig.getLong(KsqlConfig.KSQL_AUTH_CACHE_EXPIRY_TIME_SECS)).thenReturn(1L); + + // When: + final Optional validator = KsqlAuthorizationValidatorFactory.create( + ksqlConfig, + serviceContext, + Optional.of(authorizationProvider) + ); + + // Then: + assertThat("validator should be present", validator.isPresent()); + assertThat(validator.get(), is(instanceOf(KsqlAuthorizationValidatorImpl.class))); + assertThat(((KsqlAuthorizationValidatorImpl)validator.get()).getAccessValidator(), + is(instanceOf(KsqlCacheAccessValidator.class))); + verifyNoMoreInteractions(adminClient); + } + + @Test + public void shouldReturnAuthorizationValidatorWhenCacheIsEnabled() { // Given: when(ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR)) .thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON); @@ -152,7 +220,8 @@ public void shouldReturnAuthorizationValidatorWithCacheExpiryTimeIsPositive() { // When: final Optional validator = KsqlAuthorizationValidatorFactory.create( ksqlConfig, - serviceContext + serviceContext, + Optional.empty() ); // Then: @@ -171,7 +240,8 @@ public void shouldReturnEmptyValidatorIfAuthorizedOperationsReturnNull() { // When: final Optional validator = KsqlAuthorizationValidatorFactory.create( ksqlConfig, - serviceContext + serviceContext, + Optional.empty() ); // Then @@ -188,7 +258,8 @@ public void shouldReturnEmptyValidatorIfKafkaBrokerVersionTooLowButAuthorizerCla // When: final Optional validator = KsqlAuthorizationValidatorFactory.create( ksqlConfig, - serviceContext + serviceContext, + Optional.empty() ); // Then @@ -211,7 +282,8 @@ public void shouldReturnEmptyValidatorIfKafkaBrokerVersionTooLowAndExceptionWrap // When: final Optional validator = KsqlAuthorizationValidatorFactory.create( ksqlConfig, - serviceContext + serviceContext, + Optional.empty() ); // Then diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorImplTest.java index 4107e2498c59..7441d3acbe44 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorImplTest.java @@ -20,9 +20,9 @@ import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.doThrow; -import io.confluent.ksql.embedded.KsqlContext; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.KsqlEngineTestUtil; +import io.confluent.ksql.exception.KsqlSchemaAuthorizationException; import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.function.InternalFunctionRegistry; @@ -59,10 +59,28 @@ public class KsqlAuthorizationValidatorImplTest { .valueColumn(ColumnName.of("F1"), SqlTypes.STRING) .build(); - private static final String STREAM_TOPIC_1 = "s1"; - private static final String STREAM_TOPIC_2 = "s2"; - private final static String TOPIC_1 = "topic1"; - private final static String TOPIC_2 = "topic2"; + private static final KeyFormat KAFKA_KEY_FORMAT = KeyFormat.nonWindowed(FormatInfo.of( + FormatFactory.KAFKA.name()), SerdeFeatures.of()); + + private static final ValueFormat KAFKA_VALUE_FORMAT = ValueFormat.of(FormatInfo.of( + FormatFactory.KAFKA.name()), SerdeFeatures.of()); + + private static final KeyFormat AVRO_KEY_FORMAT = KeyFormat.nonWindowed(FormatInfo.of( + FormatFactory.AVRO.name()), SerdeFeatures.of()); + + private static final ValueFormat AVRO_VALUE_FORMAT = ValueFormat.of(FormatInfo.of( + FormatFactory.AVRO.name()), SerdeFeatures.of()); + + private static final String KAFKA_STREAM_TOPIC = "kafka_stream"; + private static final String AVRO_STREAM_TOPIC = "avro_stream"; + private final static String KAFKA_TOPIC = "kafka_topic"; + private final static String AVRO_TOPIC = "avro_topic"; + + private final static KsqlTopic KAFKA_KSQL_TOPIC = + new KsqlTopic(KAFKA_TOPIC, KAFKA_KEY_FORMAT, KAFKA_VALUE_FORMAT); + + private final static KsqlTopic AVRO_KSQL_TOPIC = + new KsqlTopic(AVRO_TOPIC, AVRO_KEY_FORMAT, AVRO_VALUE_FORMAT); @Mock private KsqlAccessValidator accessValidator; @@ -82,8 +100,8 @@ public void setUp() { authorizationValidator = new KsqlAuthorizationValidatorImpl(accessValidator); securityContext = new KsqlSecurityContext(Optional.empty(), serviceContext); - givenStreamWithTopic(STREAM_TOPIC_1, TOPIC_1); - givenStreamWithTopic(STREAM_TOPIC_2, TOPIC_2); + givenStreamWithTopic(KAFKA_STREAM_TOPIC, KAFKA_KSQL_TOPIC); + givenStreamWithTopic(AVRO_STREAM_TOPIC, AVRO_KSQL_TOPIC); } @After @@ -98,7 +116,7 @@ private Statement givenStatement(final String sql) { @Test public void shouldSingleSelectWithReadPermissionsAllowed() { // Given: - final Statement statement = givenStatement("SELECT * FROM " + STREAM_TOPIC_1 + ";"); + final Statement statement = givenStatement("SELECT * FROM " + KAFKA_STREAM_TOPIC + ";"); // When/Then: authorizationValidator.checkAuthorization(securityContext, metaStore, statement); @@ -107,9 +125,9 @@ public void shouldSingleSelectWithReadPermissionsAllowed() { @Test public void shouldThrowWhenSingleSelectWithoutReadPermissionsDenied() { // Given: - givenAccessDenied(TOPIC_1, AclOperation.READ); + givenTopicAccessDenied(KAFKA_TOPIC, AclOperation.READ); final Statement statement = givenStatement(String.format( - "SELECT * FROM %s;", STREAM_TOPIC_1) + "SELECT * FROM %s;", KAFKA_STREAM_TOPIC) ); // When: @@ -120,7 +138,27 @@ public void shouldThrowWhenSingleSelectWithoutReadPermissionsDenied() { // Then: assertThat(e.getMessage(), containsString(String.format( - "Authorization denied to Read on topic(s): [%s]", TOPIC_1 + "Authorization denied to Read on topic(s): [%s]", KAFKA_TOPIC + ))); + } + + @Test + public void shouldThrowWhenSingleSelectWithoutSubjectReadPermissionsDenied() { + // Given: + givenSubjectAccessDenied(AVRO_TOPIC + "-key", AclOperation.READ); + final Statement statement = givenStatement(String.format( + "SELECT * FROM %s;", AVRO_STREAM_TOPIC) + ); + + // When: + final Exception e = assertThrows( + KsqlSchemaAuthorizationException.class, + () -> authorizationValidator.checkAuthorization(securityContext, metaStore, statement) + ); + + // Then: + assertThat(e.getMessage(), containsString(String.format( + "Authorization denied to Read on Schema Registry subject: [%s-key]", AVRO_TOPIC ))); } @@ -128,7 +166,7 @@ public void shouldThrowWhenSingleSelectWithoutReadPermissionsDenied() { public void shouldJoinSelectWithReadPermissionsAllowed() { // Given: final Statement statement = givenStatement(String.format( - "SELECT * FROM %s A JOIN %s B ON A.F1 = B.F1;", STREAM_TOPIC_1, STREAM_TOPIC_2) + "SELECT * FROM %s A JOIN %s B ON A.F1 = B.F1;", KAFKA_STREAM_TOPIC, AVRO_STREAM_TOPIC) ); // When/Then: @@ -138,9 +176,9 @@ public void shouldJoinSelectWithReadPermissionsAllowed() { @Test public void shouldThrowWhenJoinSelectWithoutReadPermissionsDenied() { // Given: - givenAccessDenied(TOPIC_1, AclOperation.READ); + givenTopicAccessDenied(KAFKA_TOPIC, AclOperation.READ); final Statement statement = givenStatement(String.format( - "SELECT * FROM %s A JOIN %s B ON A.F1 = B.F1;", STREAM_TOPIC_1, STREAM_TOPIC_2) + "SELECT * FROM %s A JOIN %s B ON A.F1 = B.F1;", KAFKA_STREAM_TOPIC, AVRO_STREAM_TOPIC) ); // When: @@ -151,16 +189,36 @@ public void shouldThrowWhenJoinSelectWithoutReadPermissionsDenied() { // Then: assertThat(e.getMessage(), containsString(String.format( - "Authorization denied to Read on topic(s): [%s]", TOPIC_1 + "Authorization denied to Read on topic(s): [%s]", KAFKA_TOPIC + ))); + } + + @Test + public void shouldThrowWhenJoinSelectWithoutSubjectReadPermissionsDenied() { + // Given: + givenSubjectAccessDenied(AVRO_TOPIC + "-value", AclOperation.READ); + final Statement statement = givenStatement(String.format( + "SELECT * FROM %s A JOIN %s B ON A.F1 = B.F1;", KAFKA_STREAM_TOPIC, AVRO_STREAM_TOPIC) + ); + + // When: + final Exception e = assertThrows( + KsqlSchemaAuthorizationException.class, + () -> authorizationValidator.checkAuthorization(securityContext, metaStore, statement) + ); + + // Then: + assertThat(e.getMessage(), containsString(String.format( + "Authorization denied to Read on Schema Registry subject: [%s-value]", AVRO_TOPIC ))); } @Test public void shouldThrowWhenJoinWithOneRightTopicWithReadPermissionsDenied() { // Given: - givenAccessDenied(TOPIC_2, AclOperation.READ); + givenTopicAccessDenied(AVRO_TOPIC, AclOperation.READ); final Statement statement = givenStatement(String.format( - "SELECT * FROM %s A JOIN %s B ON A.F1 = B.F1;", STREAM_TOPIC_1, STREAM_TOPIC_2) + "SELECT * FROM %s A JOIN %s B ON A.F1 = B.F1;", KAFKA_STREAM_TOPIC, AVRO_STREAM_TOPIC) ); // When: @@ -171,16 +229,16 @@ public void shouldThrowWhenJoinWithOneRightTopicWithReadPermissionsDenied() { // Then: assertThat(e.getMessage(), containsString(String.format( - "Authorization denied to Read on topic(s): [%s]", TOPIC_2 + "Authorization denied to Read on topic(s): [%s]", AVRO_TOPIC ))); } @Test public void shouldThrowWhenJoinWitOneLeftTopicWithReadPermissionsDenied() { // Given: - givenAccessDenied(TOPIC_1, AclOperation.READ); + givenTopicAccessDenied(KAFKA_TOPIC, AclOperation.READ); final Statement statement = givenStatement(String.format( - "SELECT * FROM %s A JOIN %s B ON A.F1 = B.F1;", STREAM_TOPIC_1, STREAM_TOPIC_2) + "SELECT * FROM %s A JOIN %s B ON A.F1 = B.F1;", KAFKA_STREAM_TOPIC, AVRO_STREAM_TOPIC) ); // When: @@ -191,7 +249,7 @@ public void shouldThrowWhenJoinWitOneLeftTopicWithReadPermissionsDenied() { // Then: assertThat(e.getMessage(), containsString(String.format( - "Authorization denied to Read on topic(s): [%s]", TOPIC_1 + "Authorization denied to Read on topic(s): [%s]", KAFKA_TOPIC ))); } @@ -199,7 +257,7 @@ public void shouldThrowWhenJoinWitOneLeftTopicWithReadPermissionsDenied() { public void shouldInsertIntoWithAllPermissionsAllowed() { // Given: final Statement statement = givenStatement(String.format( - "INSERT INTO %s SELECT * FROM %s;", STREAM_TOPIC_2, STREAM_TOPIC_1) + "INSERT INTO %s SELECT * FROM %s;", AVRO_STREAM_TOPIC, KAFKA_STREAM_TOPIC) ); // When/then: @@ -209,9 +267,9 @@ public void shouldInsertIntoWithAllPermissionsAllowed() { @Test public void shouldThrowWhenInsertIntoWithOnlyReadPermissionsAllowed() { // Given: - givenAccessDenied(TOPIC_2, AclOperation.WRITE); + givenTopicAccessDenied(AVRO_TOPIC, AclOperation.WRITE); final Statement statement = givenStatement(String.format( - "INSERT INTO %s SELECT * FROM %s;", STREAM_TOPIC_2, STREAM_TOPIC_1) + "INSERT INTO %s SELECT * FROM %s;", AVRO_STREAM_TOPIC, KAFKA_STREAM_TOPIC) ); // When: @@ -222,16 +280,16 @@ public void shouldThrowWhenInsertIntoWithOnlyReadPermissionsAllowed() { // Then: assertThat(e.getMessage(), containsString(String.format( - "Authorization denied to Write on topic(s): [%s]", TOPIC_2 + "Authorization denied to Write on topic(s): [%s]", AVRO_TOPIC ))); } @Test public void shouldThrowWhenInsertIntoWithOnlyWritePermissionsAllowed() { // Given: - givenAccessDenied(TOPIC_1, AclOperation.READ); + givenTopicAccessDenied(KAFKA_TOPIC, AclOperation.READ); final Statement statement = givenStatement(String.format( - "INSERT INTO %s SELECT * FROM %s;", STREAM_TOPIC_2, STREAM_TOPIC_1) + "INSERT INTO %s SELECT * FROM %s;", AVRO_STREAM_TOPIC, KAFKA_STREAM_TOPIC) ); // When: @@ -242,16 +300,16 @@ public void shouldThrowWhenInsertIntoWithOnlyWritePermissionsAllowed() { // Then: assertThat(e.getMessage(), containsString(String.format( - "Authorization denied to Read on topic(s): [%s]", TOPIC_1 + "Authorization denied to Read on topic(s): [%s]", KAFKA_TOPIC ))); } @Test public void shouldThrowWhenCreateAsSelectWithoutReadPermissionsDenied() { // Given: - givenAccessDenied(TOPIC_1, AclOperation.READ); + givenTopicAccessDenied(KAFKA_TOPIC, AclOperation.READ); final Statement statement = givenStatement(String.format( - "CREATE STREAM newStream AS SELECT * FROM %s;", STREAM_TOPIC_1) + "CREATE STREAM newStream AS SELECT * FROM %s;", KAFKA_STREAM_TOPIC) ); // When: @@ -262,7 +320,7 @@ public void shouldThrowWhenCreateAsSelectWithoutReadPermissionsDenied() { // Then: assertThat(e.getMessage(), containsString(String.format( - "Authorization denied to Read on topic(s): [%s]", TOPIC_1 + "Authorization denied to Read on topic(s): [%s]", KAFKA_TOPIC ))); } @@ -270,7 +328,7 @@ public void shouldThrowWhenCreateAsSelectWithoutReadPermissionsDenied() { public void shouldCreateAsSelectExistingTopicWithWritePermissionsAllowed() { // Given: final Statement statement = givenStatement(String.format( - "CREATE STREAM %s AS SELECT * FROM %s;", STREAM_TOPIC_2, STREAM_TOPIC_1) + "CREATE STREAM %s AS SELECT * FROM %s;", AVRO_STREAM_TOPIC, KAFKA_STREAM_TOPIC) ); // When/Then: @@ -280,9 +338,9 @@ public void shouldCreateAsSelectExistingTopicWithWritePermissionsAllowed() { @Test public void shouldThrowWhenCreateAsSelectExistingStreamWithoutWritePermissionsDenied() { // Given: - givenAccessDenied(TOPIC_2, AclOperation.WRITE); + givenTopicAccessDenied(AVRO_TOPIC, AclOperation.WRITE); final Statement statement = givenStatement(String.format( - "CREATE STREAM %s AS SELECT * FROM %s;", STREAM_TOPIC_2, STREAM_TOPIC_1) + "CREATE STREAM %s AS SELECT * FROM %s;", AVRO_STREAM_TOPIC, KAFKA_STREAM_TOPIC) ); // When: @@ -293,7 +351,7 @@ public void shouldThrowWhenCreateAsSelectExistingStreamWithoutWritePermissionsDe // Then: assertThat(e.getMessage(), containsString(String.format( - "Authorization denied to Write on topic(s): [%s]", TOPIC_2 + "Authorization denied to Write on topic(s): [%s]", AVRO_TOPIC ))); } @@ -302,7 +360,7 @@ public void shouldCreateAsSelectWithTopicAndWritePermissionsAllowed() { // Given: final Statement statement = givenStatement(String.format( "CREATE STREAM newStream WITH (kafka_topic='%s') AS SELECT * FROM %s;", - TOPIC_2, STREAM_TOPIC_1) + AVRO_TOPIC, KAFKA_STREAM_TOPIC) ); // When/Then: @@ -312,7 +370,7 @@ public void shouldCreateAsSelectWithTopicAndWritePermissionsAllowed() { @Test public void shouldPrintTopicWithReadPermissionsAllowed() { // Given: - final Statement statement = givenStatement(String.format("Print '%s';", TOPIC_1)); + final Statement statement = givenStatement(String.format("Print '%s';", KAFKA_TOPIC)); // When/Then authorizationValidator.checkAuthorization(securityContext, metaStore, statement); @@ -321,8 +379,8 @@ public void shouldPrintTopicWithReadPermissionsAllowed() { @Test public void shouldThrowWhenPrintTopicWithoutReadPermissionsDenied() { // Given: - givenAccessDenied(TOPIC_1, AclOperation.READ); - final Statement statement = givenStatement(String.format("Print '%s';", TOPIC_1)); + givenTopicAccessDenied(KAFKA_TOPIC, AclOperation.READ); + final Statement statement = givenStatement(String.format("Print '%s';", KAFKA_TOPIC)); // When: final Exception e = assertThrows( @@ -332,7 +390,7 @@ public void shouldThrowWhenPrintTopicWithoutReadPermissionsDenied() { // Then: assertThat(e.getMessage(), containsString(String.format( - "Authorization denied to Read on topic(s): [%s]", TOPIC_1 + "Authorization denied to Read on topic(s): [%s]", KAFKA_TOPIC ))); } @@ -340,7 +398,7 @@ public void shouldThrowWhenPrintTopicWithoutReadPermissionsDenied() { public void shouldCreateSourceWithReadPermissionsAllowed() { // Given: final Statement statement = givenStatement(String.format( - "CREATE STREAM s1 WITH (kafka_topic='%s', value_format='JSON');", TOPIC_1) + "CREATE STREAM s1 WITH (kafka_topic='%s', value_format='JSON');", KAFKA_TOPIC) ); // When/Then: @@ -350,9 +408,9 @@ public void shouldCreateSourceWithReadPermissionsAllowed() { @Test public void shouldThrowWhenCreateSourceWithoutReadPermissionsDenied() { // Given: - givenAccessDenied(TOPIC_1, AclOperation.READ); + givenTopicAccessDenied(KAFKA_TOPIC, AclOperation.READ); final Statement statement = givenStatement(String.format( - "CREATE STREAM s1 WITH (kafka_topic='%s', value_format='JSON');", TOPIC_1) + "CREATE STREAM s1 WITH (kafka_topic='%s', value_format='JSON');", KAFKA_TOPIC) ); // When: @@ -363,25 +421,21 @@ public void shouldThrowWhenCreateSourceWithoutReadPermissionsDenied() { // Then: assertThat(e.getMessage(), containsString(String.format( - "Authorization denied to Read on topic(s): [%s]", TOPIC_1 + "Authorization denied to Read on topic(s): [%s]", KAFKA_TOPIC ))); } - private void givenAccessDenied(final String topicName, final AclOperation operation) { + private void givenTopicAccessDenied(final String topicName, final AclOperation operation) { doThrow(new KsqlTopicAuthorizationException(operation, Collections.singleton(topicName))) - .when(accessValidator).checkAccess(securityContext, topicName, operation); + .when(accessValidator).checkTopicAccess(securityContext, topicName, operation); } - private void givenStreamWithTopic( - final String streamName, - final String topicName - ) { - final KsqlTopic sourceTopic = new KsqlTopic( - topicName, - KeyFormat.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name()), SerdeFeatures.of()), - ValueFormat.of(FormatInfo.of(FormatFactory.JSON.name()), SerdeFeatures.of()) - ); + private void givenSubjectAccessDenied(final String subjectName, final AclOperation operation) { + doThrow(new KsqlSchemaAuthorizationException(operation, subjectName)) + .when(accessValidator).checkSubjectAccess(securityContext, subjectName, operation); + } + private void givenStreamWithTopic(final String streamName, final KsqlTopic sourceTopic) { final KsqlStream streamSource = new KsqlStream<>( "", SourceName.of(streamName.toUpperCase()), diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlBackendAccessValidatorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlBackendAccessValidatorTest.java index 210aa08de697..14c6844b64c9 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlBackendAccessValidatorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlBackendAccessValidatorTest.java @@ -15,7 +15,6 @@ package io.confluent.ksql.security; -import static org.apache.kafka.common.acl.AclOperation.READ; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertThrows; @@ -72,7 +71,7 @@ public void shouldAllowIfAuthorizedOperationsIsNull() { givenTopicPermissions(TOPIC_1, null); // When/Then: - accessValidator.checkAccess(securityContext, TOPIC_NAME_1, AclOperation.READ); + accessValidator.checkTopicAccess(securityContext, TOPIC_NAME_1, AclOperation.READ); } @Test @@ -81,7 +80,7 @@ public void shouldAllowIfAuthorizedOperationsContainsREAD() { givenTopicPermissions(TOPIC_1, Collections.singleton(AclOperation.READ)); // When/Then: - accessValidator.checkAccess(securityContext, TOPIC_NAME_1, AclOperation.READ); + accessValidator.checkTopicAccess(securityContext, TOPIC_NAME_1, AclOperation.READ); } @Test @@ -92,7 +91,7 @@ public void shouldDenyIfAuthorizedOperationsDoesNotContainREAD() { // When: final Exception e = assertThrows( KsqlTopicAuthorizationException.class, - () -> accessValidator.checkAccess(securityContext, TOPIC_NAME_1, AclOperation.READ) + () -> accessValidator.checkTopicAccess(securityContext, TOPIC_NAME_1, AclOperation.READ) ); // Then: @@ -110,7 +109,7 @@ public void shouldThrowExceptionWhenDescribeTopicFails() { // When: assertThrows( KafkaResponseGetFailedException.class, - () -> accessValidator.checkAccess(securityContext, TOPIC_NAME_1, READ) + () -> accessValidator.checkTopicAccess(securityContext, TOPIC_NAME_1, AclOperation.READ) ); } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlCacheAccessValidatorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlCacheAccessValidatorTest.java index 23fee571a61c..c4cfaec1072c 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlCacheAccessValidatorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlCacheAccessValidatorTest.java @@ -15,7 +15,6 @@ package io.confluent.ksql.security; -import static org.apache.kafka.common.acl.AclOperation.READ; import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; @@ -24,6 +23,7 @@ import static org.mockito.Mockito.when; import com.google.common.base.Ticker; +import io.confluent.ksql.exception.KsqlSchemaAuthorizationException; import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.util.KsqlConfig; import java.util.concurrent.TimeUnit; @@ -37,6 +37,7 @@ @RunWith(MockitoJUnitRunner.class) public class KsqlCacheAccessValidatorTest { private static final String TOPIC_1 = "topic1"; + private static final String SUBJECT_1 = "subject1"; private static final long ONE_SEC_IN_NS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS); @Mock @@ -60,39 +61,76 @@ public void setUp() { } @Test - public void shouldCheckBackendValidatorOnFirstRequest() { + public void shouldCheckBackendValidatorOnFirstTopicAccessRequest() { // When - cache.checkAccess(securityContext, TOPIC_1, AclOperation.READ); + cache.checkTopicAccess(securityContext, TOPIC_1, AclOperation.READ); // Then verify(backendValidator, times(1)) - .checkAccess(securityContext, TOPIC_1, AclOperation.READ); + .checkTopicAccess(securityContext, TOPIC_1, AclOperation.READ); verifyNoMoreInteractions(backendValidator); } @Test - public void shouldCheckCacheValidatorOnSecondRequest() { + public void shouldCheckCacheValidatorOnSecondTopicAccessRequest() { // When - cache.checkAccess(securityContext, TOPIC_1, AclOperation.READ); + cache.checkTopicAccess(securityContext, TOPIC_1, AclOperation.READ); when(fakeTicker.read()).thenReturn(ONE_SEC_IN_NS); - cache.checkAccess(securityContext, TOPIC_1, AclOperation.READ); + cache.checkTopicAccess(securityContext, TOPIC_1, AclOperation.READ); // Then verify(backendValidator, times(1)) - .checkAccess(securityContext, TOPIC_1, AclOperation.READ); + .checkTopicAccess(securityContext, TOPIC_1, AclOperation.READ); verifyNoMoreInteractions(backendValidator); } @Test - public void shouldThrowAuthorizationExceptionWhenBackendValidatorIsDenied() { + public void shouldCheckBackendValidatorOnFirstSubjectAccessRequest() { + // When + cache.checkSubjectAccess(securityContext, SUBJECT_1, AclOperation.READ); + + // Then + verify(backendValidator, times(1)) + .checkSubjectAccess(securityContext, SUBJECT_1, AclOperation.READ); + verifyNoMoreInteractions(backendValidator); + } + + @Test + public void shouldCheckCacheValidatorOnSecondSubjectAccessRequest() { + // When + cache.checkSubjectAccess(securityContext, SUBJECT_1, AclOperation.READ); + when(fakeTicker.read()).thenReturn(ONE_SEC_IN_NS); + cache.checkSubjectAccess(securityContext, SUBJECT_1, AclOperation.READ); + + // Then + verify(backendValidator, times(1)) + .checkSubjectAccess(securityContext, SUBJECT_1, AclOperation.READ); + verifyNoMoreInteractions(backendValidator); + } + + @Test + public void shouldThrowAuthorizationExceptionWhenBackendTopicValidatorIsDenied() { // Given doThrow(KsqlTopicAuthorizationException.class).when(backendValidator) - .checkAccess(securityContext, TOPIC_1, READ); + .checkTopicAccess(securityContext, TOPIC_1, AclOperation.READ); // When: assertThrows( KsqlTopicAuthorizationException.class, - () -> cache.checkAccess(securityContext, TOPIC_1, READ) + () -> cache.checkTopicAccess(securityContext, TOPIC_1, AclOperation.READ) + ); + } + + @Test + public void shouldThrowAuthorizationExceptionWhenBackendSubjectValidatorIsDenied() { + // Given + doThrow(KsqlSchemaAuthorizationException.class).when(backendValidator) + .checkSubjectAccess(securityContext, SUBJECT_1, AclOperation.READ); + + // When: + assertThrows( + KsqlSchemaAuthorizationException.class, + () -> cache.checkSubjectAccess(securityContext, SUBJECT_1, AclOperation.READ) ); } @@ -100,12 +138,25 @@ public void shouldThrowAuthorizationExceptionWhenBackendValidatorIsDenied() { public void shouldThrowExceptionWhenBackendValidatorThrowsAnException() { // Given doThrow(RuntimeException.class).when(backendValidator) - .checkAccess(securityContext, TOPIC_1, READ); + .checkTopicAccess(securityContext, TOPIC_1, AclOperation.READ); + + // When: + assertThrows( + RuntimeException.class, + () -> cache.checkTopicAccess(securityContext, TOPIC_1, AclOperation.READ) + ); + } + + @Test + public void shouldThrowExceptionWhenBackendSubjectValidatorThrowsAnException() { + // Given + doThrow(RuntimeException.class).when(backendValidator) + .checkSubjectAccess(securityContext, SUBJECT_1, AclOperation.READ); // When: assertThrows( RuntimeException.class, - () -> cache.checkAccess(securityContext, TOPIC_1, READ) + () -> cache.checkSubjectAccess(securityContext, SUBJECT_1, AclOperation.READ) ); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlProvidedAccessValidatorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlProvidedAccessValidatorTest.java new file mode 100644 index 000000000000..84af01841103 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlProvidedAccessValidatorTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.security; + +import com.google.common.collect.ImmutableList; +import org.apache.kafka.common.acl.AclOperation; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@RunWith(MockitoJUnitRunner.class) +public class KsqlProvidedAccessValidatorTest { + private KsqlProvidedAccessValidator accessValidator; + + @Mock + private KsqlSecurityContext securityContext; + @Mock + private KsqlAuthorizationProvider authorizationProvider; + + @Before + public void setup() { + accessValidator = new KsqlProvidedAccessValidator(authorizationProvider); + } + + @Test + public void shouldCheckTopicPrivilegesOnProvidedAccessValidator() { + // When + accessValidator.checkTopicAccess(securityContext, "topic1", AclOperation.WRITE); + + // Then + verify(authorizationProvider, times(1)) + .checkPrivileges( + securityContext, + AuthObjectType.TOPIC, + "topic1", + ImmutableList.of(AclOperation.WRITE)); + } + + @Test + public void shouldCheckSubjectTopicPrivilegesOnProvidedAccessValidator() { + // When + accessValidator.checkSubjectAccess(securityContext, "subject1", AclOperation.READ); + + // Then + verify(authorizationProvider, times(1)) + .checkPrivileges( + securityContext, + AuthObjectType.SUBJECT, + "subject1", + ImmutableList.of(AclOperation.READ)); + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/SourceTopicsExtractorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/SourceTopicsExtractorTest.java index f7fb980c2875..5ef8661491e3 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/SourceTopicsExtractorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/SourceTopicsExtractorTest.java @@ -20,7 +20,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThrows; -import static org.mockito.Mockito.when; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.KsqlEngineTestUtil; @@ -40,11 +39,11 @@ import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.serde.SerdeFeatures; import io.confluent.ksql.serde.ValueFormat; -import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlException; import java.util.Optional; -import org.apache.kafka.clients.admin.TopicDescription; +import java.util.stream.Collectors; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -60,17 +59,23 @@ public class SourceTopicsExtractorTest { .valueColumn(ColumnName.of("F1"), SqlTypes.STRING) .build(); + private static final KsqlTopic TOPIC_1 = new KsqlTopic( + "topic1", + KeyFormat.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name()), SerdeFeatures.of()), + ValueFormat.of(FormatInfo.of(FormatFactory.JSON.name()), SerdeFeatures.of()) + ); + + private static final KsqlTopic TOPIC_2 = new KsqlTopic( + "topic2", + KeyFormat.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name()), SerdeFeatures.of()), + ValueFormat.of(FormatInfo.of(FormatFactory.JSON.name()), SerdeFeatures.of()) + ); + private static final String STREAM_TOPIC_1 = "s1"; private static final String STREAM_TOPIC_2 = "s2"; @Mock private ServiceContext serviceContext; - @Mock - private KafkaTopicClient kafkaTopicClient; - @Mock - private TopicDescription TOPIC_1; - @Mock - private TopicDescription TOPIC_2; private SourceTopicsExtractor extractor; private KsqlEngine ksqlEngine; @@ -82,10 +87,7 @@ public void setUp() { ksqlEngine = KsqlEngineTestUtil.createKsqlEngine(serviceContext, metaStore); extractor = new SourceTopicsExtractor(metaStore); - givenTopic("topic1", TOPIC_1); givenStreamWithTopic(STREAM_TOPIC_1, TOPIC_1); - - givenTopic("topic2", TOPIC_2); givenStreamWithTopic(STREAM_TOPIC_2, TOPIC_2); } @@ -107,7 +109,7 @@ public void shouldExtractTopicFromSimpleSelect() { extractor.process(statement, null); // Then: - assertThat(extractor.getPrimaryKafkaTopicName(), is(TOPIC_1.name())); + assertThat(extractor.getPrimarySourceTopic(), is(TOPIC_1)); } @Test @@ -121,7 +123,7 @@ public void shouldExtractPrimaryTopicFromJoinSelect() { extractor.process(statement, null); // Then: - assertThat(extractor.getPrimaryKafkaTopicName(), is(TOPIC_1.name())); + assertThat(extractor.getPrimarySourceTopic(), is(TOPIC_1)); } @Test @@ -135,7 +137,7 @@ public void shouldExtractJoinTopicsFromJoinSelect() { extractor.process(statement, null); // Then: - assertThat(extractor.getSourceTopics(), contains(TOPIC_1.name(), TOPIC_2.name())); + assertThat(extractor.getSourceTopics(), contains(TOPIC_2, TOPIC_1)); } @Test @@ -156,14 +158,8 @@ public void shouldFailIfSourceTopicNotInMetastore() { private void givenStreamWithTopic( final String streamName, - final TopicDescription topicDescription + final KsqlTopic sourceTopic ) { - final KsqlTopic sourceTopic = new KsqlTopic( - topicDescription.name(), - KeyFormat.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name()), SerdeFeatures.of()), - ValueFormat.of(FormatInfo.of(FormatFactory.JSON.name()), SerdeFeatures.of()) - ); - final KsqlStream streamSource = new KsqlStream<>( "", SourceName.of(streamName.toUpperCase()), @@ -175,8 +171,4 @@ private void givenStreamWithTopic( metaStore.putSource(streamSource, false); } - - private static void givenTopic(final String topicName, final TopicDescription topicDescription) { - when(topicDescription.name()).thenReturn(topicName); - } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 1c4a846f6a3b..99bb7c3aab37 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -313,7 +313,8 @@ public void startAsync() { this.serverMetadataResource = ServerMetadataResource.create(serviceContext, ksqlConfigNoPort); final StatementParser statementParser = new StatementParser(ksqlEngine); final Optional authorizationValidator = - KsqlAuthorizationValidatorFactory.create(ksqlConfigNoPort, serviceContext); + KsqlAuthorizationValidatorFactory.create(ksqlConfigNoPort, serviceContext, + securityExtension.getAuthorizationProvider()); final Errors errorHandler = new Errors(restConfig.getConfiguredInstance( KsqlRestConfig.KSQL_SERVER_ERROR_MESSAGES, ErrorMessages.class @@ -759,7 +760,8 @@ static KsqlRestApplication buildApplication( restConfig); final Optional authorizationValidator = - KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext); + KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext, + securityExtension.getAuthorizationProvider()); final Errors errorHandler = new Errors(restConfig.getConfiguredInstance( KsqlRestConfig.KSQL_SERVER_ERROR_MESSAGES, diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/AuthTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/AuthTest.java index 1d1cf851a963..5b9747740004 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/AuthTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/AuthTest.java @@ -27,7 +27,9 @@ import io.confluent.ksql.api.utils.InsertsResponse; import io.confluent.ksql.api.utils.QueryResponse; import io.confluent.ksql.rest.server.KsqlRestConfig; +import io.confluent.ksql.security.AuthObjectType; import io.confluent.ksql.security.KsqlAuthorizationProvider; +import io.confluent.ksql.security.KsqlSecurityContext; import io.confluent.ksql.security.KsqlSecurityExtension; import io.confluent.ksql.security.KsqlUserContextProvider; import io.confluent.ksql.test.util.TestBasicJaasConfig; @@ -42,12 +44,15 @@ import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.client.WebClient; import java.security.Principal; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; + +import org.apache.kafka.common.acl.AclOperation; import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; @@ -546,9 +551,23 @@ private void shouldAllowAccessWithoutAuthentication( stopServer(); stopClient(); AtomicReference authorizationCallReference = new AtomicReference<>(false); - this.authorizationProvider = (user, method, path) -> { - authorizationCallReference.set(true); + this.authorizationProvider = new KsqlAuthorizationProvider() { + @Override + public void checkEndpointAccess(final Principal user, + final String method, + final String path) { + authorizationCallReference.set(true); + } + + @Override + public void checkPrivileges(final KsqlSecurityContext securityContext, + final AuthObjectType objectType, + final String objectName, + final List privileges) { + // Not required for vert.x authX as it only authorizes endpoints + } }; + createServer(createServerConfig()); client = createClient(); action.run(); @@ -563,12 +582,26 @@ private void shouldAllowAccessWithPermissionCheck(final String expectedUser, AtomicReference principalAtomicReference = new AtomicReference<>(); AtomicReference methodAtomicReference = new AtomicReference<>(); AtomicReference pathAtomicReference = new AtomicReference<>(); - this.authorizationProvider = (user, method, path) -> { - throwIfNullPrincipal(user); - principalAtomicReference.set(user); - methodAtomicReference.set(method); - pathAtomicReference.set(path); + this.authorizationProvider = new KsqlAuthorizationProvider() { + @Override + public void checkEndpointAccess(final Principal user, + final String method, + final String path) { + throwIfNullPrincipal(user); + principalAtomicReference.set(user); + methodAtomicReference.set(method); + pathAtomicReference.set(path); + } + + @Override + public void checkPrivileges(final KsqlSecurityContext securityContext, + final AuthObjectType objectType, + final String objectName, + final List privileges) { + // Not required for vert.x authX as it only authorizes endpoints + } }; + createServer(createServerConfig()); client = createClient(); action.run(); @@ -581,9 +614,23 @@ private void shouldNotAllowAccessIfPermissionCheckThrowsException( ExceptionThrowingRunnable runnable) throws Exception { stopServer(); stopClient(); - this.authorizationProvider = (user, method, path) -> { - throw new KsqlException("Forbidden"); + this.authorizationProvider = new KsqlAuthorizationProvider() { + @Override + public void checkEndpointAccess(final Principal user, + final String method, + final String path) { + throw new KsqlException("Forbidden"); + } + + @Override + public void checkPrivileges(final KsqlSecurityContext securityContext, + final AuthObjectType objectType, + final String objectName, + final List privileges) { + // Not required for vert.x authX as it only authorizes endpoints + } }; + createServer(createServerConfig()); client = createClient(); runnable.run(); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/MockKsqlSecurityExtension.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/MockKsqlSecurityExtension.java index b14ffcdfb7cd..616f74bf1732 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/MockKsqlSecurityExtension.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/MockKsqlSecurityExtension.java @@ -1,9 +1,14 @@ package io.confluent.ksql.rest.integration; +import io.confluent.ksql.security.AuthObjectType; import io.confluent.ksql.security.KsqlAuthorizationProvider; +import io.confluent.ksql.security.KsqlSecurityContext; import io.confluent.ksql.security.KsqlSecurityExtension; import io.confluent.ksql.security.KsqlUserContextProvider; import io.confluent.ksql.util.KsqlConfig; +import org.apache.kafka.common.acl.AclOperation; +import java.security.Principal; +import java.util.List; import java.util.Optional; /** @@ -22,9 +27,23 @@ public void initialize(final KsqlConfig ksqlConfig) { @Override public Optional getAuthorizationProvider() { - return Optional.of( - (user, method, path) -> MockKsqlSecurityExtension.provider - .checkEndpointAccess(user, method, path)); + return Optional.of(new KsqlAuthorizationProvider() { + @Override + public void checkEndpointAccess(final Principal user, + final String method, + final String path) { + MockKsqlSecurityExtension.provider.checkEndpointAccess(user, method, path); + } + + @Override + public void checkPrivileges(final KsqlSecurityContext securityContext, + final AuthObjectType objectType, + final String objectName, + final List privileges) { + MockKsqlSecurityExtension.provider + .checkPrivileges(securityContext, objectType, objectName, privileges); + } + }); } @Override From 0e222e113fca77f146c9ab172d39b8160bfa75ae Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Thu, 22 Jul 2021 14:49:46 -0500 Subject: [PATCH 2/2] in progress --- config/ksql-server.properties | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/config/ksql-server.properties b/config/ksql-server.properties index 29bec1bec28d..8077ee8564be 100644 --- a/config/ksql-server.properties +++ b/config/ksql-server.properties @@ -14,6 +14,34 @@ # #------ Endpoint config ------- +#------ RBAC ------------------ + +# Enable KSQL authorization and impersonation +ksql.security.extension.class=io.confluent.ksql.security.KsqlConfluentSecurityExtension + +# Metadata URL and access credentials (shared by Security handlers and KSQL security extension) +# The `ksql:ksql` user:password are pre-configured in the `login.properties` you pasted in the Kafka server.properties +confluent.metadata.bootstrap.server.urls=http://localhost:8090 +confluent.metadata.http.auth.credentials.provider=BASIC +confluent.metadata.basic.auth.user.info=ksql:ksql + +# Configure the plugin (found in the confluent-security-plugins) +# You must add the `confluent-security-plugins` to the ksqlDB classpath `KSQL_CLASSPATH` +ksql.authentication.plugin.class=io.confluent.ksql.security.VertxBearerOrBasicAuthenticationPlugin + +# Replace {PUBLIC_KEY} with the location of the `publickey.pem` file of this repository +# (i.e. /path/to/devel-tools/rbac/config/creds/publickey.pem) +# This is the same public key shared with the Kafka server. It is used only to validate client tokens. +public.key.path=/home/sergio/Tools/devel-tools/rbac/config/creds/publickey.pem + +# Configure the connection to the Kafka server +# The `ksql:ksql` user:password is pre-configured in the `login.properties`. +# Note: You must configure RBAC roles-bindings to allow `ksql` to access Kafka topics later. +security.protocol=SASL_PLAINTEXT +sasl.mechanism=OAUTHBEARER +sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ + metadataServerUrls="http://localhost:8090" username="ksql" password="ksql"; +sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler ### HTTP ### # The URL the KSQL server will listen on: @@ -78,5 +106,7 @@ compression.type=snappy #------ Schema Registry ------- # Uncomment and complete the following to enable KSQL's integration to the Confluent Schema Registry: -# ksql.schema.registry.url=http://localhost:8081 +ksql.schema.registry.url=http://localhost:8081 +ksql.schema.registry.basic.auth.credentials.source=USER_INFO +ksql.schema.registry.basic.auth.user.info=ksql:ksql