Skip to content

Commit

Permalink
feat(ksql-connect): introduce syntax for CREATE CONNECTOR (syntax onl…
Browse files Browse the repository at this point in the history
…y) (#3139)
  • Loading branch information
agavra authored Jul 30, 2019
1 parent e5c063e commit e823659
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ statement
(WITH tableProperties)? #createTable
| CREATE TABLE (IF NOT EXISTS)? qualifiedName
(WITH tableProperties)? AS query #createTableAs
| CREATE (SINK | SOURCE) CONNECTOR identifier WITH tableProperties #createConnector
| INSERT INTO qualifiedName query (PARTITION BY identifier)? #insertInto
| INSERT INTO qualifiedName (columns)? VALUES values #insertValues
| DROP STREAM (IF EXISTS)? qualifiedName (DELETE TOPIC)? #dropStream
Expand Down Expand Up @@ -87,7 +88,7 @@ tableProperties
;

tableProperty
: identifier EQ literal
: (identifier | STRING) EQ literal
;

printClause
Expand Down Expand Up @@ -316,6 +317,7 @@ nonReserved
| EXPLAIN | ANALYZE | TYPE
| SET | RESET
| IF
| CONNECTOR | SOURCE | SINK
| KEY
;

Expand Down Expand Up @@ -428,6 +430,9 @@ RUN: 'RUN';
SCRIPT: 'SCRIPT';
DECIMAL: 'DECIMAL';
KEY: 'KEY';
CONNECTOR: 'CONNECTOR';
SINK: 'SINK';
SOURCE: 'SOURCE';

IF: 'IF';

Expand Down
31 changes: 27 additions & 4 deletions ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.SqlBaseParser.CreateConnectorContext;
import io.confluent.ksql.parser.SqlBaseParser.InsertValuesContext;
import io.confluent.ksql.parser.SqlBaseParser.IntervalClauseContext;
import io.confluent.ksql.parser.SqlBaseParser.LimitClauseContext;
Expand All @@ -42,6 +43,8 @@
import io.confluent.ksql.parser.tree.BooleanLiteral;
import io.confluent.ksql.parser.tree.Cast;
import io.confluent.ksql.parser.tree.ComparisonExpression;
import io.confluent.ksql.parser.tree.CreateConnector;
import io.confluent.ksql.parser.tree.CreateConnector.Type;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTable;
Expand Down Expand Up @@ -195,10 +198,16 @@ private Map<String, Literal> processTableProperties(
final ImmutableMap.Builder<String, Literal> properties = ImmutableMap.builder();
if (tablePropertiesContext != null) {
for (final TablePropertyContext prop : tablePropertiesContext.tableProperty()) {
properties.put(
ParserUtil.getIdentifierText(prop.identifier()),
(Literal) visit(prop.literal())
);
if (prop.identifier() != null) {
properties.put(
ParserUtil.getIdentifierText(prop.identifier()),
(Literal) visit(prop.literal())
);
} else {
properties.put(
ParserUtil.unquote(prop.STRING().getText(), "'"),
(Literal) visit(prop.literal()));
}
}
}
return properties.build();
Expand Down Expand Up @@ -265,6 +274,20 @@ public Node visitCreateTableAs(final SqlBaseParser.CreateTableAsContext context)
);
}

@Override
public Node visitCreateConnector(final CreateConnectorContext context) {
final Map<String, Literal> properties = processTableProperties(context.tableProperties());
final String name = ParserUtil.getIdentifierText(context.identifier());
final CreateConnector.Type type = context.SOURCE() != null ? Type.SOURCE : Type.SINK;

return new CreateConnector(
getLocation(context),
name,
properties,
type
);
}

@Override
public Node visitInsertInto(final SqlBaseParser.InsertIntoContext context) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.tree;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import jdk.nashorn.internal.ir.annotations.Immutable;

