Skip to content

Commit

Permalink
feat: add SHOW CONNECTORS functionality (#3210)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Aug 14, 2019
1 parent e1865a4 commit 0bf31eb
Show file tree
Hide file tree
Showing 15 changed files with 640 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.ksql.cli.console.table.Table.Builder;
import io.confluent.ksql.cli.console.table.builder.CommandStatusTableBuilder;
import io.confluent.ksql.cli.console.table.builder.ConnectorInfoTableBuilder;
import io.confluent.ksql.cli.console.table.builder.ConnectorListTableBuilder;
import io.confluent.ksql.cli.console.table.builder.ErrorEntityTableBuilder;
import io.confluent.ksql.cli.console.table.builder.ExecutionPlanTableBuilder;
import io.confluent.ksql.cli.console.table.builder.FunctionNameListTableBuilder;
Expand All @@ -41,6 +42,7 @@
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.rest.entity.ArgumentInfo;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ConnectorList;
import io.confluent.ksql.rest.entity.CreateConnectorEntity;
import io.confluent.ksql.rest.entity.ErrorEntity;
import io.confluent.ksql.rest.entity.ExecutionPlan;
Expand Down Expand Up @@ -144,6 +146,8 @@ public class Console implements Closeable {
Console::printFunctionDescription)
.put(CreateConnectorEntity.class,
tablePrinter(CreateConnectorEntity.class, ConnectorInfoTableBuilder::new))
.put(ConnectorList.class,
tablePrinter(ConnectorList.class, ConnectorListTableBuilder::new))
.put(ErrorEntity.class,
tablePrinter(ErrorEntity.class, ErrorEntityTableBuilder::new))
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.cli.console.table.builder;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.rest.entity.ConnectorList;
import java.util.List;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;

public class ConnectorListTableBuilder implements TableBuilder<ConnectorList> {

private static final List<String> HEADERS = ImmutableList.of(
"Connector Name", "Type", "Class"
);


@Override
public Table buildTable(final ConnectorList entity) {
return new Table.Builder()
.withColumnHeaders(HEADERS)
.withRows(entity.getConnectors()
.stream()
.map(info -> ImmutableList.of(
info.getName(),
ObjectUtils.defaultIfNull(info.getType(), ConnectorType.UNKNOWN).name(),
ObjectUtils.defaultIfNull(info.getClassName(), ""))))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.confluent.ksql.rest.entity.ArgumentInfo;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ConnectorList;
import io.confluent.ksql.rest.entity.EntityQueryId;
import io.confluent.ksql.rest.entity.ExecutionPlan;
import io.confluent.ksql.rest.entity.FieldInfo;
Expand All @@ -49,6 +50,7 @@
import io.confluent.ksql.rest.entity.Queries;
import io.confluent.ksql.rest.entity.RunningQuery;
import io.confluent.ksql.rest.entity.SchemaInfo;
import io.confluent.ksql.rest.entity.SimpleConnectorInfo;
import io.confluent.ksql.rest.entity.SourceDescription;
import io.confluent.ksql.rest.entity.SourceDescriptionEntity;
import io.confluent.ksql.rest.entity.SourceInfo;
Expand All @@ -70,6 +72,7 @@
import java.util.function.Supplier;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -698,6 +701,48 @@ public void testSortedPrintTablesList() throws IOException {
}
}

@Test
public void shouldPrintConnectorsList() throws IOException {
// Given:
final KsqlEntityList entities = new KsqlEntityList(ImmutableList.of(
new ConnectorList(
"statement",
ImmutableList.of(),
ImmutableList.of(
new SimpleConnectorInfo("foo", ConnectorType.SOURCE, "clazz"),
new SimpleConnectorInfo("bar", null, null)
))
));

// When:
console.printKsqlEntityList(entities);

// Then:
final String output = terminal.getOutputString();
if (console.getOutputFormat() == OutputFormat.JSON) {
assertThat(output, is(""
+ "[ {\n"
+ " \"@type\" : \"connector_list\",\n"
+ " \"statementText\" : \"statement\",\n"
+ " \"warnings\" : [ ],\n"
+ " \"connectors\" : [ {\n"
+ " \"name\" : \"foo\",\n"
+ " \"type\" : \"source\",\n"
+ " \"className\" : \"clazz\"\n"
+ " }, {\n"
+ " \"name\" : \"bar\"\n"
+ " } ]\n"
+ "} ]\n"));
} else {
assertThat(output, is("\n"
+ " Connector Name | Type | Class \n"
+ "----------------------------------\n"
+ " foo | SOURCE | clazz \n"
+ " bar | UNKNOWN | \n"
+ "----------------------------------\n"));
}
}

@Test
public void testPrintExecuptionPlan() throws IOException {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ statement
| (LIST | SHOW) STREAMS EXTENDED? #listStreams
| (LIST | SHOW) TABLES EXTENDED? #listTables
| (LIST | SHOW) FUNCTIONS #listFunctions
| (LIST | SHOW) (SOURCE | SINK)? CONNECTORS #listConnectors
| DESCRIBE EXTENDED? qualifiedName #showColumns
| DESCRIBE FUNCTION qualifiedName #describeFunction
| PRINT (qualifiedName | STRING) printClause #printTopic
Expand Down Expand Up @@ -431,6 +432,7 @@ SCRIPT: 'SCRIPT';
DECIMAL: 'DECIMAL';
KEY: 'KEY';
CONNECTOR: 'CONNECTOR';
CONNECTORS: 'CONNECTORS';
SINK: 'SINK';
SOURCE: 'SOURCE';

Expand Down
17 changes: 17 additions & 0 deletions ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.confluent.ksql.parser.SqlBaseParser.InsertValuesContext;
import io.confluent.ksql.parser.SqlBaseParser.IntervalClauseContext;
import io.confluent.ksql.parser.SqlBaseParser.LimitClauseContext;
import io.confluent.ksql.parser.SqlBaseParser.ListConnectorsContext;
import io.confluent.ksql.parser.SqlBaseParser.SingleStatementContext;
import io.confluent.ksql.parser.SqlBaseParser.TablePropertiesContext;
import io.confluent.ksql.parser.SqlBaseParser.TablePropertyContext;
Expand All @@ -83,6 +84,8 @@
import io.confluent.ksql.parser.tree.Join;
import io.confluent.ksql.parser.tree.JoinCriteria;
import io.confluent.ksql.parser.tree.JoinOn;
import io.confluent.ksql.parser.tree.ListConnectors;
import io.confluent.ksql.parser.tree.ListConnectors.Scope;
import io.confluent.ksql.parser.tree.ListFunctions;
import io.confluent.ksql.parser.tree.ListProperties;
import io.confluent.ksql.parser.tree.ListQueries;
Expand Down Expand Up @@ -585,6 +588,20 @@ public Node visitListFunctions(final SqlBaseParser.ListFunctionsContext ctx) {
return new ListFunctions(getLocation(ctx));
}

@Override
public Node visitListConnectors(final ListConnectorsContext ctx) {
final ListConnectors.Scope scope;
if (ctx.SOURCE() != null) {
scope = Scope.SOURCE;
} else if (ctx.SINK() != null) {
scope = Scope.SINK;
} else {
scope = Scope.ALL;
}

return new ListConnectors(getLocation(ctx), scope);
}

@Override
public Node visitTerminateQuery(final SqlBaseParser.TerminateQueryContext context) {
return new TerminateQuery(getLocation(context), context.qualifiedName().getText());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,5 @@ protected R visitGroupingElement(final GroupingElement node, final C context) {
protected R visitSimpleGroupBy(final SimpleGroupBy node, final C context) {
return visitGroupingElement(node, context);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.tree;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.parser.NodeLocation;
import java.util.Objects;
import java.util.Optional;

@Immutable
public class ListConnectors extends Statement {

private final Scope scope;

public enum Scope {
ALL,
SOURCE,
SINK
}

public ListConnectors(final Optional<NodeLocation> location, final Scope scope) {
super(location);
this.scope = Objects.requireNonNull(scope, "scope");
}

public Scope getScope() {
return scope;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ListConnectors that = (ListConnectors) o;
return scope == that.scope;
}

@Override
public int hashCode() {
return Objects.hash(scope);
}

@Override
public String toString() {
return "ListConnectors{"
+ "scope=" + scope
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.tree;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

import com.google.common.testing.EqualsTester;
import io.confluent.ksql.parser.NodeLocation;
import io.confluent.ksql.parser.tree.ListConnectors.Scope;
import java.util.Optional;
import org.junit.Test;

public class ListConnectorsTest {

@Test
public void shouldImplementHashCodeAndEquals() {
new EqualsTester()
.addEqualityGroup(
new ListConnectors(Optional.empty(), Scope.ALL),
new ListConnectors(Optional.of(new NodeLocation(1, 1)), Scope.ALL)
)
.addEqualityGroup(
new ListConnectors(Optional.empty(), Scope.SOURCE)
)
.testEquals();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.entity;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.Immutable;
import java.util.List;
import java.util.Objects;

@JsonIgnoreProperties(ignoreUnknown = true)
@Immutable
public class ConnectorList extends KsqlEntity {

private final ImmutableList<SimpleConnectorInfo> connectors;

@JsonCreator
public ConnectorList(
@JsonProperty("statementText") final String statementText,
@JsonProperty("warnings") final List<KsqlWarning> warnings,
@JsonProperty("connectors") final List<SimpleConnectorInfo> connectors
) {
super(statementText, warnings);
this.connectors = ImmutableList.copyOf(Objects.requireNonNull(connectors, "connectors"));
}

public ImmutableList<SimpleConnectorInfo> getConnectors() {
return connectors;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ConnectorList that = (ConnectorList) o;
return Objects.equals(connectors, that.connectors);
}

@Override
public int hashCode() {
return Objects.hash(connectors);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
@JsonSubTypes.Type(value = FunctionDescriptionList.class, name = "describe_function"),
@JsonSubTypes.Type(value = FunctionNameList.class, name = "function_names"),
@JsonSubTypes.Type(value = CreateConnectorEntity.class, name = "connector_info"),
@JsonSubTypes.Type(value = ConnectorList.class, name = "connector_list"),
@JsonSubTypes.Type(value = ErrorEntity.class, name = "error_entity")
})
public abstract class KsqlEntity {
Expand Down
Loading

0 comments on commit 0bf31eb

Please sign in to comment.