Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add config to disable pull queries when validation is required #3879

Merged
merged 3 commits into from
Nov 18, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public class CliTest {
.builder(CLUSTER::bootstrapServers)
.withProperty(KsqlConfig.SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY,
KsqlConstants.defaultSinkWindowChangeLogAdditionalRetention + 1)
.withProperty(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true)
.build();

@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public final class ImmutableProperties {
.add(KsqlConfig.KSQL_EXT_DIR)
.add(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG)
.add(KsqlConfig.KSQL_QUERY_PULL_ENABLE_CONFIG)
.add(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG)
.addAll(KsqlConfig.SSL_CONFIG_NAMES)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@ public class KsqlConfig extends AbstractConfig {
+ "\"off\" disables the validator. If set to \"auto\", KSQL will attempt to discover "
+ "whether the Kafka cluster supports the required API, and enables the validator if "
+ "it does.";
public static final String KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG =
"ksql.query.pull.skip.access.validator";
public static final boolean KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DEFAULT = false;
public static final String KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC = "If \"true\", KSQL will "
+ " NOT enforce access validation checks for pull queries, which could expose Kafka topics"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validation checks here is a bit vague. Can we be more explicit? This is really about skips authorization checks. The docs and the name of this config should reflect this.

I think the actual functionality is that with this set to the default false KSQL won't support pull queries when running against a secure Kafka. With it set to true KSQL won't check the user has access to the topics underlying the the materialized state the pull query is accessing.

Is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just matched it to the config above. I think this is okay, since these aspects are explained in the other config anyway.

+ " which are secured with ACLs. Please enable only after careful consideration."
+ " If \"false\", KSQL pull queries will fail against a secure Kafka cluster";

public static final String KSQL_QUERY_PULL_ENABLE_CONFIG = "ksql.query.pull.enable";
public static final String KSQL_QUERY_PULL_ENABLE_DOC =
Expand Down Expand Up @@ -604,6 +611,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DEFAULT,
Importance.LOW,
KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DOC
).define(
KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG,
Type.BOOLEAN,
KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DEFAULT,
Importance.LOW,
KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlServerException;
import java.util.Optional;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
Expand All @@ -29,37 +30,35 @@ public final class KsqlAuthorizationValidatorFactory {
private static final Logger LOG = LoggerFactory
.getLogger(KsqlAuthorizationValidatorFactory.class);
private static final String KAFKA_AUTHORIZER_CLASS_NAME = "authorizer.class.name";
private static final KsqlAuthorizationValidator DUMMY_VALIDATOR =
(sc, metastore, statement) -> { };

private KsqlAuthorizationValidatorFactory() {
}

public static KsqlAuthorizationValidator create(
public static Optional<KsqlAuthorizationValidator> create(
final KsqlConfig ksqlConfig,
final ServiceContext serviceContext
) {
final String enabled = ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR);
if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON)) {
LOG.info("Forcing topic access validator");
return new KsqlAuthorizationValidatorImpl();
return Optional.of(new KsqlAuthorizationValidatorImpl());
} else if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF)) {
return DUMMY_VALIDATOR;
return Optional.empty();
}

final Admin adminClient = serviceContext.getAdminClient();

if (isKafkaAuthorizerEnabled(adminClient)) {
if (KafkaClusterUtil.isAuthorizedOperationsSupported(adminClient)) {
LOG.info("KSQL topic authorization checks enabled.");
return new KsqlAuthorizationValidatorImpl();
return Optional.of(new KsqlAuthorizationValidatorImpl());
}

LOG.warn("The Kafka broker has an authorization service enabled, but the Kafka "
+ "version does not support authorizedOperations(). "
+ "KSQL topic authorization checks will not be enabled.");
}
return DUMMY_VALIDATOR;
return Optional.empty();
}

