-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: terminate persistent query on DROP command #6143
Conversation
0d21017
to
f090285
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feature is 🔥 but I'm worried that it's breaking some abstractions.
While your comment in the PR description makes sense from an implementation perspective, I don't think the MetaStoreImpl
should be changing the state of the engine - that makes the coupling pretty tight, even if it's just in the form of a callback (which, conversely, makes the implementation coupling tight but the abstraction coupling very loose - the QueryMetadata#closeCallback has been a super pain to refactor around for example). If we wanted to refactor things in the future (for example, removing the TerminateQuery
altogether, I think this change will make those changes a little more difficult to reason about.
Instead, I want to suggest an alternative approach - what if we added a flag to TerminateQuery
that indicated whether failure was acceptable. Then, we could just always expand a DropX
by first running a TerminateQuery
and then running the drop command.
Also just writing the compatibility story here for anyone who references this PR as documentation and as thinking-out-loud (I think we're good is the tl;dr).
Backwards compatibility is fine, because a DROP X
would never be enqueued unless there were no queries writing to it. That means that if we restore from any old command topic, there would not be any change in behavior.
Forwards compatibility is limited, but in the same way new features function. If you roll back, then the DROP X
will fail - but that's OK because nothing after that will be executed.
also I think this would be good for @big-andy-coates to get some eyes on as well
@@ -218,7 +218,7 @@ public TransientQueryMetadata executeQuery( | |||
statement.getConfigOverrides()) | |||
.executeQuery(statement); | |||
registerQuery(query); | |||
primaryContext.registerQuery(query); | |||
primaryContext.registerQuery(Optional.empty(), query); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if you drop a stream that a transient query is reading from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Optional.empty()
makes reference to the stream/table who owns this query. Transient queries do not have such stream/table, so the empty parameter. However, I found that the stream/table that has no persistent queries running can be dropped even if it has transient queries running. Seems a bug in all versions of ksql 'cause I don't see transient queries registered in the referential integrity entries. I will fix this bug in another github issue to review it separately, and understand the following behavior once implemented:
ksql> drop stream s1;
Cannot drop S1.
The following queries read from this source: [_confluent-ksql-default_transient_2601039483758810470_1599154165566].
The following queries write into this source: [].
You need to terminate them before dropping S1.
^ What if there are several push queries running? Does it makes sense to show the queryID or add another line for # of transient queries reading?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
What if there are several push queries running? Does it makes sense to show the queryID or add another line for # of transient queries reading?
I think your suggestion of adding another line for # of transient queries reading is OK. This will become extra important to handle properly when we have scalable push queries (cc @AlanConfluent)
Thanks for looking @agavra . I also don't like the idea of using the metastore to terminate the query, but it was the only part I could attempt to terminate the query and dropping the stream using the synchronize block, thus avoiding other new streams to be created on that stream while is being dropped. What I'm thinking now is to take the ReferentialIntegrityTableEntity out of the metastore, and do the terminate/drop in the EngineExecutor instead. That would also leave the metastore simple. I'll see how easy/complex that refactor will be. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metastore is definitely not the place to put this logic - the metastore should only store the metadata about the sources.
Regarding @agavra's comments about backwards compatibility... I don't think we need to worry about DROP
statements in the command topic that would previously have failed because of a still executing query: only validated statements are written to the command topic, so it should be impossible for such a DROP
statement to exist in the topic.
I'd therefore suggest that we only need to modify the behaviour of the DROP
statement and don't need to enqueue a TERMINATE
. The engine should be terminating things, not the metastore.
Let me have a look at the PR in more depth and I'll add more info / thoughts.
OK, so had a quick look at this. As I see it, in terms of who should be responsible for terminating the query on a drop source command, well, that should be either The challenge we need to overcome is that we need to:
At the moment, the metastore tracks both the set of sources and the referential integrity data that indicates what queries read from or write to sources, but doesn't know what query was created as part of the Long term, the metastore probably shouldn't know about query ids, but should continue to stop you deleting a source some other view depends on. Query ids are the responsibility of the engine. So rather than adding the id of the query created as part of a statement to the metastore, a more strategic fix would be to change the metastore so that it tracked the relationship between sources, not source->query->source. With the metastore only tracking sources, the engine can then track which queries it created for each source, and can then use both these bits of information to know if it can drop a source, (only if no other sources depend on it), and if it needs to terminate any queries, (i.e. the query started as part of the source creation). To summarize changes:
|
@big-andy-coates That makes sense. If the metastore knows only about sources, then when dropping a source created from a But there's still the question about the Btw, there's another thing about using sources only. What should happen if there's a push query running on a stream? Should we allow dropping the source stream? A push (or transient) query doesn't have a source name we can use to reference it in the metastore. We probably need to check in the |
Multiple threads read the metastore. Hence the synchronisation. There's no need to move the synchronisation, that's just there to ensure reads see consistent internally state.
IMHO, transient queries shouldn't stop you dropping a source. Not being able to drop you source just because another user is currently selecting from it in the CLI seems like a right PITA for users. |
I think that makes sense from the way we handle push queries today, but if people are depending on push queries to power their applications (a la scalable push queries work) this might be catastrophic to drop one that is in use. |
Fair point. As long as we allow admins to kill transient queries, then I see no issue with them blocking DDL commands. However, for the scope of this change.... transient queries don't currently block DDL commands, so changing this would be out of scope and would require more design and thought and a KLIP. When we come to do it, this would just be a check in the engine layer to ensure no transient queries are using the source before attempting to update the metastore. This wouldn't change the design outlined above. |
f090285
to
466af0f
Compare
@agavra @big-andy-coates I updated the PR with your comments. I amended the previous commits to start fresh. There are 2 commits. The 1st commit is just a refactor to add the 2nd commit This time I am terminating the query in the The new @big-andy-coates I initially worked on the long-term idea about having all source->queries map in the
For this iteration, I think on keeping it less complex and track queries in the Metastore except the CREATE_AS query. What do you think? Thought:
Regarding the displayed message:
I didn't add the stream/table references there. Because we still have the TERMINATE command, then users can continue working as before (terminate the query, then drop). I was going to add the stream/table reference, but if the queries of those streams/tables are not running anymore, then why locking the DROP command with only references to stream/tables? I wasn't sure about the UX here. Any opinion? |
Hey @spena, sorry for not having time to look at this yet. I have scanned it and I think the PR in its current form doesn't leave the code base better than we found it - something all PRs should strive for. I think we need to think more about the solution, and that's the cause of the delay in reviewing. I need to find time to properly get my head into what's happening / needed. @agavra any thoughts? |
@spena - I'll assign this to me and make a priority to take a deeper dive into it sometime this week to address my/andy's comments. |
Let's say we've run the following: CREATE STREAM SRC (... stuff ...) WITH (... stuff...);
CREATE STREAM DST AS SELECT ... stuff ... FROM SRC; The second statement creates a persistent query, let's say it has query ID CSAS_DST_1. Currently the metastore will have:
The relationship here can be seen as SRC -> CSAS_DST_1 -> DST. And the engine will have:
If the user doesn't terminate CSAS_DST_1, then:
If the user terminates CSAS_DST_1, then:
Note, users can delete SRC even if DST still exists. This is weird, as DST uses SRC. It's this way historically because we expose the implementation detail of the persistent query that builds DST from SRC. Proposal I propose that the metastore should not know about queries. Only the engine should know about them. The referential checks in the metastore should only check if there are any downstream streams or tables that need the src. As well as tracking source and sink sources per query, the engine should also track per-source, queries the write to a source. This can be split into the query created during a C*AS and all other queries. When a user tries to drop a source the engine will first check to see if any queries other queries write to the source, i.e. INSERT INTOs. If any exist, then the statement is rejected. If not, the engine attempts to drop the query from the metastore, which will fail if any sources depend on the source being dropped. If the metastore is updated without error, then the engine stops any create query. With the design, after running the above two statements the metastore will have:
The relationship here can be seen as SRC -> DST, i.e. no knowledge of the query. The metastore won't allow SRC to be deleted while DST exists. And the engine will have:
If the user doesn't terminate CSAS_DST_1 manually, then:
If the user terminates CSAS_DST_1, then:
Note: the differences here are two fold:
In the fullness of time we should remove INSERT INTO from the language and use UNION instead. Then we can completely hide the concept of 'persistent queries'. See KLIP-17 and KLIP-20 for more info. ** INSERT INTO ** For completeness, let's look at an example with an INSERT INTO running. CREATE STREAM SRC (... stuff ...) WITH (... stuff...);
CREATE STREAM DST AS SELECT ... stuff ... FROM SRC;
INSERT INTO DST SELECT ...stuff... FROM SRC; Now as well as query CSAS_DST_1, the insert into creates persistent query INSERT_DST_1. After running the above statements the metastore will have (no change):
And the engine will have:
Note, If the user doesn't terminate INSERT_DST_1 manually, then:
If the user terminates INSERT_DST_1, then:
Basically, the presence of the ** TRANSIENT QUERIES ** Stopping a drop due to a transient query is... tricky! Transient queries don't go through the command topic and only exist on one node in the cluster. A naive implementation is going to be full of race conditions. We'd also need to support TERMINATE on transient queries before we did this, as otherwise admin's could be stuck unable to drop a source just because of one transient query, which they then have to track down and stop... by which point someone else has started another transient query.... arrrggghhhh! Given all of this... this would need a lot of thought and planning - hence best to keep it out of scope for this work. ** Steps to implement **
|
@big-andy-coates your code-architecture-vision never fails to impress me 🙇 |
Ah, shucks. |
466af0f
to
9242079
Compare
@big-andy-coates @agavra I updated the PR with the feedback.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is almost ready to be shipped, the new approach is much cleaner IMO! The only thing that I think is missing is recovery testing. We have a class, RestoreCommandsCompactor
, which removes queries which are paired with a terminate to make sure that we don't run them. After this change, it's possible that we'll have queries that were terminated via a DROP
command. We should add logic (and a corresponding test in RecoveryTest
) to make sure that that is properly handled.
final boolean withQuery) { | ||
return new Executor(sql, withQuery).execute(ddlCommand); | ||
final boolean withQuery, | ||
final Set<SourceName> withQuerySources |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you think about making this part of the DdlCommand
that's passed in instead of passing it in separately? I feel like this makes sense to me because (1) it's only used for some of the DdlCommand types and (2) it would be persisted into the command topic so we can make sure that things get properly tracked in recovery. It might be a bigger change, so it can wait if we want to get this in before the 0.14 cut
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. It was a big change when I was looking into this. I'll follow up with this for the next release.
@@ -134,6 +146,17 @@ EngineContext createSandbox(final ServiceContext serviceContext) { | |||
return Collections.unmodifiableMap(persistentQueries); | |||
} | |||
|
|||
Set<String> getQueriesWithSink(final SourceName sourceName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason not to make this a Set<QueryId>
? I feel that keeping this strongly typed is a benefit, especially if we want to leverage QueryId
more heavily in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason. It's actually better your suggestion.
Done.
if (command instanceof DropSourceCommand) { | ||
throwIfOtherQueriesExist(((DropSourceCommand) command).getSourceName()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any way to avoid special casing DropSourceCommand
and just passing what we need into the executor? (applies below as well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This casing also bothers me, but I the only way I found is to pass the EngineContext
to the DdlCommandExec
, and do the checks when dropping the source inside, even stop the query inside too. It requires that DdlCommandExec
know queries, so I wasn't sure if we wanted to do that there. I left it here as queries is in the engine context.
If you like the idea of passing the EngineContext to the DdlCommandExec, then I can do it in a follow-up PR for 0.15. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, let's keep it as is for now and we can explore a refactor another time if this bites us
private void terminateCreateAsQuery(final SourceName sourceName) { | ||
createAsQueries.computeIfPresent(sourceName, (ignore , queryId) -> { | ||
final PersistentQueryMetadata query = persistentQueries.get(queryId); | ||
if (query != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when could this ever be null? maybe we should throw some error if it is null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed it. Seems we'll never be null.
ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MetaStoreImpl.java
Outdated
Show resolved
Hide resolved
@@ -93,116 +94,97 @@ public void putSource(final DataSource dataSource, final boolean allowReplace) { | |||
}); | |||
} | |||
|
|||
dataSources.put(dataSource.getName(), new SourceInfo(dataSource)); | |||
dataSources.put(dataSource.getName(), | |||
(existing != null) ? existing.copyWith(dataSource) : new SourceInfo(dataSource)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused here, shouldn't existing
already have the SourceName
as dataSource
? why are we calling copyWith(dataSource)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. We don't need to add the dataSource anymore when calling putSource
with replace and if the dataSource already exists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted back this change. I need to replace the existing dataSource because it may contain changes in the Schema (caused by AlterStream). But I need a copy of the internal linked/referential sources.
ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/MetaStoreImplTest.java
Show resolved
Hide resolved
ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/MetaStoreImplTest.java
Show resolved
Hide resolved
if (!result.isSuccess()) { | ||
throw new KsqlStatementException(result.getMessage(), sqlExpression); | ||
} | ||
|
||
if (command instanceof DropSourceCommand) { | ||
terminateCreateAsQuery(((DropSourceCommand) command).getSourceName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this fail for any reason? we might be left in a state where we drop the source but still have a query running into it. That would be very bad 😢 is there any way to make the drop & terminate an atomic operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard to make this atomic. I would need to change the code to mark the Metastore source as deleted, but not real deletion. Then attempt to terminate the query, if successful, then do the real delete, otherwise undo the deletion mark.
Another solution I thought was to attempt to terminate the query before deleting the source, but there is another referential check inside the Metastore that should be done before terminating the query, so I couldn't either.
Anyway, seems the only reason the query.close()
would fail is if the closeTimeout
is invalid. I made a change in the QueryMetadata
to build the Duration.fromMillis
during the constructor. That way we are sure the close won't fail because of that. And the other reasons that could make this to fail are thrown after the query was marked for PENDING_SHUTDOWN
. After this is set, the stream cannot be closed again 'cause it will still pending shutting down. Not sure what else to do there.
comment is out-of-date, Sergio applied the changes :)
2f5a62f
to
a891fc6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to get to all of this today but it's getting late and I'm losing steam 😢 I'll get to this tomorrow
"CREATE STREAM B AS SELECT * FROM A;", | ||
"DROP STREAM B;", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we test also something that depends on B
before drop? e.g.
"CREATE STREAM B AS SELECT * FROM A;", | |
"DROP STREAM B;", | |
"CREATE STREAM B AS SELECT * FROM A;", | |
"CREATE STREAM C AS SELECT * FROM B;", | |
"DROP STREAM C;", | |
"DROP STREAM B;", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for your patience @spena - took me a while to do a few rounds. The remaining comments are just suggestions
@@ -72,6 +86,8 @@ | |||
private final Map<QueryId, PersistentQueryMetadata> persistentQueries; | |||
private final Set<QueryMetadata> allLiveQueries = ConcurrentHashMap.newKeySet(); | |||
private final QueryCleanupService cleanupService; | |||
private final Map<SourceName, QueryId> createAsQueries = new ConcurrentHashMap<>(); | |||
private final Map<SourceName, Set<QueryId>> otherQueries = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: reading this code, I'm always confused about what otherQueries
actually is and it makes me revisit the code often. Can we come up with a better name here?
Alternatively, maybe it makes sense to have a data structure:
class SourceQueryTracker {
SourceName target;
QueryId createTargetAsQuery;
Set<QueryId> nonCreateThatWriteIntoQuery;
Set<QueryId> queriesThatReadFromTarget;
}
private final Map<SourceName, SinkQueryTracker> queryTrackers;
That would also make it much easier to reason about thread safety. Just make everything in here totally synchronized.
persistentQuery.getSourceNames(), | ||
ImmutableSet.of(persistentQuery.getSinkName())); | ||
if (createAsQuery) { | ||
createAsQueries.put(persistentQuery.getSinkName(), queryId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we add this query to the otherQueries
? It can still read from another (or more) source, right? If we don't do that, our error message that says "queries that read from this source: [foo, bar]" will be wrong (can we add a test for that too?)
|
||
private void throwIfOtherQueriesExist(final SourceName sourceName) { | ||
final Set<QueryId> sinkQueries = getOtherQueries(sourceName, FILTER_QUERIES_WITH_SINK); | ||
final Set<QueryId> sourceQueries = getOtherQueries(sourceName, FILTER_QUERIES_WITH_SOURCE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that this part is enforced by the metastore, which is why we don't use FILTER_QUERIES_WITH_SOURCE
anywhere in this class other than here. That seems a little fragile to rely on this lose coupling. I think it's OK for now, but at a minimum add a comment so the next person debugging knows where to look
final Iterable<SourceName> allSourceNames = Iterables.concat( | ||
Collections.singleton(persistentQuery.getSinkName()), | ||
persistentQuery.getSourceNames() | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we use this below, can we make a helper method out of it?
- return QueryId on getQueriesWithSink() instead of String - Add a syncrhonized HashSet to otherQueries - Add more tests to validate deep copy on Metastore - Remove unecessary null check - Add comment why terminate is done after delete source - Other minor fixes
- rename otherQueries to insertQueries - helper method to concat query sink and sources
1fa7a11
to
b097de8
Compare
Description
Fixes #2177
This PR automatically terminates the persistent query of a stream/table when executing the DROP command. If the stream/table to drop has another query writing/reading on it, then the DROP will fail. This prevents leaking zombie queries on dropped streams.
The query termination is done by the
MetaStoreImpl.deleteSource
in an atomic operation. This code is the one checking for referential integrity so it is the best place to do the job.To check if the query is good to terminate, the metastore stores the stream/table source with a reference to the queryId running. This way, when attempting to delete the source, it checks if the referenced query matches the query in the
ReferentialIntegrityTableEntity
. The query is terminated only if the query is a Sink query and no other queries exist, either source or sink.I added a few tests in the
KsqlEngine
to verify the drop command works (negative & positively).Testing done
Added unit tests
Verified manually
Reviewer checklist