Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make SchemaRegistry permission validations on KSQL requests #7773

Merged
merged 2 commits into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion config/ksql-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,34 @@
#

#------ Endpoint config -------
#------ RBAC ------------------

# Enable KSQL authorization and impersonation
ksql.security.extension.class=io.confluent.ksql.security.KsqlConfluentSecurityExtension

# Metadata URL and access credentials (shared by Security handlers and KSQL security extension)
# The `ksql:ksql` user:password are pre-configured in the `login.properties` you pasted in the Kafka server.properties
confluent.metadata.bootstrap.server.urls=http://localhost:8090
confluent.metadata.http.auth.credentials.provider=BASIC
confluent.metadata.basic.auth.user.info=ksql:ksql

# Configure the plugin (found in the confluent-security-plugins)
# You must add the `confluent-security-plugins` to the ksqlDB classpath `KSQL_CLASSPATH`
ksql.authentication.plugin.class=io.confluent.ksql.security.VertxBearerOrBasicAuthenticationPlugin

# Replace {PUBLIC_KEY} with the location of the `publickey.pem` file of this repository
# (i.e. /path/to/devel-tools/rbac/config/creds/publickey.pem)
# This is the same public key shared with the Kafka server. It is used only to validate client tokens.
public.key.path=/home/sergio/Tools/devel-tools/rbac/config/creds/publickey.pem

# Configure the connection to the Kafka server
# The `ksql:ksql` user:password is pre-configured in the `login.properties`.
# Note: You must configure RBAC roles-bindings to allow `ksql` to access Kafka topics later.
security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
metadataServerUrls="http://localhost:8090" username="ksql" password="ksql";
sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler

### HTTP ###
# The URL the KSQL server will listen on:
Expand Down Expand Up @@ -78,5 +106,7 @@ compression.type=snappy
#------ Schema Registry -------

# Uncomment and complete the following to enable KSQL's integration to the Confluent Schema Registry:
# ksql.schema.registry.url=http://localhost:8081
ksql.schema.registry.url=http://localhost:8081
ksql.schema.registry.basic.auth.credentials.source=USER_INFO
ksql.schema.registry.basic.auth.user.info=ksql:ksql

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.security;

import static java.util.Objects.requireNonNull;

public enum AuthObjectType {
TOPIC("Topic"),
SUBJECT("Subject");

private final String name;

AuthObjectType(final String name) {
this.name = requireNonNull(name, "name");
}

@Override
public String toString() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,30 @@
*/
public interface KsqlAccessValidator {
/**
* Checks if an authenticated user provided by the {@code securityContext} has authorization
* to execute the {@code operation} on the kafka {@code topicName}.
* Checks if an authenticated user, provided by the {@code securityContext}, has authorization
* to execute all specified {@code actions} on the {@code topicName}.
*
* @param securityContext The context for the authenticated user.
* @param topicName The topic name to check access.
* @param operation The {@code AclOperation} to validate against the {@code topicName}.
*/
void checkAccess(KsqlSecurityContext securityContext, String topicName, AclOperation operation);
void checkTopicAccess(
KsqlSecurityContext securityContext,
String topicName,
AclOperation operation
);

/**
* Checks if an authenticated user, provided by the {@code securityContext}, has authorization
* to execute all specified {@code actions} on the {@code subjectName}.
*
* @param securityContext The context for the authenticated user.
* @param subjectName The subject name to check access.
* @param operation The {@code AclOperation} to validate against the {@code subjectName}.
*/
void checkSubjectAccess(
KsqlSecurityContext securityContext,
String subjectName,
AclOperation operation
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.confluent.ksql.security;

import java.security.Principal;
import java.util.List;
import org.apache.kafka.common.acl.AclOperation;

/**
* Interface that provides authorization to KSQL.
Expand All @@ -29,4 +31,18 @@ public interface KsqlAuthorizationProvider {
* @param path The endpoint path to access, i.e. "/ksql", "/ksql/terminate", "/query"*
*/
void checkEndpointAccess(Principal user, String method, String path);

/**
* Checks if the user (if available) with the {@code userSecurityContext} has the specified
* {@code privileges} on the the specified {@code objectType} and {@code objectName}.
*
* @param userSecurityContext The user security context which privileges will be checked
* @param objectType The object type to check for privileges
* @param objectName The object name to check for privileges
* @param privileges The list of privileges to check in the resource
*/
void checkPrivileges(KsqlSecurityContext userSecurityContext,
AuthObjectType objectType,
String objectName,
List<AclOperation> privileges);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,44 +35,73 @@ private KsqlAuthorizationValidatorFactory() {
}

public static Optional<KsqlAuthorizationValidator> create(
final KsqlConfig ksqlConfig,
final ServiceContext serviceContext,
final Optional<KsqlAuthorizationProvider> externalAuthorizationProvider
) {
final Optional<KsqlAccessValidator> accessValidator = getAccessValidator(
ksqlConfig,
serviceContext,
externalAuthorizationProvider
);

return accessValidator.map(v ->
new KsqlAuthorizationValidatorImpl(cacheIfEnabled(ksqlConfig, v)));
}

private static Optional<KsqlAccessValidator> getAccessValidator(
final KsqlConfig ksqlConfig,
final ServiceContext serviceContext,
final Optional<KsqlAuthorizationProvider> externalAuthorizationProvider
) {
if (externalAuthorizationProvider.isPresent()) {
return Optional.of(new KsqlProvidedAccessValidator(externalAuthorizationProvider.get()));
} else if (isTopicAccessValidatorEnabled(ksqlConfig, serviceContext)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also have a config for enabling SR auth?

return Optional.of(new KsqlBackendAccessValidator());
}

return Optional.empty();
}

private static KsqlAccessValidator cacheIfEnabled(
final KsqlConfig ksqlConfig,
final KsqlAccessValidator accessValidator
) {
return isCacheEnabled(ksqlConfig)
? new KsqlCacheAccessValidator(ksqlConfig, accessValidator)
: accessValidator;
}

private static boolean isCacheEnabled(final KsqlConfig ksqlConfig) {
// The cache expiry time is used to decided whether to enable the cache or not
return ksqlConfig.getLong(KsqlConfig.KSQL_AUTH_CACHE_EXPIRY_TIME_SECS) > 0;
}

private static boolean isTopicAccessValidatorEnabled(
final KsqlConfig ksqlConfig,
final ServiceContext serviceContext
) {
final String enabled = ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR);
if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON)) {
LOG.info("Forcing topic access validator");
return Optional.of(createAuthorizationValidator(ksqlConfig));
return true;
} else if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF)) {
return Optional.empty();
return false;
}

