Skip to content

Commit

Permalink
feat: new CLI params to provide credentials for cloud connector manag…
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Feb 3, 2022
1 parent 8329dc4 commit a33b1e6
Show file tree
Hide file tree
Showing 27 changed files with 624 additions and 61 deletions.
6 changes: 4 additions & 2 deletions ksqldb-cli/src/main/java/io/confluent/ksql/Ksql.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ private KsqlRestClient buildClient(
final Map<String, String> clientProps = PropertiesUtil.applyOverrides(configProps, systemProps);
final String server = options.getServer();
final Optional<BasicCredentials> creds = options.getUserNameAndPassword();
final Optional<BasicCredentials> ccloudApiKey = options.getCCloudApiKey();

return clientBuilder.build(server, localProps, clientProps, creds);
return clientBuilder.build(server, localProps, clientProps, creds, ccloudApiKey);
}

private static Map<String, String> stripClientSideProperties(final Map<String, String> props) {
Expand All @@ -153,7 +154,8 @@ KsqlRestClient build(
String serverAddress,
Map<String, ?> localProperties,
Map<String, String> clientProps,
Optional<BasicCredentials> creds
Optional<BasicCredentials> creds,
Optional<BasicCredentials> ccloudApiKey
);
}

Expand Down
99 changes: 93 additions & 6 deletions ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.SqlBaseParser;
import io.confluent.ksql.parser.SqlBaseParser.CreateConnectorContext;
import io.confluent.ksql.parser.SqlBaseParser.DefineVariableContext;
import io.confluent.ksql.parser.SqlBaseParser.DescribeConnectorContext;
import io.confluent.ksql.parser.SqlBaseParser.DropConnectorContext;
import io.confluent.ksql.parser.SqlBaseParser.ListConnectorPluginsContext;
import io.confluent.ksql.parser.SqlBaseParser.ListConnectorsContext;
import io.confluent.ksql.parser.SqlBaseParser.ListVariablesContext;
import io.confluent.ksql.parser.SqlBaseParser.PrintTopicContext;
import io.confluent.ksql.parser.SqlBaseParser.QueryStatementContext;
Expand All @@ -37,10 +42,11 @@
import io.confluent.ksql.reactive.BaseSubscriber;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.KsqlRestClientException;
import io.confluent.ksql.rest.client.KsqlUnsupportedServerException;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.client.StreamPublisher;
import io.confluent.ksql.rest.client.exception.KsqlMissingCredentialsException;
import io.confluent.ksql.rest.client.exception.KsqlRestClientException;
import io.confluent.ksql.rest.client.exception.KsqlUnsupportedServerException;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
Expand All @@ -52,7 +58,9 @@
import io.confluent.ksql.util.ErrorMessageUtil;
import io.confluent.ksql.util.HandlerMaps;
import io.confluent.ksql.util.HandlerMaps.ClassHandlerMap2;
import io.confluent.ksql.util.HandlerMaps.ClassHandlerMapR2;
import io.confluent.ksql.util.HandlerMaps.Handler2;
import io.confluent.ksql.util.HandlerMaps.HandlerR2;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlVersion;
import io.confluent.ksql.util.KsqlVersion.VersionType;
Expand All @@ -66,6 +74,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -95,6 +104,23 @@ public class Cli implements KsqlRequestExecutor, Closeable {

private static final KsqlParser KSQL_PARSER = new DefaultKsqlParser();

// validators return an Optional<RuntimeException> error message representing the
// validation error, if any.
private static final ClassHandlerMapR2<StatementContext, Cli, Void, Optional>
STATEMENT_VALIDATORS =
HandlerMaps
.forClass(StatementContext.class)
.withArgTypes(Cli.class, Void.class)
.withReturnType(Optional.class)
.put(DefineVariableContext.class, Cli::defineVariableFromCtxtSandbox)
.put(UndefineVariableContext.class, Cli::undefineVariableFromCtxtSandbox)
.put(CreateConnectorContext.class, Cli::validateConnectorRequest)
.put(DropConnectorContext.class, Cli::validateConnectorRequest)
.put(DescribeConnectorContext.class, Cli::validateConnectorRequest)
.put(ListConnectorsContext.class, Cli::validateConnectorRequest)
.put(ListConnectorPluginsContext.class, Cli::validateConnectorRequest)
.build();

private static final ClassHandlerMap2<StatementContext, Cli, String> STATEMENT_HANDLERS =
HandlerMaps
.forClass(StatementContext.class)
Expand All @@ -106,6 +132,11 @@ public class Cli implements KsqlRequestExecutor, Closeable {
.put(DefineVariableContext.class, Cli::defineVariableFromCtxt)
.put(UndefineVariableContext.class, Cli::undefineVariableFromCtxt)
.put(ListVariablesContext.class, Cli::listVariablesFromCtxt)
.put(CreateConnectorContext.class, Cli::handleConnectorRequest)
.put(DropConnectorContext.class, Cli::handleConnectorRequest)
.put(DescribeConnectorContext.class, Cli::handleConnectorRequest)
.put(ListConnectorsContext.class, Cli::handleConnectorRequest)
.put(ListConnectorPluginsContext.class, Cli::handleConnectorRequest)
.build();

private final Long streamedQueryRowLimit;
Expand All @@ -116,6 +147,7 @@ public class Cli implements KsqlRequestExecutor, Closeable {
private final RemoteServerState remoteServerState;

private final Map<String, String> sessionVariables;
private Map<String, String> sandboxedSessionVariables;

public static Cli build(
final Long streamedQueryRowLimit,
Expand Down Expand Up @@ -416,9 +448,14 @@ private boolean isVariableSubstitutionEnabled() {
return KsqlConfig.KSQL_VARIABLE_SUBSTITUTION_ENABLE_DEFAULT;
}

private ParsedStatement substituteVariables(final ParsedStatement statement) {
private ParsedStatement substituteVariables(
final ParsedStatement statement,
final boolean isSandbox
) {
if (isVariableSubstitutionEnabled()) {
final String replacedStmt = VariableSubstitutor.substitute(statement, sessionVariables);
final String replacedStmt = isSandbox
? VariableSubstitutor.substitute(statement, sandboxedSessionVariables)
: VariableSubstitutor.substitute(statement, sessionVariables);
return KSQL_PARSER.parse(replacedStmt).get(0);
} else {
return statement;
Expand All @@ -427,9 +464,25 @@ private ParsedStatement substituteVariables(final ParsedStatement statement) {

private void handleStatements(final String line) {
final List<ParsedStatement> statements = KSQL_PARSER.parse(line);
final StringBuilder consecutiveStatements = new StringBuilder();

statements.stream().map(this::substituteVariables).forEach(parsed -> {
// validate all before executing any
sandboxedSessionVariables = new HashMap<>(sessionVariables);
statements.stream().map(stmt -> this.substituteVariables(stmt, true)).forEach(parsed -> {
final StatementContext statementContext = parsed.getStatement().statement();

final HandlerR2<StatementContext, Cli, Void, Optional> validator =
STATEMENT_VALIDATORS.get(statementContext.getClass());
if (validator != null) {
final Optional<?> validationError = validator.handle(this, null, statementContext);
validationError.map(o -> (RuntimeException)o).ifPresent(error -> {
throw error;
});
}
});

// execute statements
final StringBuilder consecutiveStatements = new StringBuilder();
statements.stream().map(stmt -> this.substituteVariables(stmt, false)).forEach(parsed -> {
final StatementContext statementContext = parsed.getStatement().statement();
final String statementText = parsed.getStatementText();

Expand Down Expand Up @@ -474,6 +527,23 @@ private void printKsqlResponse(final RestResponse<KsqlEntityList> response) {
}
}

private <C extends SqlBaseParser.StatementContext>
Optional<RuntimeException> validateConnectorRequest(final C context) {
if (restClient.getIsCCloudServer() && !restClient.getHasCCloudApiKey()) {
return Optional.of(new KsqlMissingCredentialsException("In order to use ksqlDB's connector "
+ "management capabilities with a Confluent Cloud ksqlDB server, launch the "
+ "ksqlDB command line with the additional flags '--confluent-api-key' and "
+ "'--confluent-api-secret' to pass a Confluent Cloud API key."));
} else {
return Optional.empty();
}
}

private <C extends SqlBaseParser.StatementContext>
void handleConnectorRequest(final String statement, final C context) {
printKsqlResponse(makeKsqlRequest(statement, restClient::makeConnectorRequest));
}

@SuppressWarnings({"try", "unused"}) // ignored param is required to compile.
private void handleQuery(
final String statement,
Expand Down Expand Up @@ -614,6 +684,15 @@ private void defineVariableFromCtxt(
sessionVariables.put(variableName, variableValue);
}

private Optional<RuntimeException> defineVariableFromCtxtSandbox(
final DefineVariableContext context
) {
final String variableName = context.variableName().getText();
final String variableValue = ParserUtil.unquote(context.variableValue().getText(), "'");
sandboxedSessionVariables.put(variableName, variableValue);
return Optional.empty();
}

@SuppressWarnings("unused")
private void undefineVariableFromCtxt(
final String ignored,
Expand All @@ -628,6 +707,14 @@ private void undefineVariableFromCtxt(
}
}

private Optional<RuntimeException> undefineVariableFromCtxtSandbox(
final UndefineVariableContext context
) {
final String variableName = context.variableName().getText();
sandboxedSessionVariables.remove(variableName);
return Optional.empty();
}

@SuppressWarnings("unused")
private void listVariablesFromCtxt(
final String ignored,
Expand Down
46 changes: 44 additions & 2 deletions ksqldb-cli/src/main/java/io/confluent/ksql/cli/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public class Options {
private static final String FILE_SHORT_OPTION = "-f";
private static final String DEFINE_OPTION = "--define";
private static final String DEFINE_SHORT_OPTION = "-d";
private static final String CONFIGURATION_FILE_OPTION_NAME = "--config-file";
private static final String CONFLUENT_API_KEY_OPTION = "--confluent-api-key";
private static final String CONFLUENT_API_SECRET_OPTION = "--confluent-api-secret";

// Only here so that the help message generated by Help.help() is accurate
@Inject
Expand All @@ -62,8 +65,6 @@ public class Options {
description = "The address of the Ksql server to connect to (ex: http://confluent.io:9098)")
private String server = "http://localhost:8088";

private static final String CONFIGURATION_FILE_OPTION_NAME = "--config-file";

@SuppressWarnings("unused") // Accessed via reflection
@Option(
name = CONFIGURATION_FILE_OPTION_NAME,
Expand Down Expand Up @@ -138,6 +139,27 @@ public class Options {
description = "Define variables for the CLI session (equivalent to the DEFINE statement).")
private List<String> definedVars = null;

@Option(
name = CONFLUENT_API_KEY_OPTION,
description =
"If you're connecting to a Confluent Cloud ksqlDB server and would like to use "
+ "ksqlDB's connector management capabilities, then provide your Confluent "
+ "Cloud API key here. The API key secret must be specified separately with the "
+ CONFLUENT_API_SECRET_OPTION
+ " flag")
private String ccloudApiKey = "";

@Option(
name = CONFLUENT_API_SECRET_OPTION,
description =
"If you're connecting to a Confluent Cloud ksqlDB server and would like to use "
+ "ksqlDB's connector management capabilities, then provide your Confluent "
+ "Cloud API key secret here. The API key itself must be specified separately "
+ "with the "
+ CONFLUENT_API_KEY_OPTION
+ " flag")
private String ccloudApiSecret = "";

public static Options parse(final String...args) throws IOException {
final SingleCommand<Options> optionsParser = SingleCommand.singleCommand(Options.class);

Expand Down Expand Up @@ -217,6 +239,26 @@ public Optional<BasicCredentials> getUserNameAndPassword() {
return Optional.of(BasicCredentials.of(userName, password));
}

public Optional<BasicCredentials> getCCloudApiKey() {
if (ccloudApiKey.isEmpty() != ccloudApiSecret.isEmpty()) {
throw new ConfigException(
"You must specify both an API key and the associated secret. If you are not "
+ "connecting to a Confluent Cloud ksqlDB server or do not plan to use "
+ "ksqlDB's connector management capabilities, then do not specify either"
+ "of the "
+ CONFLUENT_API_KEY_OPTION
+ " or the "
+ CONFLUENT_API_SECRET_OPTION
+ " flags on the command line");
}

if (ccloudApiKey.isEmpty()) {
return Optional.empty();
}

return Optional.of(BasicCredentials.of(ccloudApiKey, ccloudApiSecret));
}

public Optional<String> getExecute() {
if (execute == null || execute.isEmpty()) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import io.confluent.ksql.links.DocumentationLinks;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.KsqlRestClientException;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.client.exception.KsqlRestClientException;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.ServerInfo;
import io.confluent.ksql.util.ErrorMessageUtil;
import io.confluent.ksql.util.Event;
import java.io.PrintWriter;
Expand All @@ -41,6 +42,9 @@ public final class RemoteServerSpecificCommand implements CliSpecificCommand {
+ "\tChange the current server to <server>" + System.lineSeparator()
+ "\t example: \"server http://my.awesome.server.com:9098;\"";

private static final String CCLOUD_KSQL_SERVICE_ID_PREFIX = "pksqlc-";
private static final String CCLOUD_KSQL_ADDRESS_SUBSTRING = ".cloud";

private final KsqlRestClient restClient;
private final Event resetCliForNewServer;

Expand Down Expand Up @@ -88,7 +92,7 @@ public static void validateClient(
final KsqlRestClient restClient
) {
try {
final RestResponse<?> restResponse = restClient.getServerInfo();
final RestResponse<ServerInfo> restResponse = restClient.getServerInfo();
if (restResponse.isErroneous()) {
final KsqlErrorMessage ksqlError = restResponse.getErrorMessage();
if (Errors.toStatusCode(ksqlError.getErrorCode()) == NOT_ACCEPTABLE.code()) {
Expand All @@ -97,6 +101,8 @@ public static void validateClient(
}
writer.format(
"Couldn't connect to the KSQL server: %s%n%n", ksqlError.getMessage());
} else {
maybeSetIsCCloudServer(restClient, restResponse.getResponse());
}
} catch (final IllegalArgumentException exception) {
writer.println("Server URL must begin with protocol (e.g., http:// or https://)");
Expand Down Expand Up @@ -127,4 +133,21 @@ public static void validateClient(
writer.flush();
}
}

private static void maybeSetIsCCloudServer(
final KsqlRestClient restClient,
final ServerInfo serverInfo) {
final String ksqlServiceId = serverInfo.getKsqlServiceId();
final String ksqlServerAddress = restClient.getServerAddress().toString();
restClient.setIsCCloudServer(isCCloudServer(ksqlServiceId, ksqlServerAddress));
}

private static boolean isCCloudServer(
final String ksqlServiceId,
final String ksqlServerAddress
) {
return ksqlServiceId.startsWith(CCLOUD_KSQL_SERVICE_ID_PREFIX)
&& ksqlServerAddress.contains(CCLOUD_KSQL_ADDRESS_SUBSTRING)
&& ksqlServerAddress.contains(ksqlServiceId);
}
}
10 changes: 5 additions & 5 deletions ksqldb-cli/src/test/java/io/confluent/ksql/KsqlTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void setUp() {
ksql = new Ksql(options, systemProps, clientBuilder, cliBuilder);

when(options.getOutputFormat()).thenReturn(OutputFormat.TABULAR);
when(clientBuilder.build(any(), any(), any(), any())).thenReturn(client);
when(clientBuilder.build(any(), any(), any(), any(), any())).thenReturn(client);
when(cliBuilder.build(any(), any(), any(), any())).thenReturn(cli);
}

Expand Down Expand Up @@ -134,7 +134,7 @@ public void shouldBuildClientWithCorrectServerAddress() {
ksql.run();

// Then:
verify(clientBuilder).build(eq("in a galaxy far far away"), any(), any(), any());
verify(clientBuilder).build(eq("in a galaxy far far away"), any(), any(), any(), any());
}

@Test
Expand All @@ -152,7 +152,7 @@ public void shouldSupportSslConfigInConfigFile() throws Exception {
verify(clientBuilder).build(any(), any(), eq(ImmutableMap.of(
"ssl.truststore.location", "some/path",
"ssl.truststore.password", "letmein"
)), any());
)), any(), any());
}

@Test
Expand All @@ -175,7 +175,7 @@ public void shouldUseSslConfigInSystemConfigInPreferenceToAnyInConfigFile() thro
verify(clientBuilder).build(any(), any(), eq(ImmutableMap.of(
"ssl.truststore.location", "some/path",
"ssl.truststore.password", "letmein"
)), any());
)), any(), any());
}

@Test
Expand All @@ -191,7 +191,7 @@ public void shouldStripSslConfigFromConfigFileWhenMakingLocalProperties() throws
ksql.run();

// Then:
verify(clientBuilder).build(any(), eq(ImmutableMap.of("some.other.setting", "value")), any(), any());
verify(clientBuilder).build(any(), eq(ImmutableMap.of("some.other.setting", "value")), any(), any(), any());
}

private void givenConfigFile(final String content) throws Exception {
Expand Down
Loading

0 comments on commit a33b1e6

Please sign in to comment.