private static boolean isKafkaAuthorizerEnabled(final Admin adminClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public DefaultServiceContext(
private DefaultServiceContext(
final KafkaClientSupplier kafkaClientSupplier,
final Supplier<Admin> adminClientSupplier,
final Function<Supplier<Admin>, KafkaTopicClient> topicClientSupplier,
final Function<Supplier<Admin>, KafkaTopicClient> topicClientProvider,
final Supplier<SchemaRegistryClient> srClientSupplier,
final Supplier<ConnectClient> connectClientSupplier,
final Supplier<SimpleKsqlClient> ksqlClientSupplier
Expand All @@ -100,7 +100,7 @@ private DefaultServiceContext(
this.kafkaClientSupplier = requireNonNull(kafkaClientSupplier, "kafkaClientSupplier");

this.topicClientSupplier = new MemoizedSupplier<>(
() -> topicClientSupplier.apply(this.adminClientSupplier));
() -> topicClientProvider.apply(this.adminClientSupplier));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyZeroInteractions;
Expand All @@ -31,6 +30,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
Expand Down Expand Up @@ -78,44 +78,45 @@ public void shouldReturnAuthorizationValidator() {
givenKafkaAuthorizer("an-authorizer-class", Collections.emptySet());

// When:
final KsqlAuthorizationValidator validator = KsqlAuthorizationValidatorFactory.create(
final Optional<KsqlAuthorizationValidator> validator = KsqlAuthorizationValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then
assertThat(validator, is(instanceOf(KsqlAuthorizationValidatorImpl.class)));
assertThat("validator should be present", validator.isPresent());
assertThat(validator.get(), is(instanceOf(KsqlAuthorizationValidatorImpl.class)));
}

@Test
public void shouldReturnDummyValidator() {
public void shouldReturnEmptyValidator() {
// Given:
givenKafkaAuthorizer("", Collections.emptySet());

// When:
final KsqlAuthorizationValidator validator = KsqlAuthorizationValidatorFactory.create(
final Optional<KsqlAuthorizationValidator> validator = KsqlAuthorizationValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then
assertThat(validator, not(instanceOf(KsqlAuthorizationValidatorImpl.class)));
assertThat(validator, is(Optional.empty()));
}

@Test
public void shouldReturnDummyValidatorIfNotEnabled() {
public void shouldReturnEmptyValidatorIfNotEnabled() {
// Given:
when(ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR))
.thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF);

// When:
final KsqlAuthorizationValidator validator = KsqlAuthorizationValidatorFactory.create(
final Optional<KsqlAuthorizationValidator> validator = KsqlAuthorizationValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then:
assertThat(validator, not(instanceOf(KsqlAuthorizationValidatorImpl.class)));
assertThat(validator, is(Optional.empty()));
verifyZeroInteractions(adminClient);
}

Expand All @@ -126,29 +127,30 @@ public void shouldReturnAuthorizationValidatorIfEnabled() {
.thenReturn(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON);

// When:
final KsqlAuthorizationValidator validator = KsqlAuthorizationValidatorFactory.create(
final Optional<KsqlAuthorizationValidator> validator = KsqlAuthorizationValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then:
assertThat(validator, instanceOf(KsqlAuthorizationValidatorImpl.class));
assertThat("validator should be present", validator.isPresent());
assertThat(validator.get(), is(instanceOf(KsqlAuthorizationValidatorImpl.class)));
verifyZeroInteractions(adminClient);
}

@Test
public void shouldReturnDummyValidatorIfAuthorizedOperationsReturnNull() {
public void shouldReturnEmptyValidatorIfAuthorizedOperationsReturnNull() {
// Given:
givenKafkaAuthorizer("an-authorizer-class", null);

// When:
final KsqlAuthorizationValidator validator = KsqlAuthorizationValidatorFactory.create(
final Optional<KsqlAuthorizationValidator> validator = KsqlAuthorizationValidatorFactory.create(
ksqlConfig,
serviceContext
);

// Then
assertThat(validator, not(instanceOf(KsqlAuthorizationValidatorImpl.class)));
assertThat(validator, is(Optional.empty()));
}

private void givenKafkaAuthorizer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class RestQueryTranslationTest {
private static final TestKsqlRestApp REST_APP = TestKsqlRestApp
.builder(TEST_HARNESS::kafkaBootstrapServers)
.withProperty(KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.withProperty(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true)
.withStaticServiceContext(TEST_HARNESS::getServiceContext)
.build();

Expand Down Expand Up @@ -106,7 +107,7 @@ public void tearDown() {

@Test
public void shouldBuildAndExecuteQueries() {
try (RestTestExecutor testExecutor = textExecutor()) {
try (RestTestExecutor testExecutor = testExecutor()) {
testExecutor.buildAndExecuteQuery(testCase);
} catch (final AssertionError e) {
throw new AssertionError(e.getMessage()
Expand All @@ -119,7 +120,7 @@ public void shouldBuildAndExecuteQueries() {
}
}

private static RestTestExecutor textExecutor() {
private static RestTestExecutor testExecutor() {
return new RestTestExecutor(
REST_APP.getListeners().get(0),
TEST_HARNESS.getKafkaCluster(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,12 +402,17 @@ private void waitForWarmStateStores(

final ImmutableList<Response> expectedResponse = ImmutableList.of(queryResponse);
final ImmutableList<String> statements = ImmutableList.of(querySql);
final long waitMs = 10;

final long threshold = System.currentTimeMillis() + MAX_STATIC_WARMUP.toMillis();
while (System.currentTimeMillis() < threshold) {
final RestResponse<QueryStream> resp = restClient.makeQueryRequest(querySql, null);
if (resp.isErroneous()) {
Thread.yield();
try {
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
Thread.sleep(waitMs);
} catch (InterruptedException e) {
// ignore
}
LOG.info("Server responded with an error code to a pull query. "
+ "This could be because the materialized store is not yet warm.");
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) {
);

final StatementParser statementParser = new StatementParser(ksqlEngine);
final KsqlAuthorizationValidator authorizationValidator =
final Optional<KsqlAuthorizationValidator> authorizationValidator =
KsqlAuthorizationValidatorFactory.create(ksqlConfigNoPort, serviceContext);

container.addEndpoint(
Expand Down Expand Up @@ -496,7 +496,7 @@ static KsqlRestApplication buildApplication(

final KsqlSecurityExtension securityExtension = loadSecurityExtension(ksqlConfig);

final KsqlAuthorizationValidator authorizationValidator =
final Optional<KsqlAuthorizationValidator> authorizationValidator =
KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext);

final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ public class DistributingExecutor {
private final CommandQueue commandQueue;
private final Duration distributedCmdResponseTimeout;
private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;
private final KsqlAuthorizationValidator authorizationValidator;
private final Optional<KsqlAuthorizationValidator> authorizationValidator;
private final RequestValidator requestValidator;

public DistributingExecutor(
final CommandQueue commandQueue,
final Duration distributedCmdResponseTimeout,
final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory,
final KsqlAuthorizationValidator authorizationValidator,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final RequestValidator requestValidator
) {
this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue");
Expand Down Expand Up @@ -154,15 +154,17 @@ private void checkAuthorization(
final MetaStore metaStore = serverExecutionContext.getMetaStore();

// Check the User will be permitted to execute this statement
authorizationValidator.checkAuthorization(userServiceContext, metaStore, statement);
authorizationValidator.ifPresent(
validator ->
validator.checkAuthorization(userServiceContext, metaStore, statement));

try {
// Check the KSQL service principal will be permitted too
authorizationValidator.checkAuthorization(
serverExecutionContext.getServiceContext(),
metaStore,
statement
);
authorizationValidator.ifPresent(
validator -> validator.checkAuthorization(
serverExecutionContext.getServiceContext(),
metaStore,
statement));
} catch (final Exception e) {
throw new KsqlServerException("The KSQL server is not permitted to execute the command", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.regex.PatternSyntaxException;
Expand Down Expand Up @@ -97,7 +98,7 @@ public class KsqlResource implements KsqlConfigurable {
private final Duration distributedCmdResponseTimeout;
private final ActivenessRegistrar activenessRegistrar;
private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;
private final KsqlAuthorizationValidator authorizationValidator;
private final Optional<KsqlAuthorizationValidator> authorizationValidator;
private RequestValidator validator;
private RequestHandler handler;

Expand All @@ -107,7 +108,7 @@ public KsqlResource(
final CommandQueue commandQueue,
final Duration distributedCmdResponseTimeout,
final ActivenessRegistrar activenessRegistrar,
final KsqlAuthorizationValidator authorizationValidator
final Optional<KsqlAuthorizationValidator> authorizationValidator
) {
this(
ksqlEngine,
Expand All @@ -125,7 +126,7 @@ public KsqlResource(
final Duration distributedCmdResponseTimeout,
final ActivenessRegistrar activenessRegistrar,
final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory,
final KsqlAuthorizationValidator authorizationValidator
final Optional<KsqlAuthorizationValidator> authorizationValidator
) {
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue");
Expand Down
Loading