Skip to content

Commit

Permalink
feat(ksql-connect): wiring for creating connectors (#3149)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Aug 5, 2019
1 parent 355cc6d commit cd20d57
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.cli.console.table.Table.Builder;
import io.confluent.ksql.cli.console.table.builder.CommandStatusTableBuilder;
import io.confluent.ksql.cli.console.table.builder.ConnectorInfoTableBuilder;
import io.confluent.ksql.cli.console.table.builder.ErrorEntityTableBuilder;
import io.confluent.ksql.cli.console.table.builder.ExecutionPlanTableBuilder;
import io.confluent.ksql.cli.console.table.builder.FunctionNameListTableBuilder;
import io.confluent.ksql.cli.console.table.builder.KafkaTopicsListTableBuilder;
Expand All @@ -39,6 +41,8 @@
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.rest.entity.ArgumentInfo;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.CreateConnectorEntity;
import io.confluent.ksql.rest.entity.ErrorEntity;
import io.confluent.ksql.rest.entity.ExecutionPlan;
import io.confluent.ksql.rest.entity.FieldInfo;
import io.confluent.ksql.rest.entity.FunctionDescriptionList;
Expand Down Expand Up @@ -133,6 +137,10 @@ public class Console implements Closeable {
tablePrinter(FunctionNameList.class, FunctionNameListTableBuilder::new))
.put(FunctionDescriptionList.class,
Console::printFunctionDescription)
.put(CreateConnectorEntity.class,
tablePrinter(CreateConnectorEntity.class, ConnectorInfoTableBuilder::new))
.put(ErrorEntity.class,
tablePrinter(ErrorEntity.class, ErrorEntityTableBuilder::new))
.build();

private static <T extends KsqlEntity> Handler1<KsqlEntity, Console> tablePrinter(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.cli.console.table.builder;

import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.rest.entity.CreateConnectorEntity;

public class ConnectorInfoTableBuilder implements TableBuilder<CreateConnectorEntity> {

@Override
public Table buildTable(final CreateConnectorEntity entity) {
return new Table.Builder()
.withColumnHeaders("Message")
.withRow("Created connector " + entity.getInfo().name())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.cli.console.table.builder;

import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.rest.entity.ErrorEntity;

public class ErrorEntityTableBuilder implements TableBuilder<ErrorEntity> {

@Override
public Table buildTable(final ErrorEntity entity) {
return new Table.Builder()
.withColumnHeaders("Error")
.withRow(entity.getErrorMessage())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.entity;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;

@JsonIgnoreProperties(ignoreUnknown = true)
public class CreateConnectorEntity extends KsqlEntity {

private final ConnectorInfo info;

@JsonCreator
public CreateConnectorEntity(
@JsonProperty("statementText") final String statementText,
@JsonProperty("info") final ConnectorInfo info
) {
super(statementText);
this.info = Objects.requireNonNull(info, "info");
}

public ConnectorInfo getInfo() {
return info;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

final CreateConnectorEntity that = (CreateConnectorEntity) o;
return Objects.equals(info, that.info);
}

@Override
public int hashCode() {
return Objects.hash(info);
}

@Override
public String toString() {
return "CreateConnectorEntity{"
+ "info=" + info
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.entity;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.errorprone.annotations.Immutable;
import java.util.Objects;

@JsonIgnoreProperties(ignoreUnknown = true)
@Immutable
public class ErrorEntity extends KsqlEntity {

private final String errorMessage;

@JsonCreator
public ErrorEntity(
@JsonProperty("statementText") final String statementText,
@JsonProperty("errorMessage") final String errorMessage
) {
super(statementText);
this.errorMessage = Objects.requireNonNull(errorMessage, "errorMessage");
}

public String getErrorMessage() {
return errorMessage;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

final ErrorEntity that = (ErrorEntity) o;
return Objects.equals(errorMessage, that.errorMessage);
}

@Override
public int hashCode() {
return Objects.hash(errorMessage);
}

@Override
public String toString() {
return "ErrorEntity{"
+ "errorMessage='" + errorMessage + '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
@JsonSubTypes.Type(value = SourceDescriptionList.class, name = "source_descriptions"),
@JsonSubTypes.Type(value = QueryDescriptionList.class, name = "query_descriptions"),
@JsonSubTypes.Type(value = FunctionDescriptionList.class, name = "describe_function"),
@JsonSubTypes.Type(value = FunctionNameList.class, name = "function_names")
@JsonSubTypes.Type(value = FunctionNameList.class, name = "function_names"),
@JsonSubTypes.Type(value = CreateConnectorEntity.class, name = "connector_info"),
@JsonSubTypes.Type(value = ErrorEntity.class, name = "error_entity")
})
public abstract class KsqlEntity {
private final String statementText;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.execution;

import com.google.common.collect.Maps;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.parser.tree.CreateConnector;
import io.confluent.ksql.rest.entity.CreateConnectorEntity;
import io.confluent.ksql.rest.entity.ErrorEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.services.ConnectClient;
import io.confluent.ksql.services.ConnectClient.ConnectResponse;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import java.util.Optional;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;

public final class ConnectExecutor {

private ConnectExecutor() { }

public static Optional<KsqlEntity> execute(
final ConfiguredStatement<CreateConnector> statement,
final KsqlExecutionContext executionContext,
final ServiceContext serviceContext
) {
final CreateConnector createConnector = statement.getStatement();
final ConnectClient client = serviceContext.getConnectClient();

final ConnectResponse<ConnectorInfo> response = client.create(
createConnector.getName(),
Maps.transformValues(createConnector.getConfig(), l -> l.getValue().toString()));

if (response.datum().isPresent()) {
return Optional.of(
new CreateConnectorEntity(
statement.getStatementText(),
response.datum().get()
)
);
}

return response.error()
.map(err -> new ErrorEntity(statement.getStatementText(), err));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.engine.InsertValuesExecutor;
import io.confluent.ksql.parser.tree.CreateConnector;
import io.confluent.ksql.parser.tree.DescribeFunction;
import io.confluent.ksql.parser.tree.Explain;
import io.confluent.ksql.parser.tree.InsertValues;
Expand Down Expand Up @@ -61,7 +62,8 @@ public enum CustomExecutors {
DESCRIBE_FUNCTION(DescribeFunction.class, DescribeFunctionExecutor::execute),
SET_PROPERTY(SetProperty.class, PropertyExecutor::set),
UNSET_PROPERTY(UnsetProperty.class, PropertyExecutor::unset),
INSERT_VALUES(InsertValues.class, insertValuesExecutor());
INSERT_VALUES(InsertValues.class, insertValuesExecutor()),
CREATE_CONNECTOR(CreateConnector.class, ConnectExecutor::execute);

public static final Map<Class<? extends Statement>, StatementExecutor<?>> EXECUTOR_MAP =
ImmutableMap.copyOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.engine.InsertValuesExecutor;
import io.confluent.ksql.parser.tree.CreateConnector;
import io.confluent.ksql.parser.tree.DescribeFunction;
import io.confluent.ksql.parser.tree.Explain;
import io.confluent.ksql.parser.tree.InsertValues;
Expand Down Expand Up @@ -64,8 +65,9 @@ public enum CustomValidators {
LIST_FUNCTIONS(ListFunctions.class, StatementValidator.NO_VALIDATION),
LIST_QUERIES(ListQueries.class, StatementValidator.NO_VALIDATION),
LIST_PROPERTIES(ListProperties.class, StatementValidator.NO_VALIDATION),
INSERT_VALUES(InsertValues.class, new InsertValuesExecutor()::execute),
CREATE_CONNECTOR(CreateConnector.class, StatementValidator.NO_VALIDATION),

INSERT_VALUES(InsertValues.class, new InsertValuesExecutor()::execute),
SHOW_COLUMNS(ShowColumns.class, ListSourceExecutor::columns),
EXPLAIN(Explain.class, ExplainExecutor::execute),
DESCRIBE_FUNCTION(DescribeFunction.class, DescribeFunctionExecutor::execute),
Expand Down
Loading

0 comments on commit cd20d57

Please sign in to comment.