// If KSQL_ACCESS_VALIDATOR_AUTO, then check if Kafka has an authorizer enabled
final Admin adminClient = serviceContext.getAdminClient();

if (isKafkaAuthorizerEnabled(adminClient)) {
if (KafkaClusterUtil.isAuthorizedOperationsSupported(adminClient)) {
LOG.info("KSQL topic authorization checks enabled.");
return Optional.of(createAuthorizationValidator(ksqlConfig));
return true;
}

LOG.warn("The Kafka broker has an authorization service enabled, but the Kafka "
+ "version does not support authorizedOperations(). "
+ "KSQL topic authorization checks will not be enabled.");
}
return Optional.empty();
}

private static KsqlAuthorizationValidator createAuthorizationValidator(
final KsqlConfig ksqlConfig
) {
KsqlAccessValidator accessValidator = new KsqlBackendAccessValidator();

// The cache expiry time is used to decided whether to enable the cache or not
final long expiryTime = ksqlConfig.getLong(KsqlConfig.KSQL_AUTH_CACHE_EXPIRY_TIME_SECS);
if (expiryTime > 0) {
accessValidator = new KsqlCacheAccessValidator(ksqlConfig, accessValidator);
}

return new KsqlAuthorizationValidatorImpl(accessValidator);
return false;
}

