From a0f381b1bce8ae58b658f51463001256d2fc3721 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Fri, 11 Sep 2020 08:42:54 -0700 Subject: [PATCH] feat: Support IF NOT EXISTS on CREATE TYPE (#6173) --- .../ksql/ddl/commands/CommandFactories.java | 2 +- .../ksql/ddl/commands/DdlCommandExec.java | 13 ++- .../ddl/commands/RegisterTypeFactory.java | 26 ++++- .../engine/rewrite/StatementRewriter.java | 3 +- .../ddl/commands/CommandFactoriesTest.java | 3 +- .../ksql/ddl/commands/DdlCommandExecTest.java | 29 ++++++ .../ddl/commands/RegisterTypeFactoryTest.java | 94 ++++++++++++++++++- .../ksql/metastore/MetaStoreImpl.java | 4 +- .../ksql/metastore/TypeRegistry.java | 6 +- .../ksql/metastore/TypeRegistryImpl.java | 11 +-- .../io/confluent/ksql/parser/SqlBase.g4 | 2 +- .../io/confluent/ksql/parser/AstBuilder.java | 3 +- .../confluent/ksql/parser/SqlFormatter.java | 3 + .../ksql/parser/tree/RegisterType.java | 18 +++- 14 files changed, 181 insertions(+), 36 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java index cc9fa6943a9b..aee919d6bcb2 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java @@ -64,7 +64,7 @@ public CommandFactories(final ServiceContext serviceContext, final MetaStore met this( new CreateSourceFactory(serviceContext), new DropSourceFactory(metaStore), - new RegisterTypeFactory(), + new RegisterTypeFactory(metaStore), new DropTypeFactory(metaStore) ); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java index 5fb37172aefb..20ea9eed40f8 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java @@ -110,11 +110,14 @@ public DdlCommandResult executeDropSource(final DropSourceCommand dropSource) { public DdlCommandResult executeRegisterType(final RegisterTypeCommand registerType) { final String name = registerType.getTypeName(); final SqlType type = registerType.getType(); - metaStore.registerType(name, type); - return new DdlCommandResult( - true, - "Registered custom type with name '" + name + "' and SQL type " + type - ); + final boolean wasRegistered = metaStore.registerType(name, type); + return wasRegistered + ? new DdlCommandResult( + true, + "Registered custom type with name '" + name + "' and SQL type " + type) + : new DdlCommandResult( + true, + name + " is already registered with type " + metaStore.resolveType(name).get()); } @Override diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/RegisterTypeFactory.java b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/RegisterTypeFactory.java index 42d61fd487c8..8887f84100af 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/RegisterTypeFactory.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/RegisterTypeFactory.java @@ -15,17 +15,33 @@ package io.confluent.ksql.ddl.commands; +import static java.util.Objects.requireNonNull; + import io.confluent.ksql.execution.ddl.commands.RegisterTypeCommand; +import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.tree.RegisterType; +import io.confluent.ksql.schema.ksql.types.SqlType; +import io.confluent.ksql.util.KsqlException; public final class RegisterTypeFactory { - RegisterTypeFactory() { + private final MetaStore metaStore; + + RegisterTypeFactory(final MetaStore metaStore) { + this.metaStore = requireNonNull(metaStore, "metaStore"); } public RegisterTypeCommand create(final RegisterType statement) { - return new RegisterTypeCommand( - statement.getType().getSqlType(), - statement.getName() - ); + final String name = statement.getName(); + final boolean ifNotExists = statement.getIfNotExists(); + final SqlType type = statement.getType().getSqlType(); + + if (!ifNotExists && metaStore.resolveType(name).isPresent()) { + throw new KsqlException( + "Cannot register custom type '" + name + "' " + + "since it is already registered with type: " + metaStore.resolveType(name).get() + ); + } + + return new RegisterTypeCommand(type, name); } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriter.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriter.java index 0269aac9c407..18a549b17dec 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriter.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriter.java @@ -477,7 +477,8 @@ public AstNode visitRegisterType(final RegisterType node, final C context) { return new RegisterType( node.getLocation(), node.getName(), - (Type) processExpression(node.getType(), context) + (Type) processExpression(node.getType(), context), + node.getIfNotExists() ); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java index 39f1c7bb50a0..7fc40055da5f 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java @@ -251,7 +251,8 @@ public void shouldCreateCommandForRegisterType() { final RegisterType ddlStatement = new RegisterType( Optional.empty(), "alias", - new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()) + new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()), + true ); // When: diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java index ceb6745bad9c..762e2a6ca0de 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java @@ -13,6 +13,7 @@ import io.confluent.ksql.execution.ddl.commands.DropSourceCommand; import io.confluent.ksql.execution.ddl.commands.DropTypeCommand; import io.confluent.ksql.execution.ddl.commands.KsqlTopic; +import io.confluent.ksql.execution.ddl.commands.RegisterTypeCommand; import io.confluent.ksql.execution.plan.Formats; import io.confluent.ksql.execution.timestamp.TimestampColumn; import io.confluent.ksql.function.InternalFunctionRegistry; @@ -22,6 +23,7 @@ import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.serde.FormatFactory; import io.confluent.ksql.serde.FormatInfo; @@ -65,6 +67,7 @@ public class DdlCommandExecTest { private CreateTableCommand createTable; private DropSourceCommand dropSource; private DropTypeCommand dropType; + private RegisterTypeCommand registerType; private final MutableMetaStore metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry()); @@ -75,6 +78,8 @@ public class DdlCommandExecTest { private KsqlStream source; @Mock private WindowInfo windowInfo; + @Mock + private SqlType type; private DdlCommandExec cmdExec; @@ -89,6 +94,7 @@ public void setup() { cmdExec = new DdlCommandExec(metaStore); dropType = new DropTypeCommand("type"); + registerType = new RegisterTypeCommand(type,"type"); } @Test @@ -243,6 +249,29 @@ public void shouldDropSource() { ); } + @Test + public void shouldRegisterType() { + // When: + final DdlCommandResult result = cmdExec.execute(SQL_TEXT, registerType, false); + + // Then: + assertThat("Expected successful resolution", result.isSuccess()); + assertThat(result.getMessage(), is("Registered custom type with name 'type' and SQL type " + type)); + } + + @Test + public void shouldNotRegisterExistingType() { + // Given: + metaStore.registerType("type", SqlTypes.STRING); + + // When: + final DdlCommandResult result = cmdExec.execute(SQL_TEXT, registerType, false); + + // Then: + assertThat("Expected successful resolution", result.isSuccess()); + assertThat(result.getMessage(), is("type is already registered with type STRING")); + } + @Test public void shouldDropExistingType() { // Given: diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/RegisterTypeFactoryTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/RegisterTypeFactoryTest.java index b003821375b5..0afaff30198e 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/RegisterTypeFactoryTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/RegisterTypeFactoryTest.java @@ -17,26 +17,86 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.when; import io.confluent.ksql.execution.ddl.commands.RegisterTypeCommand; import io.confluent.ksql.execution.expression.tree.Type; +import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.tree.RegisterType; import io.confluent.ksql.schema.ksql.types.SqlBaseType; import io.confluent.ksql.schema.ksql.types.SqlPrimitiveType; import io.confluent.ksql.schema.ksql.types.SqlStruct; +import io.confluent.ksql.schema.ksql.types.SqlType; +import io.confluent.ksql.util.KsqlException; import java.util.Optional; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.class) public class RegisterTypeFactoryTest { - private final RegisterTypeFactory factory = new RegisterTypeFactory(); + private static final String EXISTING_TYPE = "existing_type"; + private static final String NOT_EXISTING_TYPE = "not_existing_type"; + private RegisterTypeFactory factory; + + @Mock + private MetaStore metaStore; + @Mock + private SqlType customType; + + @Before + public void setUp() { + when(metaStore.resolveType(EXISTING_TYPE)).thenReturn(Optional.of(customType)); + factory = new RegisterTypeFactory(metaStore); + } + + @Test + public void shouldCreateCommandForRegisterTypeWhenIfNotExitsSet() { + // Given: + final RegisterType ddlStatement = new RegisterType( + Optional.empty(), + NOT_EXISTING_TYPE, + new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()), + true + ); + + // When: + final RegisterTypeCommand result = factory.create(ddlStatement); + + // Then: + assertThat(result.getType(), equalTo(ddlStatement.getType().getSqlType())); + assertThat(result.getTypeName(), equalTo(NOT_EXISTING_TYPE)); + } + + @Test + public void shouldCreateCommandForRegisterTypeWhenIfNotExitsNotSet() { + // Given: + final RegisterType ddlStatement = new RegisterType( + Optional.empty(), + NOT_EXISTING_TYPE, + new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()), + false + ); + + // When: + final RegisterTypeCommand result = factory.create(ddlStatement); + + // Then: + assertThat(result.getType(), equalTo(ddlStatement.getType().getSqlType())); + assertThat(result.getTypeName(), equalTo(NOT_EXISTING_TYPE)); + } @Test - public void shouldCreateCommandForRegisterType() { + public void shouldNotThrowOnRegisterExistingTypeWhenIfNotExistsSet() { // Given: final RegisterType ddlStatement = new RegisterType( Optional.empty(), - "alias", - new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()) + EXISTING_TYPE, + new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()), + true ); // When: @@ -44,6 +104,30 @@ public void shouldCreateCommandForRegisterType() { // Then: assertThat(result.getType(), equalTo(ddlStatement.getType().getSqlType())); - assertThat(result.getTypeName(), equalTo("alias")); + assertThat(result.getTypeName(), equalTo(EXISTING_TYPE)); + } + + @Test + public void shouldThrowOnRegisterExistingTypeWhenIfNotExistsNotSet() { + // Given: + final RegisterType ddlStatement = new RegisterType( + Optional.empty(), + EXISTING_TYPE, + new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()), + false + ); + + // When: + final Exception e = assertThrows( + KsqlException.class, + () -> factory.create(ddlStatement) + ); + + // Then: + assertThat( + e.getMessage(), + equalTo("Cannot register custom type '" + + EXISTING_TYPE + "' since it is already registered with type: " + customType) + ); } } diff --git a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MetaStoreImpl.java b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MetaStoreImpl.java index 1392b5ec95e7..cb237354cb8b 100644 --- a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MetaStoreImpl.java +++ b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MetaStoreImpl.java @@ -273,8 +273,8 @@ private Stream streamSources(final Set sourceNames) { } @Override - public void registerType(final String name, final SqlType type) { - typeRegistry.registerType(name, type); + public boolean registerType(final String name, final SqlType type) { + return typeRegistry.registerType(name, type); } @Override diff --git a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/TypeRegistry.java b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/TypeRegistry.java index 7a6020b727ee..e98b66cc7c5a 100644 --- a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/TypeRegistry.java +++ b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/TypeRegistry.java @@ -33,7 +33,7 @@ public interface TypeRegistry { * @param name the name, must be unique * @param type the schema to associate it with */ - void registerType(String name, SqlType type); + boolean registerType(String name, SqlType type); /** * @param name the previously registered name @@ -78,7 +78,9 @@ public SqlType getType() { */ TypeRegistry EMPTY = new TypeRegistry() { @Override - public void registerType(final String name, final SqlType type) { } + public boolean registerType(final String name, final SqlType type) { + return false; + } @Override public boolean deleteType(final String name) { diff --git a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/TypeRegistryImpl.java b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/TypeRegistryImpl.java index 62e2b312954f..3f132884f124 100644 --- a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/TypeRegistryImpl.java +++ b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/TypeRegistryImpl.java @@ -16,7 +16,6 @@ package io.confluent.ksql.metastore; import io.confluent.ksql.schema.ksql.types.SqlType; -import io.confluent.ksql.util.KsqlException; import java.util.Iterator; import java.util.Map; import java.util.Optional; @@ -27,14 +26,8 @@ public class TypeRegistryImpl implements TypeRegistry { private final Map typeRegistry = new ConcurrentHashMap<>(); @Override - public void registerType(final String name, final SqlType type) { - final SqlType oldValue = typeRegistry.putIfAbsent(name.toUpperCase(), type); - if (oldValue != null) { - throw new KsqlException( - "Cannot register custom type '" + name + "' " - + "since it is already registered with type: " + type - ); - } + public boolean registerType(final String name, final SqlType type) { + return typeRegistry.putIfAbsent(name.toUpperCase(), type) == null; } @Override diff --git a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index 6f35f2a4b03b..f4b738a7c6f1 100644 --- a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -80,7 +80,7 @@ statement | DROP TABLE (IF EXISTS)? sourceName (DELETE TOPIC)? #dropTable | DROP CONNECTOR (IF EXISTS)? identifier #dropConnector | EXPLAIN (statement | identifier) #explain - | CREATE TYPE identifier AS type #registerType + | CREATE TYPE (IF NOT EXISTS)? identifier AS type #registerType | DROP TYPE (IF EXISTS)? identifier #dropType ; diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 80fe423872d4..bcc8f0e576ed 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -1239,7 +1239,8 @@ public Node visitRegisterType(final RegisterTypeContext context) { return new RegisterType( getLocation(context), ParserUtil.getIdentifierText(context.identifier()), - typeParser.getType(context.type()) + typeParser.getType(context.type()), + context.EXISTS() != null ); } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java index 7438d4730ea3..475944a99aed 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java @@ -448,6 +448,9 @@ private void visitExtended() { @Override public Void visitRegisterType(final RegisterType node, final Integer context) { builder.append("CREATE TYPE "); + if (node.getIfNotExists()) { + builder.append("IF NOT EXISTS "); + } builder.append(FORMAT_OPTIONS.escape(node.getName())); builder.append(" AS "); builder.append(formatExpression(node.getType())); diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/RegisterType.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/RegisterType.java index af6c2f33ac0a..65c4b1f44294 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/RegisterType.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/RegisterType.java @@ -24,11 +24,17 @@ public class RegisterType extends Statement implements ExecutableDdlStatement { private final Type type; private final String name; + private final boolean ifNotExists; - public RegisterType(final Optional location, final String name, final Type type) { + public RegisterType( + final Optional location, + final String name, final Type type, + final boolean ifNotExists + ) { super(location); this.name = Objects.requireNonNull(name, "name"); this.type = Objects.requireNonNull(type, "type"); + this.ifNotExists = Objects.requireNonNull(ifNotExists, "ifNotExists"); } public Type getType() { @@ -39,6 +45,10 @@ public String getName() { return name; } + public boolean getIfNotExists() { + return ifNotExists; + } + @Override public R accept(final AstVisitor visitor, final C context) { return visitor.visitRegisterType(this, context); @@ -54,12 +64,13 @@ public boolean equals(final Object o) { } final RegisterType that = (RegisterType) o; return Objects.equals(type, that.type) - && Objects.equals(name, that.name); + && Objects.equals(name, that.name) + && ifNotExists == that.ifNotExists; } @Override public int hashCode() { - return Objects.hash(type, name); + return Objects.hash(type, name, ifNotExists); } @Override @@ -67,6 +78,7 @@ public String toString() { return "RegisterType{" + "type=" + type + ", name='" + name + '\'' + + ", ifNotExists=" + ifNotExists + '}'; } }