Skip to content

Commit

Permalink
feat: Support IF NOT EXISTS on CREATE TYPE (#6173)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zara Lim authored Sep 11, 2020
1 parent 9d4cc6d commit a0f381b
Show file tree
Hide file tree
Showing 14 changed files with 181 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public CommandFactories(final ServiceContext serviceContext, final MetaStore met
this(
new CreateSourceFactory(serviceContext),
new DropSourceFactory(metaStore),
new RegisterTypeFactory(),
new RegisterTypeFactory(metaStore),
new DropTypeFactory(metaStore)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ public DdlCommandResult executeDropSource(final DropSourceCommand dropSource) {
public DdlCommandResult executeRegisterType(final RegisterTypeCommand registerType) {
final String name = registerType.getTypeName();
final SqlType type = registerType.getType();
metaStore.registerType(name, type);
return new DdlCommandResult(
true,
"Registered custom type with name '" + name + "' and SQL type " + type
);
final boolean wasRegistered = metaStore.registerType(name, type);
return wasRegistered
? new DdlCommandResult(
true,
"Registered custom type with name '" + name + "' and SQL type " + type)
: new DdlCommandResult(
true,
name + " is already registered with type " + metaStore.resolveType(name).get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,33 @@

package io.confluent.ksql.ddl.commands;

import static java.util.Objects.requireNonNull;

import io.confluent.ksql.execution.ddl.commands.RegisterTypeCommand;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.tree.RegisterType;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlException;

public final class RegisterTypeFactory {
RegisterTypeFactory() {
private final MetaStore metaStore;

RegisterTypeFactory(final MetaStore metaStore) {
this.metaStore = requireNonNull(metaStore, "metaStore");
}

public RegisterTypeCommand create(final RegisterType statement) {
return new RegisterTypeCommand(
statement.getType().getSqlType(),
statement.getName()
);
final String name = statement.getName();
final boolean ifNotExists = statement.getIfNotExists();
final SqlType type = statement.getType().getSqlType();

if (!ifNotExists && metaStore.resolveType(name).isPresent()) {
throw new KsqlException(
"Cannot register custom type '" + name + "' "
+ "since it is already registered with type: " + metaStore.resolveType(name).get()
);
}

return new RegisterTypeCommand(type, name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ public AstNode visitRegisterType(final RegisterType node, final C context) {
return new RegisterType(
node.getLocation(),
node.getName(),
(Type) processExpression(node.getType(), context)
(Type) processExpression(node.getType(), context),
node.getIfNotExists()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ public void shouldCreateCommandForRegisterType() {
final RegisterType ddlStatement = new RegisterType(
Optional.empty(),
"alias",
new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build())
new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()),
true
);

// When:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.confluent.ksql.execution.ddl.commands.DropSourceCommand;
import io.confluent.ksql.execution.ddl.commands.DropTypeCommand;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.ddl.commands.RegisterTypeCommand;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.function.InternalFunctionRegistry;
Expand All @@ -22,6 +23,7 @@
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
Expand Down Expand Up @@ -65,6 +67,7 @@ public class DdlCommandExecTest {
private CreateTableCommand createTable;
private DropSourceCommand dropSource;
private DropTypeCommand dropType;
private RegisterTypeCommand registerType;

private final MutableMetaStore metaStore
= MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry());
Expand All @@ -75,6 +78,8 @@ public class DdlCommandExecTest {
private KsqlStream source;
@Mock
private WindowInfo windowInfo;
@Mock
private SqlType type;

private DdlCommandExec cmdExec;

Expand All @@ -89,6 +94,7 @@ public void setup() {

cmdExec = new DdlCommandExec(metaStore);
dropType = new DropTypeCommand("type");
registerType = new RegisterTypeCommand(type,"type");
}

@Test
Expand Down Expand Up @@ -243,6 +249,29 @@ public void shouldDropSource() {
);
}

@Test
public void shouldRegisterType() {
// When:
final DdlCommandResult result = cmdExec.execute(SQL_TEXT, registerType, false);

// Then:
assertThat("Expected successful resolution", result.isSuccess());
assertThat(result.getMessage(), is("Registered custom type with name 'type' and SQL type " + type));
}

@Test
public void shouldNotRegisterExistingType() {
// Given:
metaStore.registerType("type", SqlTypes.STRING);

// When:
final DdlCommandResult result = cmdExec.execute(SQL_TEXT, registerType, false);

// Then:
assertThat("Expected successful resolution", result.isSuccess());
assertThat(result.getMessage(), is("type is already registered with type STRING"));
}

@Test
public void shouldDropExistingType() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,117 @@

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;

import io.confluent.ksql.execution.ddl.commands.RegisterTypeCommand;
import io.confluent.ksql.execution.expression.tree.Type;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.tree.RegisterType;
import io.confluent.ksql.schema.ksql.types.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlPrimitiveType;
import io.confluent.ksql.schema.ksql.types.SqlStruct;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class RegisterTypeFactoryTest {
private final RegisterTypeFactory factory = new RegisterTypeFactory();
private static final String EXISTING_TYPE = "existing_type";
private static final String NOT_EXISTING_TYPE = "not_existing_type";
private RegisterTypeFactory factory;

@Mock
private MetaStore metaStore;
@Mock
private SqlType customType;

@Before
public void setUp() {
when(metaStore.resolveType(EXISTING_TYPE)).thenReturn(Optional.of(customType));
factory = new RegisterTypeFactory(metaStore);
}

@Test
public void shouldCreateCommandForRegisterTypeWhenIfNotExitsSet() {
// Given:
final RegisterType ddlStatement = new RegisterType(
Optional.empty(),
NOT_EXISTING_TYPE,
new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()),
true
);

// When:
final RegisterTypeCommand result = factory.create(ddlStatement);

// Then:
assertThat(result.getType(), equalTo(ddlStatement.getType().getSqlType()));
assertThat(result.getTypeName(), equalTo(NOT_EXISTING_TYPE));
}

@Test
public void shouldCreateCommandForRegisterTypeWhenIfNotExitsNotSet() {
// Given:
final RegisterType ddlStatement = new RegisterType(
Optional.empty(),
NOT_EXISTING_TYPE,
new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()),
false
);

// When:
final RegisterTypeCommand result = factory.create(ddlStatement);

// Then:
assertThat(result.getType(), equalTo(ddlStatement.getType().getSqlType()));
assertThat(result.getTypeName(), equalTo(NOT_EXISTING_TYPE));
}

@Test
public void shouldCreateCommandForRegisterType() {
public void shouldNotThrowOnRegisterExistingTypeWhenIfNotExistsSet() {
// Given:
final RegisterType ddlStatement = new RegisterType(
Optional.empty(),
"alias",
new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build())
EXISTING_TYPE,
new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()),
true
);

// When:
final RegisterTypeCommand result = factory.create(ddlStatement);

// Then:
assertThat(result.getType(), equalTo(ddlStatement.getType().getSqlType()));
assertThat(result.getTypeName(), equalTo("alias"));
assertThat(result.getTypeName(), equalTo(EXISTING_TYPE));
}

@Test
public void shouldThrowOnRegisterExistingTypeWhenIfNotExistsNotSet() {
// Given:
final RegisterType ddlStatement = new RegisterType(
Optional.empty(),
EXISTING_TYPE,
new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()),
false
);

// When:
final Exception e = assertThrows(
KsqlException.class,
() -> factory.create(ddlStatement)
);

// Then:
assertThat(
e.getMessage(),
equalTo("Cannot register custom type '"
+ EXISTING_TYPE + "' since it is already registered with type: " + customType)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ private Stream<SourceInfo> streamSources(final Set<SourceName> sourceNames) {
}

@Override
public void registerType(final String name, final SqlType type) {
typeRegistry.registerType(name, type);
public boolean registerType(final String name, final SqlType type) {
return typeRegistry.registerType(name, type);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface TypeRegistry {
* @param name the name, must be unique
* @param type the schema to associate it with
*/
void registerType(String name, SqlType type);
boolean registerType(String name, SqlType type);

/**
* @param name the previously registered name
Expand Down Expand Up @@ -78,7 +78,9 @@ public SqlType getType() {
*/
TypeRegistry EMPTY = new TypeRegistry() {
@Override
public void registerType(final String name, final SqlType type) { }
public boolean registerType(final String name, final SqlType type) {
return false;
}

@Override
public boolean deleteType(final String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.confluent.ksql.metastore;

import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlException;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
Expand All @@ -27,14 +26,8 @@ public class TypeRegistryImpl implements TypeRegistry {
private final Map<String, SqlType> typeRegistry = new ConcurrentHashMap<>();

@Override
public void registerType(final String name, final SqlType type) {
final SqlType oldValue = typeRegistry.putIfAbsent(name.toUpperCase(), type);
if (oldValue != null) {
throw new KsqlException(
"Cannot register custom type '" + name + "' "
+ "since it is already registered with type: " + type
);
}
public boolean registerType(final String name, final SqlType type) {
return typeRegistry.putIfAbsent(name.toUpperCase(), type) == null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ statement
| DROP TABLE (IF EXISTS)? sourceName (DELETE TOPIC)? #dropTable
| DROP CONNECTOR (IF EXISTS)? identifier #dropConnector
| EXPLAIN (statement | identifier) #explain
| CREATE TYPE identifier AS type #registerType
| CREATE TYPE (IF NOT EXISTS)? identifier AS type #registerType
| DROP TYPE (IF EXISTS)? identifier #dropType
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,8 @@ public Node visitRegisterType(final RegisterTypeContext context) {
return new RegisterType(
getLocation(context),
ParserUtil.getIdentifierText(context.identifier()),
typeParser.getType(context.type())
typeParser.getType(context.type()),
context.EXISTS() != null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,9 @@ private void visitExtended() {
@Override
public Void visitRegisterType(final RegisterType node, final Integer context) {
builder.append("CREATE TYPE ");
if (node.getIfNotExists()) {
builder.append("IF NOT EXISTS ");
}
builder.append(FORMAT_OPTIONS.escape(node.getName()));
builder.append(" AS ");
builder.append(formatExpression(node.getType()));
Expand Down
Loading

0 comments on commit a0f381b

Please sign in to comment.