-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bring back the commented assertions in StatementExecutorTest and aCom… #406
Bring back the commented assertions in StatementExecutorTest and aCom… #406
Conversation
@hjafarpour are you planning on fixing the failures caused by these assertions? |
@apurvam yes, PR is ready for review. |
Thanks @hjafarpour. Could you add a note about problems which caused the failures and how you fixed them in the description of the PR? |
@@ -51,6 +52,38 @@ | |||
<artifactId>junit</artifactId> | |||
<scope>test</scope> | |||
</dependency> | |||
<dependency> | |||
<groupId>org.apache.curator</groupId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this dependency on curator? As mentioned before, a description of the problem and the fix would help in understanding this decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apurvam added more info on the problem in the description of the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't get why we need curator? I don't believe that EmbeddedSingleNodeKafkaCluster
uses it. It would need a zookeeper dependency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EmbeddedSingleNodeKafkaCluster uses ZooKeeperEmbedded which depends on curator.
Here I'm using t he same implementation for EmbeddedSingleNodeKafkaCluster as the one in streams-example. Do we have any other implementation?
@@ -51,6 +52,38 @@ | |||
<artifactId>junit</artifactId> | |||
<scope>test</scope> | |||
</dependency> | |||
<dependency> | |||
<groupId>org.apache.curator</groupId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't get why we need curator? I don't believe that EmbeddedSingleNodeKafkaCluster
uses it. It would need a zookeeper dependency
|
||
@AfterClass | ||
public static void cleanUp() { | ||
CLUSTER.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need this as it happens automatically when you use @ClassRule
|
||
public class CommandRunnerTest { | ||
|
||
@ClassRule | ||
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need an embedded broker? Could we not just use the MockKafkaConsumer
etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored this test to mock requires objects instead of using EmbeddedSingleNodeKafkaCluster.
} | ||
|
||
@Test | ||
public void testThread() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed the method for better description.
CommandRunner commandRunner = getCommanRunner(statementExecutor); | ||
new Thread(commandRunner).start(); | ||
Thread.sleep(50000); | ||
Thread.sleep(2000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be avoiding Thread.sleep(...)
in tests. With the code the way it currently is, you could pass a mock/stub version of the StatementExectuor
to the CommandRunner
and use a latch or counter to determine when it has received the expected commands.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored this test along with the CommandRunner test so we won't need to test the thread.
Assert.assertEquals(CommandStatus.Status.SUCCESS, statusStore.get(topicCommandId).getStatus()); | ||
Assert.assertEquals(CommandStatus.Status.SUCCESS, statusStore.get(csCommandId).getStatus()); | ||
// Assert.assertEquals(CommandStatus.Status.SUCCESS, statusStore.get(csasCommandId).getStatus()); | ||
// Assert.assertEquals(CommandStatus.Status.ERROR, statusStore.get(ctasCommandId).getStatus()); | ||
if (statusStore.containsKey(csasCommandId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems weird. The test has no control over the commands that are processed. Tests need to be deterministic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using mocks refactored this so it will be deterministic.
…e thread to a new method for better testablility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates Hojjat. I think this is starting to look good.
These changes have made me curious though: from what you wrote in the description, these tests should always have failed, independently of #391 . Is this true?
|
||
public class CommandRunnerTest { | ||
|
||
/** | ||
* Get a command runner instance using mock values | ||
*/ | ||
private CommandRunner getCommanRunner(StatementExecutor statementExecutor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we have a typo here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, fixed it.
import io.confluent.ksql.util.Pair; | ||
|
||
public class StatementExecutorTest extends EasyMockSupport { | ||
|
||
@ClassRule | ||
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So to be clear, this line is the only reason we have included the dependency on Curator? Can't you use mocks here just like you did in your updates to the other test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed I tried that first by trying to mock KsqlEngine class but this class in addition to its internal QueryEngine and Metastore instances are being used in several places and mocking them would be much more complex for multiple statements. Instead I opted to use the embedded broker.
Also when we add more integration tests for this module we will need Embedded broker there too.
Map<String, Object> props = new HashMap<>(); | ||
props.put("application.id", "ksqlStatementExecutorTest"); | ||
props.put("bootstrap.servers", CLUSTER.bootstrapServers()); | ||
|
||
MockKsqkEngine mockKsqkEngine = new MockKsqkEngine( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a typo on the class and variable names. MockKsqkEngine
should be MockKsqlEngine
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class does not add any functionality, I deleted it and used KsqlEngine instead.
import io.confluent.ksql.rest.server.utils.TestUtils; | ||
import io.confluent.ksql.util.Pair; | ||
|
||
import static org.easymock.EasyMock.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably need to change your import settings. We don't really want to be importing *
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it. Need to see why my IntelliJ didn't catch this!
@@ -91,6 +81,20 @@ public void close() { | |||
commandStore.close(); | |||
} | |||
|
|||
public void fetchAndRunCommands() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is only visible due to testing. At the very least it should be made package-private, i.e, it def shouldn't be public.
An alternative would be to not make this class Runnable
and instead use an ExecutorService
in KsqlRestApplication
to submit a Runnable
with a fixed delay. You would then keep this public method on CommandRunner
and remove the run
method with the while
loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made it package-private.
@@ -91,6 +81,20 @@ public void close() { | |||
commandStore.close(); | |||
} | |||
|
|||
public void fetchAndRunCommands() { | |||
ConsumerRecords<CommandId, Command> records = commandStore.getNewCommands(); | |||
log.debug("Found {} new writes to command topic", records.count()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this message should be trace
level? It has potential to be quite spammy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, changed it to trace.
if (command.getStatement() != null) { | ||
executeStatement(command, commandId); | ||
} else { | ||
log.debug("Skipping null statement for ID {}", commandId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what circumstance does this happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically we should not have any command with null value since KSQL should be the only one who writes the commands to the command topic. I guess this remained here when our intern was refactoring. I'll remove it.
List<Pair<CommandId, Command>> commandList = new TestUtils().getAllPriorCommandRecords(); | ||
List<ConsumerRecord<CommandId, Command>> recordList = new ArrayList<>(); | ||
for (Pair commandPair: commandList) { | ||
recordList.add(new ConsumerRecord<CommandId, Command>("T", 1, 1, (CommandId) commandPair |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: redundant '<CommandId, Command>`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it.
import io.confluent.ksql.rest.server.utils.TestUtils; | ||
import io.confluent.ksql.util.Pair; | ||
|
||
import static org.easymock.EasyMock.*; | ||
|
||
public class CommandRunnerTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a test for when a statement is null
, i.e., StatementExecutor#handleStatements
should not be called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the suggested test.
TestUtils.getMockKsqlConfig(), new MockKafkaTopicClient()); | ||
StatementParser statementParser = new StatementParser(mockKsqkEngine); | ||
StatementExecutor statementExecutor = new StatementExecutor(mockKsqkEngine, statementParser); | ||
public void testNewCommandRun() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does the test do? It is easier to understand what should happen if we have good test names, i.e, "shouldExecuteNewCommandsFromCommandStore"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed the tests.
// Assert.assertEquals(CommandStatus.Status.ERROR, statusStore.get(ctasCommandId).getStatus()); | ||
Assert.assertEquals(CommandStatus.Status.SUCCESS, statusStore.get(csasCommandId).getStatus()); | ||
Assert.assertEquals(CommandStatus.Status.ERROR, statusStore.get(ctasCommandId).getStatus()); | ||
verify(statementExecutor); | ||
} | ||
|
||
@Test | ||
public void testPriorCommandsRun() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here: what "should" it do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed the name.
@@ -114,11 +122,12 @@ public void handleCSAS_CTASStatement() throws Exception { | |||
|
|||
Map<CommandId, CommandStatus> statusStore = statementExecutor.getStatuses(); | |||
Assert.assertNotNull(statusStore); | |||
Assert.assertEquals(6, statusStore.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can create a Map<CommandId, CommandStatus>
call it expected and then do:
assertThat(statusStore, equalTo(expected))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that CommandStatus has two fields, status and message. We only case about the status in this case so if creating CommandStatus with specific message, especially exception message for ERROR case may be much more complex.
@@ -140,8 +149,8 @@ public void handlePriorStatement() throws Exception { | |||
Assert.assertEquals(4, statusStore.size()); | |||
Assert.assertEquals(CommandStatus.Status.SUCCESS, statusStore.get(topicCommandId).getStatus()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
…-assertion-in-CommandRunnerTest-StatementExecutorTest
@apurvam yes, the tests were not correct previously. |
… store. I am reducing it back to 5 seconds and also adding a message in KSQL console to let user know that the service is being initialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of comments otherwise LGTM
@@ -136,14 +136,14 @@ public CommandId distributeStatement( | |||
log.debug("Reading prior command records"); | |||
|
|||
List<ConsumerRecord<CommandId, Command>> result = new ArrayList<>(); | |||
ConsumerRecords<CommandId, Command> records = commandConsumer.poll(30000); | |||
ConsumerRecords<CommandId, Command> records = commandConsumer.poll(5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make this configurable rather than hard coding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or at least extract it to a constant as it is used below, too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dguy extracted it to a constant.
…-assertion-in-CommandRunnerTest-StatementExecutorTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks!
…mandRunnerTest
The problem with the previous test was that it was trying to connect to a kafka broker and since there was no brokers available it would fail for both cases, correct and incorrect queries. The assertions were also set incorrectly and for the correct query the expected result was ERROR.
In order to have the test behave correctly for both correct and incorrect queries we now use an embedded test broker similar to the tests in the engine. This need dependencies such as kafka and curator.