From cf5b6f60d63d51219581569bf9263f095cb45917 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Tue, 18 Jan 2022 16:40:27 -0600 Subject: [PATCH 1/3] feat: uses internal topics prefix for shared runtimes --- .../main/java/io/confluent/ksql/query/QueryBuilder.java | 7 +++++++ .../ksql/integration/EndToEndIntegrationTest.java | 8 ++++++++ 2 files changed, 15 insertions(+) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java index 7188452d397a..42cbd82998b7 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java @@ -66,6 +66,7 @@ import io.confluent.ksql.util.PushQueryMetadata.ResultType; import io.confluent.ksql.util.QueryApplicationId; import io.confluent.ksql.util.QueryMetadata; +import io.confluent.ksql.util.ReservedInternalTopics; import io.confluent.ksql.util.SandboxedBinPackedPersistentQueryMetadataImpl; import io.confluent.ksql.util.SandboxedSharedKafkaStreamsRuntimeImpl; import io.confluent.ksql.util.SharedKafkaStreamsRuntime; @@ -565,6 +566,12 @@ public static Map buildStreamsProperties( StorageUtilizationMetricsReporter.class.getName() ); + if (config.getBoolean(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED)) { + newStreamsProperties.put(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + + config.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)); + } + // Passing shared state into managed components newStreamsProperties.put(KsqlConfig.KSQL_INTERNAL_METRIC_COLLECTORS_CONFIG, metricCollectors); newStreamsProperties.put( diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java index 66112362f00f..a9d4943881ad 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; @@ -240,6 +241,13 @@ public void shouldSupportDroppingAndRecreatingJoinQuery() throws Exception { final List columns = waitForFirstRow(queryMetadata); + if (sharedRuntimes) { + assertThat(TEST_HARNESS.getKafkaCluster().getTopics(), + hasItem("_confluent-ksql-default_-CSAS_CART_EVENT_PRODUCT_1-Join-repartition")); + } else { + assertThat(TEST_HARNESS.getKafkaCluster().getTopics(), + hasItem("_confluent-ksql-default_query_CSAS_CART_EVENT_PRODUCT_1-Join-repartition")); + } assertThat(CONSUMED_COUNT.get(), greaterThan(0)); assertThat(PRODUCED_COUNT.get(), greaterThan(0)); assertThat(columns.get(0).toString(), startsWith("USER_")); From 4a8d1012b8044424b10358262d3097d343ef3b91 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Tue, 18 Jan 2022 16:54:55 -0600 Subject: [PATCH 2/3] add query for readability --- .../src/main/java/io/confluent/ksql/query/QueryBuilder.java | 3 ++- .../io/confluent/ksql/integration/EndToEndIntegrationTest.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java index 42cbd82998b7..4dccafaf2b52 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java @@ -569,7 +569,8 @@ public static Map buildStreamsProperties( if (config.getBoolean(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED)) { newStreamsProperties.put(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + - config.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)); + config.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG) + + "query"); } // Passing shared state into managed components diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java index a9d4943881ad..8b88dae428a0 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java @@ -243,7 +243,7 @@ public void shouldSupportDroppingAndRecreatingJoinQuery() throws Exception { if (sharedRuntimes) { assertThat(TEST_HARNESS.getKafkaCluster().getTopics(), - hasItem("_confluent-ksql-default_-CSAS_CART_EVENT_PRODUCT_1-Join-repartition")); + hasItem("_confluent-ksql-default_query-CSAS_CART_EVENT_PRODUCT_1-Join-repartition")); } else { assertThat(TEST_HARNESS.getKafkaCluster().getTopics(), hasItem("_confluent-ksql-default_query_CSAS_CART_EVENT_PRODUCT_1-Join-repartition")); From dd857e8999e5aaa71df34ad1f34348173d751b0b Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Wed, 19 Jan 2022 09:25:23 -0600 Subject: [PATCH 3/3] checkstyle --- .../src/main/java/io/confluent/ksql/query/QueryBuilder.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java index 4dccafaf2b52..1a01ad7c9ba5 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java @@ -568,9 +568,9 @@ public static Map buildStreamsProperties( if (config.getBoolean(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED)) { newStreamsProperties.put(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, - ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + - config.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG) + - "query"); + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + + config.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG) + + "query"); } // Passing shared state into managed components