From dd5d1efecab844150fc3a93ab7c90255ce13cb24 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Thu, 21 Oct 2021 14:38:07 -0500 Subject: [PATCH 1/2] fix: fix_application_id_to_work_with_acls --- .../io/confluent/ksql/query/QueryBuilder.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java index f111bf7c28d9..01c424cba467 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java @@ -63,6 +63,7 @@ import io.confluent.ksql.util.PushQueryMetadata.ResultType; import io.confluent.ksql.util.QueryApplicationId; import io.confluent.ksql.util.QueryMetadata; +import io.confluent.ksql.util.ReservedInternalTopics; import io.confluent.ksql.util.SharedKafkaStreamsRuntime; import io.confluent.ksql.util.SharedKafkaStreamsRuntimeImpl; import io.confluent.ksql.util.TransientQueryMetadata; @@ -484,12 +485,20 @@ private SharedKafkaStreamsRuntime getKafkaStreamsInstance( } } final SharedKafkaStreamsRuntime stream; + final KsqlConfig ksqlConfig = config.getConfig(true); + final String queryPrefix = ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG); + final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG); if (real) { stream = new SharedKafkaStreamsRuntimeImpl( kafkaStreamsBuilder, config.getConfig(true).getInt(KsqlConfig.KSQL_QUERY_ERROR_MAX_QUEUE_SIZE), buildStreamsProperties( - "_confluent-ksql-" + streams.size() + "-" + UUID.randomUUID(), + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + + serviceId + + queryPrefix + + streams.size() + + "-" + + UUID.randomUUID(), queryID) ); } else { @@ -497,7 +506,13 @@ private SharedKafkaStreamsRuntime getKafkaStreamsInstance( kafkaStreamsBuilder, config.getConfig(true).getInt(KsqlConfig.KSQL_QUERY_ERROR_MAX_QUEUE_SIZE), buildStreamsProperties( - "_confluent-ksql-" + streams.size() + "-" + UUID.randomUUID() + "-validation", + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + + serviceId + + queryPrefix + + streams.size() + + "-" + + UUID.randomUUID() + + "-validation", queryID) ); } From ff0affd74a46f57f646c979822c9c43e11695a64 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Thu, 21 Oct 2021 15:46:47 -0500 Subject: [PATCH 2/2] checkstyle --- .../src/main/java/io/confluent/ksql/query/QueryBuilder.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java index 01c424cba467..aa40bdc300df 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java @@ -486,7 +486,8 @@ private SharedKafkaStreamsRuntime getKafkaStreamsInstance( } final SharedKafkaStreamsRuntime stream; final KsqlConfig ksqlConfig = config.getConfig(true); - final String queryPrefix = ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG); + final String queryPrefix = ksqlConfig + .getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG); final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG); if (real) { stream = new SharedKafkaStreamsRuntimeImpl(