private static boolean isKafkaAuthorizerEnabled(final Admin adminClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.security;

import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
Expand All @@ -24,8 +25,13 @@
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.topic.SourceTopicsExtractor;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.util.Set;
import org.apache.kafka.common.acl.AclOperation;

/**
Expand Down Expand Up @@ -69,10 +75,9 @@ private void validateQuery(
final MetaStore metaStore,
final Query query
) {
final SourceTopicsExtractor extractor = new SourceTopicsExtractor(metaStore);
extractor.process(query, null);
for (String kafkaTopic : extractor.getSourceTopics()) {
accessValidator.checkAccess(securityContext, kafkaTopic, AclOperation.READ);
for (KsqlTopic ksqlTopic : extractQueryTopics(query, metaStore)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a comment but a question for my own understanding: it seems KsqlAuthorizationValidator is wrapping a KsqlAccessValidator here, can we just merge these two interfaces? I guess that's boiled down to:

  • What are the conceptual differences of these two?
  • Would KsqlAccessValidator ever need to be leveraged directly without the KsqlAuthorizationValidator?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KsqlAuthorizationValidator validates the user can execute a specified statement. This class gets information about that, such as topics and schemas; and then validates each resource based on the type of statement (query, create, insert, ...).

Initially, the validation was part of this KsqlAuthorizationValidator, but I split it when I introduced an authorization cache, which lives in KsqlCacheAccessValidator. The cache wraps another validation if the cache is enabled. The current validation, besides caching, is the KsqlBackendAccessValidator, which just checks with the backend service (i.e. Kafka) to check for ACLs. The new one is KsqlProvidedAccessValidator, which checks with an external service, like RBAC. SR permissions will be checked by the KSQL confluent security plugins which can either use RBAC or call the SR /permissions endpoint to check for ACLs (both part of the confluent security plugins).

checkTopicAccess(securityContext, ksqlTopic.getKafkaTopicName(), AclOperation.READ);
checkSchemaAccess(securityContext, ksqlTopic, AclOperation.READ);
}
}

Expand All @@ -94,7 +99,7 @@ private void validateCreateAsSelect(

// At this point, the topic should have been created by the TopicCreateInjector
final String kafkaTopic = getCreateAsSelectSinkTopic(metaStore, createAsSelect);
accessValidator.checkAccess(securityContext, kafkaTopic, AclOperation.WRITE);
checkTopicAccess(securityContext, kafkaTopic, AclOperation.WRITE);
}

private void validateInsertInto(
Expand All @@ -111,22 +116,28 @@ private void validateInsertInto(
validateQuery(securityContext, metaStore, insertInto.getQuery());

final String kafkaTopic = getSourceTopicName(metaStore, insertInto.getTarget());
accessValidator.checkAccess(securityContext, kafkaTopic, AclOperation.WRITE);
checkTopicAccess(securityContext, kafkaTopic, AclOperation.WRITE);
}

private void validatePrintTopic(
final KsqlSecurityContext securityContext,
final PrintTopic printTopic
) {
accessValidator.checkAccess(securityContext, printTopic.getTopic(), AclOperation.READ);
checkTopicAccess(securityContext, printTopic.getTopic(), AclOperation.READ);

// SchemaRegistry permissions cannot be validated here because the schema is guessed when
// printing the topic by obtaining the first row and attempt to find the right schema
}

private void validateCreateSource(
final KsqlSecurityContext securityContext,
final CreateSource createSource
) {
final String sourceTopic = createSource.getProperties().getKafkaTopic();
accessValidator.checkAccess(securityContext, sourceTopic, AclOperation.READ);
checkTopicAccess(securityContext, sourceTopic, AclOperation.READ);

// SchemaRegistry permissions are validated when SchemaRegisterInjector is called during CREATE
// operations. There's no need to validate the user has READ permissions here.
}

private String getSourceTopicName(final MetaStore metaStore, final SourceName streamOrTable) {
Expand All @@ -146,4 +157,39 @@ private String getCreateAsSelectSinkTopic(
return createAsSelect.getProperties().getKafkaTopic()
.orElseGet(() -> getSourceTopicName(metaStore, createAsSelect.getName()));
}

private Set<KsqlTopic> extractQueryTopics(final Query query, final MetaStore metaStore) {
final SourceTopicsExtractor extractor = new SourceTopicsExtractor(metaStore);
extractor.process(query, null);
return extractor.getSourceTopics();
}

private void checkTopicAccess(
final KsqlSecurityContext securityContext,
final String resourceName,
final AclOperation operation
) {
accessValidator.checkTopicAccess(securityContext, resourceName, operation);
}

private void checkSchemaAccess(
final KsqlSecurityContext securityContext,
final KsqlTopic ksqlTopic,
final AclOperation operation
) {

if (formatSupportsSchemaInference(ksqlTopic.getKeyFormat().getFormatInfo())) {
accessValidator.checkSubjectAccess(securityContext,
KsqlConstants.getSRSubject(ksqlTopic.getKafkaTopicName(), true), operation);
}

if (formatSupportsSchemaInference(ksqlTopic.getValueFormat().getFormatInfo())) {
accessValidator.checkSubjectAccess(securityContext,
KsqlConstants.getSRSubject(ksqlTopic.getKafkaTopicName(), false), operation);
}
}

private static boolean formatSupportsSchemaInference(final FormatInfo format) {
return FormatFactory.of(format).supportsFeature(SerdeFeature.SCHEMA_INFERENCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@

/**
* An implementation of {@link KsqlAccessValidator} that provides authorization checks
* from the Kafka service.
* from the backend services.
*/
public class KsqlBackendAccessValidator implements KsqlAccessValidator {
@Override
public void checkAccess(
public void checkTopicAccess(
final KsqlSecurityContext securityContext,
final String topicName,
final AclOperation operation
Expand All @@ -42,4 +42,16 @@ public void checkAccess(
throw new KsqlTopicAuthorizationException(operation, Collections.singleton(topicName));
}
}

@Override
public void checkSubjectAccess(
final KsqlSecurityContext securityContext,
final String subjectName,
final AclOperation operation
) {
// Nothing to do. Checking permissions for Schema Registry require an external authorization
// provider. Schema Registry does not have an API to list the allowed permissions for
// the user in a specified subject.
return;
}
}
Loading