diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java index 7188452d397a..1a01ad7c9ba5 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java @@ -66,6 +66,7 @@ import io.confluent.ksql.util.PushQueryMetadata.ResultType; import io.confluent.ksql.util.QueryApplicationId; import io.confluent.ksql.util.QueryMetadata; +import io.confluent.ksql.util.ReservedInternalTopics; import io.confluent.ksql.util.SandboxedBinPackedPersistentQueryMetadataImpl; import io.confluent.ksql.util.SandboxedSharedKafkaStreamsRuntimeImpl; import io.confluent.ksql.util.SharedKafkaStreamsRuntime; @@ -565,6 +566,13 @@ public static Map buildStreamsProperties( StorageUtilizationMetricsReporter.class.getName() ); + if (config.getBoolean(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED)) { + newStreamsProperties.put(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + + config.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG) + + "query"); + } + // Passing shared state into managed components newStreamsProperties.put(KsqlConfig.KSQL_INTERNAL_METRIC_COLLECTORS_CONFIG, metricCollectors); newStreamsProperties.put( diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java index 66112362f00f..8b88dae428a0 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; @@ -240,6 +241,13 @@ public void shouldSupportDroppingAndRecreatingJoinQuery() throws Exception { final List columns = waitForFirstRow(queryMetadata); + if (sharedRuntimes) { + assertThat(TEST_HARNESS.getKafkaCluster().getTopics(), + hasItem("_confluent-ksql-default_query-CSAS_CART_EVENT_PRODUCT_1-Join-repartition")); + } else { + assertThat(TEST_HARNESS.getKafkaCluster().getTopics(), + hasItem("_confluent-ksql-default_query_CSAS_CART_EVENT_PRODUCT_1-Join-repartition")); + } assertThat(CONSUMED_COUNT.get(), greaterThan(0)); assertThat(PRODUCED_COUNT.get(), greaterThan(0)); assertThat(columns.get(0).toString(), startsWith("USER_"));