Skip to content

Commit

Permalink
fix: fix issues with multi-statement requests failing to validate (#3952
Browse files Browse the repository at this point in the history
)

fixes: #3363

Issue was that the `prepare` call is throwing if the meta-store doesn't contain the sources in a statement.

Given a request contains a later statement that depends on an earlier statement, e.g.

```sql
CREATE STREAM FOO (...) WITH (...);
CREATE STREAM BAR AS SELECT *
```

The KSQL correctly validates the statements, because it updates the sandboxed copy of the meta-store as it goes.

However, when it comes to executing the statement it once against `prepares` the statement. Only this time its using the true meta-store and that is only updated by the command runner thread.  This can mean that the first statement is correctly written to the command topic, but if the next statement is `prepare`'d before the command runner thread executes the statement and updates the meta-store, then the `prepare` call on the rest-request thead will fail with an error that `FOO` does not exist.

The server has the ability to wait for a previous command to be executed by the command runner thread before executing the next. However, this isn't done for _distributed_ statements, i.e. those written to the command topic as its not meant to be required.  It is done for statements such as `SHOW STREAMS`, which need to have any previous command executed as it may be creating a stream!

This is fixed in master already. The fix was to _not_ have `prepare` validate that the source exists.  This is the correct fix. However, this would be a big change for 5.4.x.  Instead, this commit makes two changes:

1. Move the check to wait for previous commands to _before_ the `prepare` call.
2. All statement types should wait for previous commands to finish. (Because we check _before_ `prepare` we don't _know_ the statement type).

This fixes the race condition at the cost of more synchronisation. However, this synchronisation will only impact multi-statement requests. The CLI generally issues single line requests, so no impact. C3 issues multi-line requests, i.e. the entire script in one go.  So this will slow down responses to C3 slightly, but will stop intermittent fails in C3 if there are interdependent statements.
  • Loading branch information
big-andy-coates authored Nov 23, 2019
1 parent 217c9ec commit 3e7169b
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public KsqlEntityList execute(
final Map<String, Object> scopedPropertyOverrides = new HashMap<>(propertyOverrides);
final KsqlEntityList entities = new KsqlEntityList();
for (ParsedStatement parsed : statements) {
// Note: temp work around for https://github.com/confluentinc/ksql/issues/3363
// We wait for any, none-black-listed, commands to complete before proceeding
// Note: This change should only be in 5.4.x
commandQueueSync.waitFor(new KsqlEntityList(entities), Statement.class);

final PreparedStatement<?> prepared = ksqlEngine.prepare(parsed);
if (prepared.getStatement() instanceof RunScript) {
final KsqlEntityList result = executeRunScript(serviceContext, prepared, propertyOverrides);
Expand All @@ -90,7 +95,7 @@ public KsqlEntityList execute(
} else {
final ConfiguredStatement<?> configured = ConfiguredStatement.of(
prepared, scopedPropertyOverrides, ksqlConfig);
executeStatement(serviceContext, configured, scopedPropertyOverrides, entities)
executeStatement(serviceContext, configured, scopedPropertyOverrides)
.ifPresent(entities::add);
}
}
Expand All @@ -101,11 +106,9 @@ public KsqlEntityList execute(
private <T extends Statement> Optional<KsqlEntity> executeStatement(
final ServiceContext serviceContext,
final ConfiguredStatement<T> configured,
final Map<String, Object> mutableScopedProperties,
final KsqlEntityList entities
final Map<String, Object> mutableScopedProperties
) {
final Class<? extends Statement> statementClass = configured.getStatement().getClass();
commandQueueSync.waitFor(new KsqlEntityList(entities), statementClass);

final StatementExecutor<T> executor = (StatementExecutor<T>)
customExecutors.getOrDefault(statementClass, distributor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class KsqlResource implements KsqlConfigurable {
private static final List<ParsedStatement> TERMINATE_CLUSTER =
new DefaultKsqlParser().parse(TerminateCluster.TERMINATE_CLUSTER_STATEMENT_TEXT);

@SuppressWarnings("unused")
private static final Set<Class<? extends Statement>> SYNC_BLACKLIST =
ImmutableSet.<Class<? extends Statement>>builder()
.add(ListTopics.class)
Expand Down Expand Up @@ -241,9 +242,10 @@ private void throwIfNotConfigured() {
}

private static boolean shouldSynchronize(final Class<? extends Statement> statementClass) {
return !SYNC_BLACKLIST.contains(statementClass)
// we never need to synchronize distributed statements
&& CustomExecutors.EXECUTOR_MAP.containsKey(statementClass);
// Note: temp work around for https://github.com/confluentinc/ksql/issues/3363
// We need to wait for all statements.
// Note: This change should only be in 5.4.x
return true;
}

private static void ensureValidPatterns(final List<String> deleteTopicList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand Down Expand Up @@ -1021,6 +1022,7 @@ public void shouldWaitForLastDistributedStatementBeforeExecutingAnyNonDistribute
assertThat(results.get(2), is(instanceOf(SourceDescriptionEntity.class)));
}

@Ignore // temp work around for https://github.com/confluentinc/ksql/issues/3363
@Test
public void shouldNotWaitOnAnyDistributedStatementsBeforeDistributingAnother() throws Exception {
// When:
Expand All @@ -1034,6 +1036,7 @@ public void shouldNotWaitOnAnyDistributedStatementsBeforeDistributingAnother() t
verify(commandStore, never()).ensureConsumedPast(anyLong(), any());
}

@Ignore // temp work around for https://github.com/confluentinc/ksql/issues/3363
@Test
public void shouldNotWaitForLastDistributedStatementBeforeExecutingSyncBlackListedStatement()
throws Exception {
Expand Down

0 comments on commit 3e7169b

Please sign in to comment.