Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DROP stream for persistent query doesn't always drop underlying query #7601

Merged
merged 2 commits into from
Jun 2, 2021

Conversation

spena
Copy link
Member

@spena spena commented May 27, 2021

Description

What behavior do you want to change, why, how does your patch achieve the changes?
This PR fixes an issue when terminating an INSERT query that caused the underlying CREATE_AS query to not be terminated when dropping the stream.

i.e.

ksql> create stream p1 as select * from s1;
ksql> insert into p1 select * from s2;
ksql> show queries;
Query ID       | Query Type | Status    | Sink Name | Sink Kafka Topic | Query String                                                                                            
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 INSERTQUERY_43 | PERSISTENT | RUNNING:1 | P1        | P1               | insert into p1 select * from s2;                                                                        
 CSAS_P1_41     | PERSISTENT | RUNNING:1 | P1        | P1               | CREATE STREAM P1 WITH (KAFKA_TOPIC='P1', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM S1 S1 EMIT CHANGES; 
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ksql> terminate INSERTQUERY_43;
ksql> drop stream p1;
ksql> show queries;
Query ID       | Query Type | Status    | Sink Name | Sink Kafka Topic | Query String                                                                                            
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
CSAS_P1_41     | PERSISTENT | RUNNING:1 | P1        | P1               | CREATE STREAM P1 WITH (KAFKA_TOPIC='P1', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM S1 S1 EMIT CHANGES; 
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

The CSAS_P1_41 should have been terminated automatically with DROP stream.

The problem was in QueryRegistryImpl that was removing query references from both insertQueries and createAsQueries mapping variables. This PR now removes the references from the right map variable.

Testing done

Describe the testing strategy. Unit and integration tests are expected for any behavior changes.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@spena spena requested a review from a team May 27, 2021 15:12
Copy link
Contributor

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me, I just got a couple questions about the testing 👍

@@ -290,16 +294,21 @@ private void unregisterQuery(final QueryMetadata query) {
final QueryId queryId = persistentQuery.getQueryId();
persistentQueries.remove(queryId);

// If query is a INSERT query, then this line should not cause any effect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this comment was not true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I thought it was true when I implemented this feature. I don't know why it wasn't it.

@@ -52,6 +52,7 @@
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have an test for the INSERT type here? I don't think it should have any effect right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -24,6 +24,7 @@
import io.confluent.ksql.query.QueryRegistryImpl.QueryExecutorFactory;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConstants;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test the query gets removed from only the correct list here? It seems that would cover the case that was causing the the problem

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thanks for the extra tests @spena !

@guozhangwang
Copy link
Contributor

Thanks @spena . The patch LGTM.

I have an out-of-scope question following this though: back to your example in the description:

create stream p1 as select * from s1;
insert into p1 select * from s2;

if we call drop s1; drop s2; would that cause p1 to be automatically dropped as well?

@spena
Copy link
Member Author

spena commented Jun 2, 2021

@guozhangwang

create stream p1 as select * from s1;
insert into p1 select * from s2;
if we call drop s1; drop s2; would that cause p1 to be automatically dropped as well?

The DROP commands won't work because another stream/table is reading/writing from S1 or S2. There is a constraint linked from p1 -> s1 as well as p1 -> s2. In order to drop s1 and s2, you must first drop the p1 query and/or terminate the insert query.

@spena spena merged commit b751cad into confluentinc:master Jun 2, 2021
@spena spena deleted the fix_kse-332 branch June 2, 2021 03:12
@guozhangwang
Copy link
Contributor

Thanks!

In order to drop s1 and s2, you must first drop the p1 query and/or terminate the insert query.

When we drop p1, wouldn't both queries be automatically dropped too? Only s1/s2 would still exist.

@spena
Copy link
Member Author

spena commented Jun 16, 2021

@guozhangwang

When we drop p1, wouldn't both queries be automatically dropped too? Only s1/s2 would still exist.

No. That was proposed in the KLIP, but it had many discussions. At the end we implemented the klip partially to at least drop a stream+query if there was no other constraints.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants