Skip to content

Commit

Permalink
feat: enable Kafla ACL authorization checks for Pull Queries (#4187)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena authored Jan 10, 2020
1 parent f991752 commit 5ee1e9e
Show file tree
Hide file tree
Showing 10 changed files with 32 additions and 91 deletions.
1 change: 0 additions & 1 deletion ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ public class CliTest {
.builder(TEST_HARNESS::kafkaBootstrapServers)
.withProperty(KsqlConfig.SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY,
KsqlConstants.defaultSinkWindowChangeLogAdditionalRetention + 1)
.withProperty(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true)
.build();

@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public final class ImmutableProperties {
.add(KsqlConfig.KSQL_EXT_DIR)
.add(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG)
.add(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG)
.add(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG)
.addAll(KsqlConfig.SSL_CONFIG_NAMES)
.build();

Expand Down
13 changes: 0 additions & 13 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,6 @@ public class KsqlConfig extends AbstractConfig {
+ "\"off\" disables the validator. If set to \"auto\", KSQL will attempt to discover "
+ "whether the Kafka cluster supports the required API, and enables the validator if "
+ "it does.";
public static final String KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG =
"ksql.query.pull.skip.access.validator";
public static final boolean KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DEFAULT = false;
public static final String KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC = "If \"true\", KSQL will "
+ " NOT enforce access validation checks for pull queries, which could expose Kafka topics"
+ " which are secured with ACLs. Please enable only after careful consideration."
+ " If \"false\", KSQL pull queries will fail against a secure Kafka cluster";

public static final String KSQL_PULL_QUERIES_ENABLE_CONFIG = "ksql.pull.queries.enable";
public static final String KSQL_QUERY_PULL_ENABLE_DOC =
Expand Down Expand Up @@ -503,12 +496,6 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DEFAULT,
Importance.LOW,
KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DOC
).define(
KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG,
Type.BOOLEAN,
KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DEFAULT,
Importance.LOW,
KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC
).define(
KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public class RestQueryTranslationTest {
private static final TestKsqlRestApp REST_APP = TestKsqlRestApp
.builder(TEST_HARNESS::kafkaBootstrapServers)
.withProperty(KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.withProperty(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true)
.withStaticServiceContext(TEST_HARNESS::getServiceContext)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -181,35 +180,24 @@ private Response handleStatement(
final PreparedStatement<?> statement
) {
try {
final Consumer<KsqlAuthorizationValidator> authValidationConsumer =
ksqlAuthorizationValidator -> ksqlAuthorizationValidator.checkAuthorization(
authorizationValidator.ifPresent(validator ->
validator.checkAuthorization(
securityContext,
ksqlEngine.getMetaStore(),
statement.getStatement()
);
statement.getStatement())
);

if (statement.getStatement() instanceof Query) {
final PreparedStatement<Query> queryStmt = (PreparedStatement<Query>) statement;

if (queryStmt.getStatement().isPullQuery()) {
final boolean skipAccessValidation = ksqlConfig.getBoolean(
KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG);
if (authorizationValidator.isPresent() && !skipAccessValidation) {
return Errors.badRequest("Pull queries are not currently supported when "
+ "access validation against Kafka is configured. If you really want to "
+ "bypass this limitation please set "
+ KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG + "=true "
+ KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC);
}

return handlePullQuery(
securityContext.getServiceContext(),
queryStmt,
request.getStreamsProperties()
);
}

authorizationValidator.ifPresent(authValidationConsumer);
return handlePushQuery(
securityContext.getServiceContext(),
queryStmt,
Expand All @@ -218,7 +206,6 @@ private Response handleStatement(
}

if (statement.getStatement() instanceof PrintTopic) {
authorizationValidator.ifPresent(authValidationConsumer);
return handlePrintTopic(
securityContext.getServiceContext(),
request.getStreamsProperties(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,23 +370,11 @@ private PreparedStatement<?> parseStatement(final KsqlRequest request) {
}

private void validateKafkaAuthorization(final Statement statement) {
if (statement instanceof Query && ((Query) statement).isPullQuery()) {
final boolean skipAccessValidation = ksqlConfig.getBoolean(
KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG);
if (authorizationValidator.isPresent() && !skipAccessValidation) {
throw new KsqlException("Pull queries are not currently supported when "
+ "access validation against Kafka is configured. If you really want to "
+ "bypass this limitation please set "
+ KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG + "=true "
+ KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC);
}
} else {
authorizationValidator.ifPresent(validator -> validator.checkAuthorization(
securityContext,
ksqlEngine.getMetaStore(),
statement)
);
}
authorizationValidator.ifPresent(validator -> validator.checkAuthorization(
securityContext,
ksqlEngine.getMetaStore(),
statement)
);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public class PullQueryFunctionalTest {
.withBasicCredentials(USER_WITH_ACCESS, USER_WITH_ACCESS_PWD)
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.STATE_DIR_CONFIG, getNewStateDir())
.withProperty(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true)
.withProperty(RestConfig.AUTHENTICATION_METHOD_CONFIG, RestConfig.AUTHENTICATION_METHOD_BASIC)
.withProperty(RestConfig.AUTHENTICATION_REALM_CONFIG, PROPS_JAAS_REALM)
.withProperty(RestConfig.AUTHENTICATION_ROLES_CONFIG, KSQL_CLUSTER_ID)
Expand All @@ -123,7 +122,6 @@ public class PullQueryFunctionalTest {
.withBasicCredentials(USER_WITH_ACCESS, USER_WITH_ACCESS_PWD)
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.STATE_DIR_CONFIG, getNewStateDir())
.withProperty(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true)
.withProperty(RestConfig.AUTHENTICATION_METHOD_CONFIG, RestConfig.AUTHENTICATION_METHOD_BASIC)
.withProperty(RestConfig.AUTHENTICATION_REALM_CONFIG, PROPS_JAAS_REALM)
.withProperty(RestConfig.AUTHENTICATION_ROLES_CONFIG, KSQL_CLUSTER_ID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ public class RestApiTest {
.withProperty("security.protocol", "SASL_SSL")
.withProperty("sasl.mechanism", "PLAIN")
.withProperty("sasl.jaas.config", SecureKafkaHelper.buildJaasConfig(NORMAL_USER))
.withProperty(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true)
.withProperties(ClientTrustStore.trustStoreProps())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -286,8 +285,7 @@ public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumb
public void shouldNotCreateExternalClientsForPullQuery() {
// Given
testResource.configure(new KsqlConfig(ImmutableMap.of(
StreamsConfig.APPLICATION_SERVER_CONFIG, "something:1",
KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true
StreamsConfig.APPLICATION_SERVER_CONFIG, "something:1"
)));

// When:
Expand All @@ -304,44 +302,24 @@ public void shouldNotCreateExternalClientsForPullQuery() {
}

@Test
public void shouldThrowExceptionForPullQueryIfValidating() {
// When:
final Response response = testResource.streamQuery(
securityContext,
new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), null)
);

// Then:
assertThat(response.getStatus(), is(Errors.badRequest("").getStatus()));
assertThat(response.getEntity(), is(instanceOf(KsqlErrorMessage.class)));
final KsqlErrorMessage expectedEntity = (KsqlErrorMessage) response.getEntity();
assertThat(
expectedEntity.getMessage(),
containsString(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG)
);
}

@Test
public void shouldPassCheckForPullQueryIfNotValidating() {
// Given
testResource.configure(new KsqlConfig(ImmutableMap.of(
StreamsConfig.APPLICATION_SERVER_CONFIG, "something:1",
KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true
)));
public void shouldReturnForbiddenKafkaAccessForPullQueryAuthorizationDenied() {
// Given:
when(mockStatementParser.<Query>parseSingleStatement(PULL_QUERY_STRING))
.thenReturn(query);
doThrow(
new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(TOPIC_NAME)))
.when(authorizationValidator).checkAuthorization(any(), any(), any());

// When:
final Response response = testResource.streamQuery(
securityContext,
new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), null)
);

// Then:
assertThat(response.getStatus(), is(Errors.badRequest("").getStatus()));
final KsqlErrorMessage expectedEntity = (KsqlErrorMessage) response.getEntity();
assertThat(
expectedEntity.getMessage(),
not(containsString(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG))
);
final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity();
final KsqlErrorMessage expectedEntity = (KsqlErrorMessage) AUTHORIZATION_ERROR_RESPONSE.getEntity();
assertEquals(response.getStatus(), AUTHORIZATION_ERROR_RESPONSE.getStatus());
assertEquals(responseEntity.getMessage(), expectedEntity.getMessage());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,6 @@ public void shouldReturnErrorMessageWhenTopicAuthorizationException() throws Exc
@Test
public void shouldHandlePullQuery() {
// Given:
when(ksqlConfig.getBoolean(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG))
.thenReturn(true);
givenQueryIs(QueryType.PULL);
givenRequestIs(query);

Expand All @@ -445,17 +443,26 @@ public void shouldHandlePullQuery() {
}

@Test
public void shouldFailPullQueryIfValidating() throws Exception {
public void shouldFailPullQueryIfTopicAuthorizationIsDenied() throws Exception {
// Given:
final String errorMessage = "authorization error";
givenQueryIs(QueryType.PULL);
givenRequestIs(query);
when(errorsHandler.kafkaAuthorizationErrorMessage(any(TopicAuthorizationException.class)))
.thenReturn(errorMessage);
doThrow(new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton("topic")))
.when(authorizationValidator).checkAuthorization(
argThat(securityContext ->
securityContext.getServiceContext() == serviceContext),
eq(metaStore),
eq(query));

// When:
wsQueryEndpoint.onOpen(session, null);

// Then:
verifyClosedContainingReason(
"Pull queries are not currently supported",
errorMessage,
CloseCodes.CANNOT_ACCEPT
);
}
Expand Down

0 comments on commit 5ee1e9e

Please sign in to comment.