diff --git a/docs/operate-and-deploy/migrations-tool.md b/docs/operate-and-deploy/migrations-tool.md index 0a79c94347c8..0b5738c4147f 100644 --- a/docs/operate-and-deploy/migrations-tool.md +++ b/docs/operate-and-deploy/migrations-tool.md @@ -48,6 +48,12 @@ types of ksqlDB statements: * `DROP TYPE` * `SET ` * `UNSET ` +* `DEFINE ` +* `UNDEFINE ` + +Any properties or variables set using the `SET`, `UNSET`, `DEFINE` and `UNDEFINE` are applied in the +current migration file only. They do not carry over to the next migration file, even if multiple +migration files are applied as part of the same `ksql-migrations apply` command Requirements and Installation ----------------------------- diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/VariableSubstitutor.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/VariableSubstitutor.java index fbf9edf9826c..35e255d47c08 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/VariableSubstitutor.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/VariableSubstitutor.java @@ -29,6 +29,7 @@ import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.commons.text.StringSubstitutor; public final class VariableSubstitutor { @@ -57,6 +58,16 @@ public static Set lookup(final String text) { } } + public static String substitute( + final String string, + final Map valueMap + ) { + return StringSubstitutor.replace( + string, valueMap.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey(), e -> sanitize(e.getValue()))) + ); + } + public static String substitute( final KsqlParser.ParsedStatement parsedStatement, final Map valueMap diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/VariableSubstitutorTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/VariableSubstitutorTest.java index 25f702e03a62..53a65f5dca96 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/VariableSubstitutorTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/VariableSubstitutorTest.java @@ -240,6 +240,20 @@ public void shouldThrowOnSQLInjection() { assertThrowOnInvalidVariables(statements, variablesMap); } + @Test + public void shouldSubstituteVariablesInString() { + // Given + final Map variablesMap = new ImmutableMap.Builder() {{ + put("event", "birthday"); + }}.build(); + + // When + final String substituted = VariableSubstitutor.substitute("Happy ${event} to you!", variablesMap); + + // Then + assertThat(substituted, equalTo("Happy birthday to you!")); + } + private void assertReplacedStatements( final List> statements, final Map variablesMap diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java index 1b5db9209a7d..b90d275be40b 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java @@ -35,10 +35,12 @@ import io.confluent.ksql.tools.migrations.util.CommandParser; import io.confluent.ksql.tools.migrations.util.CommandParser.SqlCommand; import io.confluent.ksql.tools.migrations.util.CommandParser.SqlCreateConnectorStatement; +import io.confluent.ksql.tools.migrations.util.CommandParser.SqlDefineVariableCommand; import io.confluent.ksql.tools.migrations.util.CommandParser.SqlDropConnectorStatement; import io.confluent.ksql.tools.migrations.util.CommandParser.SqlInsertValues; import io.confluent.ksql.tools.migrations.util.CommandParser.SqlPropertyCommand; import io.confluent.ksql.tools.migrations.util.CommandParser.SqlStatement; +import io.confluent.ksql.tools.migrations.util.CommandParser.SqlUndefineVariableCommand; import io.confluent.ksql.tools.migrations.util.MetadataUtil; import io.confluent.ksql.tools.migrations.util.MetadataUtil.MigrationState; import io.confluent.ksql.tools.migrations.util.MigrationFile; @@ -311,14 +313,46 @@ private void executeCommands( final Clock clock, final String previous ) { + final List commands = CommandParser.splitSql(migrationFileContent); + + executeCommands( + commands, ksqlClient, config, executionStart, migration, clock, previous, true); + executeCommands( + commands, ksqlClient, config, executionStart, migration, clock, previous, false); + } + + /** + * If validateOnly is set to true, then this parses each of the commands but only executes + * DEFINE/UNDEFINE commands (variables are needed for parsing INSERT INTO... VALUES, SET/UNSET + * and DEFINE commands). If validateOnly is set to false, then each command will execute after + * parsing. + */ + private void executeCommands( + final List commands, + final Client ksqlClient, + final MigrationConfig config, + final String executionStart, + final MigrationFile migration, + final Clock clock, + final String previous, + final boolean validateOnly + ) { + cleanUpJavaClientVariables(ksqlClient); final Map properties = new HashMap<>(); - final List commands = CommandParser.parse(migrationFileContent); - for (final SqlCommand command : commands) { + for (final String command : commands) { try { - executeCommand(command, ksqlClient, properties); - } catch (InterruptedException | ExecutionException e) { + final Map variables = ksqlClient.getVariables().entrySet() + .stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().toString())); + executeCommand( + CommandParser.transformToSqlCommand(command, variables), + ksqlClient, + properties, + validateOnly + ); + } catch (InterruptedException | ExecutionException | MigrationException e) { + final String action = validateOnly ? "parse" : "execute"; final String errorMsg = String.format( - "Failed to execute sql: %s. Error: %s", command.getCommand(), e.getMessage()); + "Failed to %s sql: %s. Error: %s", action, command, e.getMessage()); updateState(config, ksqlClient, MigrationState.ERROR, executionStart, migration, clock, previous, Optional.of(errorMsg)); throw new MigrationException(errorMsg); @@ -326,7 +360,32 @@ private void executeCommands( } } + private void cleanUpJavaClientVariables(final Client ksqlClient) { + ksqlClient.getVariables().forEach((k, v) -> ksqlClient.undefine(k)); + } + private void executeCommand( + final SqlCommand command, + final Client ksqlClient, + final Map properties, + final boolean defineUndefineOnly + ) throws ExecutionException, InterruptedException { + if (command instanceof SqlDefineVariableCommand) { + ksqlClient.define( + ((SqlDefineVariableCommand) command).getVariable(), + ((SqlDefineVariableCommand) command).getValue() + ); + } else if (command instanceof SqlUndefineVariableCommand) { + ksqlClient.undefine(((SqlUndefineVariableCommand) command).getVariable()); + } else if (!defineUndefineOnly) { + executeNonVariableCommands(command, ksqlClient, properties); + } + } + + /** + * Executes everything besides define/undefine commands + */ + private void executeNonVariableCommands( final SqlCommand command, final Client ksqlClient, final Map properties diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/CommandParser.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/CommandParser.java index fa2d5f77bed6..25768469e240 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/CommandParser.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/CommandParser.java @@ -15,7 +15,6 @@ package io.confluent.ksql.tools.migrations.util; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import io.confluent.ksql.execution.expression.tree.BooleanLiteral; import io.confluent.ksql.execution.expression.tree.CreateArrayExpression; @@ -32,6 +31,7 @@ import io.confluent.ksql.parser.AstBuilder; import io.confluent.ksql.parser.DefaultKsqlParser; import io.confluent.ksql.parser.KsqlParser; +import io.confluent.ksql.parser.VariableSubstitutor; import io.confluent.ksql.parser.exception.ParseFailedException; import io.confluent.ksql.parser.tree.CreateConnector; import io.confluent.ksql.parser.tree.CreateConnector.Type; @@ -48,14 +48,22 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling public final class CommandParser { + // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling private static final String QUOTED_STRING_OR_WHITESPACE = "('([^']*|(''))*')|\\s+"; - private static final Pattern SET_PROPERTY = - Pattern.compile("\\s*SET\\s+'((?:[^']*|(?:''))*)'\\s*=\\s*'((?:[^']*|(?:''))*)'\\s*;\\s*"); - private static final Pattern UNSET_PROPERTY = - Pattern.compile("\\s*UNSET\\s+'((?:[^']*|(?:''))*)'\\s*;\\s*"); + private static final Pattern SET_PROPERTY = Pattern.compile( + "\\s*SET\\s+'((?:[^']*|(?:''))*)'\\s*=\\s*'((?:[^']*|(?:''))*)'\\s*;\\s*", + Pattern.CASE_INSENSITIVE); + private static final Pattern UNSET_PROPERTY = Pattern.compile( + "\\s*UNSET\\s+'((?:[^']*|(?:''))*)'\\s*;\\s*", Pattern.CASE_INSENSITIVE); private static final Pattern DROP_CONNECTOR = - Pattern.compile("\\s*DROP\\s+CONNECTOR\\s+(.*?)\\s*;\\s*"); + Pattern.compile("\\s*DROP\\s+CONNECTOR\\s+(.*?)\\s*;\\s*", Pattern.CASE_INSENSITIVE); + private static final Pattern DEFINE_VARIABLE = Pattern.compile( + "\\s*DEFINE\\s+([a-zA-Z_][a-zA-Z0-9_@]*)\\s*=\\s*'((?:[^']*|(?:''))*)'\\s*;\\s*", + Pattern.CASE_INSENSITIVE); + private static final Pattern UNDEFINE_VARIABLE = Pattern.compile( + "\\s*UNDEFINE\\s+([a-zA-Z_][a-zA-Z0-9_@]*)\\s*;\\s*", Pattern.CASE_INSENSITIVE); private static final KsqlParser KSQL_PARSER = new DefaultKsqlParser(); private static final String INSERT = "INSERT"; private static final String INTO = "INTO"; @@ -84,7 +92,7 @@ public final class CommandParser { private static final char SINGLE_QUOTE = '\''; private static final char SEMICOLON = ';'; private static final List UNSUPPORTED_STATEMENTS = ImmutableList.of( - DEFINE, UNDEFINE, DESCRIBE, EXPLAIN, SELECT, PRINT, SHOW, LIST + DESCRIBE, EXPLAIN, SELECT, PRINT, SHOW, LIST ); private enum StatementType { @@ -93,18 +101,14 @@ private enum StatementType { DROP_CONNECTOR, STATEMENT, SET_PROPERTY, - UNSET_PROPERTY + UNSET_PROPERTY, + DEFINE_VARIABLE, + UNDEFINE_VARIABLE } private CommandParser() { } - public static List parse(final String sql) { - return splitSql(sql).stream() - .map(CommandParser::transformToSqlCommand) - .collect(Collectors.toList()); - } - /** * Splits a string of sql commands into a list of commands and filters out the comments. * Note that escaped strings are not handled, because they end up getting split into two adjacent @@ -112,8 +116,7 @@ public static List parse(final String sql) { * * @return list of commands with comments removed */ - @VisibleForTesting - static List splitSql(final String sql) { + public static List splitSql(final String sql) { final List commands = new ArrayList<>(); StringBuffer current = new StringBuffer(); int index = 0; @@ -197,14 +200,17 @@ public static Object toFieldType(final Expression expressionValue) { /** * Determines the type of command a sql string is and returns a SqlCommand. */ - private static SqlCommand transformToSqlCommand(final String sql) { + public static SqlCommand transformToSqlCommand( + final String sql, final Map variables) { + + // Splits the sql string into tokens(uppercased keyowords, identifiers and strings) final List tokens = Arrays .stream(sql.toUpperCase().split(QUOTED_STRING_OR_WHITESPACE)) .filter(s -> !s.isEmpty()) .collect(Collectors.toList()); switch (getStatementType(tokens)) { case INSERT_VALUES: - return getInsertValuesStatement(sql); + return getInsertValuesStatement(sql, variables); case CREATE_CONNECTOR: return getCreateConnectorStatement(sql); case DROP_CONNECTOR: @@ -212,19 +218,28 @@ private static SqlCommand transformToSqlCommand(final String sql) { case STATEMENT: return new SqlStatement(sql); case SET_PROPERTY: - return getSetPropertyCommand(sql); + return getSetPropertyCommand(sql, variables); case UNSET_PROPERTY: - return getUnsetPropertyCommand(sql); + return getUnsetPropertyCommand(sql, variables); + case DEFINE_VARIABLE: + return getDefineVariableCommand(sql, variables); + case UNDEFINE_VARIABLE: + return getUndefineVariableCommand(sql); default: throw new IllegalStateException(); } } - private static SqlInsertValues getInsertValuesStatement(final String sql) { + private static SqlInsertValues getInsertValuesStatement( + final String sql, final Map variables) { final InsertValues parsedStatement; try { + final String substituted = VariableSubstitutor.substitute( + KSQL_PARSER.parse(sql).get(0), + variables + ); parsedStatement = (InsertValues) new AstBuilder(TypeRegistry.EMPTY) - .buildStatement(KSQL_PARSER.parse(sql).get(0).getStatement()); + .buildStatement(KSQL_PARSER.parse(substituted).get(0).getStatement()); } catch (ParseFailedException e) { throw new MigrationException(String.format( "Failed to parse INSERT VALUES statement. Statement: %s. Reason: %s", @@ -265,24 +280,57 @@ private static SqlDropConnectorStatement getDropConnectorStatement(final String return new SqlDropConnectorStatement(sql, dropConnectorMatcher.group(1)); } - private static SqlPropertyCommand getSetPropertyCommand(final String sql) { + private static SqlPropertyCommand getSetPropertyCommand( + final String sql, final Map variables) { final Matcher setPropertyMatcher = SET_PROPERTY.matcher(sql); if (!setPropertyMatcher.matches()) { throw new MigrationException("Invalid SET command: " + sql); } return new SqlPropertyCommand( - sql, true, setPropertyMatcher.group(1), Optional.of(setPropertyMatcher.group(2))); + sql, + true, + VariableSubstitutor.substitute(setPropertyMatcher.group(1), variables), + Optional.of(VariableSubstitutor.substitute(setPropertyMatcher.group(2), variables)) + ); } - private static SqlPropertyCommand getUnsetPropertyCommand(final String sql) { + private static SqlPropertyCommand getUnsetPropertyCommand( + final String sql, final Map variables) { final Matcher unsetPropertyMatcher = UNSET_PROPERTY.matcher(sql); if (!unsetPropertyMatcher.matches()) { throw new MigrationException("Invalid UNSET command: " + sql); } return new SqlPropertyCommand( - sql, false, unsetPropertyMatcher.group(1), Optional.empty()); + sql, + false, + VariableSubstitutor.substitute(unsetPropertyMatcher.group(1), variables), + Optional.empty()); + } + + private static SqlDefineVariableCommand getDefineVariableCommand( + final String sql, final Map variables) { + final Matcher defineVariableMatcher = DEFINE_VARIABLE.matcher(sql); + if (!defineVariableMatcher.matches()) { + throw new MigrationException("Invalid DEFINE command: " + sql); + } + return new SqlDefineVariableCommand( + sql, + defineVariableMatcher.group(1), + VariableSubstitutor.substitute(defineVariableMatcher.group(2), variables) + ); } + private static SqlUndefineVariableCommand getUndefineVariableCommand(final String sql) { + final Matcher undefineVariableMatcher = UNDEFINE_VARIABLE.matcher(sql); + if (!undefineVariableMatcher.matches()) { + throw new MigrationException("Invalid UNDEFINE command: " + sql); + } + return new SqlUndefineVariableCommand(sql, undefineVariableMatcher.group(1)); + } + + /** + * Determines if a statement is supported and the type of command it is based on a list of tokens. + */ private static StatementType getStatementType(final List tokens) { if (isInsertValuesStatement(tokens)) { return StatementType.INSERT_VALUES; @@ -290,10 +338,14 @@ private static StatementType getStatementType(final List tokens) { return StatementType.CREATE_CONNECTOR; } else if (isDropConnectorStatement(tokens)) { return StatementType.DROP_CONNECTOR; - } else if (tokens.size() > 0 && tokens.get(0).equals(SET)) { + } else if (isSetStatement(tokens)) { return StatementType.SET_PROPERTY; - } else if (tokens.size() > 0 && tokens.get(0).equals(UNSET)) { + } else if (isUnsetStatement(tokens)) { return StatementType.UNSET_PROPERTY; + } else if (isDefineStatement(tokens)) { + return StatementType.DEFINE_VARIABLE; + } else if (isUndefineStatement(tokens)) { + return StatementType.UNDEFINE_VARIABLE; } else { validateSupportedStatementType(tokens); return StatementType.STATEMENT; @@ -324,6 +376,22 @@ private static boolean isDropConnectorStatement(final List tokens) { return tokens.size() > 1 && tokens.get(0).equals(DROP) && tokens.get(1).equals(CONNECTOR); } + private static boolean isSetStatement(final List tokens) { + return tokens.size() > 0 && tokens.get(0).equals(SET); + } + + private static boolean isUnsetStatement(final List tokens) { + return tokens.size() > 0 && tokens.get(0).equals(UNSET); + } + + private static boolean isDefineStatement(final List tokens) { + return tokens.size() > 0 && tokens.get(0).equals(DEFINE); + } + + private static boolean isUndefineStatement(final List tokens) { + return tokens.size() > 0 && tokens.get(0).equals(UNDEFINE); + } + /** * Validates that the sql statement represented by the list of input tokens * (keywords separated whitespace, or strings identified by single quotes) @@ -485,4 +553,49 @@ public Optional getValue() { return value; } } + + /** + * Represents ksqlDB DEFINE commands. + */ + public static class SqlDefineVariableCommand extends SqlCommand { + private final String variable; + private final String value; + + SqlDefineVariableCommand( + final String command, + final String variable, + final String value + ) { + super(command); + this.variable = Objects.requireNonNull(variable); + this.value = Objects.requireNonNull(value); + } + + public String getVariable() { + return variable; + } + + public String getValue() { + return value; + } + } + + /** + * Represents ksqlDB UNDEFINE commands. + */ + public static class SqlUndefineVariableCommand extends SqlCommand { + private final String variable; + + SqlUndefineVariableCommand( + final String command, + final String variable + ) { + super(command); + this.variable = Objects.requireNonNull(variable); + } + + public String getVariable() { + return variable; + } + } } diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java index 5e840e63f935..a23308473039 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java @@ -228,7 +228,8 @@ private void shouldApplyMigrations() throws Exception { "CREATE SOURCE CONNECTOR C WITH ('connector.class'='org.apache.kafka.connect.tools.MockSourceConnector');\n" + "CREATE SINK CONNECTOR D WITH ('connector.class'='org.apache.kafka.connect.tools.MockSinkConnector', 'topics'='d');\n" + "CREATE TABLE blue (ID BIGINT PRIMARY KEY, A STRING) WITH (KAFKA_TOPIC='blue', PARTITIONS=1, VALUE_FORMAT='DELIMITED');" + - "DROP TABLE blue;" + "DROP TABLE blue;" + + "DEFINE onlyDefinedInFile1 = 'nope';" ); createMigrationFile( 2, @@ -237,19 +238,24 @@ private void shouldApplyMigrations() throws Exception { "CREATE OR REPLACE STREAM FOO (A STRING, B INT) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');" + "ALTER STREAM FOO ADD COLUMN C BIGINT;" + "/* add some '''data''' to FOO */" + - "INSERT INTO FOO VALUES ('HELLO', 50, -4325);" + + "DEFINE variable = '50';" + + "INSERT INTO FOO VALUES ('HELLO', ${variable}, -4325);" + "INSERT INTO FOO (A) VALUES ('GOOD''BYE');" + - "INSERT INTO FOO (A) VALUES ('mua--ha\nha');" + + "INSERT INTO FOO (A) VALUES ('${onlyDefinedInFile1}--ha\nha');" + "INSERT INTO FOO (A) VALUES ('');" + - "SET 'ksql.output.topic.name.prefix' = 'cool';" + + "DEFINE variable = 'cool';" + + "SET 'ksql.output.topic.name.prefix' = '${variable}';" + "CREATE STREAM `bar` AS SELECT CONCAT(A, 'woo''hoo') AS A FROM FOO;" + "UNSET 'ksql.output.topic.name.prefix';" + "CREATE STREAM CAR AS SELECT * FROM FOO;" + "DROP CONNECTOR D;" + "INSERT INTO `bar` SELECT A FROM CAR;" + "CREATE TYPE ADDRESS AS STRUCT;" + - "CREATE STREAM HOMES (ADDR ADDRESS) WITH (KAFKA_TOPIC='HOMES', PARTITIONS=1, VALUE_FORMAT='JSON');" + - "INSERT INTO HOMES VALUES (STRUCT(number := 123, street := 'sesame st', city := 'New York City'));" + + "DEFINE suffix = 'OMES';" + + "DEFINE variable = 'H${suffix}';" + + "CREATE STREAM ${variable} (ADDR ADDRESS) WITH (KAFKA_TOPIC='${variable}', PARTITIONS=1, VALUE_FORMAT='JSON');" + + "UNDEFINE variable;" + + "INSERT INTO HOMES VALUES (STRUCT(number := 123, street := 'sesame st', city := '${variable}'));" + "DROP TYPE ADDRESS;" ); @@ -342,7 +348,7 @@ private static void verifyMigrationsApplied() { assertThat(foo.get(2).getRow().get().getColumns().get(0), is("GOOD'BYE")); assertNull(foo.get(2).getRow().get().getColumns().get(1)); assertNull(foo.get(2).getRow().get().getColumns().get(2)); - assertThat(foo.get(3).getRow().get().getColumns().get(0), is("mua--ha\nha")); + assertThat(foo.get(3).getRow().get().getColumns().get(0), is("${onlyDefinedInFile1}--ha\nha")); assertThat(foo.get(4).getRow().get().getColumns().get(0), is("")); // verify bar @@ -351,7 +357,7 @@ private static void verifyMigrationsApplied() { hasSize(6)); // first row is a header, last row is a message saying "Limit Reached" assertThat(bar.get(1).getRow().get().getColumns().get(0), is("HELLOwoo'hoo")); assertThat(bar.get(2).getRow().get().getColumns().get(0), is("GOOD'BYEwoo'hoo")); - assertThat(bar.get(3).getRow().get().getColumns().get(0), is("mua--ha\nhawoo'hoo")); + assertThat(bar.get(3).getRow().get().getColumns().get(0), is("${onlyDefinedInFile1}--ha\nhawoo'hoo")); assertThat(bar.get(4).getRow().get().getColumns().get(0), is("woo'hoo")); verifyConnector("C", true); @@ -362,7 +368,7 @@ private static void verifyMigrationsApplied() { () -> makeKsqlQuery("SELECT * FROM HOMES EMIT CHANGES LIMIT 1;"), hasSize(3)); // first row is a header, last row is a message saying "Limit Reached" assertThat(homes.get(1).getRow().get().getColumns().size(), is(1)); - assertThat(homes.get(1).getRow().get().getColumns().get(0).toString(), is("{NUMBER=123, STREET=sesame st, CITY=New York City}")); + assertThat(homes.get(1).getRow().get().getColumns().get(0).toString(), is("{NUMBER=123, STREET=sesame st, CITY=${variable}}")); // verify type was dropped: assertTypeCount(0); diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java index 1d3efa48a05e..3e5760c62aad 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java @@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; import com.github.rvesse.airline.SingleCommand; @@ -86,8 +87,16 @@ public class ApplyMigrationCommandTest { private static final String SET_COMMANDS = COMMAND + "SET 'auto.offset.reset' = 'earliest';" + "CREATE TABLE BAR AS SELECT * FROM FOO GROUP BY A;" - + "UNSET 'auto.offset.reset';" + + "UNSET 'auto.offset.reset';" + "CREATE STREAM MOO (A STRING) WITH (KAFKA_TOPIC='MOO', PARTITIONS=1, VALUE_FORMAT='DELIMITED');"; + private static final String DEFINE_COMMANDS = COMMAND + + "DEFINE pre='a';" + + "DEFINE str='${pre}bc';" + + "SET '${str}'='yay';" + + "CREATE STREAM ${str} AS SELECT * FROM FOO;" + + "INSERT INTO FOO VALUES ('${str}');" + + "UNDEFINE str;" + + "INSERT INTO FOO VALUES ('${str}');"; @Rule public TemporaryFolder folder = new TemporaryFolder(); @@ -200,6 +209,91 @@ public void shouldApplySetUnsetCommands() throws Exception { inOrder.verifyNoMoreInteractions(); } + @Test + public void shouldApplyDefineUndefineCommands() throws Exception { + // Given: + final Map variables = ImmutableMap.of("pre", "a", "str", "abc"); + command = PARSER.parse("-n"); + createMigrationFile(1, NAME, migrationsDir, DEFINE_COMMANDS); + when(versionQueryResult.get()).thenReturn(ImmutableList.of()); + when(ksqlClient.getVariables()).thenReturn( + ImmutableMap.of(), ImmutableMap.of(), variables, variables, variables, variables, variables, + variables, variables, variables, variables, variables, variables, variables, variables, + variables, variables, ImmutableMap.of() + ); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + Instant.ofEpochMilli(1000), ZoneId.systemDefault())); + + // Then: + assertThat(result, is(0)); + final InOrder inOrder = inOrder(ksqlClient); + + verifyMigratedVersion(inOrder, 1, "", MigrationState.MIGRATED, () -> { + inOrder.verify(ksqlClient).executeStatement(COMMAND, new HashMap<>()); + inOrder.verify(ksqlClient).define("pre", "a"); + inOrder.verify(ksqlClient).define("str", "abc"); + inOrder.verify(ksqlClient).executeStatement(eq("CREATE STREAM ${str} AS SELECT * FROM FOO;"), propCaptor.capture()); + assertThat(propCaptor.getValue().size(), is(1)); + assertThat(propCaptor.getValue().get("abc"), is("yay")); + inOrder.verify(ksqlClient).insertInto("`FOO`", new KsqlObject(ImmutableMap.of("`A`", "abc"))); + inOrder.verify(ksqlClient).undefine("str"); + inOrder.verify(ksqlClient).insertInto("`FOO`", new KsqlObject(ImmutableMap.of("`A`", "${str}"))); + }); + inOrder.verify(ksqlClient).close(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void shouldResetVariablesBetweenMigrations() throws Exception { + // Given: + final Map variables = ImmutableMap.of("cat", "pat"); + command = PARSER.parse("-a"); + createMigrationFile(1, NAME, migrationsDir, "DEFINE cat='pat';"); + createMigrationFile(2, NAME, migrationsDir, "INSERT INTO FOO VALUES ('${cat}');"); + when(versionQueryResult.get()).thenReturn(ImmutableList.of()); + when(ksqlClient.getVariables()).thenReturn(ImmutableMap.of(), ImmutableMap.of(), variables, ImmutableMap.of()); + givenAppliedMigration(1, NAME, MigrationState.MIGRATED); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + Instant.ofEpochMilli(1000), ZoneId.systemDefault())); + + // Then: + assertThat(result, is(0)); + final InOrder inOrder = inOrder(ksqlClient); + inOrder.verify(ksqlClient, times(2)).getVariables(); + inOrder.verify(ksqlClient).define("cat", "pat"); + inOrder.verify(ksqlClient).getVariables(); + inOrder.verify(ksqlClient).undefine("cat"); + inOrder.verify(ksqlClient).getVariables(); + inOrder.verify(ksqlClient).insertInto("`FOO`", new KsqlObject(ImmutableMap.of("`A`", "${cat}"))); + inOrder.verify(ksqlClient).close(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void shouldResetPropertiesBetweenMigrations() throws Exception { + // Given: + command = PARSER.parse("-a"); + createMigrationFile(1, NAME, migrationsDir, "SET 'cat'='pat';"); + createMigrationFile(2, NAME, migrationsDir, COMMAND); + when(versionQueryResult.get()).thenReturn(ImmutableList.of()); + givenAppliedMigration(1, NAME, MigrationState.MIGRATED); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + Instant.ofEpochMilli(1000), ZoneId.systemDefault())); + + // Then: + assertThat(result, is(0)); + final InOrder inOrder = inOrder(ksqlClient); + inOrder.verify(ksqlClient).executeStatement(COMMAND, ImmutableMap.of()); + inOrder.verify(ksqlClient).close(); + inOrder.verifyNoMoreInteractions(); + } + @Test public void shouldApplySecondMigration() throws Exception { // Given: @@ -306,7 +400,7 @@ public void shouldNotApplyMigrationIfPreviousNotFinished() throws Exception { final InOrder inOrder = inOrder(ksqlClient); verifyMigratedVersion(inOrder, 1, "", MigrationState.MIGRATED); inOrder.verify(ksqlClient).close(); - Mockito.verify(ksqlClient, Mockito.times(1)).executeStatement(COMMAND, new HashMap<>()); + Mockito.verify(ksqlClient, times(1)).executeStatement(COMMAND, new HashMap<>()); } @Test @@ -346,9 +440,9 @@ public void shouldSkipApplyIfValidateFails() throws Exception { // Then: assertThat(result, is(1)); - Mockito.verify(ksqlClient, Mockito.times(3)).executeQuery(any()); - Mockito.verify(ksqlClient, Mockito.times(0)).executeStatement(any(), any()); - Mockito.verify(ksqlClient, Mockito.times(0)).insertInto(any(), any()); + Mockito.verify(ksqlClient, times(3)).executeQuery(any()); + Mockito.verify(ksqlClient, times(0)).executeStatement(any(), any()); + Mockito.verify(ksqlClient, times(0)).insertInto(any(), any()); } @Test @@ -388,8 +482,27 @@ public void shouldFailIfMetadataNotInitialized() throws Exception { // Then: assertThat(result, is(1)); - Mockito.verify(ksqlClient, Mockito.times(0)).executeStatement(any(), any()); - Mockito.verify(ksqlClient, Mockito.times(0)).insertInto(any(), any()); + Mockito.verify(ksqlClient, times(0)).executeStatement(any(), any()); + Mockito.verify(ksqlClient, times(0)).insertInto(any(), any()); + } + + @Test + public void shouldThrowErrorOnParsingFailure() throws Exception { + // Given: + command = PARSER.parse("-n"); + createMigrationFile(1, NAME, migrationsDir, "SHOW TABLES;"); + when(versionQueryResult.get()).thenReturn(ImmutableList.of()); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + Instant.ofEpochMilli(1000), ZoneId.systemDefault())); + + // Then: + assertThat(result, is(1)); + final InOrder inOrder = inOrder(ksqlClient); + verifyMigratedVersion( + inOrder, 1, "", MigrationState.ERROR, + Optional.of("Failed to parse sql: SHOW TABLES;. Error: 'SHOW' statements are not supported."), () -> {}); } @Test diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/CommandParserTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/CommandParserTest.java index dd186e07669a..1c5d0ab6934b 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/CommandParserTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/CommandParserTest.java @@ -24,16 +24,21 @@ import static org.junit.Assert.assertTrue; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.tools.migrations.MigrationException; import io.confluent.ksql.tools.migrations.util.CommandParser.SqlCreateConnectorStatement; +import io.confluent.ksql.tools.migrations.util.CommandParser.SqlDefineVariableCommand; import io.confluent.ksql.tools.migrations.util.CommandParser.SqlDropConnectorStatement; import io.confluent.ksql.tools.migrations.util.CommandParser.SqlInsertValues; import io.confluent.ksql.tools.migrations.util.CommandParser.SqlCommand; import io.confluent.ksql.tools.migrations.util.CommandParser.SqlPropertyCommand; import io.confluent.ksql.tools.migrations.util.CommandParser.SqlStatement; +import io.confluent.ksql.tools.migrations.util.CommandParser.SqlUndefineVariableCommand; import java.math.BigDecimal; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.junit.Test; public class CommandParserTest { @@ -41,7 +46,7 @@ public class CommandParserTest { @Test public void shouldParseInsertValuesStatement() { // When: - List commands = CommandParser.parse("INSERT INTO FOO VALUES (55);"); + List commands = parse("INSERT INTO FOO VALUES (55);"); // Then: assertThat(commands.size(), is(1)); @@ -57,7 +62,7 @@ public void shouldParseInsertValuesStatement() { @Test public void shouldParseInsertValuesStatementWithExplicitFields() { // When: - List commands = CommandParser.parse("INSERT INTO `foo` (col1, col2) VALUES (55, '40');"); + List commands = parse("INSERT INTO `foo` (col1, col2) VALUES (55, '40');"); // Then: assertThat(commands.size(), is(1)); @@ -74,7 +79,7 @@ public void shouldParseInsertValuesStatementWithExplicitFields() { @Test public void shouldParseInsertValuesStatementWithExplicitQuoting() { // When: - List commands = CommandParser.parse("INSERT INTO `foo` (`col1`) VALUES (55);"); + List commands = parse("INSERT INTO `foo` (`col1`) VALUES (55);"); // Then: assertThat(commands.size(), is(1)); @@ -87,11 +92,33 @@ public void shouldParseInsertValuesStatementWithExplicitQuoting() { assertThat(toFieldType(insertValues.getValues().get(0)), is(55)); } + @Test + public void shouldParseInsertValuesStatementWithVariables() { + // When: + List commands = parse( + "INSERT INTO ${stream} VALUES (${num});", + ImmutableMap.of( + "stream", "FOO", + "num", "55" + ) + ); + + // Then: + assertThat(commands.size(), is(1)); + assertThat(commands.get(0), instanceOf(SqlInsertValues.class)); + final SqlInsertValues insertValues = (SqlInsertValues) commands.get(0); + + assertThat(insertValues.getSourceName(), is("`FOO`")); + assertThat(insertValues.getColumns().size(), is(0)); + assertThat(insertValues.getValues().size(), is(1)); + assertThat(toFieldType(insertValues.getValues().get(0)), is(55)); + } + @Test public void shouldThrowOnInvalidInsertValues() { // When: final MigrationException e = assertThrows(MigrationException.class, - () -> CommandParser.parse("insert into foo values (this_should_not_here) ('val');")); + () -> parse("insert into foo values (this_should_not_here) ('val');")); // Then: assertThat(e.getMessage(), containsString("Failed to parse INSERT VALUES statement")); @@ -100,7 +127,7 @@ public void shouldThrowOnInvalidInsertValues() { @Test public void shouldParseInsertIntoStatement() { // When: - List commands = CommandParser.parse("INSERT INTO FOO SELECT VALUES FROM BAR;"); + List commands = parse("INSERT INTO FOO SELECT VALUES FROM BAR;"); // Then: assertThat(commands.size(), is(1)); @@ -110,24 +137,47 @@ public void shouldParseInsertIntoStatement() { @Test public void shouldParseSetUnsetStatements() { - List commands = CommandParser.parse("SET 'foo.property'='bar';UNSET 'foo.property';"); + List commands = parse("SeT 'foo.property'='bar';UnSET 'foo.property';"); assertThat(commands.size(), is(2)); assertThat(commands.get(0), instanceOf(SqlPropertyCommand.class)); - assertThat(commands.get(0).getCommand(), is("SET 'foo.property'='bar';")); + assertThat(commands.get(0).getCommand(), is("SeT 'foo.property'='bar';")); assertThat(((SqlPropertyCommand) commands.get(0)).isSetCommand(), is(true)); assertThat(((SqlPropertyCommand) commands.get(0)).getProperty(), is("foo.property")); assertThat(((SqlPropertyCommand) commands.get(0)).getValue().get(), is("bar")); assertThat(commands.get(1), instanceOf(SqlPropertyCommand.class)); - assertThat(commands.get(1).getCommand(), is("UNSET 'foo.property';")); + assertThat(commands.get(1).getCommand(), is("UnSET 'foo.property';")); assertThat(((SqlPropertyCommand) commands.get(1)).isSetCommand(), is(false)); assertThat(((SqlPropertyCommand) commands.get(1)).getProperty(), is("foo.property")); assertTrue(!((SqlPropertyCommand) commands.get(1)).getValue().isPresent()); } + @Test + public void shouldParseSetStatementsWithVariables() { + List commands = parse("SET '${name}'='${value}';", + ImmutableMap.of("name", "foo.property", "value", "bar")); + assertThat(commands.size(), is(1)); + assertThat(commands.get(0), instanceOf(SqlPropertyCommand.class)); + assertThat(commands.get(0).getCommand(), is("SET '${name}'='${value}';")); + assertThat(((SqlPropertyCommand) commands.get(0)).isSetCommand(), is(true)); + assertThat(((SqlPropertyCommand) commands.get(0)).getProperty(), is("foo.property")); + assertThat(((SqlPropertyCommand) commands.get(0)).getValue().get(), is("bar")); + } + + @Test + public void shouldParseUnsetStatementsWithVariables() { + List commands = parse("UnSeT '${name}';", + ImmutableMap.of("name", "foo.property")); + assertThat(commands.size(), is(1)); + assertThat(commands.get(0), instanceOf(SqlPropertyCommand.class)); + assertThat(((SqlPropertyCommand) commands.get(0)).isSetCommand(), is(false)); + assertThat(((SqlPropertyCommand) commands.get(0)).getProperty(), is("foo.property")); + assertTrue(!((SqlPropertyCommand) commands.get(0)).getValue().isPresent()); + } + @Test public void shouldParseMultipleStatements() { // When: - List commands = CommandParser.parse("INSERT INTO foo VALUES (32);INSERT INTO FOO_2 VALUES ('wow',3,'hello ''world''!');"); + List commands = parse("INSERT INTO foo VALUES (32);INSERT INTO FOO_2 VALUES ('wow',3,'hello ''world''!');"); // Then: assertThat(commands.size(), is(2)); @@ -146,7 +196,7 @@ public void shouldParseMultipleStatements() { @Test public void shouldParseCreateStatement() { // When: - List commands = CommandParser.parse("CREATE STREAM FOO (A STRING) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='DELIMITED');"); + List commands = parse("CREATE STREAM FOO (A STRING) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='DELIMITED');"); // Then: assertThat(commands.size(), is(1)); @@ -157,7 +207,7 @@ public void shouldParseCreateStatement() { @Test public void shouldParseCreateAsStatement() { // When: - List commands = CommandParser.parse("CREATE STREAM FOO AS SELECT col1, col2 + 2 FROM BAR;"); + List commands = parse("CREATE STREAM FOO AS SELECT col1, col2 + 2 FROM BAR;"); // Then: assertThat(commands.size(), is(1)); @@ -168,7 +218,7 @@ public void shouldParseCreateAsStatement() { @Test public void shouldParseCreateOrReplaceStatement() { // When: - List commands = CommandParser.parse("create or replace stream FOO (A STRING) WITH (KAFKA_TOPIC='FOO', VALUE_FORMAT='DELIMITED');"); + List commands = parse("create or replace stream FOO (A STRING) WITH (KAFKA_TOPIC='FOO', VALUE_FORMAT='DELIMITED');"); // Then: assertThat(commands.size(), is(1)); @@ -179,7 +229,7 @@ public void shouldParseCreateOrReplaceStatement() { @Test public void shouldParseTerminateStatement() { // When: - List commands = CommandParser.parse("terminate some_query_id;"); + List commands = parse("terminate some_query_id;"); // Then: assertThat(commands.size(), is(1)); @@ -190,7 +240,7 @@ public void shouldParseTerminateStatement() { @Test public void shouldParseDropSourceStatement() { // When: - List commands = CommandParser.parse("drop stream foo;"); + List commands = parse("drop stream foo;"); // Then: assertThat(commands.size(), is(1)); @@ -201,7 +251,7 @@ public void shouldParseDropSourceStatement() { @Test public void shouldParseAlterSourceStatement() { // When: - List commands = CommandParser.parse("alter stream foo add column new_col string;"); + List commands = parse("alter stream foo add column new_col string;"); // Then: assertThat(commands.size(), is(1)); @@ -212,7 +262,7 @@ public void shouldParseAlterSourceStatement() { @Test public void shouldParseCreateTypeStatement() { // When: - List commands = CommandParser.parse("create type address as struct;"); + List commands = parse("create type address as struct;"); // Then: assertThat(commands.size(), is(1)); @@ -223,7 +273,7 @@ public void shouldParseCreateTypeStatement() { @Test public void shouldParseDropTypeStatement() { // When: - List commands = CommandParser.parse("drop type address;"); + List commands = parse("drop type address;"); // Then: assertThat(commands.size(), is(1)); @@ -234,7 +284,7 @@ public void shouldParseDropTypeStatement() { @Test public void shouldParseSeveralCommands() { // When: - List commands = CommandParser.parse("CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) WITH (kafka_topic='locations', value_format='json', partitions=1);\n" + List commands = parse("CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) WITH (kafka_topic='locations', value_format='json', partitions=1);\n" + "INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);\n" + "INSERT INTO `riderLocations` (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);\n" + "INSERT INTO \"riderLocations\" (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);\n" @@ -276,7 +326,7 @@ public void shouldParseCreateConnectorStatement() { + " \"key\"='username');"; // When: - List commands = CommandParser.parse(createConnector); + List commands = parse(createConnector); // Then: assertThat(commands.size(), is(1)); @@ -296,10 +346,10 @@ public void shouldParseCreateConnectorStatement() { @Test public void shouldParseDropConnectorStatement() { // Given: - final String dropConnector = "DROP CONNECTOR `jdbc-connector` ;"; // The space at the end is to make sure that the regex doesn't capture it as a part of the name + final String dropConnector = "DRoP CONNEcTOR `jdbc-connector` ;"; // The space at the end is to make sure that the regex doesn't capture it as a part of the name // When: - List commands = CommandParser.parse(dropConnector); + List commands = parse(dropConnector); // Then: assertThat(commands.size(), is(1)); @@ -337,60 +387,86 @@ public void shouldSplitCommandsWithComments() { } @Test - public void shouldThrowOnMalformedComment() { + public void shouldParseDefineStatement() { + // Given: + final String defineVar = "DEFINE var = 'foo';"; + // When: - final MigrationException e = assertThrows(MigrationException.class, - () -> CommandParser.splitSql("/* Comment ")); + List commands = parse(defineVar); // Then: - assertThat(e.getMessage(), is("Invalid sql - failed to find closing token '*/'")); + assertThat(commands.size(), is(1)); + assertThat(commands.get(0).getCommand(), is(defineVar)); + assertThat(commands.get(0), instanceOf(SqlDefineVariableCommand.class)); + assertThat(((SqlDefineVariableCommand) commands.get(0)).getVariable(), is("var")); + assertThat(((SqlDefineVariableCommand) commands.get(0)).getValue(), is("foo")); } @Test - public void shouldThrowOnMalformedQuote() { + public void shouldDefineStatementWithVariable() { + // Given: + final String defineVar = "DEFiNe word = 'walk${suffix}';"; + // When: - final MigrationException e = assertThrows(MigrationException.class, - () -> CommandParser.splitSql("select 'unclosed quote;")); + List commands = parse(defineVar, ImmutableMap.of("suffix", "ing")); // Then: - assertThat(e.getMessage(), is("Invalid sql - failed to find closing token '''")); + assertThat(commands.size(), is(1)); + assertThat(commands.get(0), instanceOf(SqlDefineVariableCommand.class)); + assertThat(((SqlDefineVariableCommand) commands.get(0)).getVariable(), is("word")); + assertThat(((SqlDefineVariableCommand) commands.get(0)).getValue(), is("walking")); } @Test - public void shouldThrowOnMissingSemicolon() { + public void shouldParseUndefineStatement() { + // Given: + final String undefineVar = "UNDEFINE var;"; + + // When: + List commands = parse(undefineVar); + + // Then: + assertThat(commands.size(), is(1)); + assertThat(commands.get(0).getCommand(), is(undefineVar)); + assertThat(commands.get(0), instanceOf(SqlUndefineVariableCommand.class)); + assertThat(((SqlUndefineVariableCommand) commands.get(0)).getVariable(), is("var")); + } + + @Test + public void shouldThrowOnMalformedComment() { // When: final MigrationException e = assertThrows(MigrationException.class, - () -> CommandParser.parse("create stream foo as select * from no_semicolon_after_this")); + () -> CommandParser.splitSql("/* Comment ")); // Then: - assertThat(e.getMessage(), containsString("Unmatched command at end of file; missing semicolon")); + assertThat(e.getMessage(), is("Invalid sql - failed to find closing token '*/'")); } @Test - public void shouldThrowOnDefineStatement() { + public void shouldThrowOnMalformedQuote() { // When: final MigrationException e = assertThrows(MigrationException.class, - () -> CommandParser.parse("define var = 'value';")); + () -> CommandParser.splitSql("select 'unclosed quote;")); // Then: - assertThat(e.getMessage(), is("'DEFINE' statements are not supported.")); + assertThat(e.getMessage(), is("Invalid sql - failed to find closing token '''")); } @Test - public void shouldThrowOnUndefineStatement() { + public void shouldThrowOnMissingSemicolon() { // When: final MigrationException e = assertThrows(MigrationException.class, - () -> CommandParser.parse("undefine var;")); + () -> parse("create stream foo as select * from no_semicolon_after_this")); // Then: - assertThat(e.getMessage(), is("'UNDEFINE' statements are not supported.")); + assertThat(e.getMessage(), containsString("Unmatched command at end of file; missing semicolon")); } @Test public void shouldThrowOnDescribeStatement() { // When: final MigrationException e = assertThrows(MigrationException.class, - () -> CommandParser.parse("describe my_stream;")); + () -> parse("describe my_stream;")); // Then: assertThat(e.getMessage(), is("'DESCRIBE' statements are not supported.")); @@ -400,7 +476,7 @@ public void shouldThrowOnDescribeStatement() { public void shouldThrowOnExplainStatement() { // When: final MigrationException e = assertThrows(MigrationException.class, - () -> CommandParser.parse("explain my_query_id;")); + () -> parse("explain my_query_id;")); // Then: assertThat(e.getMessage(), is("'EXPLAIN' statements are not supported.")); @@ -410,7 +486,7 @@ public void shouldThrowOnExplainStatement() { public void shouldThrowOnSelectStatement() { // When: final MigrationException e = assertThrows(MigrationException.class, - () -> CommandParser.parse("select * from my_stream emit changes;")); + () -> parse("select * from my_stream emit changes;")); // Then: assertThat(e.getMessage(), is("'SELECT' statements are not supported.")); @@ -420,7 +496,7 @@ public void shouldThrowOnSelectStatement() { public void shouldThrowOnPrintStatement() { // When: final MigrationException e = assertThrows(MigrationException.class, - () -> CommandParser.parse("print 'my_topic';")); + () -> parse("print 'my_topic';")); // Then: assertThat(e.getMessage(), is("'PRINT' statements are not supported.")); @@ -430,7 +506,7 @@ public void shouldThrowOnPrintStatement() { public void shouldThrowOnShowStatement() { // When: final MigrationException e = assertThrows(MigrationException.class, - () -> CommandParser.parse("show connectors;")); + () -> parse("show connectors;")); // Then: assertThat(e.getMessage(), is("'SHOW' statements are not supported.")); @@ -440,7 +516,7 @@ public void shouldThrowOnShowStatement() { public void shouldThrowOnListStatement() { // When: final MigrationException e = assertThrows(MigrationException.class, - () -> CommandParser.parse("list queries;")); + () -> parse("list queries;")); // Then: assertThat(e.getMessage(), is("'LIST' statements are not supported.")); @@ -450,9 +526,19 @@ public void shouldThrowOnListStatement() { public void shouldThrowOnRunScriptStatement() { // When: final MigrationException e = assertThrows(MigrationException.class, - () -> CommandParser.parse("RUN SCRIPT 'my_script.sql';")); + () -> parse("RUN SCRIPT 'my_script.sql';")); // Then: assertThat(e.getMessage(), is("'RUN SCRIPT' statements are not supported.")); } + + private List parse(final String sql, Map variables) { + return CommandParser.splitSql(sql).stream() + .map(command -> CommandParser.transformToSqlCommand(command, variables)) + .collect(Collectors.toList()); + } + + private List parse(final String sql) { + return parse(sql, ImmutableMap.of()); + } }