-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support pausing/resuming persistent queries #9203
Conversation
Addresses: #6403 Implements KLIP-63: PAUSE and RESUME for persistent queries.
NB: I owe the following:
Also, the build will fail until apache/kafka#12161 is merged and synced to CCS Kafka. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jnh5y -- LGTM overall.
I agree with the need to enhance command topic compaction to not repeatedly pause/resume queries during restart. Happy to have that be a separate PR, though, if that's your preference.
@@ -62,6 +64,7 @@ public class BinPackedPersistentQueryMetadataImpl implements PersistentQueryMeta | |||
private static final Logger LOG = LoggerFactory | |||
.getLogger(BinPackedPersistentQueryMetadataImpl.class); | |||
|
|||
private final AtomicBoolean isPaused = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we actually expecting concurrent access to this value? ksql processing statements from the command topic should be single-threaded, so I'd expect at least updates to this value to only be from a single thread. It's possible that there are other threads which might try to read the value concurrently. (Curious what your thinking here was.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! I didn't know for sure that this class is used in a single-threaded manner. So I assumed that an AtomicBoolean was better/more sensible.
If there's multi-threaded access, I can see that things may be a mess for other things in class.
I'll switch from an AtomicBoolean to a regular one.
ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutor.java
Show resolved
Hide resolved
public void shouldPauseAndResumeMultipleQueries() { | ||
// Given: | ||
createQuery("1"); | ||
createQuery("2"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of having the second query in this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal of this test is to show that pausing one query does not impact a separate query. I've updated the test to reflect that a little bit.
List <KsqlEntity> showQueries3 = RestIntegrationTestUtil.makeKsqlRequest(REST_APP, "SHOW " | ||
+ "QUERIES;"); | ||
assertThat(((Queries) showQueries3.get(0)).getQueries().get(0).getStatusCount().getStatuses() | ||
.get(KsqlQueryStatus.RUNNING), equalTo(1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a status check into this test for when the query is paused too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'm trying to add a little bit for that.
|
||
@Override | ||
public void pause() { | ||
sharedKafkaStreamsRuntime.getKafkaStreams().pauseNamedTopology(topology.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to understand more about how pausing queries works with named topologies. I saw the note about it in the KIP but am still unclear about the details. I also have some questions about how pausing a KS app affects query status and the metrics emitted from the app. Let me try to catch you offline and then summarize the interesting points here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool. Did our conversations at the KSE on-site answer these questions well enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! I meant to post an update here shortly afterwards but now it'll be a good memory exercise 😆 :
- For named topologies, pause/resume works as usual as long as you pass in the specific topology name. You can pause a topology in this way before the topology has been started, so no concerns there.
- Query status is complicated/we're not entirely clear. We think queries may continue to transition from REBALANCING to RUNNING (and potentially among other states as well) as normal even after a query is paused. It's best not to assume any guarantees on streams state for paused queries. We'll adjust our monitoring accordingly.
- Metrics continue to be emitted since that part of the stream thread execution loop is not skipped, but metric values will of course be affected. Again, we'll adjust our monitoring accordingly.
final Optional<QueryId> queryId = pauseQuery.getQueryId(); | ||
|
||
if (!queryId.isPresent()) { | ||
context.getPersistentQueries().forEach(PersistentQueryMetadata::pause); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for myself (and other readers) since I was confused the first time through: this code here is executed against a sandbox, in order to validate that the query ID exists and is not a source table query. The analogous code in InteractiveStatementExecutor is the one that actually executes this statement from the command topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Yeah, some of this is confusing. Shout if there's an obvious way to improve it for this PR!
...est-app/src/main/java/io/confluent/ksql/rest/server/computation/ValidatedCommandFactory.java
Outdated
Show resolved
Hide resolved
...db-rest-app/src/test/java/io/confluent/ksql/rest/integration/PauseResumeIntegrationTest.java
Outdated
Show resolved
Hide resolved
return ((Queries) showQueries2.get(0)).getQueries().get(0).getStatusCount().getStatuses() | ||
.get(status); | ||
} catch (Exception e) { | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: return -1
(or some other nonsensical value) to indicate this error, rather than a plausible value (even though this value similarly fails tests).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding that to another commit here in a bit.
…mputation/ValidatedCommandFactory.java Co-authored-by: Victoria Xia <[email protected]>
…on/PauseResumeIntegrationTest.java Co-authored-by: Victoria Xia <[email protected]>
…Config.STATESTORE_CACHE_MAX_BYTES_CONFIG The configuration org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG was reverted in Apache Kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jnh5y -- this is looking really good! We should be ready to merge once the open issues we've already talked about are addressed: removing Pause/ResumeQueryEntity, migrations integration test, monitoring updates (outside the ksql repo). Also docs and command topic compaction, but those can be follow-ups.
@@ -197,8 +202,7 @@ public void pausedQueriesShouldBePausedOnRestart() { | |||
// When: | |||
String queryId = ((Queries) RestIntegrationTestUtil.makeKsqlRequest(REST_APP, "SHOW " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this statement always guaranteed to grab the query ID for supplier
rather than supplier2
? If we're not sure, it'd be safer to grab the query ID before creating second query to avoid flakiness where we inadvertently pause the wrong query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've sorted out with some refactoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, what was the refactoring that was done here? It looks like the test still relies on this line returning the query ID for the query that corresponds to supplier
rather than supplier2
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, here's try two. I'm getting the queryId immediately after creating only that first query. That should skip over any ordering in the return of show queries
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this makes sense. Can you apply the same update to the other test in this file too?
|
||
List <KsqlEntity> showQueries3 = RestIntegrationTestUtil.makeKsqlRequest(REST_APP, "SHOW " | ||
+ "QUERIES;"); | ||
assertThat(((Queries) showQueries3.get(0)).getQueries().get(0).getStatusCount().getStatuses() | ||
.get(KsqlQueryStatus.RUNNING), equalTo(1)); | ||
assertThat(getRunningCount(), equalTo(2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it was meant to replace the line above it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
@@ -260,17 +251,18 @@ private int getRunningCount() { | |||
return getCount(KsqlQueryStatus.RUNNING); | |||
} | |||
|
|||
// JNH: Fix this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, is this referring to the comment below that says This failing shows that the command topic isn't being read completely?
? I'd be curious to understand more about what the issue is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it!
@@ -169,10 +169,11 @@ public void shouldScatterGatherAndMergeShowQueries() { | |||
final KsqlEngine engine = mock(KsqlEngine.class); | |||
when(engine.getAllLiveQueries()).thenReturn(ImmutableList.of(localMetadata)); | |||
when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(localMetadata)); | |||
|
|||
|
|||
// new QueryStatusCount(Collections.singletonMap(q.getQueryStatus(), 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this might've been left in unintentionally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still needs to be addressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. Getting it now.
if (statement.getStatement() instanceof PauseQuery) { | ||
pauseQuery((PreparedStatement<PauseQuery>) statement); | ||
|
||
final String successMessage = "Query paused."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vcrfxia Ah! Here's what is returned when a query is run.
Thoughts on this? It follows what happens with terminate
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me! I'm all for slimmer code. Will take a look at your changes later today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... it is maybe wonky if one is using PAUSE ALL. Also, if one repeatedly pauses or resumes a query, then there's no note that the query was already paused, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm good point. We could refactor pauseQuery()
to return the success message, based on whether the query exists, whether it was "PAUSE ALL" etc? Could also do the same thing for terminate while we're at it.
I think that'd be a nice touch. Could do it in a follow-up instead of this PR if you prefer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I've added it to my list of things to track.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jnh5y -- LGTM pending the minor comments/questions inline.
As discussed offline, let's run a quick manual test for the migrations tool (which also exercises the java client) for now, and then add integration test coverage for both in a follow-up PR. Also docs updates for both (just to mention that PAUSE and RESUME are officially supported).
Command topic compaction can wait, and I see you've got a separate docs PR open already which is great. Let's get this merged!
@@ -255,6 +259,73 @@ public void shouldSupportDroppingAndRecreatingJoinQuery() throws Exception { | |||
assertThat(columns.get(2).toString(), either(is("FEMALE")).or(is("MALE"))); | |||
} | |||
|
|||
@Test | |||
public void shouldSupportPauseAndResume() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this test offer beyond the new integration tests in PauseResumeIntegrationTest.java? If nothing, we can remove this one. It'd be nice to keep all the integration testing for the feature in one place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed!
@@ -197,8 +202,7 @@ public void pausedQueriesShouldBePausedOnRestart() { | |||
// When: | |||
String queryId = ((Queries) RestIntegrationTestUtil.makeKsqlRequest(REST_APP, "SHOW " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, what was the refactoring that was done here? It looks like the test still relies on this line returning the query ID for the query that corresponds to supplier
rather than supplier2
.
@@ -169,10 +169,11 @@ public void shouldScatterGatherAndMergeShowQueries() { | |||
final KsqlEngine engine = mock(KsqlEngine.class); | |||
when(engine.getAllLiveQueries()).thenReturn(ImmutableList.of(localMetadata)); | |||
when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(localMetadata)); | |||
|
|||
|
|||
// new QueryStatusCount(Collections.singletonMap(q.getQueryStatus(), 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still needs to be addressed.
Addresses: #6403
Implements KLIP-63: PAUSE and RESUME for persistent queries.
Description
Adds the ability to pause and resume persistent queries.
Testing done
Unit tests have been added to cover the new syntax and basic usage.
An integration test class has been created.
Reviewer checklist