diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 210dbb0285bb..57b629032410 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -168,6 +168,18 @@ public class KsqlConfig extends AbstractConfig { public static final String KSQL_SECURITY_EXTENSION_DOC = "A KSQL security extension class that " + "provides authorization to KSQL servers."; + public static final String KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR = "ksql.access.validator.enable"; + public static final String KSQL_ACCESS_VALIDATOR_ON = "on"; + public static final String KSQL_ACCESS_VALIDATOR_OFF = "off"; + public static final String KSQL_ACCESS_VALIDATOR_AUTO = "auto"; + public static final String KSQL_ACCESS_VALIDATOR_DOC = + "Config to enable/disable the topic access validator, which checks that KSQL can access " + + "the involved topics before committing to execute a statement. Possible values are " + + "\"on\", \"off\", and \"auto\". Setting to \"on\" enables the validator. Setting to " + + "\"off\" disables the validator. If set to \"auto\", KSQL will attempt to discover " + + "whether the Kafka cluster supports the required API, and enables the validator if " + + "it does."; + public static final Collection COMPATIBLY_BREAKING_CONFIG_DEFS = ImmutableList.of( new CompatibilityBreakingConfigDef( @@ -470,6 +482,17 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { null, ConfigDef.Importance.LOW, KSQL_CUSTOM_METRICS_EXTENSION_DOC + ).define( + KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR, + Type.STRING, + KSQL_ACCESS_VALIDATOR_AUTO, + ValidString.in( + KSQL_ACCESS_VALIDATOR_ON, + KSQL_ACCESS_VALIDATOR_OFF, + KSQL_ACCESS_VALIDATOR_AUTO + ), + ConfigDef.Importance.LOW, + KSQL_ACCESS_VALIDATOR_DOC ) .withClientSslSupport(); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/TopicAccessValidatorFactory.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/TopicAccessValidatorFactory.java index f2403886945c..d3a6874e127f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/TopicAccessValidatorFactory.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/TopicAccessValidatorFactory.java @@ -15,9 +15,9 @@ package io.confluent.ksql.engine; -import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.services.KafkaClusterUtil; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlServerException; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ConfigEntry; @@ -28,15 +28,23 @@ public final class TopicAccessValidatorFactory { private static final Logger LOG = LoggerFactory.getLogger(TopicAccessValidatorFactory.class); private static final String KAFKA_AUTHORIZER_CLASS_NAME = "authorizer.class.name"; + private static final TopicAccessValidator DUMMY_VALIDATOR = (sc, metastore, statement) -> { }; private TopicAccessValidatorFactory() { - } public static TopicAccessValidator create( - final ServiceContext serviceContext, - final MetaStore metaStore + final KsqlConfig ksqlConfig, + final ServiceContext serviceContext ) { + final String enabled = ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR); + if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON)) { + LOG.info("Forcing topic access validator"); + return new AuthorizationTopicAccessValidator(); + } else if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF)) { + return DUMMY_VALIDATOR; + } + final AdminClient adminClient = serviceContext.getAdminClient(); if (isKafkaAuthorizerEnabled(adminClient)) { @@ -49,11 +57,7 @@ public static TopicAccessValidator create( + "version does not support authorizedOperations(). " + "KSQL topic authorization checks will not be enabled."); } - - // Dummy validator if a Kafka authorizer is not enabled - return (sc, metastore, statement) -> { - return; - }; + return DUMMY_VALIDATOR; } private static boolean isKafkaAuthorizerEnabled(final AdminClient adminClient) { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/TopicAccessValidatorFactoryTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/TopicAccessValidatorFactoryTest.java index 1a3b5f5081f1..085e2f5a3e59 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/TopicAccessValidatorFactoryTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/TopicAccessValidatorFactoryTest.java @@ -15,18 +15,20 @@ package io.confluent.ksql.engine; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.mock; -import static org.easymock.EasyMock.replay; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.any; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.util.KsqlConfig; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -36,24 +38,24 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; -import org.apache.kafka.clients.admin.DescribeClusterOptions; import org.apache.kafka.clients.admin.DescribeClusterResult; import org.apache.kafka.clients.admin.DescribeConfigsResult; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.config.ConfigResource; -import org.easymock.EasyMock; -import org.easymock.EasyMockRunner; -import org.easymock.Mock; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; -import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; -@RunWith(EasyMockRunner.class) public class TopicAccessValidatorFactoryTest { private static final String KAFKA_AUTHORIZER_CLASS_NAME = "authorizer.class.name"; + @Mock + private KsqlConfig ksqlConfig; @Mock private ServiceContext serviceContext; @Mock @@ -61,12 +63,16 @@ public class TopicAccessValidatorFactoryTest { private Node node; + @Rule + final public MockitoRule mockitoJUnit = MockitoJUnit.rule(); + @Before public void setUp() { node = new Node(1, "host", 9092); - expect(serviceContext.getAdminClient()).andReturn(adminClient); - replay(serviceContext); + when(serviceContext.getAdminClient()).thenReturn(adminClient); + when(ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR)) + .thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_AUTO); } @Test @@ -75,7 +81,10 @@ public void shouldReturnAuthorizationValidator() { givenKafkaAuthorizer("an-authorizer-class", Collections.emptySet()); // When: - final TopicAccessValidator validator = TopicAccessValidatorFactory.create(serviceContext, null); + final TopicAccessValidator validator = TopicAccessValidatorFactory.create( + ksqlConfig, + serviceContext + ); // Then assertThat(validator, is(instanceOf(AuthorizationTopicAccessValidator.class))); @@ -87,19 +96,59 @@ public void shouldReturnDummyValidator() { givenKafkaAuthorizer("", Collections.emptySet()); // When: - final TopicAccessValidator validator = TopicAccessValidatorFactory.create(serviceContext, null); + final TopicAccessValidator validator = TopicAccessValidatorFactory.create( + ksqlConfig, + serviceContext + ); // Then assertThat(validator, not(instanceOf(AuthorizationTopicAccessValidator.class))); } + @Test + public void shouldReturnDummyValidatorIfNotEnabled() { + // Given: + when(ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR)) + .thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF); + + // When: + final TopicAccessValidator validator = TopicAccessValidatorFactory.create( + ksqlConfig, + serviceContext + ); + + // Then: + assertThat(validator, not(instanceOf(AuthorizationTopicAccessValidator.class))); + verifyZeroInteractions(adminClient); + } + + @Test + public void shouldReturnAuthorizationValidatorIfEnabled() { + // Given: + when(ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR)) + .thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON); + + // When: + final TopicAccessValidator validator = TopicAccessValidatorFactory.create( + ksqlConfig, + serviceContext + ); + + // Then: + assertThat(validator, instanceOf(AuthorizationTopicAccessValidator.class)); + verifyZeroInteractions(adminClient); + } + @Test public void shouldReturnDummyValidatorIfAuthorizedOperationsReturnNull() { // Given: givenKafkaAuthorizer("an-authorizer-class", null); // When: - final TopicAccessValidator validator = TopicAccessValidatorFactory.create(serviceContext, null); + final TopicAccessValidator validator = TopicAccessValidatorFactory.create( + ksqlConfig, + serviceContext + ); // Then assertThat(validator, not(instanceOf(AuthorizationTopicAccessValidator.class))); @@ -109,24 +158,25 @@ private void givenKafkaAuthorizer( final String className, final Set authOperations ) { - expect(adminClient.describeCluster()).andReturn(describeClusterResult(authOperations)); - expect(adminClient.describeCluster(anyObject())) - .andReturn(describeClusterResult(authOperations)); - expect(adminClient.describeConfigs(describeBrokerRequest())) - .andReturn(describeBrokerResult(Collections.singletonList( + final DescribeClusterResult describeClusterResult = describeClusterResult(authOperations); + when(adminClient.describeCluster()).thenReturn(describeClusterResult); + when(adminClient.describeCluster(any())) + .thenReturn(describeClusterResult); + final DescribeConfigsResult describeConfigsResult = describeBrokerResult( + Collections.singletonList( new ConfigEntry(KAFKA_AUTHORIZER_CLASS_NAME, className) - ))); - - replay(adminClient); + ) + ); + when(adminClient.describeConfigs(describeBrokerRequest())) + .thenReturn(describeConfigsResult); } private DescribeClusterResult describeClusterResult(final Set authOperations) { final Collection nodes = Collections.singletonList(node); - final DescribeClusterResult describeClusterResult = EasyMock.mock(DescribeClusterResult.class); - expect(describeClusterResult.nodes()).andReturn(KafkaFuture.completedFuture(nodes)); - expect(describeClusterResult.authorizedOperations()) - .andReturn(KafkaFuture.completedFuture(authOperations)); - replay(describeClusterResult); + final DescribeClusterResult describeClusterResult = mock(DescribeClusterResult.class); + when(describeClusterResult.nodes()).thenReturn(KafkaFuture.completedFuture(nodes)); + when(describeClusterResult.authorizedOperations()) + .thenReturn(KafkaFuture.completedFuture(authOperations)); return describeClusterResult; } @@ -138,8 +188,7 @@ private DescribeConfigsResult describeBrokerResult(final List broke final DescribeConfigsResult describeConfigsResult = mock(DescribeConfigsResult.class); final Map config = ImmutableMap.of( new ConfigResource(ConfigResource.Type.BROKER, node.idString()), new Config(brokerConfigs)); - expect(describeConfigsResult.all()).andReturn(KafkaFuture.completedFuture(config)).anyTimes(); - replay(describeConfigsResult); + when(describeConfigsResult.all()).thenReturn(KafkaFuture.completedFuture(config)); return describeConfigsResult; } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 37a1c0dcf6ac..07003bcaed9d 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -395,7 +395,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) { final StatementParser statementParser = new StatementParser(ksqlEngine); final TopicAccessValidator topicAccessValidator = - TopicAccessValidatorFactory.create(serviceContext, ksqlEngine.getMetaStore()); + TopicAccessValidatorFactory.create(ksqlConfig, serviceContext); container.addEndpoint( ServerEndpointConfig.Builder @@ -500,7 +500,7 @@ static KsqlRestApplication buildApplication( final KsqlSecurityExtension securityExtension = loadSecurityExtension(ksqlConfig); final TopicAccessValidator topicAccessValidator = - TopicAccessValidatorFactory.create(serviceContext, ksqlEngine.getMetaStore()); + TopicAccessValidatorFactory.create(ksqlConfig, serviceContext); final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( ksqlConfig,