diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java index 008a8c73f403..c0e4654a2cc1 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java @@ -78,6 +78,11 @@ public KsqlEntityList execute( final Map scopedPropertyOverrides = new HashMap<>(propertyOverrides); final KsqlEntityList entities = new KsqlEntityList(); for (ParsedStatement parsed : statements) { + // Note: temp work around for https://github.com/confluentinc/ksql/issues/3363 + // We wait for any, none-black-listed, commands to complete before proceeding + // Note: This change should only be in 5.4.x + commandQueueSync.waitFor(new KsqlEntityList(entities), Statement.class); + final PreparedStatement prepared = ksqlEngine.prepare(parsed); if (prepared.getStatement() instanceof RunScript) { final KsqlEntityList result = executeRunScript(serviceContext, prepared, propertyOverrides); @@ -90,7 +95,7 @@ public KsqlEntityList execute( } else { final ConfiguredStatement configured = ConfiguredStatement.of( prepared, scopedPropertyOverrides, ksqlConfig); - executeStatement(serviceContext, configured, scopedPropertyOverrides, entities) + executeStatement(serviceContext, configured, scopedPropertyOverrides) .ifPresent(entities::add); } } @@ -101,11 +106,9 @@ public KsqlEntityList execute( private Optional executeStatement( final ServiceContext serviceContext, final ConfiguredStatement configured, - final Map mutableScopedProperties, - final KsqlEntityList entities + final Map mutableScopedProperties ) { final Class statementClass = configured.getStatement().getClass(); - commandQueueSync.waitFor(new KsqlEntityList(entities), statementClass); final StatementExecutor executor = (StatementExecutor) customExecutors.getOrDefault(statementClass, distributor); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index 780d44e24de2..a2000ac21b86 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -82,6 +82,7 @@ public class KsqlResource implements KsqlConfigurable { private static final List TERMINATE_CLUSTER = new DefaultKsqlParser().parse(TerminateCluster.TERMINATE_CLUSTER_STATEMENT_TEXT); + @SuppressWarnings("unused") private static final Set> SYNC_BLACKLIST = ImmutableSet.>builder() .add(ListTopics.class) @@ -241,9 +242,10 @@ private void throwIfNotConfigured() { } private static boolean shouldSynchronize(final Class statementClass) { - return !SYNC_BLACKLIST.contains(statementClass) - // we never need to synchronize distributed statements - && CustomExecutors.EXECUTOR_MAP.containsKey(statementClass); + // Note: temp work around for https://github.com/confluentinc/ksql/issues/3363 + // We need to wait for all statements. + // Note: This change should only be in 5.4.x + return true; } private static void ensureValidPatterns(final List deleteTopicList) { diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 3496682db378..030109760683 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -170,6 +170,7 @@ import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -1021,6 +1022,7 @@ public void shouldWaitForLastDistributedStatementBeforeExecutingAnyNonDistribute assertThat(results.get(2), is(instanceOf(SourceDescriptionEntity.class))); } + @Ignore // temp work around for https://github.com/confluentinc/ksql/issues/3363 @Test public void shouldNotWaitOnAnyDistributedStatementsBeforeDistributingAnother() throws Exception { // When: @@ -1034,6 +1036,7 @@ public void shouldNotWaitOnAnyDistributedStatementsBeforeDistributingAnother() t verify(commandStore, never()).ensureConsumedPast(anyLong(), any()); } + @Ignore // temp work around for https://github.com/confluentinc/ksql/issues/3363 @Test public void shouldNotWaitForLastDistributedStatementBeforeExecutingSyncBlackListedStatement() throws Exception {