From cd20d57332b649df23cac6cf7cf83d1936f4791e Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Mon, 5 Aug 2019 10:53:37 -0700 Subject: [PATCH] feat(ksql-connect): wiring for creating connectors (#3149) --- .../confluent/ksql/cli/console/Console.java | 8 ++ .../builder/ConnectorInfoTableBuilder.java | 30 ++++ .../builder/ErrorEntityTableBuilder.java | 30 ++++ .../rest/entity/CreateConnectorEntity.java | 66 +++++++++ .../ksql/rest/entity/ErrorEntity.java | 67 +++++++++ .../ksql/rest/entity/KsqlEntity.java | 4 +- .../server/execution/ConnectExecutor.java | 59 ++++++++ .../server/execution/CustomExecutors.java | 4 +- .../server/validation/CustomValidators.java | 4 +- .../server/execution/ConnectExecutorTest.java | 129 ++++++++++++++++++ 10 files changed, 398 insertions(+), 3 deletions(-) create mode 100644 ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ConnectorInfoTableBuilder.java create mode 100644 ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ErrorEntityTableBuilder.java create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CreateConnectorEntity.java create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/ErrorEntity.java create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ConnectExecutor.java create mode 100644 ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index 0c4b18f640f2..f86ea7cf2771 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -27,6 +27,8 @@ import io.confluent.ksql.cli.console.table.Table; import io.confluent.ksql.cli.console.table.Table.Builder; import io.confluent.ksql.cli.console.table.builder.CommandStatusTableBuilder; +import io.confluent.ksql.cli.console.table.builder.ConnectorInfoTableBuilder; +import io.confluent.ksql.cli.console.table.builder.ErrorEntityTableBuilder; import io.confluent.ksql.cli.console.table.builder.ExecutionPlanTableBuilder; import io.confluent.ksql.cli.console.table.builder.FunctionNameListTableBuilder; import io.confluent.ksql.cli.console.table.builder.KafkaTopicsListTableBuilder; @@ -39,6 +41,8 @@ import io.confluent.ksql.json.JsonMapper; import io.confluent.ksql.rest.entity.ArgumentInfo; import io.confluent.ksql.rest.entity.CommandStatusEntity; +import io.confluent.ksql.rest.entity.CreateConnectorEntity; +import io.confluent.ksql.rest.entity.ErrorEntity; import io.confluent.ksql.rest.entity.ExecutionPlan; import io.confluent.ksql.rest.entity.FieldInfo; import io.confluent.ksql.rest.entity.FunctionDescriptionList; @@ -133,6 +137,10 @@ public class Console implements Closeable { tablePrinter(FunctionNameList.class, FunctionNameListTableBuilder::new)) .put(FunctionDescriptionList.class, Console::printFunctionDescription) + .put(CreateConnectorEntity.class, + tablePrinter(CreateConnectorEntity.class, ConnectorInfoTableBuilder::new)) + .put(ErrorEntity.class, + tablePrinter(ErrorEntity.class, ErrorEntityTableBuilder::new)) .build(); private static Handler1 tablePrinter( diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ConnectorInfoTableBuilder.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ConnectorInfoTableBuilder.java new file mode 100644 index 000000000000..b178a057afc6 --- /dev/null +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ConnectorInfoTableBuilder.java @@ -0,0 +1,30 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.cli.console.table.builder; + +import io.confluent.ksql.cli.console.table.Table; +import io.confluent.ksql.rest.entity.CreateConnectorEntity; + +public class ConnectorInfoTableBuilder implements TableBuilder { + + @Override + public Table buildTable(final CreateConnectorEntity entity) { + return new Table.Builder() + .withColumnHeaders("Message") + .withRow("Created connector " + entity.getInfo().name()) + .build(); + } +} diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ErrorEntityTableBuilder.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ErrorEntityTableBuilder.java new file mode 100644 index 000000000000..5aca7eb14541 --- /dev/null +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ErrorEntityTableBuilder.java @@ -0,0 +1,30 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.cli.console.table.builder; + +import io.confluent.ksql.cli.console.table.Table; +import io.confluent.ksql.rest.entity.ErrorEntity; + +public class ErrorEntityTableBuilder implements TableBuilder { + + @Override + public Table buildTable(final ErrorEntity entity) { + return new Table.Builder() + .withColumnHeaders("Error") + .withRow(entity.getErrorMessage()) + .build(); + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CreateConnectorEntity.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CreateConnectorEntity.java new file mode 100644 index 000000000000..ccaffa200dc2 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CreateConnectorEntity.java @@ -0,0 +1,66 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Objects; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class CreateConnectorEntity extends KsqlEntity { + + private final ConnectorInfo info; + + @JsonCreator + public CreateConnectorEntity( + @JsonProperty("statementText") final String statementText, + @JsonProperty("info") final ConnectorInfo info + ) { + super(statementText); + this.info = Objects.requireNonNull(info, "info"); + } + + public ConnectorInfo getInfo() { + return info; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final CreateConnectorEntity that = (CreateConnectorEntity) o; + return Objects.equals(info, that.info); + } + + @Override + public int hashCode() { + return Objects.hash(info); + } + + @Override + public String toString() { + return "CreateConnectorEntity{" + + "info=" + info + + '}'; + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/ErrorEntity.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/ErrorEntity.java new file mode 100644 index 000000000000..0a4b7508b3d8 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/ErrorEntity.java @@ -0,0 +1,67 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.errorprone.annotations.Immutable; +import java.util.Objects; + +@JsonIgnoreProperties(ignoreUnknown = true) +@Immutable +public class ErrorEntity extends KsqlEntity { + + private final String errorMessage; + + @JsonCreator + public ErrorEntity( + @JsonProperty("statementText") final String statementText, + @JsonProperty("errorMessage") final String errorMessage + ) { + super(statementText); + this.errorMessage = Objects.requireNonNull(errorMessage, "errorMessage"); + } + + public String getErrorMessage() { + return errorMessage; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final ErrorEntity that = (ErrorEntity) o; + return Objects.equals(errorMessage, that.errorMessage); + } + + @Override + public int hashCode() { + return Objects.hash(errorMessage); + } + + @Override + public String toString() { + return "ErrorEntity{" + + "errorMessage='" + errorMessage + '\'' + + '}'; + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java index 0fd8cff0c8b0..c97abead169b 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java @@ -41,7 +41,9 @@ @JsonSubTypes.Type(value = SourceDescriptionList.class, name = "source_descriptions"), @JsonSubTypes.Type(value = QueryDescriptionList.class, name = "query_descriptions"), @JsonSubTypes.Type(value = FunctionDescriptionList.class, name = "describe_function"), - @JsonSubTypes.Type(value = FunctionNameList.class, name = "function_names") + @JsonSubTypes.Type(value = FunctionNameList.class, name = "function_names"), + @JsonSubTypes.Type(value = CreateConnectorEntity.class, name = "connector_info"), + @JsonSubTypes.Type(value = ErrorEntity.class, name = "error_entity") }) public abstract class KsqlEntity { private final String statementText; diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ConnectExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ConnectExecutor.java new file mode 100644 index 000000000000..6daa1bdaaa70 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ConnectExecutor.java @@ -0,0 +1,59 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.execution; + +import com.google.common.collect.Maps; +import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.parser.tree.CreateConnector; +import io.confluent.ksql.rest.entity.CreateConnectorEntity; +import io.confluent.ksql.rest.entity.ErrorEntity; +import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.services.ConnectClient; +import io.confluent.ksql.services.ConnectClient.ConnectResponse; +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.statement.ConfiguredStatement; +import java.util.Optional; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; + +public final class ConnectExecutor { + + private ConnectExecutor() { } + + public static Optional execute( + final ConfiguredStatement statement, + final KsqlExecutionContext executionContext, + final ServiceContext serviceContext + ) { + final CreateConnector createConnector = statement.getStatement(); + final ConnectClient client = serviceContext.getConnectClient(); + + final ConnectResponse response = client.create( + createConnector.getName(), + Maps.transformValues(createConnector.getConfig(), l -> l.getValue().toString())); + + if (response.datum().isPresent()) { + return Optional.of( + new CreateConnectorEntity( + statement.getStatementText(), + response.datum().get() + ) + ); + } + + return response.error() + .map(err -> new ErrorEntity(statement.getStatementText(), err)); + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java index a21014ea4866..a4ef7bdd0852 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.engine.InsertValuesExecutor; +import io.confluent.ksql.parser.tree.CreateConnector; import io.confluent.ksql.parser.tree.DescribeFunction; import io.confluent.ksql.parser.tree.Explain; import io.confluent.ksql.parser.tree.InsertValues; @@ -61,7 +62,8 @@ public enum CustomExecutors { DESCRIBE_FUNCTION(DescribeFunction.class, DescribeFunctionExecutor::execute), SET_PROPERTY(SetProperty.class, PropertyExecutor::set), UNSET_PROPERTY(UnsetProperty.class, PropertyExecutor::unset), - INSERT_VALUES(InsertValues.class, insertValuesExecutor()); + INSERT_VALUES(InsertValues.class, insertValuesExecutor()), + CREATE_CONNECTOR(CreateConnector.class, ConnectExecutor::execute); public static final Map, StatementExecutor> EXECUTOR_MAP = ImmutableMap.copyOf( diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java index d99ed33d4fc4..a2421fefea10 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.engine.InsertValuesExecutor; +import io.confluent.ksql.parser.tree.CreateConnector; import io.confluent.ksql.parser.tree.DescribeFunction; import io.confluent.ksql.parser.tree.Explain; import io.confluent.ksql.parser.tree.InsertValues; @@ -64,8 +65,9 @@ public enum CustomValidators { LIST_FUNCTIONS(ListFunctions.class, StatementValidator.NO_VALIDATION), LIST_QUERIES(ListQueries.class, StatementValidator.NO_VALIDATION), LIST_PROPERTIES(ListProperties.class, StatementValidator.NO_VALIDATION), - INSERT_VALUES(InsertValues.class, new InsertValuesExecutor()::execute), + CREATE_CONNECTOR(CreateConnector.class, StatementValidator.NO_VALIDATION), + INSERT_VALUES(InsertValues.class, new InsertValuesExecutor()::execute), SHOW_COLUMNS(ShowColumns.class, ListSourceExecutor::columns), EXPLAIN(Explain.class, ExplainExecutor::execute), DESCRIBE_FUNCTION(DescribeFunction.class, DescribeFunctionExecutor::execute), diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java new file mode 100644 index 000000000000..44e4ff44d0af --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java @@ -0,0 +1,129 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.execution; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.tree.CreateConnector; +import io.confluent.ksql.parser.tree.CreateConnector.Type; +import io.confluent.ksql.parser.tree.StringLiteral; +import io.confluent.ksql.rest.entity.CreateConnectorEntity; +import io.confluent.ksql.rest.entity.ErrorEntity; +import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.services.ConnectClient; +import io.confluent.ksql.services.ConnectClient.ConnectResponse; +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.util.KsqlConfig; +import java.util.Optional; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ConnectExecutorTest { + + private static final KsqlConfig CONFIG = new KsqlConfig(ImmutableMap.of()); + + private static final CreateConnector CREATE_CONNECTOR = new CreateConnector( + "foo", ImmutableMap.of("foo", new StringLiteral("bar")), Type.SOURCE); + + private static final ConfiguredStatement CREATE_CONNECTOR_CONFIGURED = + ConfiguredStatement.of( + PreparedStatement.of( + "CREATE SOURCE CONNECTOR foo WITH ('foo'='bar');", + CREATE_CONNECTOR), + ImmutableMap.of(), + CONFIG); + + @Mock + private ServiceContext serviceContext; + @Mock + private ConnectClient connectClient; + + @Before + public void setUp() { + when(serviceContext.getConnectClient()).thenReturn(connectClient); + } + + @Test + public void shouldPassInCorrectArgsToConnectClient() { + // Given: + givenSuccess(); + + // When: + ConnectExecutor.execute(CREATE_CONNECTOR_CONFIGURED, null, serviceContext); + + // Then: + verify(connectClient).create("foo", ImmutableMap.of("foo", "bar")); + } + + @Test + public void shouldReturnConnectorInfoEntityOnSuccess() { + // Given: + givenSuccess(); + + // When: + final Optional entity = ConnectExecutor + .execute(CREATE_CONNECTOR_CONFIGURED, null, serviceContext); + + // Then: + assertThat("Expected non-empty response", entity.isPresent()); + assertThat(entity.get(), instanceOf(CreateConnectorEntity.class)); + } + + @Test + public void shouldReturnErrorEntityOnError() { + // Given: + givenError(); + + // When: + final Optional entity = ConnectExecutor + .execute(CREATE_CONNECTOR_CONFIGURED, null, serviceContext); + + // Then: + assertThat("Expected non-empty response", entity.isPresent()); + assertThat(entity.get(), instanceOf(ErrorEntity.class)); + } + + private void givenSuccess() { + when(connectClient.create(anyString(), anyMap())) + .thenReturn(ConnectResponse.of( + new ConnectorInfo( + "foo", + ImmutableMap.of(), + ImmutableList.of(), + ConnectorType.SOURCE))); + } + + private void givenError() { + when(connectClient.create(anyString(), anyMap())) + .thenReturn(ConnectResponse.of("error!")); + } + +} \ No newline at end of file