Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding metric-collection for DESCRIBE EXTEND #475

Merged
merged 15 commits into from
Dec 1, 2017
Merged
145 changes: 85 additions & 60 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Scanner;
import java.util.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -277,65 +272,20 @@ private void handleStatements(String line)
String statementText = KsqlEngine.getStatementString(statementContext);
if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext
|| statementContext.statement() instanceof SqlBaseParser.PrintTopicContext) {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
consecutiveStatements = new StringBuilder();
}
if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext) {
handleStreamedQuery(statementText);
} else {
handlePrintedTopic(statementText);
}
consecutiveStatements = printOrDisplayQueryResults(consecutiveStatements, statementContext, statementText);

} else if (statementContext.statement() instanceof SqlBaseParser.ListPropertiesContext) {
listProperties(statementText);

KsqlEntityList ksqlEntityList = restClient.makeKsqlRequest(statementText).getResponse();
PropertiesList propertiesList = (PropertiesList) ksqlEntityList.get(0);
propertiesList.getProperties().putAll(restClient.getLocalProperties());
terminal.printKsqlEntityList(
Arrays.asList(propertiesList)
);
} else if (statementContext.statement() instanceof SqlBaseParser.SetPropertyContext) {
SqlBaseParser.SetPropertyContext setPropertyContext =
(SqlBaseParser.SetPropertyContext) statementContext.statement();
String property = AstBuilder.unquote(setPropertyContext.STRING(0).getText(), "'");
String value = AstBuilder.unquote(setPropertyContext.STRING(1).getText(), "'");
setProperty(property, value);
setProperty(statementContext);

} else if (statementContext.statement() instanceof SqlBaseParser.UnsetPropertyContext) {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
consecutiveStatements = new StringBuilder();
}
SqlBaseParser.UnsetPropertyContext unsetPropertyContext =
(SqlBaseParser.UnsetPropertyContext) statementContext.statement();
String property = AstBuilder.unquote(unsetPropertyContext.STRING().getText(), "'");
unsetProperty(property);
consecutiveStatements = unsetProperty(consecutiveStatements, statementContext);
} else if (statementContext.statement() instanceof SqlBaseParser.RunScriptContext) {
SqlBaseParser.RunScriptContext runScriptContext =
(SqlBaseParser.RunScriptContext) statementContext.statement();
String schemaFilePath = AstBuilder.unquote(runScriptContext.STRING().getText(), "'");
String fileContent;
try {
fileContent = new String(Files.readAllBytes(Paths.get(schemaFilePath)), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new KsqlException(" Could not read statements from file: " + schemaFilePath + ". "
+ "Details: " + e.getMessage(), e);
}
setProperty(DdlConfig.SCHEMA_FILE_CONTENT_PROPERTY, fileContent);
printKsqlResponse(
restClient.makeKsqlRequest(statementText)
);
runScript(statementContext, statementText);
} else if (statementContext.statement() instanceof SqlBaseParser.RegisterTopicContext) {
CliUtils cliUtils = new CliUtils();
Optional<String> avroSchema = cliUtils.getAvroSchemaIfAvroTopic(
(SqlBaseParser.RegisterTopicContext) statementContext.statement());
if (avroSchema.isPresent()) {
setProperty(DdlConfig.AVRO_SCHEMA, avroSchema.get());
}
consecutiveStatements.append(statementText);
registerTopic(consecutiveStatements, statementContext, statementText);
} else {
consecutiveStatements.append(statementText);
}
Expand All @@ -347,13 +297,88 @@ private void handleStatements(String line)
}
}

private void registerTopic(StringBuilder consecutiveStatements, SqlBaseParser.SingleStatementContext statementContext, String statementText) {
CliUtils cliUtils = new CliUtils();
Optional<String> avroSchema = cliUtils.getAvroSchemaIfAvroTopic(
(SqlBaseParser.RegisterTopicContext) statementContext.statement());
if (avroSchema.isPresent()) {
setProperty(DdlConfig.AVRO_SCHEMA, avroSchema.get());
}
consecutiveStatements.append(statementText);
}

private void runScript(SqlBaseParser.SingleStatementContext statementContext, String statementText) throws IOException {
SqlBaseParser.RunScriptContext runScriptContext =
(SqlBaseParser.RunScriptContext) statementContext.statement();
String schemaFilePath = AstBuilder.unquote(runScriptContext.STRING().getText(), "'");
String fileContent;
try {
fileContent = new String(Files.readAllBytes(Paths.get(schemaFilePath)), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new KsqlException(" Could not read statements from file: " + schemaFilePath + ". "
+ "Details: " + e.getMessage(), e);
}
setProperty(DdlConfig.SCHEMA_FILE_CONTENT_PROPERTY, fileContent);
printKsqlResponse(
restClient.makeKsqlRequest(statementText)
);
}

private StringBuilder unsetProperty(StringBuilder consecutiveStatements, SqlBaseParser.SingleStatementContext statementContext) throws IOException {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
consecutiveStatements = new StringBuilder();
}
SqlBaseParser.UnsetPropertyContext unsetPropertyContext =
(SqlBaseParser.UnsetPropertyContext) statementContext.statement();
String property = AstBuilder.unquote(unsetPropertyContext.STRING().getText(), "'");
unsetProperty(property);
return consecutiveStatements;
}

