-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(migrations): support DEFINE and UNDEFINE statements #7366
Conversation
"CREATE STREAM HOMES (ADDR ADDRESS) WITH (KAFKA_TOPIC='HOMES', PARTITIONS=1, VALUE_FORMAT='JSON');" + | ||
"INSERT INTO HOMES VALUES (STRUCT(number := 123, street := 'sesame st', city := 'New York City'));" + | ||
"DEFINE variable = 'HOMES';" + | ||
"CREATE STREAM ${variable} (ADDR ADDRESS) WITH (KAFKA_TOPIC='${variable}', PARTITIONS=1, VALUE_FORMAT='JSON');" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line passing depends on #7365
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jzaralim -- this is exciting! Comments inline. Also I updated the PR title to reflect that this PR adds support for DEFINE and UNDEFINE statements in the migrations tool, but doesn't add the full variable substitution support (i.e., the --define
option) specified in the KLIP.
@@ -312,13 +314,17 @@ private void executeCommands( | |||
final String previous | |||
) { | |||
final Map<String, Object> properties = new HashMap<>(); | |||
final List<SqlCommand> commands = CommandParser.parse(migrationFileContent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we would still parse everything before starting to execute any commands, to avoid situations where a migrations file is partially executed because a later statement fails to parse. As discussed offline, we can achieve this by having a validation loop that is identical to the execution loop except only DEFINE and UNDEFINE statements are executed (i.e., sent to the java client for purposes of tracking variables, without anything actually being sent to the ksqlDB server).
...b-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java
Show resolved
Hide resolved
@@ -56,6 +57,10 @@ | |||
Pattern.compile("\\s*UNSET\\s+'((?:[^']*|(?:''))*)'\\s*;\\s*"); | |||
private static final Pattern DROP_CONNECTOR = | |||
Pattern.compile("\\s*DROP\\s+CONNECTOR\\s+(.*?)\\s*;\\s*"); | |||
private static final Pattern DEFINE_VARIABLE = Pattern.compile( | |||
"\\s*DEFINE\\s+([a-zA-Z_][a-zA-Z0-9_@]*)\\s*=\\s*'((?:[^']*|(?:''))*)'\\s*;\\s*"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just checking: DEFINE statements require that the values be strings, even though the java client and HTTP endpoints allow non-string values, right? It looks like the values are converted into strings by SessionProperties
anyway. Is there a reason we didn't have the HTTP endpoints require string values to keep things simpler/more consistent? It's worth considering whether the java client should only accept string values as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It follows the same pattern as SET
, which also requires strings in the statement but can be passed as object parameters. I can see two reasons this could be convenient:
- Users using the endpoints don't need to treat
sessionProperties
andsessionVariables
differently - Client users can make requests like
client.define("name", 5);
orclient.define("name", false);
without having to convert them to strings.
These are pretty minor reasons though. Swapping everything to strings is a pretty easy change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. Let's leave this as is!
private static final Pattern DEFINE_VARIABLE = Pattern.compile( | ||
"\\s*DEFINE\\s+([a-zA-Z_][a-zA-Z0-9_@]*)\\s*=\\s*'((?:[^']*|(?:''))*)'\\s*;\\s*"); | ||
private static final Pattern UNDEFINE_VARIABLE = | ||
Pattern.compile("\\s*UNDEFINE\\s+([a-zA-Z_][a-zA-Z0-9_@]*)\\s*;\\s*"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where did this set of valid characters in the variable name (i.e., [a-zA-Z_][a-zA-Z0-9_@]*
) come from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable name follows the pattern for identifiers in the parser:
: (LETTER | '_') (LETTER | DIGIT | '_' | '@' )* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. It's not great that we've got it in two places now (the parser and also the migrations tool) but there are disadvantages to having them be shared as well. How about we add a comment to this line in the migrations tool code to reference the SqlBase.g4 line (with a git permalink) so if the parser is changed in the future and something breaks, we'll at least know where to look to update this.
return getUnsetPropertyCommand(sql); | ||
return getUnsetPropertyCommand(sql, variables); | ||
case DEFINE_VARIABLE: | ||
return getDefineVariableCommand(sql); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also needs to support variable substitution, right? In case users want to define a variable based on other variables.
I'm not sure whether it makes sense to support variable substitution for the variable name in addition to for the variable value. It'd be good to check what the ksqlDB CLI's stance on this is, and add support if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Only the variable values can be replaced.
} | ||
|
||
private static SqlDefineVariableCommand getDefineVariableCommand(final String sql) { | ||
final Matcher defineVariableMatcher = DEFINE_VARIABLE.matcher(sql); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't tried this so I might be mistaken but I think we've got a bug around casing here: transformToSqlCommand()
uppercases the sql before applying the regex matchers but we don't do that here (we can't because we don't want to strings or quoted identifiers), which means this matcher might fail to find a match (e.g., if the user lowercases DEFINE
in their sql command). I think this bug also affects the other statement types.
If this is true, we definitely need to fix this, and we should add test coverage for it too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Do you know whether this is a problem for 0.17 (e.g., for INSERT VALUES)? If so, we should backport a fix to 6.2.x (requires going through the CP blocker process). We should also consider getting a fix into 0.17 but I think that's unlikely at this point, in which case we'd want to document the limitation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It affects SET and UNSET. Filed a blocker, but IMO because there is a clear workaround (tell users to capitalize SET and UNSET), it's not terrible if it doesn't get included.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed it's not terrible if it's not included but I think it's still better to fix it before the 6.2 release. 0.17 probably isn't worth it, though.
ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/CommandParser.java
Outdated
Show resolved
Hide resolved
ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/CommandParserTest.java
Outdated
Show resolved
Hide resolved
ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/CommandParserTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jzaralim -- looking good! A couple important questions but nothing major in terms of changes. Looking forward to getting this merged :)
...b-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java
Outdated
Show resolved
Hide resolved
...b-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java
Show resolved
Hide resolved
...b-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java
Show resolved
Hide resolved
...b-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private static SqlDefineVariableCommand getDefineVariableCommand(final String sql) { | ||
final Matcher defineVariableMatcher = DEFINE_VARIABLE.matcher(sql); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Do you know whether this is a problem for 0.17 (e.g., for INSERT VALUES)? If so, we should backport a fix to 6.2.x (requires going through the CP blocker process). We should also consider getting a fix into 0.17 but I think that's unlikely at this point, in which case we'd want to document the limitation.
"\\s*DEFINE\\s+([a-zA-Z_][a-zA-Z0-9_@]*)\\s*=\\s*'((?:[^']*|(?:''))*)'\\s*;\\s*", | ||
Pattern.CASE_INSENSITIVE); | ||
private static final Pattern UNDEFINE_VARIABLE = Pattern.compile( | ||
"\\s*UNDEFINE\\s+([a-zA-Z_][a-zA-Z0-9_@]*)\\s*;\\s*", Pattern.CASE_INSENSITIVE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need the toUpperCase()
call in transformToSqlCommand()
now that we have case-insensitivity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, it's used to determine the type of command
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But why do we need to uppercase the command if the regex is case-insensitive now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The regexes are used to determine things like the name of a connector being dropped. There is some logic before that for determining the type of command or if the command is supported, which works by finding specific keywords in the sql string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, got it. Would it be equivalent if we removed the toUpperCase()
and instead replaced the .equals()
for token comparison with .equalsIgnoreCase()
instead? It's mildly less efficient but I think it improves readability by not separating the logic for case-insensitive comparison across two different places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some places where it uses contains
instead of equals
, for example tokens.contains(VALUES)
, so removing the toUpperCase
would actually make it more confusing. I've added some comments to clarify what tokens
is supposed to be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Thanks!
...ols/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks @jzaralim .
...b-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java
Outdated
Show resolved
Hide resolved
"\\s*DEFINE\\s+([a-zA-Z_][a-zA-Z0-9_@]*)\\s*=\\s*'((?:[^']*|(?:''))*)'\\s*;\\s*", | ||
Pattern.CASE_INSENSITIVE); | ||
private static final Pattern UNDEFINE_VARIABLE = Pattern.compile( | ||
"\\s*UNDEFINE\\s+([a-zA-Z_][a-zA-Z0-9_@]*)\\s*;\\s*", Pattern.CASE_INSENSITIVE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, got it. Would it be equivalent if we removed the toUpperCase()
and instead replaced the .equals()
for token comparison with .equalsIgnoreCase()
instead? It's mildly less efficient but I think it improves readability by not separating the logic for case-insensitive comparison across two different places.
Description
^^^
Previously, the tool parsed each command, collected them and then executed them. Because the variables are tracked in the java client, but SET/UNSET/INSERT VALUES don't do variable substitution through the client, the flow was refactored to parse commands and then execute them one by one.
Testing done
unit + integration
Reviewer checklist