-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: shared runtimes #7721
feat: shared runtimes #7721
Conversation
I am opening up a new PR as we have settle on a design and are moving to start adapting tests |
@wcarlson5 is this updated one ready for a quick look? |
@guozhangwang I am just adding tests now so I shouldn't expect the code to change too much barring finding something majorly wrong. So yes please take a look if you can |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm less familiar with pt.3 https://github.com/apache/kafka/pull/10788/files (would need to review that again after pt.2 is merged, so maybe I would miss some parts. Would take another thorough pass after I complete reviewing pt.3.
this.metrics.addMetric(stateMetricName, (Gauge<String>) (config, now) -> state); | ||
this.metrics.addMetric(errorMetricName, (Gauge<String>) (config, now) -> error); | ||
} catch (IllegalArgumentException e) { | ||
//skip if the metric is already there |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why this could happen, could you elaborate a bit? Ditto elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Walker's mentioned this to me, I believe this is handling the metrics that are at the KS runtime level and thus no longer unique to each metric.
I think we also discussed that silently swallowing any/every IllegalArgumentException is not very safe, so I'm guessing this is just a temporary workaround to unblock the tests? In the end we should have separate handling for separate metrics types, ie query-level vs runtime-level, and avoid registering the same metric twice instead of catching and swallowing the exception.
If you want to do that in a followup PR that's ok with me, but can you (a) file a ticket for this, and (b) make sure we aren't accidentally swallowing a real IllegalArgumentException. It probably makes sense to tackle (b) by swallowing or rethrowing it based on the feature flag, so this doesn't affect the existing cases. Thoguhts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this is the constructor for the "Per Query Listener", it probably shouldn't be called at all for metrics that aren't per query 🙂 . I think we probably need to break this up into a PerQueryListener
which is keyed by the queryId
(as. in the current map perQuery
), and a PerRuntimeListener
which is keyed by the queryApplicationId
(in a new map, eg perRuntime
or `perRuntimeMetrics)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is a temporary thing to unblock a lot of the tests. I think we can come back and rethink this when we take care of observe stuff. We shouldn't need to fix it for the POC really
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just a bit concerned about merging this unless we can be sure to get the followup done ASAP. Seems dangerous to silently swallow an exception like this -- maybe we could at least put it behind a feature flag for the time being? That way we can file a ticket for the V1 milestone
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just FYI, I filed https://confluentinc.atlassian.net/browse/KCI-812
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can put it behind a feature for for sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the new corruption detection these are actually still per-query metrics. So I changed it to use the queryId instead
private SharedKafkaStreamsRuntime getStream( | ||
final Set<SourceName> sources, | ||
final QueryId queryID) { | ||
for (final SharedKafkaStreamsRuntime sharedKafkaStreamsRuntimeImpl : streams) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems mis-aligned?
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class ValidationSharedKafkaStreamsRuntimeImpl implements SharedKafkaStreamsRuntime { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we really need another impl here: for sandboxing, maybe we should just pass the parameter (btw if it is only for sandboxing, then maybe we can just inherit its name than using a new name real
?), all the way down into the runtime, and then skip starting the KS accordingly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit form the part 3 stuff. But basically for the validation we are adding the topology to a runtime that is not started. This way we can be sure its is going to work. I went through the other way it got complicated pretty quickly. IMHO it is much cleaner to to keep is seprate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me think about this a bit more after I get hands dirty at the pt.3 PR on AK. What kind of validations are we actually checking for shared runtime in sandboxing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thats a good question. @rodesai might be able to fill in more details but for sure we want to avoid duplicate topics. Perviously we used to make a query completely then close the runtime. That obviously won't work now. I am not sure how much it work it would take to revamp our validation flow so I would rather just keep it as close as we can an not block on it yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is related to some of the rbac/authorization stuff we were discussing earlier, ie we try to start up a query as the user and if it's all good, then allow the "real" query to be started...is that right @rodesai ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the main goal is around rbac/authX then I feel we do not need a full fledged runtime impl to be used in the sandbox -- I am not 100% sure though..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still feel that we can follow the same create-query -> close-runtime pattern here: since the goal for sandboxing are for 1) making sure the query is well-formed and hence would not fail compilation before writing to cmd topic, and 2) rbac stuff (not sure if this is really done at sandboxing, cc @rodesai @agavra could you clarify?) I think we can still do the sandboxing by creating a runtime and add that query to the runtime, and then close the runtime --- i.e. we only do a single-query shared runtime at sandboxing which would not influence the validations we'd want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RBAC checks don't rely on sandboxing of the runtime. They are done here:
Line 126 in 70565f2
checkAuthorization(injected, securityContext, executionContext); |
Today we sandbox to support (1) - we want to make sure that it's possible to execute our physical plan by building a Kafka Streams topology and applying any meta-store/query registry changes. IIUC we want to now also:
- select the runtime that the query is going to run in
- add validation that the topology can be added to that runtime (by adding it to a non-running copy)
I think to do this we need to be able to create a sandbox runtime that doesn't actually start any threads (and probably should throw if anything does try to start it).
metrics.addMetric(metricName, gauge); | ||
try { | ||
metrics.addMetric(metricName, gauge); | ||
} catch (IllegalArgumentException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one I am not sure why we do it. It seems to be a duplicate of the other metric. They both report the state. It looks like some legacy stuff from 2018. It might be a good time to remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What other metric? cc @rodesai
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ableegoldman these
ksql/ksqldb-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java
Line 360 in 81bfc1f
configureGaugeForState( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both the legacy and new metrics registration calls this configureGaugeForState
but with different group-name / tags. I think the legacy metrics were not removed since it may still be used by some monitoring? Worth double check with @stevenpyzhang .
@guozhangwang @ableegoldman @rodesai I cleaned up the hacks that I need to get this working as the streams were being developed and it should be ready for another review |
throw new IllegalStateException("Streams in not running but is in state" | ||
+ kafkaStreams.state()); | ||
} | ||
kafkaStreams.cleanUpNamedTopology(queryId.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function would check getTopologyByName(name).isPresent()
which would return nothing since we already metadata.remove(queryId.toString());
beforehand, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the metadata.remove(queryId.toString())
only affects the ksql metadata I have stored in this runtime class.
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class ValidationSharedKafkaStreamsRuntimeImpl implements SharedKafkaStreamsRuntime { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the main goal is around rbac/authX then I feel we do not need a full fledged runtime impl to be used in the sandbox -- I am not 100% sure though..
this.streamsProperties = ImmutableMap.copyOf(streamsProperties); | ||
metadata = new ConcurrentHashMap<>(); | ||
sources = new ConcurrentHashMap<>(); | ||
kafkaStreams.setUncaughtExceptionHandler(this::uncaughtHandler); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a different exception handler which can tell which query inside the KS runtime was throwing the error, since I think it would be needed for the per-query healthcheck?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should but for now we can not determine that. But that is in the V2 plan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's still targeted for V1 to at least have the correct information about which query is affected. But not in this PR, we're trying not to tackle every single thing at the same time 😜
final QueryId queryId) { | ||
this.errorClassifier = errorClassifier; | ||
if (metadata.containsKey(queryId.toString())) { | ||
kafkaStreams.removeNamedTopology(queryId.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When this scenario can actually happen? And should we really just remove the old query silently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a possible race where is the query was added and then replaced the stop()
might not be able to remove the topology, once the removeNamedTopology
returns a future
we can remove this check. we swallow the possible error is stop to clean up the state and let the other queries continue processing so we might need to remove it again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay that makes sense. For the second part we swallow the possible error is stop to clean up the state and let the other queries continue processing so we might need to remove it again.
though, not sure I fully follow that (maybe it's just due to typos..) could you re-explain to me?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow I butchered that sentence.... I just removed the swallowing of the error so it shouldn't be an issue anymore
ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntime.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntime.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntime.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueriesInSharedRuntimesImpl.java
Outdated
Show resolved
Hide resolved
errorType | ||
); | ||
for (PersistentQueriesInSharedRuntimesImpl query: metadata.values()) { | ||
query.getListener().onError(metadata.get(query.getQueryId().toString()), queryError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to my other comment, this is the part I was concerning about: if it throws we just mark all runtimes / queries as failed --- even if some queries are on different runtimes. Is that okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We decided that it was okay for a first round. It will require some more changes to make it possible to distinguish which NamedTopology
the error comes from. But it shouldn't stop the query it will just let the metrics know there has been an error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's okay to for this PR; but for V1 I'm concerned about staying as is: if I'm not reading it wrong, all query's status would become ERROR
when this happens though they can still proceed right? This would be very confusing to users and also is a big regression from old versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% agree. There will need to be streams changes before we can fix that though
this.metrics = Objects.requireNonNull(metrics, "metrics cannot be null."); | ||
this.ticker = Objects.requireNonNull(ticker, "ticker"); | ||
|
||
this.stateMetricName = metrics.metricName( | ||
"query-status", | ||
groupPrefix + "ksql-queries", | ||
"The current status of the given query.", | ||
Collections.singletonMap("status", queryApplicationId)); | ||
Collections.singletonMap("status", queryId)); | ||
|
||
errorMetricName = metrics.metricName( | ||
"error-status", | ||
groupPrefix + "ksql-queries", | ||
"The current error status of the given query, if the state is in ERROR state", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rodesai what is this metric exactly? I don't think I see anything on the dashboard resembling it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it tells you whether an error is a user error or system error. we use it in our monitors but don't plot it anywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wcarlson5 ! Since now I'm done with the AK side pt.3 code ,I made another pass on the new commits.
metrics.addMetric(metricName, gauge); | ||
try { | ||
metrics.addMetric(metricName, gauge); | ||
} catch (IllegalArgumentException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both the legacy and new metrics registration calls this configureGaugeForState
but with different group-name / tags. I think the legacy metrics were not removed since it may still be used by some monitoring? Worth double check with @stevenpyzhang .
errorType | ||
); | ||
for (PersistentQueriesInSharedRuntimesImpl query: metadata.values()) { | ||
query.getListener().onError(metadata.get(query.getQueryId().toString()), queryError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's okay to for this PR; but for V1 I'm concerned about staying as is: if I'm not reading it wrong, all query's status would become ERROR
when this happens though they can still proceed right? This would be very confusing to users and also is a big regression from old versions.
@@ -122,7 +137,9 @@ | |||
final FunctionRegistry functionRegistry, | |||
final KafkaStreamsBuilder kafkaStreamsBuilder, | |||
final StreamsBuilder streamsBuilder, | |||
final MaterializationProviderBuilderFactory materializationProviderBuilderFactory | |||
final MaterializationProviderBuilderFactory materializationProviderBuilderFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the only reason we are passing in this streamsBuilder in the line above is for testing (the only non-testing caller always pass in a new TopologyStreamsBuilder
). And now with the new call path this object is not always used and hence is a bit awkward to still maintain it as a class field.
How about not maintaining it at all, and pass in one at the buildPersistentQuery
and let the caller QueryRegistryImpl
to determine which StreamsBuilder to pass in based on the ksqlConfig
. We can even just have two the two projected build(Not)SharedPersistentQuery
as public and let the QueryRegistryImpl
to determine which one to call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is a good idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems not addressed yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I miss-understood your suggestion. I pretty much did the second half but did not actually remove the global var. However I just updated it so it should be ready for another pass.
StreamsBuilder
is also used in the transient query creation but you are right that only is testing is it inputed, otherwise it is just the default. As far as I can tell it was only used in one way too.
Eventually I would like to have the named wrapper to take a normal topology as well. Then we only need one type and we can make it a global var again and it will be simple again. But that can wait.
* Sandboxed {@link PersistentQueryMetadata} that prevents to modify the state of the internal | ||
* {@link org.apache.kafka.streams.KafkaStreams}. | ||
*/ | ||
public final class SandboxedPersistentQueriesInSharedRuntimesImpl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see my other comment: checking the sandbox call path here, I still do not know why we need to have an extended class here: since start/stop are not triggered anyways in the sandboxing, can't we just create the PersistentQueriesInSharedRuntimesImpl
even for sandboxing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to have a sandbox class personally so that if we call start by mistake we either no-op or raise an error
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class ValidationSharedKafkaStreamsRuntimeImpl implements SharedKafkaStreamsRuntime { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still feel that we can follow the same create-query -> close-runtime pattern here: since the goal for sandboxing are for 1) making sure the query is well-formed and hence would not fail compilation before writing to cmd topic, and 2) rbac stuff (not sure if this is really done at sandboxing, cc @rodesai @agavra could you clarify?) I think we can still do the sandboxing by creating a runtime and add that query to the runtime, and then close the runtime --- i.e. we only do a single-query shared runtime at sandboxing which would not influence the validations we'd want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wcarlson5, took a first pass and left some feedback inline.
One improvement I think we should make is to rearrange this so that the KafkaStreams instance is selected when building the physical plan, rather than executing the physical plan. Doing the latter is brittle - it will be very easy for us to write bugs that cause queries to wind up on different runtimes across versions by mistake. It would be better to make the runtime for a query explicit by putting this in QueryPlan
(which is then written to the command topic).
To do this you'll probably want to refactor what's here a bit by pulling runtime management out into its own class rather than relying on QueryRegistryImpl
and QueryExecutor
. Instead, we could have an interface like:
interface RuntimeInstanceManager {
RuntimeId selectRuntimeForPhysicalPlan(final PhysicalPlan plan);
KafkaStreams getRuntimeById(final RuntimeId id);
}
That you then call from planQuery
to figure out the runtime for a query. Then, QueryExecutor
can just get the runtime it needs to use from this object.
It's ok to do this as a follow-up. That said, since we're biasing towards fixing issues as follow-ups can you file issues for them and add them to this PR?
@@ -393,6 +393,14 @@ | |||
+ "if false, new lambda queries won't be processed but any existing lambda " | |||
+ "queries are unaffected."; | |||
|
|||
public static final String KSQL_SHARED_RUNTIME_ENABLED = "ksql.SharedRuntime.enabled"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make the name more hierarchical, e.g. ksql.runtime.feature.shared.enabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, how do we ensure that older queries don't use shared runtimes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something @guozhangwang @ableegoldman and I talked about. We figured the that we could get it from the cmd topic to populate the ksqlConfig.
We were not confident with how that worked, so for now we thought it would be okay to just have this for new queries.
this.metrics = Objects.requireNonNull(metrics, "metrics cannot be null."); | ||
this.ticker = Objects.requireNonNull(ticker, "ticker"); | ||
|
||
this.stateMetricName = metrics.metricName( | ||
"query-status", | ||
groupPrefix + "ksql-queries", | ||
"The current status of the given query.", | ||
Collections.singletonMap("status", queryApplicationId)); | ||
Collections.singletonMap("status", queryId)); | ||
|
||
errorMetricName = metrics.metricName( | ||
"error-status", | ||
groupPrefix + "ksql-queries", | ||
"The current error status of the given query, if the state is in ERROR state", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it tells you whether an error is a user error or system error. we use it in our monitors but don't plot it anywhere
@@ -232,6 +256,47 @@ PersistentQueryMetadata buildPersistentQuery( | |||
) { | |||
final KsqlConfig ksqlConfig = config.getConfig(true); | |||
|
|||
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED)) { | |||
return buildSharedPersistentQuery( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: buildPersistentQueryInSharedRuntime
, buildPersistentQueryInDedicatedRuntime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fancy names
ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java
Outdated
Show resolved
Hide resolved
this.metrics = Objects.requireNonNull(metrics, "metrics cannot be null."); | ||
this.ticker = Objects.requireNonNull(ticker, "ticker"); | ||
|
||
this.stateMetricName = metrics.metricName( | ||
"query-status", | ||
groupPrefix + "ksql-queries", | ||
"The current status of the given query.", | ||
Collections.singletonMap("status", queryApplicationId)); | ||
Collections.singletonMap("status", queryId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably ok to change the tag here - we might see some weirdness with the monitor around the upgrade, but it should resolve itself once we move past a window. That said, we probably still want to report these metrics at the runtime level. When we have 100s of queries, we don't want to pay for a bunch of datadog metrics that are all telling us the same thing. If we want to defer implementing this metric as a per-runtime metric why don't we use a different tag key for the query ID in this PR and then make that change in a follow-up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will make a follow up ticket to make the metrics that should be per runtime over to per runtime.
ksqldb-engine/src/main/java/io/confluent/ksql/query/KafkaStreamsBuilderImpl.java
Outdated
Show resolved
Hide resolved
} | ||
stream.markSources(queryID, sources); | ||
streams.add(stream); | ||
return stream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where do we enforce limits on the number of runtimes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently we don't. We still haven't figured out the optimal number of runtimes. And we want to also see how many times we can read the same source. 20 might not be the right number
@@ -60,6 +64,9 @@ | |||
private final Map<SourceName, Set<QueryId>> insertQueries; | |||
private final Collection<QueryEventListener> eventListeners; | |||
private final QueryExecutorFactory executorFactory; | |||
private final ArrayList<SharedKafkaStreamsRuntime> streams; | |||
private final ArrayList<SharedKafkaStreamsRuntime> validationSet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way we're maintaining runtimes and sandbox runtimes here isn't going to work
First, it assumes that every query that is ever validated will then be enqueued onto the command topic and executed. If/when that doesn't happen (e.g. because of multiple conflicting requests), we'll wind up with validationSet
getting out of sync with streams
, because you'll have a runtime in validationSet
that never actually makes it into streams
.
We also try not to share data structures between sandboxes (and the original) because then the access needs to be synchronized. In the current implementation there's only ever one writer to one of these - either the command runner thread (in the case of the main engine's instance), or a server request thread (in the case of a sandbox).
I'd suggest restructuring this like so:
- Only maintain one
List<SharedKafkaStreamsRuntime>
per instance ofQueryRegistryImpl
. - When sandboxing, create a copy of the above with sandboxed instances of
SharedKafkaStreamsRuntime
. To sandbox this class, you can create newKafkaStreams
instances, add the named topologies, but not start the threads. Alternatively, we can punt on implementing the sandbox shared runtime and make all the calls into it no-ops. We wouldn't be doing any validation, but that's probably ok for the first go-round.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I think I misunderstood when the validation was taking place. I added a copy ctor for the validation that is used after a real runtime is chosen
} | ||
|
||
@Test | ||
public void shouldMakePersistentQueriesWithDifferentSources() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this assert that they choose the same runtime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was worried about throwing exceptions but some checks would be a good idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rodesai @guozhangwang I think I got to all your comments. Let me know if something was not addressed. Sorry for the huge diff so late in the PR process. It is mostly renaming things and moving them around :)
@@ -393,6 +393,14 @@ | |||
+ "if false, new lambda queries won't be processed but any existing lambda " | |||
+ "queries are unaffected."; | |||
|
|||
public static final String KSQL_SHARED_RUNTIME_ENABLED = "ksql.SharedRuntime.enabled"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something @guozhangwang @ableegoldman and I talked about. We figured the that we could get it from the cmd topic to populate the ksqlConfig.
We were not confident with how that worked, so for now we thought it would be okay to just have this for new queries.
this.metrics = Objects.requireNonNull(metrics, "metrics cannot be null."); | ||
this.ticker = Objects.requireNonNull(ticker, "ticker"); | ||
|
||
this.stateMetricName = metrics.metricName( | ||
"query-status", | ||
groupPrefix + "ksql-queries", | ||
"The current status of the given query.", | ||
Collections.singletonMap("status", queryApplicationId)); | ||
Collections.singletonMap("status", queryId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will make a follow up ticket to make the metrics that should be per runtime over to per runtime.
@@ -122,7 +137,9 @@ | |||
final FunctionRegistry functionRegistry, | |||
final KafkaStreamsBuilder kafkaStreamsBuilder, | |||
final StreamsBuilder streamsBuilder, | |||
final MaterializationProviderBuilderFactory materializationProviderBuilderFactory | |||
final MaterializationProviderBuilderFactory materializationProviderBuilderFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is a good idea
@@ -232,6 +256,47 @@ PersistentQueryMetadata buildPersistentQuery( | |||
) { | |||
final KsqlConfig ksqlConfig = config.getConfig(true); | |||
|
|||
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED)) { | |||
return buildSharedPersistentQuery( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fancy names
} | ||
stream.markSources(queryID, sources); | ||
streams.add(stream); | ||
return stream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently we don't. We still haven't figured out the optimal number of runtimes. And we want to also see how many times we can read the same source. 20 might not be the right number
errorType | ||
); | ||
for (PersistentQueriesInSharedRuntimesImpl query: metadata.values()) { | ||
query.getListener().onError(metadata.get(query.getQueryId().toString()), queryError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% agree. There will need to be streams changes before we can fix that though
@@ -60,6 +64,9 @@ | |||
private final Map<SourceName, Set<QueryId>> insertQueries; | |||
private final Collection<QueryEventListener> eventListeners; | |||
private final QueryExecutorFactory executorFactory; | |||
private final ArrayList<SharedKafkaStreamsRuntime> streams; | |||
private final ArrayList<SharedKafkaStreamsRuntime> validationSet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I think I misunderstood when the validation was taking place. I added a copy ctor for the validation that is used after a real runtime is chosen
} | ||
|
||
@Test | ||
public void shouldMakePersistentQueriesWithDifferentSources() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was worried about throwing exceptions but some checks would be a good idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @wcarlson5 is this PR ready for another look now? Seems the sandboxing logic has not been updated yet, or do you plan to do it in a later PR?
@@ -122,7 +137,9 @@ | |||
final FunctionRegistry functionRegistry, | |||
final KafkaStreamsBuilder kafkaStreamsBuilder, | |||
final StreamsBuilder streamsBuilder, | |||
final MaterializationProviderBuilderFactory materializationProviderBuilderFactory | |||
final MaterializationProviderBuilderFactory materializationProviderBuilderFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems not addressed yet?
@@ -111,6 +130,14 @@ private QueryRegistryImpl(final QueryRegistryImpl original) { | |||
.filter(Optional::isPresent) | |||
.map(Optional::get) | |||
.collect(Collectors.toList()); | |||
if (!original.sandbox) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rodesai When fixing an issue with the parametrization of a test it surfaced an issue where the registry is creating a sandbox from a sandbox. This was causing an exception because of the copy ctor. This happens only during the replacement of a persistent query. It doesn't make sense to me why thing is happening, it makes me think there is something I am missing about the validation flow. Maybe how a replace changes its behavior? I will look into it more tomorrow but perhaps you have some idea?
This fix does not work, it just prevents the exception but no data is produced
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I adapted it to be able to copy a validation runtime but still no data is coming though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding @rodesai 's meta comment "One improvement I think we should make is to rearrange this so that the KafkaStreams instance is selected when building the physical plan, rather than executing the physical plan." It seems still not addressed yet since we are still selecting the getKafkaStreamsInstance
after the physical plan is generated, and right now our logic is simply "select the first one that either does not contain any of the requested source topics, or that already have this queryID". In later versions we may make this logic more complicated and hence may assign query to different runtimes..
metadata = new ConcurrentHashMap<>(sharedKafkaStreamsRuntime.getMetadata()); | ||
sources = new ConcurrentHashMap<>(sharedKafkaStreamsRuntime.getSourcesMap()); | ||
for (PersistentQueriesInSharedRuntimesImpl queryMetadata : metadata.values()) { | ||
//kafkaStreams.addNamedTopology(queryMetadata.getTopology()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, there is an issue with the build leaking topics so for now we will leave this disabled until that is fixed
private final Map<QueryId, Set<SourceName>> sources; | ||
private final KafkaStreamsBuilder kafkaStreamsBuilder; | ||
|
||
public ValidationSharedKafkaStreamsRuntimeImpl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this constructor is used somewhere in the tests, but I cannot find it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't actually. it is used in the sandbox creation for the QueryRegistryImp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah now I see it, sorry!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang yes moving the runtime selection to the plan is in the plan. We have tickets filled for it. But this PR is already pretty complicated so I don't want to add any more complications that can be done in followups
private final Map<QueryId, Set<SourceName>> sources; | ||
private final KafkaStreamsBuilder kafkaStreamsBuilder; | ||
|
||
public ValidationSharedKafkaStreamsRuntimeImpl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't actually. it is used in the sandbox creation for the QueryRegistryImp
LGTM modulo all tests are fixed. |
A improved version of #7482
This will cause ksql to use shared runtimes