Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add config for enabling topic access validator #3079

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_SECURITY_EXTENSION_DOC = "A KSQL security extension class that "
+ "provides authorization to KSQL servers.";

public static final String KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR = "ksql.access.validator.enable";
public static final String KSQL_ACCESS_VALIDATOR_ON = "on";
public static final String KSQL_ACCESS_VALIDATOR_OFF = "off";
public static final String KSQL_ACCESS_VALIDATOR_AUTO = "auto";
public static final String KSQL_ACCESS_VALIDATOR_DOC =
"Config to enable/disable the topic access validator, which checks that KSQL can access "
+ "the involved topics before committing to execute a statement. Possible values are "
+ "\"on\", \"off\", and \"auto\". Setting to \"on\" enables the validator. Setting to "
+ "\"off\" disables the validator. If set to \"auto\", KSQL will attempt to discover "
+ "whether the Kafka cluster supports the required API, and enables the validator if "
+ "it does.";

public static final Collection<CompatibilityBreakingConfigDef> COMPATIBLY_BREAKING_CONFIG_DEFS
= ImmutableList.of(
new CompatibilityBreakingConfigDef(
Expand Down Expand Up @@ -470,6 +482,17 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
null,
ConfigDef.Importance.LOW,
KSQL_CUSTOM_METRICS_EXTENSION_DOC
).define(
KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR,
Type.STRING,
KSQL_ACCESS_VALIDATOR_AUTO,
ValidString.in(
KSQL_ACCESS_VALIDATOR_ON,
KSQL_ACCESS_VALIDATOR_OFF,
KSQL_ACCESS_VALIDATOR_AUTO
),
ConfigDef.Importance.LOW,
KSQL_ACCESS_VALIDATOR_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

package io.confluent.ksql.engine;

import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.services.KafkaClusterUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlServerException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConfigEntry;
Expand All @@ -28,15 +28,23 @@
public final class TopicAccessValidatorFactory {
private static final Logger LOG = LoggerFactory.getLogger(TopicAccessValidatorFactory.class);
private static final String KAFKA_AUTHORIZER_CLASS_NAME = "authorizer.class.name";
private static final TopicAccessValidator DUMMY_VALIDATOR = (sc, metastore, statement) -> { };

private TopicAccessValidatorFactory() {

}

public static TopicAccessValidator create(
final ServiceContext serviceContext,
final MetaStore metaStore
final KsqlConfig ksqlConfig,
final ServiceContext serviceContext
) {
final String enabled = ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR);
if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional:
Is case-sensitive a requirement in all KSQL configurations? Should we allow the use of any case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only other similar config (ksql.named.internal.topics) is case-sensitive. I don't feel particularly strongly one way or another.

LOG.info("Forcing topic access validator");
return new AuthorizationTopicAccessValidator();
} else if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF)) {
return DUMMY_VALIDATOR;
}

final AdminClient adminClient = serviceContext.getAdminClient();

if (isKafkaAuthorizerEnabled(adminClient)) {
Expand All @@ -49,11 +57,7 @@ public static TopicAccessValidator create(
+ "version does not support authorizedOperations(). "
+ "KSQL topic authorization checks will not be enabled.");
}

// Dummy validator if a Kafka authorizer is not enabled
return (sc, metastore, statement) -> {
return;
};
return DUMMY_VALIDATOR;
}

private static boolean isKafkaAuthorizerEnabled(final AdminClient adminClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@

package io.confluent.ksql.engine;

import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -36,37 +38,41 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

@RunWith(EasyMockRunner.class)
public class TopicAccessValidatorFactoryTest {
private static final String KAFKA_AUTHORIZER_CLASS_NAME = "authorizer.class.name";

@Mock
private KsqlConfig ksqlConfig;
@Mock
private ServiceContext serviceContext;
@Mock
private AdminClient adminClient;

private Node node;

@Rule
final public MockitoRule mockitoJUnit = MockitoJUnit.rule();

@Before
public void setUp() {
node = new Node(1, "host", 9092);

expect(serviceContext.getAdminClient()).andReturn(adminClient);
replay(serviceContext);
when(serviceContext.getAdminClient()).thenReturn(adminClient);
when(ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR))
.thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_AUTO);
}

