Skip to content

Commit

Permalink
feat: add config for enabling topic access validator (#3079)
Browse files Browse the repository at this point in the history
Adds a config called ksql.access.validator.enable for enabling the
topic access validator. The possible values for this config are "on",
"off", and "auto". "on" is used to enable the validator, "off"
is used to disable it, and "auto" can be set to have ksql query kafka
properties to auto-discover whether or not the validator can be used (
this is the behaviour before this patch, and is the current default
value for this setting). This config is useful for enabling the validator
when using ksql against multi-tenant kafka, where ksql will not have
access to the broker's configs.

This patch also switches the topic validator factory test to use mockito
for mocking, instead of easymock.
  • Loading branch information
rodesai authored Jul 23, 2019
1 parent 7647e05 commit 440e247
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 42 deletions.
23 changes: 23 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_SECURITY_EXTENSION_DOC = "A KSQL security extension class that "
+ "provides authorization to KSQL servers.";

public static final String KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR = "ksql.access.validator.enable";
public static final String KSQL_ACCESS_VALIDATOR_ON = "on";
public static final String KSQL_ACCESS_VALIDATOR_OFF = "off";
public static final String KSQL_ACCESS_VALIDATOR_AUTO = "auto";
public static final String KSQL_ACCESS_VALIDATOR_DOC =
"Config to enable/disable the topic access validator, which checks that KSQL can access "
+ "the involved topics before committing to execute a statement. Possible values are "
+ "\"on\", \"off\", and \"auto\". Setting to \"on\" enables the validator. Setting to "
+ "\"off\" disables the validator. If set to \"auto\", KSQL will attempt to discover "
+ "whether the Kafka cluster supports the required API, and enables the validator if "
+ "it does.";

public static final Collection<CompatibilityBreakingConfigDef> COMPATIBLY_BREAKING_CONFIG_DEFS
= ImmutableList.of(
new CompatibilityBreakingConfigDef(
Expand Down Expand Up @@ -470,6 +482,17 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
null,
ConfigDef.Importance.LOW,
KSQL_CUSTOM_METRICS_EXTENSION_DOC
).define(
KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR,
Type.STRING,
KSQL_ACCESS_VALIDATOR_AUTO,
ValidString.in(
KSQL_ACCESS_VALIDATOR_ON,
KSQL_ACCESS_VALIDATOR_OFF,
KSQL_ACCESS_VALIDATOR_AUTO
),
ConfigDef.Importance.LOW,
KSQL_ACCESS_VALIDATOR_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

package io.confluent.ksql.engine;

import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.services.KafkaClusterUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlServerException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConfigEntry;
Expand All @@ -28,15 +28,23 @@
public final class TopicAccessValidatorFactory {
private static final Logger LOG = LoggerFactory.getLogger(TopicAccessValidatorFactory.class);
private static final String KAFKA_AUTHORIZER_CLASS_NAME = "authorizer.class.name";
private static final TopicAccessValidator DUMMY_VALIDATOR = (sc, metastore, statement) -> { };

private TopicAccessValidatorFactory() {

}

public static TopicAccessValidator create(
final ServiceContext serviceContext,
final MetaStore metaStore
final KsqlConfig ksqlConfig,
final ServiceContext serviceContext
) {
final String enabled = ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR);
if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON)) {
LOG.info("Forcing topic access validator");
return new AuthorizationTopicAccessValidator();
} else if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF)) {
return DUMMY_VALIDATOR;
}

final AdminClient adminClient = serviceContext.getAdminClient();

if (isKafkaAuthorizerEnabled(adminClient)) {
Expand All @@ -49,11 +57,7 @@ public static TopicAccessValidator create(
+ "version does not support authorizedOperations(). "
+ "KSQL topic authorization checks will not be enabled.");
}

// Dummy validator if a Kafka authorizer is not enabled
return (sc, metastore, statement) -> {
return;
};
return DUMMY_VALIDATOR;
}

private static boolean isKafkaAuthorizerEnabled(final AdminClient adminClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@

package io.confluent.ksql.engine;

import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -36,37 +38,41 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

@RunWith(EasyMockRunner.class)
public class TopicAccessValidatorFactoryTest {
private static final String KAFKA_AUTHORIZER_CLASS_NAME = "authorizer.class.name";

@Mock
private KsqlConfig ksqlConfig;
@Mock
private ServiceContext serviceContext;
@Mock
private AdminClient adminClient;

private Node node;

@Rule
final public MockitoRule mockitoJUnit = MockitoJUnit.rule();

@Before
public void setUp() {
node = new Node(1, "host", 9092);

expect(serviceContext.getAdminClient()).andReturn(adminClient);
replay(serviceContext);
when(serviceContext.getAdminClient()).thenReturn(adminClient);
when(ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR))
.thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_AUTO);
}