private StringBuilder printOrDisplayQueryResults(StringBuilder consecutiveStatements, SqlBaseParser.SingleStatementContext statementContext, String statementText) throws IOException, InterruptedException, ExecutionException {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
consecutiveStatements = new StringBuilder();
}
if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext) {
handleStreamedQuery(statementText);
} else {
handlePrintedTopic(statementText);
}
return consecutiveStatements;
}

private void setProperty(SqlBaseParser.SingleStatementContext statementContext) {
SqlBaseParser.SetPropertyContext setPropertyContext =
(SqlBaseParser.SetPropertyContext) statementContext.statement();
String property = AstBuilder.unquote(setPropertyContext.STRING(0).getText(), "'");
String value = AstBuilder.unquote(setPropertyContext.STRING(1).getText(), "'");
setProperty(property, value);
}

private void listProperties(String statementText) throws IOException {
KsqlEntityList ksqlEntityList = restClient.makeKsqlRequest(statementText).getResponse();
PropertiesList propertiesList = (PropertiesList) ksqlEntityList.get(0);
propertiesList.getProperties().putAll(restClient.getLocalProperties());
terminal.printKsqlEntityList(
Collections.singletonList(propertiesList)
);
}

private void printKsqlResponse(RestResponse<KsqlEntityList> response) throws IOException {
if (response.isSuccessful()) {
KsqlEntityList ksqlEntities = response.getResponse();
boolean noErrorFromServer = true;
for (KsqlEntity entity : ksqlEntities) {
if (entity instanceof ErrorMessageEntity) {
terminal.printErrorMessage(((ErrorMessageEntity) entity).getErrorMessage());
ErrorMessageEntity errorMsg = (ErrorMessageEntity) entity;
terminal.printErrorMessage(errorMsg.getErrorMessage());
LOGGER.error(errorMsg.getErrorMessage().getMessage());
noErrorFromServer = false;
} else if (entity instanceof CommandStatusEntity &&
(((CommandStatusEntity) entity).getCommandStatus().getStatus() == CommandStatus.Status.ERROR)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,19 +420,27 @@ private void printAsTable(KsqlEntity ksqlEntity) {
)).collect(Collectors.toList());
} else if (ksqlEntity instanceof Queries) {
List<Queries.RunningQuery> runningQueries = ((Queries) ksqlEntity).getQueries();
columnHeaders = Arrays.asList("Query ID", "Kafka Topic", "Query String");
columnHeaders = Arrays.asList("Query ID", "Kafka Topic", "Query String", "Statistics");
rowValues = runningQueries.stream()
.map(runningQuery -> Arrays.asList(
Long.toString(runningQuery.getId()),
runningQuery.getKafkaTopic(),
runningQuery.getQueryString()
runningQuery.getQueryString(),
runningQuery.getStatistics()
)).collect(Collectors.toList());
} else if (ksqlEntity instanceof SourceDescription) {
List<SourceDescription.FieldSchemaInfo> fields = ((SourceDescription) ksqlEntity).getSchema();
SourceDescription ksqlEntity1 = (SourceDescription) ksqlEntity;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we name this variable sourceDescription?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

List<SourceDescription.FieldSchemaInfo> fields = ksqlEntity1.getSchema();
columnHeaders = Arrays.asList("Field", "Type");
rowValues = fields.stream()
.map(field -> Arrays.asList(field.getName(), field.getType()))
.collect(Collectors.toList());

rowValues.add(Arrays.asList("------","--------"));
rowValues.add(Arrays.asList("",""));
rowValues.add(Arrays.asList("Statistics", ksqlEntity1.getStatistics()));


} else if (ksqlEntity instanceof TopicDescription) {
columnHeaders = new ArrayList<>();
columnHeaders.add("Topic Name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ public void testPrintKSqlEntityList() throws IOException {
properties.put("k3", true);

List<Queries.RunningQuery> queries = new ArrayList<>();
queries.add(new Queries.RunningQuery("select * from t1", "TestTopic", 1));
queries.add(new Queries.RunningQuery("select * from t1", "TestTopic", "stats", 1));

for (int i = 0; i < 5; i++) {
KsqlEntityList entityList = new KsqlEntityList(Arrays.asList(
new CommandStatusEntity("e", "topic/1", "SUCCESS", "Success Message"),
new ErrorMessageEntity("e", new FakeException()),
new PropertiesList("e", properties),
new Queries("e", queries),
new SourceDescription("e", "TestSource", buildTestSchema(i), DataSource.DataSourceType.KTABLE, "key", "2000-01-01"),
new SourceDescription("e", "TestSource", buildTestSchema(i), DataSource.DataSourceType.KTABLE, "key", "2000-01-01", ""),
new TopicDescription("e", "TestTopic", "TestKafkaTopic", "AVRO", "schemaString"),
new StreamsList("e", Arrays.asList(new StreamsList.StreamInfo("TestStream", "TestTopic", "AVRO"))),
new TablesList("e", Arrays.asList(new TablesList.TableInfo("TestTable", "TestTopic", "JSON", false))),
Expand Down
8 changes: 8 additions & 0 deletions ksql-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
<groupId>io.confluent.ksql</groupId>
<artifactId>ksql-common</artifactId>
</dependency>
<dependency>
<groupId>io.confluent.support</groupId>
<artifactId>support-metrics-common</artifactId>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksql-serde</artifactId>
Expand Down Expand Up @@ -121,6 +125,10 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>common-metrics</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Loading