-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed the regression bug for CSAS/CTAS result type check. #419
Changes from 1 commit
5305c4e
df94028
35e8660
8cd30d0
65783e0
205a09f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -205,7 +205,7 @@ private KsqlEntity executeStatement( | |
|| statement instanceof DropStream | ||
|| statement instanceof DropTable | ||
) { | ||
// getStatementExecutionPlan(statement, statementText, streamsProperties); | ||
getStatementExecutionPlan(statement, statementText, streamsProperties); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be named If this method is doing two things (ie. generating and validating the plan), then it would be better to split it up into two separate methods, each performing a particular function. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree w @apurvam - it should be renamed. Looking the code it doesn't make - sense. It looks like it is executing various ddlcommands inside getStatementExecutionPlan() (i.e. CreateTableAsSelect etc) - and also publishes them onto the command topic to allow all nodes to execute the same command again "distributeStatement()". Shouldnt we have a validate() - that doesn't do any execution and then call distribute() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I added a new method and updated the PR. |
||
return distributeStatement(statementText, statement, streamsProperties); | ||
} else { | ||
if (statement != null) { | ||
|
@@ -328,6 +328,8 @@ private ExecutionPlan getStatementExecutionPlan(Statement statement, String stat | |
if (ddlCommandTask != null) { | ||
try { | ||
return new ExecutionPlan(ddlCommandTask.execute(statement, statementText, properties)); | ||
} catch (KsqlException ksqlException) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we change this method so that it throw There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, updated it for this PR. When we did this we were still finalizing the KsqlException class. Updated it. Will have to look into other places to use KsqlException as much as possible in other PRs. |
||
throw ksqlException; | ||
} catch (Throwable t) { | ||
throw new KsqlException("Cannot RUN execution plan for this statement, " + statement, t); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,8 +25,28 @@ | |
import io.confluent.ksql.metastore.KsqlTable; | ||
import io.confluent.ksql.metastore.KsqlTopic; | ||
import io.confluent.ksql.metastore.MetaStore; | ||
import io.confluent.ksql.parser.tree.*; | ||
import io.confluent.ksql.rest.entity.*; | ||
import io.confluent.ksql.parser.tree.Expression; | ||
import io.confluent.ksql.parser.tree.ListQueries; | ||
import io.confluent.ksql.parser.tree.ListRegisteredTopics; | ||
import io.confluent.ksql.parser.tree.ListStreams; | ||
import io.confluent.ksql.parser.tree.ListTables; | ||
import io.confluent.ksql.parser.tree.QualifiedName; | ||
import io.confluent.ksql.parser.tree.RegisterTopic; | ||
import io.confluent.ksql.parser.tree.ShowColumns; | ||
import io.confluent.ksql.parser.tree.Statement; | ||
import io.confluent.ksql.parser.tree.StringLiteral; | ||
import io.confluent.ksql.rest.entity.CommandStatus; | ||
import io.confluent.ksql.rest.entity.CommandStatusEntity; | ||
import io.confluent.ksql.rest.entity.ErrorMessageEntity; | ||
import io.confluent.ksql.rest.entity.KsqlEntity; | ||
import io.confluent.ksql.rest.entity.KsqlEntityList; | ||
import io.confluent.ksql.rest.entity.KsqlRequest; | ||
import io.confluent.ksql.rest.entity.KsqlTopicInfo; | ||
import io.confluent.ksql.rest.entity.KsqlTopicsList; | ||
import io.confluent.ksql.rest.entity.Queries; | ||
import io.confluent.ksql.rest.entity.SourceDescription; | ||
import io.confluent.ksql.rest.entity.StreamsList; | ||
import io.confluent.ksql.rest.entity.TablesList; | ||
import io.confluent.ksql.rest.server.mock.MockKafkaTopicClient; | ||
import io.confluent.ksql.rest.server.KsqlRestConfig; | ||
import io.confluent.ksql.rest.server.StatementParser; | ||
|
@@ -45,10 +65,18 @@ | |
import org.apache.kafka.connect.data.SchemaBuilder; | ||
import org.junit.Test; | ||
|
||
import java.util.*; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Properties; | ||
import java.util.concurrent.Future; | ||
import java.util.stream.Collectors; | ||
|
||
import javax.ws.rs.core.Response; | ||
|
||
import static org.hamcrest.CoreMatchers.instanceOf; | ||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertThat; | ||
|
@@ -363,4 +391,46 @@ public void testListTablesStatement() throws Exception { | |
|
||
assertEquals(expectedTable, testTables.get(0)); | ||
} | ||
|
||
@Test | ||
public void shouldFailForIncorrectCSASStatementResultType() throws Exception { | ||
KsqlResource testResource = TestKsqlResourceUtil.get(); | ||
String ksqlString1 = "CREATE STREAM s1 AS SELECT * FROM test_table;"; | ||
|
||
Response response1 = testResource.handleKsqlStatements(new KsqlRequest(ksqlString1, Collections | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here and elsewhere in this class, we can use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
.EMPTY_MAP)); | ||
KsqlEntityList result1 = (KsqlEntityList) response1.getEntity(); | ||
assertTrue("Incorrect response size.", result1.size() == 1); | ||
assertThat(result1.get(0), instanceOf(ErrorMessageEntity.class)); | ||
ErrorMessageEntity errorMessageEntity1 = (ErrorMessageEntity) result1.get(0); | ||
assertTrue(errorMessageEntity1.getErrorMessage().getMessage().equalsIgnoreCase("Invalid " | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the exception thrown by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, do we have tests for the positive code path, where there are no errors? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but the 'getStatementExecutionPlan' method is private and because of that I have the test in the caller. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Elsewhere to. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated the assertion checks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As i said above, we should use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
+ "result type. Your SELECT query produces a TABLE. Please use CREATE TABLE AS SELECT statement instead.")); | ||
|
||
String ksqlString2 = "CREATE STREAM s2 AS SELECT S2_F1 , count(S2_F1) FROM test_stream group by " | ||
+ "s2_f1;"; | ||
|
||
Response response2 = testResource.handleKsqlStatements(new KsqlRequest(ksqlString2, Collections | ||
.EMPTY_MAP)); | ||
KsqlEntityList result2 = (KsqlEntityList) response2.getEntity(); | ||
assertTrue("Incorrect response size.", result2.size() == 1); | ||
assertThat(result2.get(0), instanceOf(ErrorMessageEntity.class)); | ||
ErrorMessageEntity errorMessageEntity2 = (ErrorMessageEntity) result2.get(0); | ||
assertTrue(errorMessageEntity2.getErrorMessage().getMessage().equalsIgnoreCase("Invalid " | ||
+ "result type. Your SELECT query produces a TABLE. Please use CREATE TABLE AS SELECT statement instead.")); | ||
} | ||
|
||
@Test | ||
public void shouldFailForIncorrectCTASStatementResultType() throws Exception { | ||
KsqlResource testResource = TestKsqlResourceUtil.get(); | ||
final String ksqlString = "CREATE TABLE s1 AS SELECT * FROM test_stream;"; | ||
|
||
Response response = testResource.handleKsqlStatements(new KsqlRequest(ksqlString, Collections | ||
.EMPTY_MAP)); | ||
KsqlEntityList result = (KsqlEntityList) response.getEntity(); | ||
assertTrue("Incorrect response size.", result.size() == 1); | ||
assertThat(result.get(0), instanceOf(ErrorMessageEntity.class)); | ||
ErrorMessageEntity errorMessageEntity = (ErrorMessageEntity) result.get(0); | ||
assertTrue(errorMessageEntity.getErrorMessage().getMessage().equalsIgnoreCase("Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.")); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the regression was that a statement was commented out? What was the impact exactly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this line evaluates the query by generating the execution plan without executing it. This will detect if the CREATE TABLE AS SELECT and CREATE STREAM AS SELECT were correct or not. Without this check the wrong queries will be sent for execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hjafarpour it doesn't use the return type, though? So are you saying that it is validating that they syntax is correct? If so, then maybe add a comment as such or have a method named
validateStatementExecutionPlan
- as it stands it looks like this method isn't doing anythingThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dguy good point. I added comment to clarify the method call.