@Test
Expand All @@ -75,7 +81,10 @@ public void shouldReturnAuthorizationValidator() {
givenKafkaAuthorizer("an-authorizer-class", Collections.emptySet());

// When:
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(serviceContext, null);
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then
assertThat(validator, is(instanceOf(AuthorizationTopicAccessValidator.class)));
Expand All @@ -87,19 +96,59 @@ public void shouldReturnDummyValidator() {
givenKafkaAuthorizer("", Collections.emptySet());

// When:
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(serviceContext, null);
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then
assertThat(validator, not(instanceOf(AuthorizationTopicAccessValidator.class)));
}

@Test
public void shouldReturnDummyValidatorIfNotEnabled() {
// Given:
when(ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR))
.thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF);

// When:
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then:
assertThat(validator, not(instanceOf(AuthorizationTopicAccessValidator.class)));
verifyZeroInteractions(adminClient);
}

@Test
public void shouldReturnAuthorizationValidatorIfEnabled() {
// Given:
when(ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR))
.thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON);

// When:
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then:
assertThat(validator, instanceOf(AuthorizationTopicAccessValidator.class));
verifyZeroInteractions(adminClient);
}

@Test
public void shouldReturnDummyValidatorIfAuthorizedOperationsReturnNull() {
// Given:
givenKafkaAuthorizer("an-authorizer-class", null);

// When:
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(serviceContext, null);
final TopicAccessValidator validator = TopicAccessValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then
assertThat(validator, not(instanceOf(AuthorizationTopicAccessValidator.class)));
Expand All @@ -109,24 +158,25 @@ private void givenKafkaAuthorizer(
final String className,
final Set<AclOperation> authOperations
) {
expect(adminClient.describeCluster()).andReturn(describeClusterResult(authOperations));
expect(adminClient.describeCluster(anyObject()))
.andReturn(describeClusterResult(authOperations));
expect(adminClient.describeConfigs(describeBrokerRequest()))
.andReturn(describeBrokerResult(Collections.singletonList(
final DescribeClusterResult describeClusterResult = describeClusterResult(authOperations);
when(adminClient.describeCluster()).thenReturn(describeClusterResult);
when(adminClient.describeCluster(any()))
.thenReturn(describeClusterResult);
final DescribeConfigsResult describeConfigsResult = describeBrokerResult(
Collections.singletonList(
new ConfigEntry(KAFKA_AUTHORIZER_CLASS_NAME, className)
)));

replay(adminClient);
)
);
when(adminClient.describeConfigs(describeBrokerRequest()))
.thenReturn(describeConfigsResult);
}

private DescribeClusterResult describeClusterResult(final Set<AclOperation> authOperations) {
final Collection<Node> nodes = Collections.singletonList(node);
final DescribeClusterResult describeClusterResult = EasyMock.mock(DescribeClusterResult.class);
expect(describeClusterResult.nodes()).andReturn(KafkaFuture.completedFuture(nodes));
expect(describeClusterResult.authorizedOperations())
.andReturn(KafkaFuture.completedFuture(authOperations));
replay(describeClusterResult);
final DescribeClusterResult describeClusterResult = mock(DescribeClusterResult.class);
when(describeClusterResult.nodes()).thenReturn(KafkaFuture.completedFuture(nodes));
when(describeClusterResult.authorizedOperations())
.thenReturn(KafkaFuture.completedFuture(authOperations));
return describeClusterResult;
}

Expand All @@ -138,8 +188,7 @@ private DescribeConfigsResult describeBrokerResult(final List<ConfigEntry> broke
final DescribeConfigsResult describeConfigsResult = mock(DescribeConfigsResult.class);
final Map<ConfigResource, Config> config = ImmutableMap.of(
new ConfigResource(ConfigResource.Type.BROKER, node.idString()), new Config(brokerConfigs));
expect(describeConfigsResult.all()).andReturn(KafkaFuture.completedFuture(config)).anyTimes();
replay(describeConfigsResult);
when(describeConfigsResult.all()).thenReturn(KafkaFuture.completedFuture(config));
return describeConfigsResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) {

final StatementParser statementParser = new StatementParser(ksqlEngine);
final TopicAccessValidator topicAccessValidator =
TopicAccessValidatorFactory.create(serviceContext, ksqlEngine.getMetaStore());
TopicAccessValidatorFactory.create(ksqlConfig, serviceContext);

container.addEndpoint(
ServerEndpointConfig.Builder
Expand Down Expand Up @@ -500,7 +500,7 @@ static KsqlRestApplication buildApplication(
final KsqlSecurityExtension securityExtension = loadSecurityExtension(ksqlConfig);

final TopicAccessValidator topicAccessValidator =
TopicAccessValidatorFactory.create(serviceContext, ksqlEngine.getMetaStore());
TopicAccessValidatorFactory.create(ksqlConfig, serviceContext);

final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
ksqlConfig,
Expand Down