Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(migrations): support DEFINE and UNDEFINE statements #7366

Merged
merged 6 commits into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/operate-and-deploy/migrations-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ types of ksqlDB statements:
* `DROP TYPE`
* `SET <property>`
* `UNSET <property>`
* `DEFINE <variable>`
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
* `UNDEFINE <variable>`

Any properties or variables set using the `SET`, `UNSET`, `DEFINE` and `UNDEFINE` are applied in the
current migration file only. They do not carry over to the next migration file, even if multiple
migration files are applied as part of the same `ksql-migrations apply` command

Requirements and Installation
-----------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.text.StringSubstitutor;

public final class VariableSubstitutor {
Expand Down Expand Up @@ -57,6 +58,16 @@ public static Set<String> lookup(final String text) {
}
}

public static String substitute(
final String string,
final Map<String, String> valueMap
) {
return StringSubstitutor.replace(
string, valueMap.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey(), e -> sanitize(e.getValue())))
);
}

public static String substitute(
final KsqlParser.ParsedStatement parsedStatement,
final Map<String, String> valueMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,20 @@ public void shouldThrowOnSQLInjection() {
assertThrowOnInvalidVariables(statements, variablesMap);
}

@Test
public void shouldSubstituteVariablesInString() {
// Given
final Map<String, String> variablesMap = new ImmutableMap.Builder<String, String>() {{
put("event", "birthday");
}}.build();

// When
final String substituted = VariableSubstitutor.substitute("Happy ${event} to you!", variablesMap);

// Then
assertThat(substituted, equalTo("Happy birthday to you!"));
}

private void assertReplacedStatements(
final List<Pair<String, String>> statements,
final Map<String, String> variablesMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
import io.confluent.ksql.tools.migrations.util.CommandParser;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlCommand;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlCreateConnectorStatement;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlDefineVariableCommand;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlDropConnectorStatement;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlInsertValues;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlPropertyCommand;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlStatement;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlUndefineVariableCommand;
import io.confluent.ksql.tools.migrations.util.MetadataUtil;
import io.confluent.ksql.tools.migrations.util.MetadataUtil.MigrationState;
import io.confluent.ksql.tools.migrations.util.MigrationFile;
Expand Down Expand Up @@ -311,22 +313,79 @@ private void executeCommands(
final Clock clock,
final String previous
) {
final List<String> commands = CommandParser.splitSql(migrationFileContent);

executeCommands(
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
commands, ksqlClient, config, executionStart, migration, clock, previous, true);
executeCommands(
commands, ksqlClient, config, executionStart, migration, clock, previous, false);
}

/**
* If validateOnly is set to true, then this parses each of the commands but only executes
* DEFINE/UNDEFINE commands (variables are needed for parsing INSERT INTO... VALUES, SET/UNSET
* and DEFINE commands). If validateOnly is set to false, then each command will execute after
* parsing.
*/
private void executeCommands(
final List<String> commands,
final Client ksqlClient,
final MigrationConfig config,
final String executionStart,
final MigrationFile migration,
final Clock clock,
final String previous,
final boolean validateOnly
) {
cleanUpJavaClientVariables(ksqlClient);
final Map<String, Object> properties = new HashMap<>();
final List<SqlCommand> commands = CommandParser.parse(migrationFileContent);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we would still parse everything before starting to execute any commands, to avoid situations where a migrations file is partially executed because a later statement fails to parse. As discussed offline, we can achieve this by having a validation loop that is identical to the execution loop except only DEFINE and UNDEFINE statements are executed (i.e., sent to the java client for purposes of tracking variables, without anything actually being sent to the ksqlDB server).

for (final SqlCommand command : commands) {
for (final String command : commands) {
try {
executeCommand(command, ksqlClient, properties);
} catch (InterruptedException | ExecutionException e) {
final Map<String, String> variables = ksqlClient.getVariables().entrySet()
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
.stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().toString()));
executeCommand(
CommandParser.transformToSqlCommand(command, variables),
ksqlClient,
properties,
validateOnly
);
} catch (InterruptedException | ExecutionException | MigrationException e) {
final String action = validateOnly ? "parse" : "execute";
final String errorMsg = String.format(
"Failed to execute sql: %s. Error: %s", command.getCommand(), e.getMessage());
"Failed to %s sql: %s. Error: %s", action, command, e.getMessage());
updateState(config, ksqlClient, MigrationState.ERROR,
executionStart, migration, clock, previous, Optional.of(errorMsg));
throw new MigrationException(errorMsg);
}
}
}

private void cleanUpJavaClientVariables(final Client ksqlClient) {
ksqlClient.getVariables().forEach((k, v) -> ksqlClient.undefine(k));
}

private void executeCommand(
final SqlCommand command,
final Client ksqlClient,
final Map<String, Object> properties,
final boolean defineUndefineOnly
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
) throws ExecutionException, InterruptedException {
if (command instanceof SqlDefineVariableCommand) {
ksqlClient.define(
((SqlDefineVariableCommand) command).getVariable(),
((SqlDefineVariableCommand) command).getValue()
);
} else if (command instanceof SqlUndefineVariableCommand) {
ksqlClient.undefine(((SqlUndefineVariableCommand) command).getVariable());
} else if (!defineUndefineOnly) {
executeNonVariableCommands(command, ksqlClient, properties);
}
}

/**
* Executes everything besides define/undefine commands
*/
private void executeNonVariableCommands(
final SqlCommand command,
final Client ksqlClient,
final Map<String, Object> properties
Expand Down
Loading