Skip to content

Commit

Permalink
feat: expose query status through EXPLAIN (#3570)
Browse files Browse the repository at this point in the history
* feat(3208): expose query status through EXPLAIN

With this commit the status of any persistent query is exposed as part of the response from `EXPLAIN <some-query-id>;`

For example,

```
ksql>EXPLAIN CSAS_ID_0_1;

ID                   : CSAS_ID_0_1
SQL                  : CREATE STREAM ID_0 WITH (KAFKA_TOPIC='ID_0', PARTITIONS=1, REPLICAS=1) AS SELECT *
FROM ORDER_KSTREAM ORDER_KSTREAM
EMIT CHANGES;
Status               : REBALANCING

 Field       | Type
-----------------------------------------
 ROWTIME     | BIGINT           (system)
 ROWKEY      | VARCHAR(STRING)  (system)
 ORDERTIME   | BIGINT
 ORDERID     | VARCHAR(STRING)
 ITEMID      | VARCHAR(STRING)
 ORDERUNITS  | DOUBLE
 TIMESTAMP   | VARCHAR(STRING)
 PRICEARRAY  | ARRAY<DOUBLE>
 KEYVALUEMAP | MAP<STRING, DOUBLE>
-----------------------------------------
...
```

Note the new `Status               : REBALANCING` part.

* chore: fix tests
  • Loading branch information
big-andy-coates authored Oct 15, 2019
1 parent c0bfa41 commit 8ef82eb
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,9 @@ private void printQueryDescription(final QueryDescription query) {
if (query.getStatementText().length() > 0) {
writer().println(String.format("%-20s : %s", "SQL", query.getStatementText()));
}
if (query.getState().isPresent()) {
writer().println(String.format("%-20s : %s", "Status", query.getState().get()));
}
writer().println();
printSchema(query.getFields(), "");
printQuerySources(query);
Expand Down
60 changes: 44 additions & 16 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -120,6 +121,8 @@ public class CliTest {
private static final String SERVER_OVERRIDE = "SERVER";
private static final String SESSION_OVERRIDE = "SESSION";

private static final Pattern QUERY_ID_PATTERN = Pattern.compile("with query ID: (\\S+)");

private static final Pattern WRITE_QUERIES = Pattern
.compile(".*The following queries write into this source: \\[(.+)].*", Pattern.DOTALL);

Expand Down Expand Up @@ -779,7 +782,7 @@ public void shouldListFunctions() {
}

@Test
public void shouldDescribeScalarFunction() throws Exception {
public void shouldDescribeScalarFunction() {
final String expectedOutput =
"Name : TIMESTAMPTOSTRING\n"
+ "Author : Confluent\n"
Expand Down Expand Up @@ -815,7 +818,7 @@ public void shouldDescribeScalarFunction() throws Exception {
}

@Test
public void shouldDescribeOverloadedScalarFunction() throws Exception {
public void shouldDescribeOverloadedScalarFunction() {
// Given:
localCli.handleLine("describe function substring;");

Expand Down Expand Up @@ -847,7 +850,7 @@ public void shouldDescribeOverloadedScalarFunction() throws Exception {
}

@Test
public void shouldDescribeAggregateFunction() throws Exception {
public void shouldDescribeAggregateFunction() {
final String expectedSummary =
"Name : TOPK\n" +
"Author : confluent\n" +
Expand All @@ -868,27 +871,42 @@ public void shouldDescribeAggregateFunction() throws Exception {
}

@Test
public void shouldPrintErrorIfCantFindFunction() throws Exception {
public void shouldExplainQueryId() {
// Given:
localCli.handleLine("CREATE STREAM " + streamName + " "
+ "AS SELECT * FROM " + orderDataProvider.kstreamName() + ";");

final String queryId = extractQueryId(terminal.getOutputString());

final String explain = "EXPLAIN " + queryId + ";";

// When:
localCli.handleLine(explain);

// Then:
assertThat(terminal.getOutputString(), containsString(queryId));
assertThat(terminal.getOutputString(), containsString("Status"));
assertThat(terminal.getOutputString(),
either(containsString(": REBALANCING"))
.or(containsString("RUNNING")));

dropStream(streamName);
}

@Test
public void shouldPrintErrorIfCantFindFunction() {
localCli.handleLine("describe function foobar;");

assertThat(terminal.getOutputString(),
containsString("Can't find any functions with the name 'foobar'"));
}

@Test
public void shouldHandleSetPropertyAsPartOfMultiStatementLine() throws Exception {
// Given:
final String csas =
"CREATE STREAM " + streamName + " "
+ "AS SELECT * FROM " + orderDataProvider.kstreamName() + ";";

public void shouldHandleSetPropertyAsPartOfMultiStatementLine() {
// When:
localCli
.handleLine("set 'auto.offset.reset'='earliest'; " + csas);
localCli.handleLine("set 'auto.offset.reset'='earliest';");

// Then:
dropStream(streamName);

assertThat(terminal.getOutputString(),
containsString("Successfully changed local property 'auto.offset.reset' to 'earliest'"));
}
Expand Down Expand Up @@ -1082,7 +1100,9 @@ private static CommandStatusEntity stubCommandStatusEntityWithSeqNum(final long
}

private void givenCommandSequenceNumber(
final KsqlRestClient mockRestClient, final long seqNum) throws Exception {
final KsqlRestClient mockRestClient,
final long seqNum
) {
final CommandStatusEntity stubEntity = stubCommandStatusEntityWithSeqNum(seqNum);
when(mockRestClient.makeKsqlRequest(anyString(), anyLong())).thenReturn(
RestResponse.successful(
Expand All @@ -1093,7 +1113,9 @@ private void givenCommandSequenceNumber(
}

private void assertLastCommandSequenceNumber(
final KsqlRestClient mockRestClient, final long seqNum) throws Exception {
final KsqlRestClient mockRestClient,
final long seqNum
) {
// Given:
reset(mockRestClient);
final String statementText = "list streams;";
Expand Down Expand Up @@ -1169,6 +1191,12 @@ private static Matcher<Iterable<? extends Iterable<? extends String>>> containsR
return Matchers.contains(rows);
}

private static String extractQueryId(final String outputString) {
final java.util.regex.Matcher matcher = QUERY_ID_PATTERN.matcher(outputString);
assertThat("Could not find query id in: " + outputString, matcher.find());
return matcher.group(1);
}

private static class TestRowCaptor implements RowCaptor {

private ImmutableList.Builder<List<String>> rows = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.serde.Format;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -123,10 +122,6 @@ public SourceName getSinkName() {
return sinkName;
}

public Format getResultTopicFormat() {
return resultTopic.getValueFormat().getFormat();
}

public String getSchemasDescription() {
return schemas.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -37,17 +38,26 @@ public static QueryDescription forQueryMetadata(final QueryMetadata queryMetadat
persistentQuery.getQueryId(),
persistentQuery,
ImmutableSet.of(persistentQuery.getSinkName()),
false
false,
Optional.of(persistentQuery.getState())
);
}
return create(new QueryId(""), queryMetadata, Collections.emptySet(), true);

return create(
new QueryId(""),
queryMetadata,
Collections.emptySet(),
true,
Optional.empty()
);
}

private static QueryDescription create(
final QueryId id,
final QueryMetadata queryMetadata,
final Set<SourceName> sinks,
final boolean valueSchemaOnly
final boolean valueSchemaOnly,
final Optional<String> state
) {
return new QueryDescription(
id,
Expand All @@ -57,8 +67,8 @@ private static QueryDescription create(
sinks.stream().map(SourceName::name).collect(Collectors.toSet()),
queryMetadata.getTopologyDescription(),
queryMetadata.getExecutionPlan(),
queryMetadata.getOverriddenProperties()
queryMetadata.getOverriddenProperties(),
state
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.rest.entity;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isEmptyString;
Expand Down Expand Up @@ -44,9 +45,9 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -87,6 +88,7 @@ public class QueryDescriptionFactoryTest {
@Before
public void setUp() {
when(topology.describe()).thenReturn(topologyDescription);
when(queryStreams.state()).thenReturn(State.RUNNING);

transientQuery = new TransientQueryMetadata(
SQL_TEXT,
Expand Down Expand Up @@ -127,60 +129,70 @@ public void setUp() {

@Test
public void shouldHaveEmptyQueryIdFromTransientQuery() {
Assert.assertThat(transientQueryDescription.getId().getId(), is(isEmptyString()));
assertThat(transientQueryDescription.getId().getId(), is(isEmptyString()));
}

@Test
public void shouldHaveQueryIdForPersistentQuery() {
Assert.assertThat(persistentQueryDescription.getId().getId(), is(QUERY_ID.getId()));
assertThat(persistentQueryDescription.getId().getId(), is(QUERY_ID.getId()));
}

@Test
public void shouldExposeExecutionPlan() {
Assert.assertThat(transientQueryDescription.getExecutionPlan(), is("execution plan"));
Assert.assertThat(persistentQueryDescription.getExecutionPlan(), is("execution plan"));
assertThat(transientQueryDescription.getExecutionPlan(), is("execution plan"));
assertThat(persistentQueryDescription.getExecutionPlan(), is("execution plan"));
}

@Test
public void shouldExposeSources() {
Assert.assertThat(transientQueryDescription.getSources(), is(SOURCE_NAMES.stream().map(SourceName::name).collect(Collectors.toSet())));
Assert.assertThat(persistentQueryDescription.getSources(), is(SOURCE_NAMES.stream().map(SourceName::name).collect( Collectors.toSet())));
assertThat(transientQueryDescription.getSources(), is(SOURCE_NAMES.stream().map(SourceName::name).collect(Collectors.toSet())));
assertThat(persistentQueryDescription.getSources(), is(SOURCE_NAMES.stream().map(SourceName::name).collect( Collectors.toSet())));
}

@Test
public void shouldExposeStatementText() {
Assert.assertThat(transientQueryDescription.getStatementText(), is(SQL_TEXT));
Assert.assertThat(persistentQueryDescription.getStatementText(), is(SQL_TEXT));
assertThat(transientQueryDescription.getStatementText(), is(SQL_TEXT));
assertThat(persistentQueryDescription.getStatementText(), is(SQL_TEXT));
}

@Test
public void shouldExposeTopology() {
Assert.assertThat(transientQueryDescription.getTopology(), is(TOPOLOGY_TEXT));
Assert.assertThat(persistentQueryDescription.getTopology(), is(TOPOLOGY_TEXT));
assertThat(transientQueryDescription.getTopology(), is(TOPOLOGY_TEXT));
assertThat(persistentQueryDescription.getTopology(), is(TOPOLOGY_TEXT));
}

@Test
public void shouldExposeOverridenProperties() {
Assert.assertThat(transientQueryDescription.getOverriddenProperties(), is(PROP_OVERRIDES));
Assert.assertThat(persistentQueryDescription.getOverriddenProperties(), is(PROP_OVERRIDES));
assertThat(transientQueryDescription.getOverriddenProperties(), is(PROP_OVERRIDES));
assertThat(persistentQueryDescription.getOverriddenProperties(), is(PROP_OVERRIDES));
}

@Test
public void shouldExposeValueFieldsForTransientQueries() {
Assert.assertThat(transientQueryDescription.getFields(), contains(
assertThat(transientQueryDescription.getFields(), contains(
new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)),
new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null))));
}

@Test
public void shouldExposeAllFieldsForPersistentQueries() {
Assert.assertThat(persistentQueryDescription.getFields(), contains(
assertThat(persistentQueryDescription.getFields(), contains(
new FieldInfo("ROWTIME", new SchemaInfo(SqlBaseType.BIGINT, null, null)),
new FieldInfo("ROWKEY", new SchemaInfo(SqlBaseType.STRING, null, null)),
new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)),
new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null))));
}

@Test
public void shouldReportPersistentQueriesStatus() {
assertThat(persistentQueryDescription.getState(), is(Optional.of("RUNNING")));
}

@Test
public void shouldNotReportTransientQueriesStatus() {
assertThat(transientQueryDescription.getState(), is(Optional.empty()));
}

@Test
public void shouldHandleRowTimeInValueSchemaForTransientQuery() {
// Given:
Expand Down Expand Up @@ -208,7 +220,7 @@ public void shouldHandleRowTimeInValueSchemaForTransientQuery() {
transientQueryDescription = QueryDescriptionFactory.forQueryMetadata(transientQuery);

// Then:
Assert.assertThat(transientQueryDescription.getFields(), contains(
assertThat(transientQueryDescription.getFields(), contains(
new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)),
new FieldInfo("ROWTIME", new SchemaInfo(SqlBaseType.BIGINT, null, null)),
new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null))));
Expand Down Expand Up @@ -241,7 +253,7 @@ public void shouldHandleRowKeyInValueSchemaForTransientQuery() {
transientQueryDescription = QueryDescriptionFactory.forQueryMetadata(transientQuery);

// Then:
Assert.assertThat(transientQueryDescription.getFields(), contains(
assertThat(transientQueryDescription.getFields(), contains(
new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)),
new FieldInfo("ROWKEY", new SchemaInfo(SqlBaseType.STRING, null, null)),
new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,17 @@
@RunWith(MockitoJUnitRunner.class)
public class ExplainExecutorTest {

@Rule public final TemporaryEngine engine = new TemporaryEngine();
@Rule public ExpectedException expectedException = ExpectedException.none();
@Rule
public final TemporaryEngine engine = new TemporaryEngine();
@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void shouldExplainQueryId() {
// Given:
final ConfiguredStatement<?> explain = engine.configure("EXPLAIN id;");
final PersistentQueryMetadata metadata = givenPersistentQuery("id");
when(metadata.getState()).thenReturn("Running");

KsqlEngine engine = mock(KsqlEngine.class);
when(engine.getPersistentQuery(metadata.getQueryId())).thenReturn(Optional.of(metadata));
Expand All @@ -68,7 +71,6 @@ public void shouldExplainQueryId() {
assertThat(query.getQueryDescription(), equalTo(QueryDescriptionFactory.forQueryMetadata(metadata)));
}


@Test
public void shouldExplainPersistentStatement() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void shouldListQueriesExtended() {
// Given
final ConfiguredStatement<?> showQueries = engine.configure("SHOW QUERIES EXTENDED;");
final PersistentQueryMetadata metadata = givenPersistentQuery("id");
when(metadata.getState()).thenReturn("Running");

final KsqlEngine engine = mock(KsqlEngine.class);
when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(metadata));
Expand Down
Loading

0 comments on commit 8ef82eb

Please sign in to comment.