Skip to content

Commit

Permalink
feat: custom auth configs for ksql connector requests (#8476)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Dec 13, 2021
1 parent 4b0572a commit 7a9a61a
Show file tree
Hide file tree
Showing 19 changed files with 622 additions and 49 deletions.
60 changes: 58 additions & 2 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,31 @@ public class KsqlConfig extends AbstractConfig {

public static final String SCHEMA_REGISTRY_URL_PROPERTY = "ksql.schema.registry.url";

public static final String CONNECT_URL_PROPERTY = "ksql.connect.url";
public static final String KSQL_CONNECT_PREFIX = "ksql.connect.";

public static final String CONNECT_WORKER_CONFIG_FILE_PROPERTY = "ksql.connect.worker.config";
public static final String CONNECT_URL_PROPERTY = KSQL_CONNECT_PREFIX + "url";

public static final String CONNECT_WORKER_CONFIG_FILE_PROPERTY =
KSQL_CONNECT_PREFIX + "worker.config";

public static final String CONNECT_BASIC_AUTH_CREDENTIALS_SOURCE_PROPERTY =
KSQL_CONNECT_PREFIX + "basic.auth.credentials.source";
public static final String BASIC_AUTH_CREDENTIALS_SOURCE_NONE = "NONE";
public static final String BASIC_AUTH_CREDENTIALS_SOURCE_FILE = "FILE";
private static final ConfigDef.ValidString BASIC_AUTH_CREDENTIALS_SOURCE_VALIDATOR =
ConfigDef.ValidString.in(
BASIC_AUTH_CREDENTIALS_SOURCE_NONE,
BASIC_AUTH_CREDENTIALS_SOURCE_FILE
);
public static final String BASIC_AUTH_CREDENTIALS_USERNAME = "username";
public static final String BASIC_AUTH_CREDENTIALS_PASSWORD = "password";

public static final String CONNECT_BASIC_AUTH_CREDENTIALS_FILE_PROPERTY =
KSQL_CONNECT_PREFIX + "basic.auth.credentials.file";
public static final String CONNECT_BASIC_AUTH_CREDENTIALS_RELOAD_PROPERTY =
KSQL_CONNECT_PREFIX + "basic.auth.credentials.reload";
public static final String CONNECT_BASIC_AUTH_FAIL_ON_UNREADABLE_CREDENTIALS =
KSQL_CONNECT_PREFIX + "basic.auth.credentials.fail.on.unreadable";

public static final String KSQL_ENABLE_UDFS = "ksql.udfs.enabled";

Expand Down Expand Up @@ -783,6 +805,40 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
+ "will prevent connect from starting up embedded within KSQL. For more information"
+ " on configuring connect, see "
+ "https://docs.confluent.io/current/connect/userguide.html#configuring-workers."
).define(
CONNECT_BASIC_AUTH_CREDENTIALS_SOURCE_PROPERTY,
ConfigDef.Type.STRING,
BASIC_AUTH_CREDENTIALS_SOURCE_NONE,
BASIC_AUTH_CREDENTIALS_SOURCE_VALIDATOR,
Importance.LOW,
"If providing explicit basic auth credentials for ksqlDB to use when sending connector "
+ "requests, this config specifies how credentials should be loaded. Valid "
+ "options are 'FILE' in order to specify the username and password in a "
+ "properties file, or 'NONE' to indicate that custom basic auth should "
+ "not be used. If 'NONE', ksqlDB will forward the auth header, if present, "
+ "on the incoming ksql request to Connect."
).define(
CONNECT_BASIC_AUTH_CREDENTIALS_FILE_PROPERTY,
ConfigDef.Type.STRING,
"",
Importance.LOW,
"If '" + CONNECT_BASIC_AUTH_CREDENTIALS_SOURCE_PROPERTY + "' is set to 'FILE', "
+ "then this config specifies the path to the credentials file."
).define(
CONNECT_BASIC_AUTH_CREDENTIALS_RELOAD_PROPERTY,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
"If true, basic auth credentials for connector auth will automatically reload "
+ "on file change (creation or modification). File deletion is not monitored and "
+ "old credentials will continue to be used in this case."
).define(
CONNECT_BASIC_AUTH_FAIL_ON_UNREADABLE_CREDENTIALS,
ConfigDef.Type.BOOLEAN,
true,
ConfigDef.Importance.LOW,
"If true, failure to load basic auth credentials for connector auth will result "
+ "in an error. If false, failure will result in a warn log and empty credentials."
).define(
KSQL_ENABLE_UDFS,
ConfigDef.Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.vertx.core.http.HttpHeaders;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -99,7 +101,7 @@ public ConnectResponse<ConnectorInfo> create(
config);

final ConnectResponse<ConnectorInfo> connectResponse = withRetries(() -> Request
.post(connectUri.resolve(CONNECTORS))
.post(resolveUri(CONNECTORS))
.setHeaders(headers())
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
Expand Down Expand Up @@ -135,7 +137,7 @@ public ConnectResponse<ConfigInfos> validate(
config);

final ConnectResponse<ConfigInfos> connectResponse = withRetries(() -> Request
.put(connectUri.resolve(String.format(VALIDATE_CONNECTOR, plugin)))
.put(resolveUri(String.format(VALIDATE_CONNECTOR, plugin)))
.setHeaders(headers())
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
Expand Down Expand Up @@ -164,7 +166,7 @@ public ConnectResponse<List<String>> connectors() {
LOG.debug("Issuing request to Kafka Connect at URI {} to list connectors", connectUri);

final ConnectResponse<List<String>> connectResponse = withRetries(() -> Request
.get(connectUri.resolve(CONNECTORS))
.get(resolveUri(CONNECTORS))
.setHeaders(headers())
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
Expand All @@ -189,7 +191,7 @@ public ConnectResponse<List<ConnectorPluginInfo>> connectorPlugins() {
LOG.debug("Issuing request to Kafka Connect at URI {} to list connector plugins", connectUri);

final ConnectResponse<List<ConnectorPluginInfo>> connectResponse = withRetries(() -> Request
.get(connectUri.resolve(CONNECTOR_PLUGINS))
.get(resolveUri(CONNECTOR_PLUGINS))
.setHeaders(headers())
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
Expand All @@ -215,7 +217,7 @@ public ConnectResponse<ConnectorStateInfo> status(final String connector) {
connector);

final ConnectResponse<ConnectorStateInfo> connectResponse = withRetries(() -> Request
.get(connectUri.resolve(CONNECTORS + "/" + connector + STATUS))
.get(resolveUri(CONNECTORS + "/" + connector + STATUS))
.setHeaders(headers())
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
Expand All @@ -241,7 +243,7 @@ public ConnectResponse<ConnectorInfo> describe(final String connector) {
connectUri, connector);

final ConnectResponse<ConnectorInfo> connectResponse = withRetries(() -> Request
.get(connectUri.resolve(String.format("%s/%s", CONNECTORS, connector)))
.get(resolveUri(String.format("%s/%s", CONNECTORS, connector)))
.setHeaders(headers())
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
Expand All @@ -267,7 +269,7 @@ public ConnectResponse<String> delete(final String connector) {
connectUri, connector);

final ConnectResponse<String> connectResponse = withRetries(() -> Request
.delete(connectUri.resolve(String.format("%s/%s", CONNECTORS, connector)))
.delete(resolveUri(String.format("%s/%s", CONNECTORS, connector)))
.setHeaders(headers())
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
Expand All @@ -293,7 +295,7 @@ public ConnectResponse<Map<String, Map<String, List<String>>>> topics(final Stri
connectUri, connector);

final ConnectResponse<Map<String, Map<String, List<String>>>> connectResponse = withRetries(
() -> Request.get(connectUri.resolve(CONNECTORS + "/" + connector + TOPICS))
() -> Request.get(resolveUri(CONNECTORS + "/" + connector + TOPICS))
.setHeaders(headers())
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
Expand All @@ -313,12 +315,35 @@ public ConnectResponse<Map<String, Map<String, List<String>>>> topics(final Stri
}
}

@VisibleForTesting
Optional<String> getAuthHeader() {
return authHeader;
}

private Header[] headers() {
return authHeader.isPresent()
? new Header[]{new BasicHeader(HttpHeaders.AUTHORIZATION.toString(), authHeader.get())}
: new Header[]{};
}

private String resolveUri(final String relativePath) {
try {
return new URI(
connectUri.getScheme(),
connectUri.getUserInfo(),
connectUri.getHost(),
connectUri.getPort(),
// concatenate relative path to existing path in order to support relative resolution;
// in contrast, URI.resolve() will resolve a path such as `/connectors` as an
// absolute path only.
Paths.get(connectUri.getPath(), relativePath).toString(),
connectUri.getQuery(),
connectUri.getFragment()).toString();
} catch (URISyntaxException e) {
throw new KsqlServerException("Failed to resolve URI", e);
}
}

@SuppressWarnings("unchecked")
private static <T> ConnectResponse<T> withRetries(final Callable<ConnectResponse<T>> action) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.services;

import io.confluent.ksql.services.ConnectClient.ConnectClientFactory;
import io.confluent.ksql.util.FileWatcher;
import io.confluent.ksql.util.KsqlConfig;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Paths;
import java.util.Base64;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Factory for managing logic for creating Connect clients, including the auth header
* that should be sent with connector requests.
*
* <p>If a custom auth header is specified
* as part of the ksqlDB server config, then:
* <ul>
* <li>the header is loaded into {@code connectAuthHeader} the first time {@code get()}
* is called.</li>
* <li>if configured, a file watcher thread will be started to monitor for changes
* to the auth credentials. This file watcher will be started when the credentials
* are first loaded.</li>
* </ul>
*
* <p>If no custom auth header is specified, then the auth header of the incoming ksql
* request, if present, will be sent with the connector request instead.
*/
public class DefaultConnectClientFactory implements ConnectClientFactory {

private static final Logger log = LoggerFactory.getLogger(DefaultConnectClientFactory.class);

private final KsqlConfig ksqlConfig;
private volatile Optional<String> connectAuthHeader;
private FileWatcher credentialsFileWatcher;

public DefaultConnectClientFactory(
final KsqlConfig ksqlConfig
) {
this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig");
}

@Override
public synchronized DefaultConnectClient get(final Optional<String> ksqlAuthHeader) {
if (connectAuthHeader == null) {
connectAuthHeader = buildAuthHeader();
}

return new DefaultConnectClient(
ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY),
// if no explicit header specified, then forward incoming request header
connectAuthHeader.isPresent() ? connectAuthHeader : ksqlAuthHeader
);
}

@Override
public synchronized void close() {
if (credentialsFileWatcher != null) {
credentialsFileWatcher.shutdown();
}
}

private Optional<String> buildAuthHeader() {
// custom basic auth credentials
if (ksqlConfig.getString(KsqlConfig.CONNECT_BASIC_AUTH_CREDENTIALS_SOURCE_PROPERTY)
.equalsIgnoreCase(KsqlConfig.BASIC_AUTH_CREDENTIALS_SOURCE_FILE)) {
final String credentialsFile =
ksqlConfig.getString(KsqlConfig.CONNECT_BASIC_AUTH_CREDENTIALS_FILE_PROPERTY);
final boolean failOnUnreadableCreds =
ksqlConfig.getBoolean(KsqlConfig.CONNECT_BASIC_AUTH_FAIL_ON_UNREADABLE_CREDENTIALS);

if (ksqlConfig.getBoolean(KsqlConfig.CONNECT_BASIC_AUTH_CREDENTIALS_RELOAD_PROPERTY)) {
startBasicAuthFileWatcher(credentialsFile, failOnUnreadableCreds);
}

return buildBasicAuthHeader(credentialsFile, failOnUnreadableCreds);
} else {
return Optional.empty();
}
}

private void startBasicAuthFileWatcher(
final String filePath,
final boolean failOnUnreadableCreds
) {
try {
credentialsFileWatcher = new FileWatcher(Paths.get(filePath), () -> {
connectAuthHeader = buildBasicAuthHeader(filePath, failOnUnreadableCreds);
});
credentialsFileWatcher.start();
log.info("Enabled automatic connector credentials reload for location: " + filePath);
} catch (java.io.IOException e) {
log.error("Failed to enable automatic connector credentials reload: " + e.getMessage());
}
}

private static Optional<String> buildBasicAuthHeader(
final String credentialsPath,
final boolean failOnUnreadableCredentials
) {
if (credentialsPath == null || credentialsPath.isEmpty()) {
throw new ConfigException(String.format("'%s' cannot be empty if '%s' is set to '%s'",
KsqlConfig.CONNECT_BASIC_AUTH_CREDENTIALS_FILE_PROPERTY,
KsqlConfig.CONNECT_BASIC_AUTH_CREDENTIALS_SOURCE_PROPERTY,
KsqlConfig.BASIC_AUTH_CREDENTIALS_SOURCE_FILE));
}

final Properties credentials = new Properties();
try (FileInputStream inputStream = new FileInputStream(credentialsPath)) {
credentials.load(inputStream);

if (credentials.containsKey(KsqlConfig.BASIC_AUTH_CREDENTIALS_USERNAME)
&& credentials.containsKey(KsqlConfig.BASIC_AUTH_CREDENTIALS_PASSWORD)) {
final String userInfo = credentials.getProperty(KsqlConfig.BASIC_AUTH_CREDENTIALS_USERNAME)
+ ":" + credentials.getProperty(KsqlConfig.BASIC_AUTH_CREDENTIALS_PASSWORD);
return Optional.of("Basic " + Base64.getEncoder()
.encodeToString(userInfo.getBytes(Charset.defaultCharset())));
} else {
if (failOnUnreadableCredentials) {
throw new ConfigException(
"Provided credentials file doesn't provide username and password");
} else {
log.warn("Provided credentials file doesn't provide username and password");
return Optional.empty();
}
}
} catch (IOException e) {
if (failOnUnreadableCredentials) {
throw new ConfigException(e.getMessage());
} else {
log.warn("Failed to load credentials file: " + e.getMessage());
return Optional.empty();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public static ServiceContext create(
new KsqlSchemaRegistryClientFactory(
ksqlConfig,
Collections.emptyMap())::get,
() -> new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY),
Optional.empty()),
() -> new DefaultConnectClientFactory(ksqlConfig).get(Optional.empty()),
ksqlClientSupplier
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.server;
package io.confluent.ksql.util;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
Expand All @@ -33,7 +33,8 @@
// https://gist.github.com/danielflower/f54c2fe42d32356301c68860a4ab21ed
// https://github.com/confluentinc/rest-utils/blob/master/core/src/main/java/io/confluent/rest/FileWatcher.java
/**
* Watches a file and calls a callback when it is changed.
* Watches a file and calls a callback when it is changed. Only file creation and modification
* are watched for; deletion has no effect.
*/
public class FileWatcher extends Thread {

Expand Down
Loading

0 comments on commit 7a9a61a

Please sign in to comment.