Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed the regression bug for CSAS/CTAS result type check. #419

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ private KsqlEntity executeStatement(
|| statement instanceof DropStream
|| statement instanceof DropTable
) {
// getStatementExecutionPlan(statement, statementText, streamsProperties);
//Sanity check for the statement before distributing it.
getStatementExecutionPlan(statement, statementText, streamsProperties);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the regression was that a statement was commented out? What was the impact exactly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this line evaluates the query by generating the execution plan without executing it. This will detect if the CREATE TABLE AS SELECT and CREATE STREAM AS SELECT were correct or not. Without this check the wrong queries will be sent for execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hjafarpour it doesn't use the return type, though? So are you saying that it is validating that they syntax is correct? If so, then maybe add a comment as such or have a method named validateStatementExecutionPlan - as it stands it looks like this method isn't doing anything

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dguy good point. I added comment to clarify the method call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be named validateExecutionPlan so its intended use is clear. The comment only helps people reading code in this context, but this method could be used in other contexts.

If this method is doing two things (ie. generating and validating the plan), then it would be better to split it up into two separate methods, each performing a particular function.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree w @apurvam - it should be renamed. Looking the code it doesn't make - sense. It looks like it is executing various ddlcommands inside getStatementExecutionPlan() (i.e. CreateTableAsSelect etc) - and also publishes them onto the command topic to allow all nodes to execute the same command again "distributeStatement()". Shouldnt we have a validate() - that doesn't do any execution and then call distribute()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I added a new method and updated the PR.

return distributeStatement(statementText, statement, streamsProperties);
} else {
if (statement != null) {
Expand Down Expand Up @@ -328,6 +329,8 @@ private ExecutionPlan getStatementExecutionPlan(Statement statement, String stat
if (ddlCommandTask != null) {
try {
return new ExecutionPlan(ddlCommandTask.execute(statement, statementText, properties));
} catch (KsqlException ksqlException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this method so that it throw KsqlException rather than Exception - generally we shouldn't have methods with the signature blah() throws Exception unless we have no control over what is being thrown. Even then, we should probably catch and wrap such cases so we have the context of what has gone wrong etc

Copy link
Contributor Author

@hjafarpour hjafarpour Oct 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, updated it for this PR. When we did this we were still finalizing the KsqlException class. Updated it. Will have to look into other places to use KsqlException as much as possible in other PRs.

throw ksqlException;
} catch (Throwable t) {
throw new KsqlException("Cannot RUN execution plan for this statement, " + statement, t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,28 @@
import io.confluent.ksql.metastore.KsqlTable;
import io.confluent.ksql.metastore.KsqlTopic;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.tree.*;
import io.confluent.ksql.rest.entity.*;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.parser.tree.ListQueries;
import io.confluent.ksql.parser.tree.ListRegisteredTopics;
import io.confluent.ksql.parser.tree.ListStreams;
import io.confluent.ksql.parser.tree.ListTables;
import io.confluent.ksql.parser.tree.QualifiedName;
import io.confluent.ksql.parser.tree.RegisterTopic;
import io.confluent.ksql.parser.tree.ShowColumns;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.StringLiteral;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ErrorMessageEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.KsqlTopicInfo;
import io.confluent.ksql.rest.entity.KsqlTopicsList;
import io.confluent.ksql.rest.entity.Queries;
import io.confluent.ksql.rest.entity.SourceDescription;
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.entity.TablesList;
import io.confluent.ksql.rest.server.mock.MockKafkaTopicClient;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.StatementParser;
Expand All @@ -45,10 +65,18 @@
import org.apache.kafka.connect.data.SchemaBuilder;
import org.junit.Test;

import java.util.*;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import javax.ws.rs.core.Response;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -363,4 +391,46 @@ public void testListTablesStatement() throws Exception {

assertEquals(expectedTable, testTables.get(0));
}

@Test
public void shouldFailForIncorrectCSASStatementResultType() throws Exception {
KsqlResource testResource = TestKsqlResourceUtil.get();
String ksqlString1 = "CREATE STREAM s1 AS SELECT * FROM test_table;";

Response response1 = testResource.handleKsqlStatements(new KsqlRequest(ksqlString1, Collections
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and elsewhere in this class, we can use Collections.emptyMap() instead and then we will not have the unchecked warnings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

.EMPTY_MAP));
KsqlEntityList result1 = (KsqlEntityList) response1.getEntity();
assertTrue("Incorrect response size.", result1.size() == 1);
assertThat(result1.get(0), instanceOf(ErrorMessageEntity.class));
ErrorMessageEntity errorMessageEntity1 = (ErrorMessageEntity) result1.get(0);
assertTrue(errorMessageEntity1.getErrorMessage().getMessage().equalsIgnoreCase("Invalid "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the exception thrown by validateExecutionPlan eventually boil down to this error message? If so, would it make more sense to test for validity at a more fine grained level, where we can check the typed exception directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, do we have tests for the positive code path, where there are no errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the 'getStatementExecutionPlan' method is private and because of that I have the test in the caller.
The positive code path test needs to be added too as you pointed out but it will include much more that the fix for this regression bug and can be part of another PR that improves the test coverage for this KsqlResource.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertThat(errorMessage...toLowerCase(), equalTo("whatever is expected"))

Elsewhere to. assertTrue should only be used to check actual boolean conditions. For everything else we should be using assertThat

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the assertion checks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As i said above, we should use assertThat here too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

+ "result type. Your SELECT query produces a TABLE. Please use CREATE TABLE AS SELECT statement instead."));

String ksqlString2 = "CREATE STREAM s2 AS SELECT S2_F1 , count(S2_F1) FROM test_stream group by "
+ "s2_f1;";

Response response2 = testResource.handleKsqlStatements(new KsqlRequest(ksqlString2, Collections
.EMPTY_MAP));
KsqlEntityList result2 = (KsqlEntityList) response2.getEntity();
assertTrue("Incorrect response size.", result2.size() == 1);
assertThat(result2.get(0), instanceOf(ErrorMessageEntity.class));
ErrorMessageEntity errorMessageEntity2 = (ErrorMessageEntity) result2.get(0);
assertTrue(errorMessageEntity2.getErrorMessage().getMessage().equalsIgnoreCase("Invalid "
+ "result type. Your SELECT query produces a TABLE. Please use CREATE TABLE AS SELECT statement instead."));
}

@Test
public void shouldFailForIncorrectCTASStatementResultType() throws Exception {
KsqlResource testResource = TestKsqlResourceUtil.get();
final String ksqlString = "CREATE TABLE s1 AS SELECT * FROM test_stream;";

Response response = testResource.handleKsqlStatements(new KsqlRequest(ksqlString, Collections
.EMPTY_MAP));
KsqlEntityList result = (KsqlEntityList) response.getEntity();
assertTrue("Incorrect response size.", result.size() == 1);
assertThat(result.get(0), instanceOf(ErrorMessageEntity.class));
ErrorMessageEntity errorMessageEntity = (ErrorMessageEntity) result.get(0);
assertTrue(errorMessageEntity.getErrorMessage().getMessage().equalsIgnoreCase("Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead."));
}

}