@Test
Expand All @@ -75,7 +81,10 @@ public void shouldReturnAuthorizationValidator() {
givenKafkaAuthorizer("an-authorizer-class", Collections.emptySet());

// When:
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(serviceContext, null);
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then
assertThat(validator, is(instanceOf(AuthorizationTopicAccessValidator.class)));
Expand All @@ -87,19 +96,59 @@ public void shouldReturnDummyValidator() {
givenKafkaAuthorizer("", Collections.emptySet());

// When:
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(serviceContext, null);
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then
assertThat(validator, not(instanceOf(AuthorizationTopicAccessValidator.class)));
}

@Test
public void shouldReturnDummyValidatorIfNotEnabled() {
// Given:
when(ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR))
.thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF);

// When:
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then:
assertThat(validator, not(instanceOf(AuthorizationTopicAccessValidator.class)));
verifyZeroInteractions(adminClient);
}

@Test
public void shouldReturnAuthorizationValidatorIfEnabled() {
// Given:
when(ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR))
.thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON);

// When:
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then:
assertThat(validator, instanceOf(AuthorizationTopicAccessValidator.class));
verifyZeroInteractions(adminClient);
}

@Test
public void shouldReturnDummyValidatorIfAuthorizedOperationsReturnNull() {
// Given:
givenKafkaAuthorizer("an-authorizer-class", null);

// When:
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(serviceContext, null);
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then
assertThat(validator, not(instanceOf(AuthorizationTopicAccessValidator.class)));
Expand All @@ -109,24 +158,25 @@ private void givenKafkaAuthorizer(
final String className,
final Set<AclOperation> authOperations
) {
expect(adminClient.describeCluster()).andReturn(describeClusterResult(authOperations));
expect(adminClient.describeCluster(anyObject()))
.andReturn(describeClusterResult(authOperations));
expect(adminClient.describeConfigs(describeBrokerRequest()))
.andReturn(describeBrokerResult(Collections.singletonList(
final DescribeClusterResult describeClusterResult = describeClusterResult(authOperations);
when(adminClient.describeCluster()).thenReturn(describeClusterResult);
when(adminClient.describeCluster(any()))
.thenReturn(describeClusterResult);
final DescribeConfigsResult describeConfigsResult = describeBrokerResult(
Collections.singletonList(
new ConfigEntry(KAFKA_AUTHORIZER_CLASS_NAME, className)
)));

replay(adminClient);
)
);
when(adminClient.describeConfigs(describeBrokerRequest()))
.thenReturn(describeConfigsResult);
}

private DescribeClusterResult describeClusterResult(final Set<AclOperation> authOperations) {
final Collection<Node> nodes = Collections.singletonList(node);
final DescribeClusterResult describeClusterResult = EasyMock.mock(DescribeClusterResult.class);
expect(describeClusterResult.nodes()).andReturn(KafkaFuture.completedFuture(nodes));
expect(describeClusterResult.authorizedOperations())
.andReturn(KafkaFuture.completedFuture(authOperations));
replay(describeClusterResult);
final DescribeClusterResult describeClusterResult = mock(DescribeClusterResult.class);
when(describeClusterResult.nodes()).thenReturn(KafkaFuture.completedFuture(nodes));
when(describeClusterResult.authorizedOperations())
.thenReturn(KafkaFuture.completedFuture(authOperations));
return describeClusterResult;
}

Expand All @@ -138,8 +188,7 @@ private DescribeConfigsResult describeBrokerResult(final List<ConfigEntry> broke
final DescribeConfigsResult describeConfigsResult = mock(DescribeConfigsResult.class);
final Map<ConfigResource, Config> config = ImmutableMap.of(
new ConfigResource(ConfigResource.Type.BROKER, node.idString()), new Config(brokerConfigs));
expect(describeConfigsResult.all()).andReturn(KafkaFuture.completedFuture(config)).anyTimes();
replay(describeConfigsResult);
when(describeConfigsResult.all()).thenReturn(KafkaFuture.completedFuture(config));
return describeConfigsResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) {

final StatementParser statementParser = new StatementParser(ksqlEngine);
final TopicAccessValidator topicAccessValidator =
TopicAccessValidatorFactory.create(serviceContext, ksqlEngine.getMetaStore());
TopicAccessValidatorFactory.create(ksqlConfig, serviceContext);

container.addEndpoint(
ServerEndpointConfig.Builder
Expand Down Expand Up @@ -500,7 +500,7 @@ static KsqlRestApplication buildApplication(
final KsqlSecurityExtension securityExtension = loadSecurityExtension(ksqlConfig);

final TopicAccessValidator topicAccessValidator =
TopicAccessValidatorFactory.create(serviceContext, ksqlEngine.getMetaStore());
TopicAccessValidatorFactory.create(ksqlConfig, serviceContext);

final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
ksqlConfig,
Expand Down

0 comments on commit 440e247

Please sign in to comment.