From 0bf31eb5e69f4844c5ebb0931ab523c428332057 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 14 Aug 2019 12:39:03 -0700 Subject: [PATCH] feat: add SHOW CONNECTORS functionality (#3210) --- .../confluent/ksql/cli/console/Console.java | 4 + .../builder/ConnectorListTableBuilder.java | 44 +++++ .../ksql/cli/console/ConsoleTest.java | 45 +++++ .../io/confluent/ksql/parser/SqlBase.g4 | 2 + .../io/confluent/ksql/parser/AstBuilder.java | 17 ++ .../ksql/parser/tree/AstVisitor.java | 1 + .../ksql/parser/tree/ListConnectors.java | 66 ++++++++ .../ksql/parser/tree/ListConnectorsTest.java | 42 +++++ .../ksql/rest/entity/ConnectorList.java | 62 +++++++ .../ksql/rest/entity/KsqlEntity.java | 1 + .../ksql/rest/entity/SimpleConnectorInfo.java | 84 +++++++++ .../server/execution/CustomExecutors.java | 2 + .../execution/ListConnectorsExecutor.java | 109 ++++++++++++ .../server/validation/CustomValidators.java | 2 + .../execution/ListConnectorsExecutorTest.java | 159 ++++++++++++++++++ 15 files changed, 640 insertions(+) create mode 100644 ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ConnectorListTableBuilder.java create mode 100644 ksql-parser/src/main/java/io/confluent/ksql/parser/tree/ListConnectors.java create mode 100644 ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ListConnectorsTest.java create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/ConnectorList.java create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SimpleConnectorInfo.java create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutor.java create mode 100644 ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index 3426ad47b3b8..6040b33e2a63 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -28,6 +28,7 @@ import io.confluent.ksql.cli.console.table.Table.Builder; import io.confluent.ksql.cli.console.table.builder.CommandStatusTableBuilder; import io.confluent.ksql.cli.console.table.builder.ConnectorInfoTableBuilder; +import io.confluent.ksql.cli.console.table.builder.ConnectorListTableBuilder; import io.confluent.ksql.cli.console.table.builder.ErrorEntityTableBuilder; import io.confluent.ksql.cli.console.table.builder.ExecutionPlanTableBuilder; import io.confluent.ksql.cli.console.table.builder.FunctionNameListTableBuilder; @@ -41,6 +42,7 @@ import io.confluent.ksql.json.JsonMapper; import io.confluent.ksql.rest.entity.ArgumentInfo; import io.confluent.ksql.rest.entity.CommandStatusEntity; +import io.confluent.ksql.rest.entity.ConnectorList; import io.confluent.ksql.rest.entity.CreateConnectorEntity; import io.confluent.ksql.rest.entity.ErrorEntity; import io.confluent.ksql.rest.entity.ExecutionPlan; @@ -144,6 +146,8 @@ public class Console implements Closeable { Console::printFunctionDescription) .put(CreateConnectorEntity.class, tablePrinter(CreateConnectorEntity.class, ConnectorInfoTableBuilder::new)) + .put(ConnectorList.class, + tablePrinter(ConnectorList.class, ConnectorListTableBuilder::new)) .put(ErrorEntity.class, tablePrinter(ErrorEntity.class, ErrorEntityTableBuilder::new)) .build(); diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ConnectorListTableBuilder.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ConnectorListTableBuilder.java new file mode 100644 index 000000000000..1fb90cc75f7a --- /dev/null +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ConnectorListTableBuilder.java @@ -0,0 +1,44 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.cli.console.table.builder; + +import com.google.common.collect.ImmutableList; +import io.confluent.ksql.cli.console.table.Table; +import io.confluent.ksql.rest.entity.ConnectorList; +import java.util.List; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; + +public class ConnectorListTableBuilder implements TableBuilder { + + private static final List HEADERS = ImmutableList.of( + "Connector Name", "Type", "Class" + ); + + + @Override + public Table buildTable(final ConnectorList entity) { + return new Table.Builder() + .withColumnHeaders(HEADERS) + .withRows(entity.getConnectors() + .stream() + .map(info -> ImmutableList.of( + info.getName(), + ObjectUtils.defaultIfNull(info.getType(), ConnectorType.UNKNOWN).name(), + ObjectUtils.defaultIfNull(info.getClassName(), "")))) + .build(); + } +} diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index c826f99829bf..543b3dba4ea0 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -36,6 +36,7 @@ import io.confluent.ksql.rest.entity.ArgumentInfo; import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.rest.entity.CommandStatusEntity; +import io.confluent.ksql.rest.entity.ConnectorList; import io.confluent.ksql.rest.entity.EntityQueryId; import io.confluent.ksql.rest.entity.ExecutionPlan; import io.confluent.ksql.rest.entity.FieldInfo; @@ -49,6 +50,7 @@ import io.confluent.ksql.rest.entity.Queries; import io.confluent.ksql.rest.entity.RunningQuery; import io.confluent.ksql.rest.entity.SchemaInfo; +import io.confluent.ksql.rest.entity.SimpleConnectorInfo; import io.confluent.ksql.rest.entity.SourceDescription; import io.confluent.ksql.rest.entity.SourceDescriptionEntity; import io.confluent.ksql.rest.entity.SourceInfo; @@ -70,6 +72,7 @@ import java.util.function.Supplier; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; @@ -698,6 +701,48 @@ public void testSortedPrintTablesList() throws IOException { } } + @Test + public void shouldPrintConnectorsList() throws IOException { + // Given: + final KsqlEntityList entities = new KsqlEntityList(ImmutableList.of( + new ConnectorList( + "statement", + ImmutableList.of(), + ImmutableList.of( + new SimpleConnectorInfo("foo", ConnectorType.SOURCE, "clazz"), + new SimpleConnectorInfo("bar", null, null) + )) + )); + + // When: + console.printKsqlEntityList(entities); + + // Then: + final String output = terminal.getOutputString(); + if (console.getOutputFormat() == OutputFormat.JSON) { + assertThat(output, is("" + + "[ {\n" + + " \"@type\" : \"connector_list\",\n" + + " \"statementText\" : \"statement\",\n" + + " \"warnings\" : [ ],\n" + + " \"connectors\" : [ {\n" + + " \"name\" : \"foo\",\n" + + " \"type\" : \"source\",\n" + + " \"className\" : \"clazz\"\n" + + " }, {\n" + + " \"name\" : \"bar\"\n" + + " } ]\n" + + "} ]\n")); + } else { + assertThat(output, is("\n" + + " Connector Name | Type | Class \n" + + "----------------------------------\n" + + " foo | SOURCE | clazz \n" + + " bar | UNKNOWN | \n" + + "----------------------------------\n")); + } + } + @Test public void testPrintExecuptionPlan() throws IOException { // Given: diff --git a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index a6a1786ed0c2..424d4624be56 100644 --- a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -38,6 +38,7 @@ statement | (LIST | SHOW) STREAMS EXTENDED? #listStreams | (LIST | SHOW) TABLES EXTENDED? #listTables | (LIST | SHOW) FUNCTIONS #listFunctions + | (LIST | SHOW) (SOURCE | SINK)? CONNECTORS #listConnectors | DESCRIBE EXTENDED? qualifiedName #showColumns | DESCRIBE FUNCTION qualifiedName #describeFunction | PRINT (qualifiedName | STRING) printClause #printTopic @@ -431,6 +432,7 @@ SCRIPT: 'SCRIPT'; DECIMAL: 'DECIMAL'; KEY: 'KEY'; CONNECTOR: 'CONNECTOR'; +CONNECTORS: 'CONNECTORS'; SINK: 'SINK'; SOURCE: 'SOURCE'; diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 9cbd221a4911..a3f120888575 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -58,6 +58,7 @@ import io.confluent.ksql.parser.SqlBaseParser.InsertValuesContext; import io.confluent.ksql.parser.SqlBaseParser.IntervalClauseContext; import io.confluent.ksql.parser.SqlBaseParser.LimitClauseContext; +import io.confluent.ksql.parser.SqlBaseParser.ListConnectorsContext; import io.confluent.ksql.parser.SqlBaseParser.SingleStatementContext; import io.confluent.ksql.parser.SqlBaseParser.TablePropertiesContext; import io.confluent.ksql.parser.SqlBaseParser.TablePropertyContext; @@ -83,6 +84,8 @@ import io.confluent.ksql.parser.tree.Join; import io.confluent.ksql.parser.tree.JoinCriteria; import io.confluent.ksql.parser.tree.JoinOn; +import io.confluent.ksql.parser.tree.ListConnectors; +import io.confluent.ksql.parser.tree.ListConnectors.Scope; import io.confluent.ksql.parser.tree.ListFunctions; import io.confluent.ksql.parser.tree.ListProperties; import io.confluent.ksql.parser.tree.ListQueries; @@ -585,6 +588,20 @@ public Node visitListFunctions(final SqlBaseParser.ListFunctionsContext ctx) { return new ListFunctions(getLocation(ctx)); } + @Override + public Node visitListConnectors(final ListConnectorsContext ctx) { + final ListConnectors.Scope scope; + if (ctx.SOURCE() != null) { + scope = Scope.SOURCE; + } else if (ctx.SINK() != null) { + scope = Scope.SINK; + } else { + scope = Scope.ALL; + } + + return new ListConnectors(getLocation(ctx), scope); + } + @Override public Node visitTerminateQuery(final SqlBaseParser.TerminateQueryContext context) { return new TerminateQuery(getLocation(context), context.qualifiedName().getText()); diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java index 30ee9cf110cd..0851f046e530 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java @@ -154,4 +154,5 @@ protected R visitGroupingElement(final GroupingElement node, final C context) { protected R visitSimpleGroupBy(final SimpleGroupBy node, final C context) { return visitGroupingElement(node, context); } + } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/ListConnectors.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/ListConnectors.java new file mode 100644 index 000000000000..fba0eea86c5e --- /dev/null +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/ListConnectors.java @@ -0,0 +1,66 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser.tree; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.parser.NodeLocation; +import java.util.Objects; +import java.util.Optional; + +@Immutable +public class ListConnectors extends Statement { + + private final Scope scope; + + public enum Scope { + ALL, + SOURCE, + SINK + } + + public ListConnectors(final Optional location, final Scope scope) { + super(location); + this.scope = Objects.requireNonNull(scope, "scope"); + } + + public Scope getScope() { + return scope; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ListConnectors that = (ListConnectors) o; + return scope == that.scope; + } + + @Override + public int hashCode() { + return Objects.hash(scope); + } + + @Override + public String toString() { + return "ListConnectors{" + + "scope=" + scope + + '}'; + } +} diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ListConnectorsTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ListConnectorsTest.java new file mode 100644 index 000000000000..96a05385a147 --- /dev/null +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ListConnectorsTest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser.tree; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +import com.google.common.testing.EqualsTester; +import io.confluent.ksql.parser.NodeLocation; +import io.confluent.ksql.parser.tree.ListConnectors.Scope; +import java.util.Optional; +import org.junit.Test; + +public class ListConnectorsTest { + + @Test + public void shouldImplementHashCodeAndEquals() { + new EqualsTester() + .addEqualityGroup( + new ListConnectors(Optional.empty(), Scope.ALL), + new ListConnectors(Optional.of(new NodeLocation(1, 1)), Scope.ALL) + ) + .addEqualityGroup( + new ListConnectors(Optional.empty(), Scope.SOURCE) + ) + .testEquals(); + } + +} \ No newline at end of file diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/ConnectorList.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/ConnectorList.java new file mode 100644 index 000000000000..b7f37c1112a1 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/ConnectorList.java @@ -0,0 +1,62 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.Immutable; +import java.util.List; +import java.util.Objects; + +@JsonIgnoreProperties(ignoreUnknown = true) +@Immutable +public class ConnectorList extends KsqlEntity { + + private final ImmutableList connectors; + + @JsonCreator + public ConnectorList( + @JsonProperty("statementText") final String statementText, + @JsonProperty("warnings") final List warnings, + @JsonProperty("connectors") final List connectors + ) { + super(statementText, warnings); + this.connectors = ImmutableList.copyOf(Objects.requireNonNull(connectors, "connectors")); + } + + public ImmutableList getConnectors() { + return connectors; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ConnectorList that = (ConnectorList) o; + return Objects.equals(connectors, that.connectors); + } + + @Override + public int hashCode() { + return Objects.hash(connectors); + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java index a20e2a11229f..821a0181fce1 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java @@ -44,6 +44,7 @@ @JsonSubTypes.Type(value = FunctionDescriptionList.class, name = "describe_function"), @JsonSubTypes.Type(value = FunctionNameList.class, name = "function_names"), @JsonSubTypes.Type(value = CreateConnectorEntity.class, name = "connector_info"), + @JsonSubTypes.Type(value = ConnectorList.class, name = "connector_list"), @JsonSubTypes.Type(value = ErrorEntity.class, name = "error_entity") }) public abstract class KsqlEntity { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SimpleConnectorInfo.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SimpleConnectorInfo.java new file mode 100644 index 000000000000..c15e35b76813 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SimpleConnectorInfo.java @@ -0,0 +1,84 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Objects; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; + +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(Include.NON_NULL) +public class SimpleConnectorInfo { + + private final String name; + private final ConnectorType type; + private final String className; + + @JsonCreator + public SimpleConnectorInfo( + @JsonProperty("name") final String name, + @JsonProperty("type") final ConnectorType type, + @JsonProperty("className") final String className + ) { + this.name = Objects.requireNonNull(name, "name"); + this.type = type; + this.className = className; + } + + public String getName() { + return name; + } + + public ConnectorType getType() { + return type; + } + + public String getClassName() { + return className; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final SimpleConnectorInfo that = (SimpleConnectorInfo) o; + return Objects.equals(name, that.name) + && type == that.type + && Objects.equals(className, that.className); + } + + @Override + public int hashCode() { + return Objects.hash(name, type, className); + } + + @Override + public String toString() { + return "SimpleConnectorInfo{" + + "name='" + name + '\'' + + ", type=" + type + + ", className='" + className + '\'' + + '}'; + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java index a4ef7bdd0852..0eae931c08ff 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java @@ -22,6 +22,7 @@ import io.confluent.ksql.parser.tree.DescribeFunction; import io.confluent.ksql.parser.tree.Explain; import io.confluent.ksql.parser.tree.InsertValues; +import io.confluent.ksql.parser.tree.ListConnectors; import io.confluent.ksql.parser.tree.ListFunctions; import io.confluent.ksql.parser.tree.ListProperties; import io.confluent.ksql.parser.tree.ListQueries; @@ -56,6 +57,7 @@ public enum CustomExecutors { LIST_FUNCTIONS(ListFunctions.class, ListFunctionsExecutor::execute), LIST_QUERIES(ListQueries.class, ListQueriesExecutor::execute), LIST_PROPERTIES(ListProperties.class, ListPropertiesExecutor::execute), + LIST_CONNECTORS(ListConnectors.class, ListConnectorsExecutor::execute), SHOW_COLUMNS(ShowColumns.class, ListSourceExecutor::columns), EXPLAIN(Explain.class, ExplainExecutor::execute), diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutor.java new file mode 100644 index 000000000000..c7a693684451 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutor.java @@ -0,0 +1,109 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.execution; + +import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.parser.tree.ListConnectors; +import io.confluent.ksql.parser.tree.ListConnectors.Scope; +import io.confluent.ksql.rest.entity.ConnectorList; +import io.confluent.ksql.rest.entity.ErrorEntity; +import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.KsqlWarning; +import io.confluent.ksql.rest.entity.SimpleConnectorInfo; +import io.confluent.ksql.services.ConnectClient; +import io.confluent.ksql.services.ConnectClient.ConnectResponse; +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.statement.ConfiguredStatement; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; + +public final class ListConnectorsExecutor { + + private ListConnectorsExecutor() { } + + @SuppressWarnings("OptionalGetWithoutIsPresent") + public static Optional execute( + final ConfiguredStatement configuredStatement, + final KsqlExecutionContext ksqlExecutionContext, + final ServiceContext serviceContext + ) { + final ConnectClient connectClient = serviceContext.getConnectClient(); + final ConnectResponse> connectors = serviceContext.getConnectClient().connectors(); + if (connectors.error().isPresent()) { + return Optional.of(new ErrorEntity( + configuredStatement.getStatementText(), + connectors.error().get() + )); + } + + final List infos = new ArrayList<>(); + final List warnings = new ArrayList<>(); + final Scope scope = configuredStatement.getStatement().getScope(); + for (final String name : connectors.datum().get()) { + final ConnectResponse response = connectClient.describe(name); + if (response.datum().filter(i -> inScope(i.type(), scope)).isPresent()) { + infos.add(fromConnectorInfoResponse(name, response) + ); + } else if (response.error().isPresent()) { + if (scope == Scope.ALL) { + infos.add(new SimpleConnectorInfo(name, ConnectorType.UNKNOWN, null)); + } + warnings.add( + new KsqlWarning( + String.format( + "Could not describe connector %s: %s", + name, + response.error().get()))); + } + } + + return Optional.of( + new ConnectorList( + configuredStatement.getStatementText(), + warnings, + infos) + ); + } + + private static boolean inScope(final ConnectorType type, final Scope scope) { + switch (scope) { + case SOURCE: return type == ConnectorType.SOURCE; + case SINK: return type == ConnectorType.SINK; + case ALL: return true; + default: throw new IllegalArgumentException("Unexpected scope: " + scope); + } + } + + @SuppressWarnings("OptionalGetWithoutIsPresent") + private static SimpleConnectorInfo fromConnectorInfoResponse( + final String name, + final ConnectResponse response + ) { + if (response.error().isPresent()) { + return new SimpleConnectorInfo(name, null, null); + } + + final ConnectorInfo info = response.datum().get(); + return new SimpleConnectorInfo( + name, + info.type(), + info.config().get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java index a2421fefea10..3ffafd4f4e66 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java @@ -22,6 +22,7 @@ import io.confluent.ksql.parser.tree.DescribeFunction; import io.confluent.ksql.parser.tree.Explain; import io.confluent.ksql.parser.tree.InsertValues; +import io.confluent.ksql.parser.tree.ListConnectors; import io.confluent.ksql.parser.tree.ListFunctions; import io.confluent.ksql.parser.tree.ListProperties; import io.confluent.ksql.parser.tree.ListQueries; @@ -65,6 +66,7 @@ public enum CustomValidators { LIST_FUNCTIONS(ListFunctions.class, StatementValidator.NO_VALIDATION), LIST_QUERIES(ListQueries.class, StatementValidator.NO_VALIDATION), LIST_PROPERTIES(ListProperties.class, StatementValidator.NO_VALIDATION), + LIST_CONNECTORS(ListConnectors.class, StatementValidator.NO_VALIDATION), CREATE_CONNECTOR(CreateConnector.class, StatementValidator.NO_VALIDATION), INSERT_VALUES(InsertValues.class, new InsertValuesExecutor()::execute), diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java new file mode 100644 index 000000000000..54c162f3ecd7 --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java @@ -0,0 +1,159 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.execution; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.tree.ListConnectors; +import io.confluent.ksql.parser.tree.ListConnectors.Scope; +import io.confluent.ksql.rest.entity.ConnectorList; +import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.KsqlWarning; +import io.confluent.ksql.rest.entity.SimpleConnectorInfo; +import io.confluent.ksql.services.ConnectClient; +import io.confluent.ksql.services.ConnectClient.ConnectResponse; +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.util.KsqlConfig; +import java.util.Optional; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@SuppressWarnings("OptionalGetWithoutIsPresent") +@RunWith(MockitoJUnitRunner.class) +public class ListConnectorsExecutorTest { + + private static final String CONNECTOR_CLASS = "class"; + + private static final ConnectorInfo INFO = new ConnectorInfo( + "connector", + ImmutableMap.of(ConnectorConfig.CONNECTOR_CLASS_CONFIG, CONNECTOR_CLASS), + ImmutableList.of(), + ConnectorType.SOURCE + ); + + @Mock + private KsqlExecutionContext engine; + @Mock + private ServiceContext serviceContext; + @Mock + private ConnectClient connectClient; + + @Before + public void setUp() { + when(serviceContext.getConnectClient()).thenReturn(connectClient); + when(connectClient.describe("connector")) + .thenReturn(ConnectResponse.of(INFO)); + when(connectClient.describe("connector2")) + .thenReturn(ConnectResponse.of("DANGER WILL ROBINSON.")); + } + + @Test + public void shouldListValidConnector() { + // Given: + when(connectClient.connectors()) + .thenReturn(ConnectResponse.of(ImmutableList.of("connector"))); + final ConfiguredStatement statement = ConfiguredStatement.of( + PreparedStatement.of("", new ListConnectors(Optional.empty(), Scope.ALL)), + ImmutableMap.of(), + new KsqlConfig(ImmutableMap.of()) + ); + + // When: + final Optional entity = ListConnectorsExecutor + .execute(statement, engine, serviceContext); + + // Then: + assertThat("expected response!", entity.isPresent()); + final ConnectorList connectorList = (ConnectorList) entity.get(); + + assertThat(connectorList, is(new ConnectorList( + "", + ImmutableList.of(), + ImmutableList.of( + new SimpleConnectorInfo("connector", ConnectorType.SOURCE, CONNECTOR_CLASS) + ) + ))); + } + + @Test + public void shouldFilterNonMatchingConnectors() { + // Given: + when(connectClient.connectors()) + .thenReturn(ConnectResponse.of(ImmutableList.of("connector", "connector2"))); + final ConfiguredStatement statement = ConfiguredStatement.of( + PreparedStatement.of("", new ListConnectors(Optional.empty(), Scope.SINK)), + ImmutableMap.of(), + new KsqlConfig(ImmutableMap.of()) + ); + + // When: + final Optional entity = ListConnectorsExecutor + .execute(statement, engine, serviceContext); + + // Then: + assertThat("expected response!", entity.isPresent()); + final ConnectorList connectorList = (ConnectorList) entity.get(); + + assertThat(connectorList, is(new ConnectorList( + "", + ImmutableList.of(), + ImmutableList.of() + ))); + } + + @Test + public void shouldListInvalidConnectorWithNoInfo() { + // Given: + when(connectClient.connectors()) + .thenReturn(ConnectResponse.of(ImmutableList.of("connector2"))); + final ConfiguredStatement statement = ConfiguredStatement.of( + PreparedStatement.of("", new ListConnectors(Optional.empty(), Scope.ALL)), + ImmutableMap.of(), + new KsqlConfig(ImmutableMap.of()) + ); + + // When: + final Optional entity = ListConnectorsExecutor + .execute(statement, engine, serviceContext); + + // Then: + assertThat("expected response!", entity.isPresent()); + final ConnectorList connectorList = (ConnectorList) entity.get(); + + assertThat(connectorList, is(new ConnectorList( + "", + ImmutableList.of( + new KsqlWarning("Could not describe connector connector2: DANGER WILL ROBINSON.")), + ImmutableList.of( + new SimpleConnectorInfo("connector2", ConnectorType.UNKNOWN, null) + ) + ))); + } + +} \ No newline at end of file