Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support IF NOT EXISTS on CREATE TYPE #6173

Merged
merged 3 commits into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public CommandFactories(final ServiceContext serviceContext, final MetaStore met
this(
new CreateSourceFactory(serviceContext),
new DropSourceFactory(metaStore),
new RegisterTypeFactory(),
new RegisterTypeFactory(metaStore),
new DropTypeFactory(metaStore)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ public DdlCommandResult executeDropSource(final DropSourceCommand dropSource) {
public DdlCommandResult executeRegisterType(final RegisterTypeCommand registerType) {
final String name = registerType.getTypeName();
final SqlType type = registerType.getType();
metaStore.registerType(name, type);
return new DdlCommandResult(
true,
"Registered custom type with name '" + name + "' and SQL type " + type
);
final boolean wasRegistered = metaStore.registerType(name, type);
return wasRegistered
? new DdlCommandResult(
true,
"Registered custom type with name '" + name + "' and SQL type " + type)
: new DdlCommandResult(
true,
name + " is already registered with type " + metaStore.resolveType(name).get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,30 @@
package io.confluent.ksql.ddl.commands;

import io.confluent.ksql.execution.ddl.commands.RegisterTypeCommand;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.tree.RegisterType;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlException;

public final class RegisterTypeFactory {
RegisterTypeFactory() {
private final MetaStore metaStore;

RegisterTypeFactory(final MetaStore metaStore) {
this.metaStore = metaStore;
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
}

public RegisterTypeCommand create(final RegisterType statement) {
return new RegisterTypeCommand(
statement.getType().getSqlType(),
statement.getName()
);
final String name = statement.getName();
final boolean ifNotExists = statement.getIfNotExists();
final SqlType type = statement.getType().getSqlType();

if (!ifNotExists && metaStore.resolveType(name).isPresent()) {
throw new KsqlException(
"Cannot register custom type '" + name + "' "
+ "since it is already registered with type: " + metaStore.resolveType(name).get()
);
}

return new RegisterTypeCommand(type, name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ public AstNode visitRegisterType(final RegisterType node, final C context) {
return new RegisterType(
node.getLocation(),
node.getName(),
(Type) processExpression(node.getType(), context)
(Type) processExpression(node.getType(), context),
node.getIfNotExists()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ public void shouldCreateCommandForRegisterType() {
final RegisterType ddlStatement = new RegisterType(
Optional.empty(),
"alias",
new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build())
new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()),
true
);

// When:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.confluent.ksql.execution.ddl.commands.DropSourceCommand;
import io.confluent.ksql.execution.ddl.commands.DropTypeCommand;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.ddl.commands.RegisterTypeCommand;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.function.InternalFunctionRegistry;
Expand All @@ -22,6 +23,7 @@
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
Expand Down Expand Up @@ -65,6 +67,7 @@ public class DdlCommandExecTest {
private CreateTableCommand createTable;
private DropSourceCommand dropSource;
private DropTypeCommand dropType;
private RegisterTypeCommand registerType;

private final MutableMetaStore metaStore
= MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry());
Expand All @@ -75,6 +78,8 @@ public class DdlCommandExecTest {
private KsqlStream source;
@Mock
private WindowInfo windowInfo;
@Mock
private SqlType type;

private DdlCommandExec cmdExec;

Expand All @@ -89,6 +94,7 @@ public void setup() {

cmdExec = new DdlCommandExec(metaStore);
dropType = new DropTypeCommand("type");
registerType = new RegisterTypeCommand(type,"type");
}

@Test
Expand Down Expand Up @@ -243,6 +249,29 @@ public void shouldDropSource() {
);
}

@Test
public void shouldRegisterType() {
// When:
final DdlCommandResult result = cmdExec.execute(SQL_TEXT, registerType, false);

// Then:
assertThat("Expected successful resolution", result.isSuccess());
assertThat(result.getMessage(), is("Registered custom type with name 'type' and SQL type " + type));
}

@Test
public void shouldNotRegisterExistingType() {
// Given:
metaStore.registerType("type", SqlTypes.STRING);

// When:
final DdlCommandResult result = cmdExec.execute(SQL_TEXT, registerType, false);

// Then:
assertThat("Expected successful resolution", result.isSuccess());
assertThat(result.getMessage(), is("type is already registered with type STRING"));
}

@Test
public void shouldDropExistingType() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,99 @@

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;

import io.confluent.ksql.execution.ddl.commands.RegisterTypeCommand;
import io.confluent.ksql.execution.expression.tree.Type;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.tree.RegisterType;
import io.confluent.ksql.schema.ksql.types.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlPrimitiveType;
import io.confluent.ksql.schema.ksql.types.SqlStruct;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class RegisterTypeFactoryTest {
private final RegisterTypeFactory factory = new RegisterTypeFactory();
private static final String EXISTING_TYPE = "existing_type";
private static final String NOT_EXISTING_TYPE = "not_existing_type";
private RegisterTypeFactory factory;

@Test
@Mock
private MetaStore metaStore;
@Mock
private SqlType customType;

@Before
public void setUp() {
when(metaStore.resolveType(EXISTING_TYPE)).thenReturn(Optional.of(customType));
factory = new RegisterTypeFactory(metaStore);
}

@Test
public void shouldCreateCommandForRegisterType() {
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
// Given:
final RegisterType ddlStatement = new RegisterType(
Optional.empty(),
"alias",
new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build())
NOT_EXISTING_TYPE,
new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()),
true
);

// When:
final RegisterTypeCommand result = factory.create(ddlStatement);

// Then:
assertThat(result.getType(), equalTo(ddlStatement.getType().getSqlType()));
assertThat(result.getTypeName(), equalTo(NOT_EXISTING_TYPE));
}

@Test
public void shouldNotThrowError() {
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
// Given:
final RegisterType ddlStatement = new RegisterType(
Optional.empty(),
EXISTING_TYPE,
new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()),
true
);

// When:
final RegisterTypeCommand result = factory.create(ddlStatement);

// Then:
assertThat(result.getType(), equalTo(ddlStatement.getType().getSqlType()));
assertThat(result.getTypeName(), equalTo("alias"));
assertThat(result.getTypeName(), equalTo(EXISTING_TYPE));
}

@Test
public void shouldThrowError() {
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
// Given:
final RegisterType ddlStatement = new RegisterType(
Optional.empty(),
EXISTING_TYPE,
new Type(SqlStruct.builder().field("foo", SqlPrimitiveType.of(SqlBaseType.STRING)).build()),
false
);

// When:
final Exception e = assertThrows(
KsqlException.class,
() -> factory.create(ddlStatement)
);

// Then:
assertThat(
e.getMessage(),
equalTo("Cannot register custom type '"
+ EXISTING_TYPE + "' since it is already registered with type: " + customType)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ private Stream<SourceInfo> streamSources(final Set<SourceName> sourceNames) {
}

@Override
public void registerType(final String name, final SqlType type) {
typeRegistry.registerType(name, type);
public boolean registerType(final String name, final SqlType type) {
return typeRegistry.registerType(name, type);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface TypeRegistry {
* @param name the name, must be unique
* @param type the schema to associate it with
*/
void registerType(String name, SqlType type);
boolean registerType(String name, SqlType type);

/**
* @param name the previously registered name
Expand Down Expand Up @@ -78,7 +78,9 @@ public SqlType getType() {
*/
TypeRegistry EMPTY = new TypeRegistry() {
@Override
public void registerType(final String name, final SqlType type) { }
public boolean registerType(final String name, final SqlType type) {
return false;
}

@Override
public boolean deleteType(final String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.confluent.ksql.metastore;

import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlException;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
Expand All @@ -27,14 +26,8 @@ public class TypeRegistryImpl implements TypeRegistry {
private final Map<String, SqlType> typeRegistry = new ConcurrentHashMap<>();

@Override
public void registerType(final String name, final SqlType type) {
final SqlType oldValue = typeRegistry.putIfAbsent(name.toUpperCase(), type);
if (oldValue != null) {
throw new KsqlException(
"Cannot register custom type '" + name + "' "
+ "since it is already registered with type: " + type
);
}
public boolean registerType(final String name, final SqlType type) {
return typeRegistry.putIfAbsent(name.toUpperCase(), type) == null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ statement
| DROP TABLE (IF EXISTS)? sourceName (DELETE TOPIC)? #dropTable
| DROP CONNECTOR (IF EXISTS)? identifier #dropConnector
| EXPLAIN (statement | identifier) #explain
| CREATE TYPE identifier AS type #registerType
| CREATE TYPE (IF NOT EXISTS)? identifier AS type #registerType
| DROP TYPE (IF EXISTS)? identifier #dropType
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,8 @@ public Node visitRegisterType(final RegisterTypeContext context) {
return new RegisterType(
getLocation(context),
ParserUtil.getIdentifierText(context.identifier()),
typeParser.getType(context.type())
typeParser.getType(context.type()),
context.EXISTS() != null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,9 @@ private void visitExtended() {
@Override
public Void visitRegisterType(final RegisterType node, final Integer context) {
builder.append("CREATE TYPE ");
if (node.getIfNotExists()) {
builder.append("IF NOT EXISTS ");
}
builder.append(FORMAT_OPTIONS.escape(node.getName()));
builder.append(" AS ");
builder.append(formatExpression(node.getType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,17 @@ public class RegisterType extends Statement implements ExecutableDdlStatement {

private final Type type;
private final String name;
private final boolean ifNotExists;

public RegisterType(final Optional<NodeLocation> location, final String name, final Type type) {
public RegisterType(
final Optional<NodeLocation> location,
final String name, final Type type,
final boolean ifNotExists
) {
super(location);
this.name = Objects.requireNonNull(name, "name");
this.type = Objects.requireNonNull(type, "type");
this.ifNotExists = Objects.requireNonNull(ifNotExists, "ifNotExists");
}

public Type getType() {
Expand All @@ -39,6 +45,10 @@ public String getName() {
return name;
}

public boolean getIfNotExists() {
return ifNotExists;
}

@Override
public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
return visitor.visitRegisterType(this, context);
Expand All @@ -54,19 +64,21 @@ public boolean equals(final Object o) {
}
final RegisterType that = (RegisterType) o;
return Objects.equals(type, that.type)
&& Objects.equals(name, that.name);
&& Objects.equals(name, that.name)
&& ifNotExists == that.ifNotExists;
}

@Override
public int hashCode() {
return Objects.hash(type, name);
return Objects.hash(type, name, ifNotExists);
}

@Override
public String toString() {
return "RegisterType{"
+ "type=" + type
+ ", name='" + name + '\''
+ ", ifNotExists=" + ifNotExists
+ '}';
}
}