From cd99f26aebffcbf55b29527effa394b9e1c2ee3a Mon Sep 17 00:00:00 2001 From: Walker Carlson <18128741+wcarlson5@users.noreply.github.com> Date: Thu, 27 Jan 2022 08:53:54 -0600 Subject: [PATCH] fix: clean up custom prefixed internal topics (#8640) * fix: clean up custom prefixed internal topics --- .../ksql/util/QueryApplicationId.java | 13 ++++++-- .../io/confluent/ksql/engine/KsqlEngine.java | 5 +-- .../engine/OrphanedTransientQueryCleaner.java | 5 +-- .../ksql/engine/QueryCleanupService.java | 20 ++++++++---- .../ksql/services/KafkaTopicClientImpl.java | 6 ++-- .../confluent/ksql/engine/KsqlEngineTest.java | 31 +++++++++++------- .../ksql/rest/server/KsqlRestApplication.java | 3 +- .../restore/KsqlRestoreCommandTopic.java | 32 +++++++++++++------ .../rest/util/PersistentQueryCleanupImpl.java | 24 +++++++++----- .../rest/server/computation/RecoveryTest.java | 3 +- .../util/PersistentQueryCleanupImplTest.java | 7 ++-- 11 files changed, 99 insertions(+), 50 deletions(-) diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/QueryApplicationId.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/QueryApplicationId.java index 4332f4086df3..77d9b271e2cf 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/QueryApplicationId.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/QueryApplicationId.java @@ -30,7 +30,7 @@ public static String buildSharedRuntimeId( final boolean persistent, final int sharedRuntimeIndex ) { - final String queryAppId = buildPrefix(config, persistent) + "-" + sharedRuntimeIndex; + final String queryAppId = buildInternalTopicPrefix(config, persistent) + sharedRuntimeIndex; if (persistent) { return queryAppId; } else { @@ -43,7 +43,7 @@ public static String build( final boolean persistent, final QueryId queryId ) { - final String queryAppId = buildPrefix(config, persistent) + queryId; + final String queryAppId = buildInternalTopicPrefix(config, persistent) + queryId; if (persistent) { return queryAppId; } else { @@ -51,7 +51,7 @@ public static String build( } } - private static String buildPrefix( + public static String buildInternalTopicPrefix( final KsqlConfig config, final boolean persistent ) { @@ -63,6 +63,13 @@ private static String buildPrefix( final String queryPrefix = config.getString(configName); + return buildInternalTopicPrefix(serviceId, queryPrefix); + } + + public static String buildInternalTopicPrefix( + final String serviceId, + final String queryPrefix + ) { return ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + serviceId + queryPrefix; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java index 9e68dd8272bc..ccd81f15f043 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java @@ -649,8 +649,9 @@ public void onClose( StreamsConfig.configDef() .defaultValues() .get(StreamsConfig.STATE_DIR_CONFIG)) - .toString() - )); + .toString(), + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), + ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG))); } else { log.info("Skipping cleanup for query {} since it was never started", applicationId); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/OrphanedTransientQueryCleaner.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/OrphanedTransientQueryCleaner.java index 593f8e2cd2b9..47dd56722f9c 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/OrphanedTransientQueryCleaner.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/OrphanedTransientQueryCleaner.java @@ -78,8 +78,9 @@ public void cleanupOrphanedInternalTopics( .getOrDefault( StreamsConfig.STATE_DIR_CONFIG, StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG)) - .toString() - )); + .toString(), + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), + ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG))); } } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryCleanupService.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryCleanupService.java index d989e005d0d7..60a63593a6d3 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryCleanupService.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryCleanupService.java @@ -20,6 +20,7 @@ import com.spun.util.io.FileUtils; import io.confluent.ksql.schema.registry.SchemaRegistryUtil; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.util.QueryApplicationId; import java.io.File; import java.nio.file.Path; import java.nio.file.Paths; @@ -101,20 +102,25 @@ public static class QueryCleanupTask implements Runnable { public QueryCleanupTask( final ServiceContext serviceContext, final String appId, - final Optional topologyName, + final Optional queryId, final boolean isTransient, - final String stateDir - ) { + final String stateDir, + final String serviceId, + final String persistentQueryPrefix) { this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); this.appId = Objects.requireNonNull(appId, "appId"); - this.topologyName = Objects.requireNonNull(topologyName, "topologyName"); - queryTopicPrefix = topologyName.map(s -> appId + "-" + s).orElse(appId); + this.topologyName = Objects.requireNonNull(queryId, "queryId"); + queryTopicPrefix = queryId + .map(s -> QueryApplicationId.buildInternalTopicPrefix( + serviceId, + persistentQueryPrefix) + s) + .orElse(appId); //generate the prefix depending on if using named topologies this.isTransient = isTransient; - pathName = topologyName + pathName = queryId .map(s -> stateDir + "/" + appId + "/__" + s + "__") .orElse(stateDir + "/" + appId); - if (isTransient && topologyName.isPresent()) { + if (isTransient && queryId.isPresent()) { throw new IllegalArgumentException("Transient Queries can not have named topologies"); } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java index 9c9c68660901..5d0664db3420 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java @@ -315,12 +315,12 @@ public void deleteTopics(final Collection topicsToDelete) { } @Override - public void deleteInternalTopics(final String applicationId) { + public void deleteInternalTopics(final String internalTopicPrefix) { try { final Set topicNames = listTopicNames(); final List internalTopics = Lists.newArrayList(); for (final String topicName : topicNames) { - if (isInternalTopic(topicName, applicationId)) { + if (isInternalTopic(topicName, internalTopicPrefix)) { internalTopics.add(topicName); } } @@ -330,7 +330,7 @@ public void deleteInternalTopics(final String applicationId) { } } catch (final Exception e) { LOG.error("Exception while trying to clean up internal topics for application id: {}.", - applicationId, e + internalTopicPrefix, e ); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java index e46813dce973..f57d454a2774 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java @@ -57,9 +57,7 @@ import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.QueryCleanupService.QueryCleanupTask; import io.confluent.ksql.function.InternalFunctionRegistry; -import io.confluent.ksql.internal.KsqlEngineMetrics; import io.confluent.ksql.metastore.MutableMetaStore; -import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; @@ -70,7 +68,6 @@ import io.confluent.ksql.parser.tree.DropTable; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.query.QueryId; -import io.confluent.ksql.query.id.SequentialQueryIdGenerator; import io.confluent.ksql.schema.ksql.SystemColumns; import io.confluent.ksql.services.FakeKafkaConsumerGroupClient; import io.confluent.ksql.services.FakeKafkaTopicClient; @@ -84,6 +81,7 @@ import io.confluent.ksql.util.MetaStoreFixture; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; +import io.confluent.ksql.util.ReservedInternalTopics; import io.confluent.ksql.util.SandboxedBinPackedPersistentQueryMetadataImpl; import io.confluent.ksql.util.SandboxedPersistentQueryMetadataImpl; import io.confluent.ksql.util.SandboxedTransientQueryMetadata; @@ -100,7 +98,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import it.unimi.dsi.fastutil.Hash; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; @@ -139,6 +136,10 @@ public class KsqlEngineTest { @Before public void setUp() { sharedRuntimeEnabled.put(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED, true); + sharedRuntimeEnabled.put(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + + "default_" + + "query"); sharedRuntimeDisabled.put(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED, false); ksqlConfig = KsqlConfigTestUtil.create("what-eva", sharedRuntimeEnabled); @@ -1411,7 +1412,8 @@ public void shouldCleanUpInternalTopicsOnQueryCloseForPersistentQueriesSharedRun // Then (there are no transient queries, so no internal topics should be deleted): awaitCleanupComplete(); - verify(topicClient).deleteInternalTopics(query.get(0).getQueryApplicationId() + "-" + query.get(0).getQueryId().toString()); + final String topicPrefix = query.get(0).getQueryApplicationId().split("query")[0] + "query_"; + verify(topicClient).deleteInternalTopics( topicPrefix + query.get(0).getQueryId().toString()); } @Test @@ -1470,15 +1472,15 @@ public void shouldNotHardDeleteSubjectForPersistentQuerySharedRuntimes() throws ksqlConfig, Collections.emptyMap() ); - final String applicationId = query.get(0).getQueryApplicationId(); + final String topicPrefix = query.get(0).getQueryApplicationId().split("query")[0] + "query_"; final String internalTopic1Val = KsqlConstants.getSRSubject( - applicationId + "-" + query.get(0).getQueryId() + "-subject1" + KsqlConstants.STREAMS_CHANGELOG_TOPIC_SUFFIX, false); + topicPrefix + query.get(0).getQueryId() + "-subject1" + KsqlConstants.STREAMS_CHANGELOG_TOPIC_SUFFIX, false); final String internalTopic2Val = KsqlConstants.getSRSubject( - applicationId + "-" + query.get(0).getQueryId() + "-subject3" + KsqlConstants.STREAMS_REPARTITION_TOPIC_SUFFIX, false); + topicPrefix + query.get(0).getQueryId() + "-subject3" + KsqlConstants.STREAMS_REPARTITION_TOPIC_SUFFIX, false); final String internalTopic1Key = KsqlConstants.getSRSubject( - applicationId + "-" + query.get(0).getQueryId() + "-subject1" + KsqlConstants.STREAMS_CHANGELOG_TOPIC_SUFFIX, true); + topicPrefix + query.get(0).getQueryId() + "-subject1" + KsqlConstants.STREAMS_CHANGELOG_TOPIC_SUFFIX, true); final String internalTopic2Key = KsqlConstants.getSRSubject( - applicationId + "-" + query.get(0).getQueryId() + "-subject3" + KsqlConstants.STREAMS_REPARTITION_TOPIC_SUFFIX, true); + topicPrefix + query.get(0).getQueryId() + "-subject3" + KsqlConstants.STREAMS_REPARTITION_TOPIC_SUFFIX, true); when(schemaRegistryClient.getAllSubjects()).thenReturn( Arrays.asList( internalTopic1Val, @@ -1493,6 +1495,7 @@ public void shouldNotHardDeleteSubjectForPersistentQuerySharedRuntimes() throws // Then: awaitCleanupComplete(); + verify(schemaRegistryClient).getAllSubjects(); verify(schemaRegistryClient, times(4)).deleteSubject(any()); verify(schemaRegistryClient, never()).deleteSubject(internalTopic1Val, true); verify(schemaRegistryClient, never()).deleteSubject(internalTopic1Key, true); @@ -2345,7 +2348,13 @@ private void awaitCleanupComplete() { private void awaitCleanupComplete(final KsqlEngine ksqlEngine) { // add a task to the end of the queue to make sure that // we've finished processing everything up until this point - ksqlEngine.getCleanupService().addCleanupTask(new QueryCleanupTask(serviceContext, "", Optional.empty(), false, "") { + Optional nameTopology; + if (ksqlEngine.getKsqlConfig().getBoolean(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED)) { + nameTopology = Optional.of("test"); + } else { + nameTopology = Optional.empty(); + } + ksqlEngine.getCleanupService().addCleanupTask(new QueryCleanupTask(serviceContext, "", nameTopology, false, "", KsqlConfig.KSQL_SERVICE_ID_DEFAULT, KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT) { @Override public void run() { // do nothing diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index d02805edc0b3..f9d89a079642 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -468,7 +468,8 @@ private void initialize(final KsqlConfig configWithApplicationServer) { StreamsConfig.STATE_DIR_CONFIG, StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG)) .toString(), - serviceContext) + serviceContext, + configWithApplicationServer) ); commandRunner.start(); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java index ebdeb9774fac..aa3193f11bb5 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java @@ -45,6 +45,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import org.apache.kafka.clients.admin.Admin; @@ -394,16 +395,25 @@ private static void maybeCleanUpQuery(final byte[] command, final KsqlConfig ksq boolean queryIdFound = false; final Map streamsProperties = new HashMap<>(ksqlConfig.getKsqlStreamConfigProps()); + boolean sharedRuntimeQuery = false; + String queryId = ""; final JSONObject jsonObject = new JSONObject(new String(command, StandardCharsets.UTF_8)); if (hasKey(jsonObject, "plan")) { final JSONObject plan = jsonObject.getJSONObject("plan"); if (hasKey(plan, "queryPlan")) { final JSONObject queryPlan = plan.getJSONObject("queryPlan"); - final String queryId = queryPlan.getString("queryId"); - streamsProperties.put( - StreamsConfig.APPLICATION_ID_CONFIG, - QueryApplicationId.build(ksqlConfig, true, new QueryId(queryId))); - + queryId = queryPlan.getString("queryId"); + if (hasKey(queryPlan, "runtimeId") + && ((Optional) queryPlan.get("runtimeId")).isPresent()) { + streamsProperties.put( + StreamsConfig.APPLICATION_ID_CONFIG, + ((Optional) queryPlan.get("runtimeId")).get()); + sharedRuntimeQuery = true; + } else { + streamsProperties.put( + StreamsConfig.APPLICATION_ID_CONFIG, + QueryApplicationId.build(ksqlConfig, true, new QueryId(queryId))); + } queryIdFound = true; } } @@ -411,13 +421,15 @@ private static void maybeCleanUpQuery(final byte[] command, final KsqlConfig ksq // the command contains a query, clean up it's internal state store and also the internal topics if (queryIdFound) { final StreamsConfig streamsConfig = new StreamsConfig(streamsProperties); - final String applicationId = - streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG); + final String topicPrefix = sharedRuntimeQuery + ? streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG) + : QueryApplicationId.buildInternalTopicPrefix(ksqlConfig, sharedRuntimeQuery) + queryId; + try { final Admin admin = new DefaultKafkaClientSupplier() .getAdmin(ksqlConfig.getKsqlAdminClientConfigProps()); final KafkaTopicClient topicClient = new KafkaTopicClientImpl(() -> admin); - topicClient.deleteInternalTopics(applicationId); + topicClient.deleteInternalTopics(topicPrefix); new StateDirectory( streamsConfig, @@ -427,9 +439,9 @@ private static void maybeCleanUpQuery(final byte[] command, final KsqlConfig ksq System.out.println( String.format( "Cleaned up internal state store and internal topics for query %s", - applicationId)); + topicPrefix)); } catch (final Exception e) { - System.out.println(String.format("Failed to clean up query %s ", applicationId)); + System.out.println(String.format("Failed to clean up query %s ", topicPrefix)); } } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/PersistentQueryCleanupImpl.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/PersistentQueryCleanupImpl.java index dcc44432a249..06a254dd0776 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/PersistentQueryCleanupImpl.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/PersistentQueryCleanupImpl.java @@ -19,6 +19,7 @@ import io.confluent.ksql.engine.QueryCleanupService; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.BinPackedPersistentQueryMetadataImpl; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.PersistentQueryMetadata; import java.io.File; import java.util.Arrays; @@ -36,9 +37,14 @@ public class PersistentQueryCleanupImpl implements PersistentQueryCleanup { private final String stateDir; private final ServiceContext serviceContext; private final QueryCleanupService queryCleanupService; + private final KsqlConfig ksqlConfig; - public PersistentQueryCleanupImpl(final String stateDir, final ServiceContext serviceContext) { + @SuppressFBWarnings(value = "EI_EXPOSE_REP") + public PersistentQueryCleanupImpl(final String stateDir, + final ServiceContext serviceContext, + final KsqlConfig ksqlConfig) { this.stateDir = stateDir; + this.ksqlConfig = ksqlConfig; this.serviceContext = serviceContext; queryCleanupService = new QueryCleanupService(); queryCleanupService.startAsync(); @@ -75,13 +81,15 @@ public void cleanupLeakedQueries(final List persistentQ allStateStores.removeAll(stateStoreNames); allStateStores.forEach((storeName) -> queryCleanupService.addCleanupTask( new QueryCleanupService.QueryCleanupTask( - serviceContext, - storeName.split("/")[0], - 1 < storeName.split("__").length - ? Optional.of(storeName.split("__")[1]) - : Optional.empty(), - false, - stateDir))); + serviceContext, + storeName.split("/")[0], + 1 < storeName.split("__").length + ? Optional.of(storeName.split("__")[1]) + : Optional.empty(), + false, + stateDir, + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), + ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG)))); } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index b5ac816b7cda..0b106b682880 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -272,7 +272,8 @@ void recover() { StreamsConfig.STATE_DIR_CONFIG, StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG)) .toString(), - serviceContext) + serviceContext, + ksqlConfig) ); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/PersistentQueryCleanupImplTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/PersistentQueryCleanupImplTest.java index ed000e6c995d..6a1ec7393712 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/PersistentQueryCleanupImplTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/PersistentQueryCleanupImplTest.java @@ -19,6 +19,7 @@ import io.confluent.ksql.engine.QueryCleanupService; import io.confluent.ksql.logging.query.TestAppender; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.PersistentQueryMetadata; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -45,6 +46,8 @@ public class PersistentQueryCleanupImplTest { File tempFile; PersistentQueryCleanupImpl cleanup; + @Mock + KsqlConfig ksqlConfig; @Mock ServiceContext context; @Mock @@ -61,7 +64,7 @@ public void setUp() { } } - cleanup = new PersistentQueryCleanupImpl("/tmp/cat/", context); + cleanup = new PersistentQueryCleanupImpl("/tmp/cat/", context, ksqlConfig); } @Test @@ -113,7 +116,7 @@ public void shouldKeepStateStoresBelongingToRunningQueries() { private void awaitCleanupComplete() { // add a task to the end of the queue to make sure that // we've finished processing everything up until this point - cleanup.getQueryCleanupService().addCleanupTask(new QueryCleanupService.QueryCleanupTask(context, "", Optional.empty(),false, "") { + cleanup.getQueryCleanupService().addCleanupTask(new QueryCleanupService.QueryCleanupTask(context, "", Optional.empty(), false, "", KsqlConfig.KSQL_SERVICE_ID_DEFAULT, "") { @Override public void run() { // do nothing