Skip to content

Commit

Permalink
refactor: clean up statement handling loop in CLI (MINOR) (#3352)
Browse files Browse the repository at this point in the history
  • Loading branch information
big-andy-coates authored Sep 17, 2019
1 parent 6be6d37 commit 62f165b
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 103 deletions.
168 changes: 76 additions & 92 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import io.confluent.ksql.parser.DefaultKsqlParser;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.SqlBaseParser;
import io.confluent.ksql.parser.SqlBaseParser.PrintTopicContext;
import io.confluent.ksql.parser.SqlBaseParser.QueryStatementContext;
import io.confluent.ksql.parser.SqlBaseParser.SingleStatementContext;
import io.confluent.ksql.parser.SqlBaseParser.SetPropertyContext;
import io.confluent.ksql.parser.SqlBaseParser.StatementContext;
import io.confluent.ksql.parser.SqlBaseParser.UnsetPropertyContext;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.KsqlRestClient.QueryStream;
Expand All @@ -40,12 +43,13 @@
import io.confluent.ksql.rest.entity.QueryDescriptionEntity;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.ErrorMessageUtil;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.HandlerMaps;
import io.confluent.ksql.util.HandlerMaps.ClassHandlerMap2;
import io.confluent.ksql.util.HandlerMaps.Handler2;
import io.confluent.ksql.util.ParserUtil;
import io.confluent.ksql.util.Version;
import io.confluent.ksql.util.WelcomeMsgUtils;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.List;
Expand All @@ -61,6 +65,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.commons.compress.utils.IOUtils;
import org.jline.reader.EndOfFileException;
import org.jline.reader.UserInterruptException;
import org.jline.terminal.Terminal;
Expand All @@ -71,6 +76,16 @@ public class Cli implements KsqlRequestExecutor, Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(Cli.class);

private static final ClassHandlerMap2<StatementContext, Cli, String> STATEMENT_HANDLERS =
HandlerMaps
.forClass(StatementContext.class)
.withArgTypes(Cli.class, String.class)
.put(QueryStatementContext.class, Cli::handleQuery)
.put(PrintTopicContext.class, Cli::handlePrintedTopic)
.put(SetPropertyContext.class, Cli::setPropertyFromCtxt)
.put(UnsetPropertyContext.class, Cli::unsetPropertyFromCtxt)
.build();

private final ExecutorService queryStreamExecutorService;

private final Long streamedQueryRowLimit;
Expand Down Expand Up @@ -121,11 +136,11 @@ public static Cli build(

@Override
public void makeKsqlRequest(final String statements) {
try {
printKsqlResponse(makeKsqlRequest(statements, restClient::makeKsqlRequest));
} catch (IOException e) {
throw new KsqlException(e);
if (statements.isEmpty()) {
return;
}

printKsqlResponse(makeKsqlRequest(statements, restClient::makeKsqlRequest));
}

private <R> RestResponse<R> makeKsqlRequest(
Expand Down Expand Up @@ -210,7 +225,7 @@ public void close() {
terminal.close();
}

void handleLine(final String line) throws Exception {
void handleLine(final String line) {
final String trimmedLine = Optional.ofNullable(line).orElse("").trim();
if (trimmedLine.isEmpty()) {
return;
Expand Down Expand Up @@ -247,74 +262,33 @@ private String readLine() {
}
}

private void handleStatements(final String line)
throws InterruptedException, IOException, ExecutionException {

private void handleStatements(final String line) {
final List<ParsedStatement> statements =
new DefaultKsqlParser().parse(line);

StringBuilder consecutiveStatements = new StringBuilder();
for (final ParsedStatement statement : statements) {
final SingleStatementContext statementContext = statement.getStatement();
final String statementText = statement.getStatementText();

if (statementContext.statement() instanceof SqlBaseParser.QueryStatementContext) {

final QueryStatementContext queryContext =
(QueryStatementContext) statementContext.statement();

if (queryContext.query().EMIT() != null) {
consecutiveStatements = printOrDisplayQueryResults(
consecutiveStatements,
statementContext,
statementText
);
} else {
makeKsqlRequest(statementText);
}
} else if (statementContext.statement() instanceof SqlBaseParser.PrintTopicContext) {
consecutiveStatements = printOrDisplayQueryResults(
consecutiveStatements,
statementContext,
statementText
);

} else if (statementContext.statement() instanceof SqlBaseParser.ListPropertiesContext) {
makeKsqlRequest(statementText);

} else if (statementContext.statement() instanceof SqlBaseParser.SetPropertyContext) {
setProperty(statementContext);
final StringBuilder consecutiveStatements = new StringBuilder();
for (final ParsedStatement parsed : statements) {
final StatementContext statementContext = parsed.getStatement().statement();
final String statementText = parsed.getStatementText();

} else if (statementContext.statement() instanceof SqlBaseParser.UnsetPropertyContext) {
consecutiveStatements = unsetProperty(consecutiveStatements, statementContext);
final Handler2<StatementContext, Cli, String> handler = STATEMENT_HANDLERS
.get(statementContext.getClass());

} else {
if (handler == null) {
consecutiveStatements.append(statementText);
} else {
makeKsqlRequest(consecutiveStatements.toString());
consecutiveStatements.setLength(0);

handler.handle(this, statementText, statementContext);
}
}
if (consecutiveStatements.length() != 0) {
makeKsqlRequest(consecutiveStatements.toString());
}
}

private StringBuilder printOrDisplayQueryResults(
final StringBuilder consecutiveStatements,
final SqlBaseParser.SingleStatementContext statementContext,
final String statementText
) throws InterruptedException, IOException, ExecutionException {
if (consecutiveStatements.length() != 0) {
makeKsqlRequest(consecutiveStatements.toString());
consecutiveStatements.setLength(0);
}
if (statementContext.statement() instanceof SqlBaseParser.QueryStatementContext) {
handleStreamedQuery(statementText);
} else {
handlePrintedTopic(statementText);
}
return consecutiveStatements;
}

private void printKsqlResponse(final RestResponse<KsqlEntityList> response) throws IOException {
private void printKsqlResponse(final RestResponse<KsqlEntityList> response) {
if (response.isSuccessful()) {
final KsqlEntityList ksqlEntities = response.getResponse();
boolean noErrorFromServer = true;
Expand All @@ -337,8 +311,11 @@ private void printKsqlResponse(final RestResponse<KsqlEntityList> response) thro
}
}

@SuppressWarnings("try")
private void handleStreamedQuery(final String query) throws IOException {
@SuppressWarnings({"try", "unused"})
private void handleStreamedQuery(
final String query,
final SqlBaseParser.QueryStatementContext ignored
) {
final RestResponse<KsqlEntityList> explainResponse = restClient
.makeKsqlRequest("EXPLAIN " + query);
if (!explainResponse.isSuccessful()) {
Expand All @@ -360,7 +337,7 @@ private void handleStreamedQuery(final String query) throws IOException {
terminal.printErrorMessage(queryResponse.getErrorMessage());
} else {
try (KsqlRestClient.QueryStream queryStream = queryResponse.getResponse();
StatusClosable ignored = terminal.setStatusMessage("Press CTRL-C to interrupt")) {
StatusClosable toClose = terminal.setStatusMessage("Press CTRL-C to interrupt")) {
streamResults(queryStream, fields);
}
}
Expand All @@ -372,14 +349,10 @@ private void streamResults(
) {
final Future<?> queryStreamFuture = queryStreamExecutorService.submit(() -> {
for (long rowsRead = 0; limitNotReached(rowsRead) && queryStream.hasNext(); rowsRead++) {
try {
final StreamedRow row = queryStream.next();
terminal.printStreamedRow(row, fields);
if (row.isTerminal()) {
break;
}
} catch (final IOException exception) {
throw new RuntimeException(exception);
final StreamedRow row = queryStream.next();
terminal.printStreamedRow(row, fields);
if (row.isTerminal()) {
break;
}
}
});
Expand Down Expand Up @@ -416,15 +389,28 @@ private boolean limitNotReached(final long rowsRead) {
return streamedQueryRowLimit == null || rowsRead < streamedQueryRowLimit;
}

@SuppressWarnings("try")
private void handlePrintedTopic(final String printTopic)
throws InterruptedException, ExecutionException, IOException {
private void handleQuery(
final String statement,
final SqlBaseParser.QueryStatementContext query
) {
if (query.query().EMIT() == null) {
makeKsqlRequest(statement);
} else {
handleStreamedQuery(statement, query);
}
}

@SuppressWarnings({"try", "unused"})
private void handlePrintedTopic(
final String printTopic,
final SqlBaseParser.PrintTopicContext ignored
) {
final RestResponse<InputStream> topicResponse =
makeKsqlRequest(printTopic, restClient::makePrintTopicRequest);

if (topicResponse.isSuccessful()) {
try (Scanner topicStreamScanner = new Scanner(topicResponse.getResponse(), UTF_8.name());
StatusClosable ignored = terminal.setStatusMessage("Press CTRL-C to interrupt")
StatusClosable toClose = terminal.setStatusMessage("Press CTRL-C to interrupt")
) {
final Future<?> topicPrintFuture = queryStreamExecutorService.submit(() -> {
while (!Thread.currentThread().isInterrupted() && topicStreamScanner.hasNextLine()) {
Expand All @@ -444,9 +430,11 @@ private void handlePrintedTopic(final String printTopic)
try {
topicPrintFuture.get();
} catch (final CancellationException exception) {
topicResponse.getResponse().close();
IOUtils.closeQuietly(topicResponse.getResponse());
terminal.writer().println("Topic printing ceased");
terminal.flush();
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
} else {
Expand All @@ -455,9 +443,11 @@ private void handlePrintedTopic(final String printTopic)
}
}

private void setProperty(final SqlBaseParser.SingleStatementContext statementContext) {
final SqlBaseParser.SetPropertyContext setPropertyContext =
(SqlBaseParser.SetPropertyContext) statementContext.statement();
@SuppressWarnings("unused")
private void setPropertyFromCtxt(
final String ignored,
final SqlBaseParser.SetPropertyContext setPropertyContext
) {
final String property = ParserUtil.unquote(setPropertyContext.STRING(0).getText(), "'");
final String value = ParserUtil.unquote(setPropertyContext.STRING(1).getText(), "'");
setProperty(property, value);
Expand All @@ -476,19 +466,13 @@ private void setProperty(final String property, final String value) {
terminal.flush();
}

private StringBuilder unsetProperty(
final StringBuilder consecutiveStatements,
final SqlBaseParser.SingleStatementContext statementContext
@SuppressWarnings("unused")
private void unsetPropertyFromCtxt(
final String ignored,
final SqlBaseParser.UnsetPropertyContext unsetPropertyContext
) {
if (consecutiveStatements.length() != 0) {
makeKsqlRequest(consecutiveStatements.toString());
consecutiveStatements.setLength(0);
}
final SqlBaseParser.UnsetPropertyContext unsetPropertyContext =
(SqlBaseParser.UnsetPropertyContext) statementContext.statement();
final String property = ParserUtil.unquote(unsetPropertyContext.STRING().getText(), "'");
unsetProperty(property);
return consecutiveStatements;
}

private void unsetProperty(final String property) {
Expand Down
22 changes: 14 additions & 8 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.eclipse.jetty.io.RuntimeIOException;
import org.jline.terminal.Terminal.Signal;
import org.jline.terminal.Terminal.SignalHandler;
import org.slf4j.Logger;
Expand Down Expand Up @@ -300,7 +301,7 @@ public List<HistoryEntry> getHistory() {
return Collections.unmodifiableList(terminal.getHistory());
}

public void printErrorMessage(final KsqlErrorMessage errorMessage) throws IOException {
public void printErrorMessage(final KsqlErrorMessage errorMessage) {
if (errorMessage instanceof KsqlStatementErrorMessage) {
printKsqlEntityList(((KsqlStatementErrorMessage)errorMessage).getEntities());
}
Expand All @@ -315,7 +316,7 @@ public void printError(final String shortMsg, final String fullMsg) {
public void printStreamedRow(
final StreamedRow row,
final List<FieldInfo> fields
) throws IOException {
) {
if (row.getErrorMessage() != null) {
printErrorMessage(row.getErrorMessage());
return;
Expand All @@ -341,7 +342,7 @@ public void printStreamedRow(
}
}

public void printKsqlEntityList(final List<KsqlEntity> entityList) throws IOException {
public void printKsqlEntityList(final List<KsqlEntity> entityList) {
switch (outputFormat) {
case JSON:
printAsJson(entityList);
Expand All @@ -364,7 +365,7 @@ public void printKsqlEntityList(final List<KsqlEntity> entityList) throws IOExce
}
}

public void printRowHeader(final List<FieldInfo> fields) throws IOException {
public void printRowHeader(final List<FieldInfo> fields) {
switch (outputFormat) {
case JSON:
break;
Expand Down Expand Up @@ -769,16 +770,21 @@ private static String splitLongLine(final String input, final int maxLineLength)
return output.toString();
}

private void printAsJson(final Object o) throws IOException {
private void printAsJson(final Object o) {
if (!((o instanceof PropertiesList || (o instanceof KsqlEntityList)))) {
log.warn(
"Unexpected result class: '{}' found in printAsJson",
o.getClass().getCanonicalName()
);
}
objectMapper.writerWithDefaultPrettyPrinter().writeValue(writer(), o);
writer().println();
flush();

try {
objectMapper.writerWithDefaultPrettyPrinter().writeValue(writer(), o);
writer().println();
flush();
} catch (final IOException e) {
throw new RuntimeIOException("Failed to write to console", e);
}
}

static class NoOpRowCaptor implements RowCaptor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.KsqlAggregateFunction;
import io.confluent.ksql.metastore.model.KeyField;
Expand All @@ -54,8 +56,6 @@
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.util.KsqlConfig;
import java.time.Duration;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.util.KsqlConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsConfig;
Expand Down

0 comments on commit 62f165b

Please sign in to comment.