Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: CREATE OR REPLACE TABLE on an existing query fails while initializing kafka streams #8130

Merged
merged 1 commit into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ public ExecuteResult execute(
serviceContext,
ksqlPlan.getConfig()
).execute(ksqlPlan.getPlan());
result.getQuery().ifPresent(query -> query.getKafkaStreams().close());

// Having a streams running in a sandboxed environment is not necessary
result.getQuery().map(QueryMetadata::getKafkaStreams).ifPresent(streams -> streams.close());
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,13 @@ private void registerPersistentQuery(
unregisterQuery(oldQuery);
}

// Initialize the query before it's exposed to other threads via the map/sets.
persistentQuery.initialize();
// If the old query was sandboxed, then the stop() won't stop the streams and will cause
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, can you explain why stop() won't stop streams? Or maybe more broadly, why we were running these KStreams applications in a sandboxed environment? Just linking me to something would be sufficient, I wasn't able to piece it together myself easily.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sandboxed objects were implemented in KSQL to validate statements when some calls to Kafka were needed. The validation should not execute any real call (i.e. delete topics, create, etc). It's just a validation. Start looking at the RequesValidator.validate() class/method.

During a validation, the full execution path is tested too. For instance, in the case of persistent queries, the validation will check a query is removed from the query registry, new queries are being added to the registry, etc. To do that, the validation process creates a sandbox for the query registry to prevent real queries are indeed removed, stopped, etc. You can see the QueryRegistryImpl.createSandbox() method.

Before making the QueryRegistry copy, the createSandbox() method creates copies of each query as sandboxed persistent queries, which have dummy methods to start/stop the query. Look at SandboxedPersistentQueryMetadataImpl. So, when the registerPersistentQuery is called by a CREATE OR REPLACE statement, the old query to be stopped is not actually stopped. It just mimics the path execution, but nothing is altered in that streams. However, the initialize() is not sandboxed, so it attempts to initialize a new stream pointing to the state store from the old streams.

I don't understand too much about the sandboxed objects, though, so it took me time to figure out why new streams are started (in sandbox), but not stopped. I wanted to fix that, but I couldn't figure out an easy way to refactor the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay, I've seen these validation code paths plenty of times before, but I didn't know what was going on in this detail. Thanks for the in-depth explanation!

// the initialize() to fail because the stream is still running. Let's initialize the
// query only when it is a new query or the old query is not sandboxed.
if (oldQuery == null || !sandbox) {
// Initialize the query before it's exposed to other threads via the map/sets.
persistentQuery.initialize();
}
persistentQueries.put(queryId, persistentQuery);
switch (persistentQuery.getPersistentQueryType()) {
case CREATE_SOURCE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,44 @@ public void shouldNotThrowWhenExecutingDuplicateTableWithIfNotExists() {
is(Optional.of("Cannot add table `FOO`: A table with the same name already exists.")));
}

@Test
public void shouldNotThrowWhenExecutingDuplicateTableWithCreateOrReplaceOnSandbox() {
// Given:
final ConfiguredStatement<?> oldQuery =
configuredStatement("CREATE TABLE FOO AS SELECT * FROM TEST2;");
final ConfiguredStatement<?> newQuery =
configuredStatement("CREATE OR REPLACE TABLE FOO AS SELECT * FROM TEST2;");

final QueryId oldQueryId = ksqlEngine.execute(serviceContext, oldQuery).getQuery()
.get().getQueryId();

sandbox = ksqlEngine.createSandbox(serviceContext);

// When:
ExecuteResult executeResult = sandbox.execute(sandboxServiceContext, newQuery);

// Then:
assertThat(executeResult.getQuery().get().getQueryId(), is(oldQueryId));
}

@Test
public void shouldNotThrowWhenExecutingDuplicateTableWithCreateOrReplace() {
// Given:
final ConfiguredStatement<?> oldQuery =
configuredStatement("CREATE TABLE FOO AS SELECT * FROM TEST2;");
final ConfiguredStatement<?> newQuery =
configuredStatement("CREATE OR REPLACE TABLE FOO AS SELECT * FROM TEST2;");

final QueryId oldQueryId = ksqlEngine.execute(serviceContext, oldQuery).getQuery()
.get().getQueryId();

// When:
ExecuteResult executeResult = ksqlEngine.execute(sandboxServiceContext, newQuery);

// Then:
assertThat(executeResult.getQuery().get().getQueryId(), is(oldQueryId));
}

@Test
public void shouldThrowWhenExecutingDuplicateTable() {
// Given:
Expand Down Expand Up @@ -2031,4 +2069,9 @@ private void givenSqlAlreadyExecuted(final String sql) {

sandbox = ksqlEngine.createSandbox(serviceContext);
}

private ConfiguredStatement<?> configuredStatement(final String statement) {
return ConfiguredStatement.of(ksqlEngine.prepare(ksqlEngine.parse(statement).get(0)),
SessionConfig.of(KSQL_CONFIG, new HashMap<>()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@

@RunWith(Parameterized.class)
public class QueryRegistryImplTest {
private static final SourceName SINK = SourceName.of("source");

@Mock
private SessionConfig config;
@Mock
Expand Down