Skip to content

Commit

Permalink
feat : Porting #3879, #3696 fixes for pull queries against secure clu…
Browse files Browse the repository at this point in the history
…sters (#3980)

* refactor: lazy initialization of clients (admin,sr,ksql,connect) (#3696)
- Made client creation lazy by memoizing them.

* feat: add config to disable pull queries when validating (#3879)

fixes #3863

 - Added `ksql.query.pull.skip.access.validator` to control if pull queries work without validation
 - By default, Pull queries error out, if auth validation is needed
 - Replaced DUMMY_VALIDATOR with Optional<> interface for KsqlAuthorizationValidatorFactory
 - Fixed some tests, added test cases
 - Applied on both `query` and websocket endpoints
  • Loading branch information
vinothchandar authored Nov 27, 2019
1 parent 2ce0ede commit 99d0d44
Show file tree
Hide file tree
Showing 34 changed files with 489 additions and 193 deletions.
1 change: 1 addition & 0 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public class CliTest {
.builder(CLUSTER::bootstrapServers)
.withProperty(KsqlConfig.SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY,
KsqlConstants.defaultSinkWindowChangeLogAdditionalRetention + 1)
.withProperty(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG, true)
.build();

@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public final class ImmutableProperties {
.add(KsqlConfig.KSQL_EXT_DIR)
.add(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG)
.add(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG)
.add(KsqlConfig.KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG)
.addAll(KsqlConfig.SSL_CONFIG_NAMES)
.build();

Expand Down
13 changes: 13 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ public class KsqlConfig extends AbstractConfig {
+ "\"off\" disables the validator. If set to \"auto\", KSQL will attempt to discover "
+ "whether the Kafka cluster supports the required API, and enables the validator if "
+ "it does.";
public static final String KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG =
"ksql.query.pull.skip.access.validator";
public static final boolean KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DEFAULT = false;
public static final String KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC = "If \"true\", KSQL will "
+ " NOT enforce access validation checks for pull queries, which could expose Kafka topics"
+ " which are secured with ACLs. Please enable only after careful consideration."
+ " If \"false\", KSQL pull queries will fail against a secure Kafka cluster";

public static final String KSQL_PULL_QUERIES_ENABLE_CONFIG = "ksql.pull.queries.enable";
public static final String KSQL_PULL_QUERIES_ENABLE_DOC =
Expand Down Expand Up @@ -584,6 +591,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_PULL_QUERIES_ENABLE_DEFAULT,
Importance.LOW,
KSQL_PULL_QUERIES_ENABLE_DOC
).define(
KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_CONFIG,
Type.BOOLEAN,
KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DEFAULT,
Importance.LOW,
KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static KsqlContext create(
) {
Objects.requireNonNull(ksqlConfig, "ksqlConfig cannot be null.");
final ServiceContext serviceContext =
ServiceContextFactory.create(ksqlConfig, DisabledKsqlClient.instance());
ServiceContextFactory.create(ksqlConfig, DisabledKsqlClient::instance);
final MutableFunctionRegistry functionRegistry = new InternalFunctionRegistry();
UserFunctionLoader.newInstance(ksqlConfig, functionRegistry, ".").load();
final ServiceInfo serviceInfo = ServiceInfo.create(ksqlConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlServerException;
import java.util.Optional;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
Expand All @@ -29,37 +30,35 @@ public final class KsqlAuthorizationValidatorFactory {
private static final Logger LOG = LoggerFactory
.getLogger(KsqlAuthorizationValidatorFactory.class);
private static final String KAFKA_AUTHORIZER_CLASS_NAME = "authorizer.class.name";
private static final KsqlAuthorizationValidator DUMMY_VALIDATOR =
(sc, metastore, statement) -> { };

private KsqlAuthorizationValidatorFactory() {
}

public static KsqlAuthorizationValidator create(
public static Optional<KsqlAuthorizationValidator> create(
final KsqlConfig ksqlConfig,
final ServiceContext serviceContext
) {
final String enabled = ksqlConfig.getString(KsqlConfig.KSQL_ENABLE_TOPIC_ACCESS_VALIDATOR);
if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_ON)) {
LOG.info("Forcing topic access validator");
return new KsqlAuthorizationValidatorImpl();
return Optional.of(new KsqlAuthorizationValidatorImpl());
} else if (enabled.equals(KsqlConfig.KSQL_ACCESS_VALIDATOR_OFF)) {
return DUMMY_VALIDATOR;
return Optional.empty();
}

final Admin adminClient = serviceContext.getAdminClient();

if (isKafkaAuthorizerEnabled(adminClient)) {
if (KafkaClusterUtil.isAuthorizedOperationsSupported(adminClient)) {
LOG.info("KSQL topic authorization checks enabled.");
return new KsqlAuthorizationValidatorImpl();
return Optional.of(new KsqlAuthorizationValidatorImpl());
}

LOG.warn("The Kafka broker has an authorization service enabled, but the Kafka "
+ "version does not support authorizedOperations(). "
+ "KSQL topic authorization checks will not be enabled.");
}
return DUMMY_VALIDATOR;
return Optional.empty();
}

private static boolean isKafkaAuthorizerEnabled(final Admin adminClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.util.KsqlConfig;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.streams.KafkaClientSupplier;
Expand All @@ -29,38 +32,85 @@
public class DefaultServiceContext implements ServiceContext {

private final KafkaClientSupplier kafkaClientSupplier;
private final Admin adminClient;
private final KafkaTopicClient topicClient;
private final Supplier<SchemaRegistryClient> srClientFactory;
private final SchemaRegistryClient srClient;
private final ConnectClient connectClient;
private final SimpleKsqlClient ksqlClient;
private final MemoizedSupplier<Admin> adminClientSupplier;
private final MemoizedSupplier<KafkaTopicClient> topicClientSupplier;
private final Supplier<SchemaRegistryClient> srClientFactorySupplier;
private final MemoizedSupplier<SchemaRegistryClient> srClient;
private final MemoizedSupplier<ConnectClient> connectClientSupplier;
private final MemoizedSupplier<SimpleKsqlClient> ksqlClientSupplier;

public DefaultServiceContext(
final KafkaClientSupplier kafkaClientSupplier,
final Admin adminClient,
final Supplier<Admin> adminClientSupplier,
final Supplier<SchemaRegistryClient> srClientSupplier,
final Supplier<ConnectClient> connectClientSupplier,
final Supplier<SimpleKsqlClient> ksqlClientSupplier
) {
this(
kafkaClientSupplier,
adminClientSupplier,
KafkaTopicClientImpl::new,
srClientSupplier,
connectClientSupplier,
ksqlClientSupplier
);
}

@VisibleForTesting
public DefaultServiceContext(
final KafkaClientSupplier kafkaClientSupplier,
final Supplier<Admin> adminClientSupplier,
final KafkaTopicClient topicClient,
final Supplier<SchemaRegistryClient> srClientFactory,
final ConnectClient connectClient,
final SimpleKsqlClient ksqlClient
final Supplier<SchemaRegistryClient> srClientSupplier,
final Supplier<ConnectClient> connectClientSupplier,
final Supplier<SimpleKsqlClient> ksqlClientSupplier
) {
this(
kafkaClientSupplier,
adminClientSupplier,
adminSupplier -> topicClient,
srClientSupplier,
connectClientSupplier,
ksqlClientSupplier
);
}

private DefaultServiceContext(
final KafkaClientSupplier kafkaClientSupplier,
final Supplier<Admin> adminClientSupplier,
final Function<Supplier<Admin>, KafkaTopicClient> topicClientProvider,
final Supplier<SchemaRegistryClient> srClientSupplier,
final Supplier<ConnectClient> connectClientSupplier,
final Supplier<SimpleKsqlClient> ksqlClientSupplier
) {
requireNonNull(adminClientSupplier, "adminClientSupplier");
this.adminClientSupplier = new MemoizedSupplier<>(adminClientSupplier);

this.srClientFactorySupplier = requireNonNull(srClientSupplier, "srClientSupplier");

requireNonNull(connectClientSupplier, "connectClientSupplier");
this.connectClientSupplier = new MemoizedSupplier<>(
connectClientSupplier);

requireNonNull(ksqlClientSupplier, "ksqlClientSupplier");
this.ksqlClientSupplier = new MemoizedSupplier<>(ksqlClientSupplier);

this.srClient = new MemoizedSupplier<>(srClientSupplier);

this.kafkaClientSupplier = requireNonNull(kafkaClientSupplier, "kafkaClientSupplier");
this.adminClient = requireNonNull(adminClient, "adminClient");
this.topicClient = requireNonNull(topicClient, "topicClient");
this.srClientFactory = requireNonNull(srClientFactory, "srClientFactory");
this.srClient = requireNonNull(srClientFactory.get(), "srClient");
this.connectClient = requireNonNull(connectClient, "connectClient");
this.ksqlClient = requireNonNull(ksqlClient, "ksqlClient");

this.topicClientSupplier = new MemoizedSupplier<>(
() -> topicClientProvider.apply(this.adminClientSupplier));
}

@Override
public Admin getAdminClient() {
return adminClient;
return adminClientSupplier.get();
}

@Override
public KafkaTopicClient getTopicClient() {
return topicClient;
return topicClientSupplier.get();
}

@Override
Expand All @@ -70,26 +120,49 @@ public KafkaClientSupplier getKafkaClientSupplier() {

@Override
public SchemaRegistryClient getSchemaRegistryClient() {
return srClient;
return srClient.get();
}

@Override
public Supplier<SchemaRegistryClient> getSchemaRegistryClientFactory() {
return srClientFactory;
return srClientFactorySupplier;
}

@Override
public ConnectClient getConnectClient() {
return connectClient;
return connectClientSupplier.get();
}

@Override
public SimpleKsqlClient getKsqlClient() {
return ksqlClient;
return ksqlClientSupplier.get();
}

@Override
public void close() {
adminClient.close();
if (adminClientSupplier.isInitialized()) {
adminClientSupplier.get().close();
}
}


static final class MemoizedSupplier<T> implements Supplier<T> {

private final Supplier<T> supplier;
private volatile boolean initialized = false;

MemoizedSupplier(final Supplier<T> supplier) {
this.supplier = Suppliers.memoize(supplier::get);
}

@Override
public T get() {
initialized = true;
return supplier.get();
}

boolean isInitialized() {
return initialized;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand Down Expand Up @@ -70,15 +71,17 @@ public class KafkaTopicClientImpl implements KafkaTopicClient {
private static final String DEFAULT_REPLICATION_PROP = "default.replication.factor";
private static final String DELETE_TOPIC_ENABLE = "delete.topic.enable";

private final Admin adminClient;
private final Supplier<Admin> adminClient;

/**
* Construct a topic client from an existing admin client.
* Note, the admin client is shared between all methods of this class, i.e the admin client
* is created only once and then reused.
*
* @param adminClient the admin client.
* @param sharedAdminClient the admin client .
*/
public KafkaTopicClientImpl(final Admin adminClient) {
this.adminClient = Objects.requireNonNull(adminClient, "adminClient");
public KafkaTopicClientImpl(final Supplier<Admin> sharedAdminClient) {
this.adminClient = Objects.requireNonNull(sharedAdminClient, "sharedAdminClient");
}

@Override
Expand Down Expand Up @@ -108,7 +111,7 @@ public void createTopic(
);

ExecutorUtil.executeWithRetries(
() -> adminClient.createTopics(
() -> adminClient.get().createTopics(
Collections.singleton(newTopic),
createOptions
).all().get(),
Expand Down Expand Up @@ -165,7 +168,7 @@ public boolean isTopicExists(final String topic) {
public Set<String> listTopicNames() {
try {
return ExecutorUtil.executeWithRetries(
() -> adminClient.listTopics().names().get(),
() -> adminClient.get().listTopics().names().get(),
ExecutorUtil.RetryBehaviour.ON_RETRYABLE);
} catch (final Exception e) {
throw new KafkaResponseGetFailedException("Failed to retrieve Kafka Topic names", e);
Expand All @@ -184,7 +187,7 @@ public Set<String> listNonInternalTopicNames() {
public Map<String, TopicDescription> describeTopics(final Collection<String> topicNames) {
try {
return ExecutorUtil.executeWithRetries(
() -> adminClient.describeTopics(
() -> adminClient.get().describeTopics(
topicNames,
new DescribeTopicsOptions().includeAuthorizedOperations(true)
).all().get(),
Expand Down Expand Up @@ -228,7 +231,7 @@ public boolean addTopicConfig(final String topicName, final Map<String, ?> overr
Collections.singletonMap(resource, entries);

ExecutorUtil.executeWithRetries(
() -> adminClient.incrementalAlterConfigs(request).all().get(),
() -> adminClient.get().incrementalAlterConfigs(request).all().get(),
ExecutorUtil.RetryBehaviour.ON_RETRYABLE);

return true;
Expand Down Expand Up @@ -263,7 +266,7 @@ public void deleteTopics(final Collection<String> topicsToDelete) {
return;
}

final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
final DeleteTopicsResult deleteTopicsResult = adminClient.get().deleteTopics(topicsToDelete);
final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
final List<String> failList = Lists.newArrayList();
final List<Pair<String, Throwable>> exceptionList = Lists.newArrayList();
Expand Down Expand Up @@ -315,7 +318,7 @@ public void deleteInternalTopics(final String applicationId) {
}

private Config getConfig() {
return KafkaClusterUtil.getConfig(adminClient);
return KafkaClusterUtil.getConfig(adminClient.get());
}

private static boolean isInternalTopic(final String topicName, final String applicationId) {
Expand Down Expand Up @@ -344,7 +347,7 @@ private Map<String, String> topicConfig(final String topicName,

try {
final Config config = ExecutorUtil.executeWithRetries(
() -> adminClient.describeConfigs(request).all().get(),
() -> adminClient.get().describeConfigs(request).all().get(),
ExecutorUtil.RetryBehaviour.ON_RETRYABLE).get(resource);
return config.entries().stream()
.filter(e -> includeDefaults
Expand Down Expand Up @@ -374,7 +377,7 @@ private boolean addTopicConfigLegacy(final String topicName, final Map<String, ?
Collections.singletonMap(resource, new Config(entries));

ExecutorUtil.executeWithRetries(
() -> adminClient.alterConfigs(request).all().get(),
() -> adminClient.get().alterConfigs(request).all().get(),
ExecutorUtil.RetryBehaviour.ON_RETRYABLE);

return true;
Expand Down
Loading

0 comments on commit 99d0d44

Please sign in to comment.