-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed the regression bug for CSAS/CTAS result type check. #419
Fixed the regression bug for CSAS/CTAS result type check. #419
Conversation
@@ -205,7 +205,7 @@ private KsqlEntity executeStatement( | |||
|| statement instanceof DropStream | |||
|| statement instanceof DropTable | |||
) { | |||
// getStatementExecutionPlan(statement, statementText, streamsProperties); | |||
getStatementExecutionPlan(statement, statementText, streamsProperties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the regression was that a statement was commented out? What was the impact exactly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this line evaluates the query by generating the execution plan without executing it. This will detect if the CREATE TABLE AS SELECT and CREATE STREAM AS SELECT were correct or not. Without this check the wrong queries will be sent for execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hjafarpour it doesn't use the return type, though? So are you saying that it is validating that they syntax is correct? If so, then maybe add a comment as such or have a method named validateStatementExecutionPlan
- as it stands it looks like this method isn't doing anything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dguy good point. I added comment to clarify the method call.
retest this please |
@@ -205,7 +205,8 @@ private KsqlEntity executeStatement( | |||
|| statement instanceof DropStream | |||
|| statement instanceof DropTable | |||
) { | |||
// getStatementExecutionPlan(statement, statementText, streamsProperties); | |||
//Sanity check for the statement before distributing it. | |||
getStatementExecutionPlan(statement, statementText, streamsProperties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be named validateExecutionPlan
so its intended use is clear. The comment only helps people reading code in this context, but this method could be used in other contexts.
If this method is doing two things (ie. generating and validating the plan), then it would be better to split it up into two separate methods, each performing a particular function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree w @apurvam - it should be renamed. Looking the code it doesn't make - sense. It looks like it is executing various ddlcommands inside getStatementExecutionPlan() (i.e. CreateTableAsSelect etc) - and also publishes them onto the command topic to allow all nodes to execute the same command again "distributeStatement()". Shouldnt we have a validate() - that doesn't do any execution and then call distribute()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I added a new method and updated the PR.
*/ | ||
private void validateStatement(Statement statement, String statementText, | ||
Map<String, Object> streamsProperties) throws Exception { | ||
getStatementExecutionPlan(statement, statementText, streamsProperties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still confused about this. The getStatementExecutionPlan
seems to just generate the full plan and returns an ExecutionPlan
. If it fails, it throws an exception. What we are doing here is just running the whole process and then saying it's valid if no exception is thrown. I presume that the execution plan would have to be eventually generated again. This seems wasteful.
Why can't we retain the returned execution plan and use it directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we will generate the execution plan again but not at the same place but in the engine in every instance of the server. This validation is just executed in the server that has received the query from user. Also this validation does not alter the metastore while the other ones that we perform in the engines does.
assertTrue("Incorrect response size.", result1.size() == 1); | ||
assertThat(result1.get(0), instanceOf(ErrorMessageEntity.class)); | ||
ErrorMessageEntity errorMessageEntity1 = (ErrorMessageEntity) result1.get(0); | ||
assertTrue(errorMessageEntity1.getErrorMessage().getMessage().equalsIgnoreCase("Invalid " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the exception thrown by validateExecutionPlan
eventually boil down to this error message? If so, would it make more sense to test for validity at a more fine grained level, where we can check the typed exception directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, do we have tests for the positive code path, where there are no errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but the 'getStatementExecutionPlan' method is private and because of that I have the test in the caller.
The positive code path test needs to be added too as you pointed out but it will include much more that the fix for this regression bug and can be part of another PR that improves the test coverage for this KsqlResource.
…ype-Enforcement-Regression-Fix
@@ -328,6 +342,8 @@ private ExecutionPlan getStatementExecutionPlan(Statement statement, String stat | |||
if (ddlCommandTask != null) { | |||
try { | |||
return new ExecutionPlan(ddlCommandTask.execute(statement, statementText, properties)); | |||
} catch (KsqlException ksqlException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change this method so that it throw KsqlException
rather than Exception
- generally we shouldn't have methods with the signature blah() throws Exception
unless we have no control over what is being thrown. Even then, we should probably catch and wrap such cases so we have the context of what has gone wrong etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, updated it for this PR. When we did this we were still finalizing the KsqlException class. Updated it. Will have to look into other places to use KsqlException as much as possible in other PRs.
assertTrue("Incorrect response size.", result1.size() == 1); | ||
assertThat(result1.get(0), instanceOf(ErrorMessageEntity.class)); | ||
ErrorMessageEntity errorMessageEntity1 = (ErrorMessageEntity) result1.get(0); | ||
assertTrue(errorMessageEntity1.getErrorMessage().getMessage().equalsIgnoreCase("Invalid " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertThat(errorMessage...toLowerCase(), equalTo("whatever is expected"))
Elsewhere to. assertTrue
should only be used to check actual boolean conditions. For everything else we should be using assertThat
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the assertion checks.
@dguy made your suggested changes. |
retest this please |
try { | ||
commandProducer.send(new ProducerRecord<>(commandTopic, commandId, command)).get(); | ||
} catch (Exception e) { | ||
throw new KsqlException("Could not write the statement into the command topic.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add the statementString in the exception method so we have a bit more context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, added the statement string to the message.
"Unable to execute statement '%s'", | ||
statementText | ||
)); | ||
} else { | ||
throw new Exception("Unable to execute statement"); | ||
throw new KsqlException("Unable to execute statement"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add the statementText into the exception so we know what weren't able to execute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -239,6 +259,8 @@ private CommandStatusEntity distributeStatement( | |||
log.warn("Timeout to get commandStatus, waited {} milliseconds:, statementText:" + statementText, | |||
distributedCommandResponseTimeout, exception); | |||
commandStatus = statementExecutor.getStatus(commandId).get(); | |||
} catch (Exception e) { | |||
throw new KsqlException("Could not write the statement into the command topic.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
assertTrue("Incorrect response size.", result1.size() == 1); | ||
assertThat(result1.get(0), instanceOf(ErrorMessageEntity.class)); | ||
ErrorMessageEntity errorMessageEntity1 = (ErrorMessageEntity) result1.get(0); | ||
assertTrue(errorMessageEntity1.getErrorMessage().getMessage().equalsIgnoreCase("Invalid " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As i said above, we should use assertThat
here too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
KsqlResource testResource = TestKsqlResourceUtil.get(); | ||
String ksqlString1 = "CREATE STREAM s1 AS SELECT * FROM test_table;"; | ||
|
||
Response response1 = testResource.handleKsqlStatements(new KsqlRequest(ksqlString1, Collections |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and elsewhere in this class, we can use Collections.emptyMap()
instead and then we will not have the unchecked warnings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@dguy added your suggestions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @hjafarpour, LGTM
No description provided.