@Immutable
public class CreateConnector extends Statement {

public enum Type {
SOURCE,
SINK
}

private final String name;
private final ImmutableMap<String, Literal> config;
private final Type type;

public CreateConnector(
final Optional<NodeLocation> location,
final String name,
final Map<String, Literal> config,
final Type type
) {
super(location);
this.name = Objects.requireNonNull(name, "name");
this.config = ImmutableMap.copyOf(Objects.requireNonNull(config, "config"));
this.type = Objects.requireNonNull(type, "type");
}

public CreateConnector(
final String name,
final Map<String, Literal> config,
final Type type
) {
this(Optional.empty(), name, config, type);
}


public String getName() {
return name;
}

public Map<String, Literal> getConfig() {
return config;
}

public Type getType() {
return type;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

final CreateConnector that = (CreateConnector) o;
return Objects.equals(name, that.name)
&& Objects.equals(config, that.config)
&& Objects.equals(type, that.type);
}

@Override
public int hashCode() {
return Objects.hash(name, config, type);
}

@Override
public String toString() {
return "CreateConnector{"
+ "name='" + name + '\''
+ ", config=" + config
+ ", type=" + type
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.IsNot.not;
Expand All @@ -43,6 +44,7 @@
import io.confluent.ksql.parser.tree.AllColumns;
import io.confluent.ksql.parser.tree.ArithmeticUnaryExpression;
import io.confluent.ksql.parser.tree.ComparisonExpression;
import io.confluent.ksql.parser.tree.CreateConnector;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTable;
Expand All @@ -66,6 +68,7 @@
import io.confluent.ksql.parser.tree.SetProperty;
import io.confluent.ksql.parser.tree.SingleColumn;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.StringLiteral;
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SqlBaseType;
Expand Down Expand Up @@ -1140,6 +1143,32 @@ public void shouldThrowHelpfulErrorMessageIfKeyFieldNotQuoted() {
KsqlParserTestUtil.buildSingleAst("CREATE STREAM S (ID INT) WITH (KEY=ID);", metaStore);
}

@Test
public void shouldBuildCreateSourceConnectorStatement() {
// When:
final PreparedStatement<CreateConnector> createExternal =
KsqlParserTestUtil.buildSingleAst(
"CREATE SOURCE CONNECTOR foo WITH ('foo.bar'='foo');", metaStore);

// Then:
assertThat(createExternal.getStatement().getConfig(), hasEntry("foo.bar", new StringLiteral("foo")));
assertThat(createExternal.getStatement().getName(), is("FOO"));
assertThat(createExternal.getStatement().getType(), is(CreateConnector.Type.SOURCE));
}

@Test
public void shouldBuildCreateSinkConnectorStatement() {
// When:
final PreparedStatement<CreateConnector> createExternal =
KsqlParserTestUtil.buildSingleAst(
"CREATE SINK CONNECTOR foo WITH (\"foo.bar\"='foo');", metaStore);

// Then:
assertThat(createExternal.getStatement().getConfig(), hasEntry("foo.bar", new StringLiteral("foo")));
assertThat(createExternal.getStatement().getName(), is("FOO"));
assertThat(createExternal.getStatement().getType(), is(CreateConnector.Type.SINK));
}

private static SearchedCaseExpression getSearchedCaseExpressionFromCsas(final Statement statement) {
final Query query = ((CreateStreamAsSelect) statement).getQuery();
final Expression caseExpression = ((SingleColumn) query.getSelect().getSelectItems().get(0)).getExpression();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.tree;

import com.google.common.collect.ImmutableMap;
import com.google.common.testing.EqualsTester;
import java.util.Map;
import java.util.Optional;
import org.junit.Test;

public class CreateConnectorTest {

private static final NodeLocation SOME_LOCATION = new NodeLocation(0, 0);
private static final NodeLocation OTHER_LOCATION = new NodeLocation(1, 0);

private static final String NAME = "foo";
private static final String OTHER_NAME = "bar";

private static final Map<String, Literal> CONFIG = ImmutableMap.of("foo", new StringLiteral("bar"));
private static final Map<String, Literal> OTHER_CONFIG = ImmutableMap.of("foo", new StringLiteral("baz"));

@Test
public void testEquals() {
new EqualsTester()
.addEqualityGroup(
new CreateConnector(Optional.of(SOME_LOCATION), NAME, CONFIG, CreateConnector.Type.SOURCE),
new CreateConnector(Optional.of(OTHER_LOCATION), NAME, CONFIG, CreateConnector.Type.SOURCE),
new CreateConnector(NAME, CONFIG, CreateConnector.Type.SOURCE)
)
.addEqualityGroup(
new CreateConnector(OTHER_NAME, CONFIG, CreateConnector.Type.SOURCE)
)
.addEqualityGroup(
new CreateConnector(NAME, OTHER_CONFIG, CreateConnector.Type.SOURCE)
)
.addEqualityGroup(
new CreateConnector(NAME, CONFIG, CreateConnector.Type.SINK)
)
.testEquals();
}

}

0 comments on commit e823659

Please sign in to comment.