Skip to content

Commit

Permalink
fix: clean up custom prefixed internal topics (#8640)
Browse files Browse the repository at this point in the history
* fix: clean up custom prefixed internal topics
  • Loading branch information
wcarlson5 authored Jan 27, 2022
1 parent 76f02b7 commit cd99f26
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static String buildSharedRuntimeId(
final boolean persistent,
final int sharedRuntimeIndex
) {
final String queryAppId = buildPrefix(config, persistent) + "-" + sharedRuntimeIndex;
final String queryAppId = buildInternalTopicPrefix(config, persistent) + sharedRuntimeIndex;
if (persistent) {
return queryAppId;
} else {
Expand All @@ -43,15 +43,15 @@ public static String build(
final boolean persistent,
final QueryId queryId
) {
final String queryAppId = buildPrefix(config, persistent) + queryId;
final String queryAppId = buildInternalTopicPrefix(config, persistent) + queryId;
if (persistent) {
return queryAppId;
} else {
return addTimeSuffix(queryAppId);
}
}

private static String buildPrefix(
public static String buildInternalTopicPrefix(
final KsqlConfig config,
final boolean persistent
) {
Expand All @@ -63,6 +63,13 @@ private static String buildPrefix(

final String queryPrefix = config.getString(configName);

return buildInternalTopicPrefix(serviceId, queryPrefix);
}

public static String buildInternalTopicPrefix(
final String serviceId,
final String queryPrefix
) {
return ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX
+ serviceId
+ queryPrefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,9 @@ public void onClose(
StreamsConfig.configDef()
.defaultValues()
.get(StreamsConfig.STATE_DIR_CONFIG))
.toString()
));
.toString(),
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG)));
} else {
log.info("Skipping cleanup for query {} since it was never started", applicationId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ public void cleanupOrphanedInternalTopics(
.getOrDefault(
StreamsConfig.STATE_DIR_CONFIG,
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG))
.toString()
));
.toString(),
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.spun.util.io.FileUtils;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.QueryApplicationId;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand Down Expand Up @@ -101,20 +102,25 @@ public static class QueryCleanupTask implements Runnable {
public QueryCleanupTask(
final ServiceContext serviceContext,
final String appId,
final Optional<String> topologyName,
final Optional<String> queryId,
final boolean isTransient,
final String stateDir
) {
final String stateDir,
final String serviceId,
final String persistentQueryPrefix) {
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.appId = Objects.requireNonNull(appId, "appId");
this.topologyName = Objects.requireNonNull(topologyName, "topologyName");
queryTopicPrefix = topologyName.map(s -> appId + "-" + s).orElse(appId);
this.topologyName = Objects.requireNonNull(queryId, "queryId");
queryTopicPrefix = queryId
.map(s -> QueryApplicationId.buildInternalTopicPrefix(
serviceId,
persistentQueryPrefix) + s)
.orElse(appId);
//generate the prefix depending on if using named topologies
this.isTransient = isTransient;
pathName = topologyName
pathName = queryId
.map(s -> stateDir + "/" + appId + "/__" + s + "__")
.orElse(stateDir + "/" + appId);
if (isTransient && topologyName.isPresent()) {
if (isTransient && queryId.isPresent()) {
throw new IllegalArgumentException("Transient Queries can not have named topologies");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,12 @@ public void deleteTopics(final Collection<String> topicsToDelete) {
}

@Override
public void deleteInternalTopics(final String applicationId) {
public void deleteInternalTopics(final String internalTopicPrefix) {
try {
final Set<String> topicNames = listTopicNames();
final List<String> internalTopics = Lists.newArrayList();
for (final String topicName : topicNames) {
if (isInternalTopic(topicName, applicationId)) {
if (isInternalTopic(topicName, internalTopicPrefix)) {
internalTopics.add(topicName);
}
}
Expand All @@ -330,7 +330,7 @@ public void deleteInternalTopics(final String applicationId) {
}
} catch (final Exception e) {
LOG.error("Exception while trying to clean up internal topics for application id: {}.",
applicationId, e
internalTopicPrefix, e
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.QueryCleanupService.QueryCleanupTask;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.internal.KsqlEngineMetrics;
import io.confluent.ksql.metastore.MutableMetaStore;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
Expand All @@ -70,7 +68,6 @@
import io.confluent.ksql.parser.tree.DropTable;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.SequentialQueryIdGenerator;
import io.confluent.ksql.schema.ksql.SystemColumns;
import io.confluent.ksql.services.FakeKafkaConsumerGroupClient;
import io.confluent.ksql.services.FakeKafkaTopicClient;
Expand All @@ -84,6 +81,7 @@
import io.confluent.ksql.util.MetaStoreFixture;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.ReservedInternalTopics;
import io.confluent.ksql.util.SandboxedBinPackedPersistentQueryMetadataImpl;
import io.confluent.ksql.util.SandboxedPersistentQueryMetadataImpl;
import io.confluent.ksql.util.SandboxedTransientQueryMetadata;
Expand All @@ -100,7 +98,6 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import it.unimi.dsi.fastutil.Hash;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;

Expand Down Expand Up @@ -139,6 +136,10 @@ public class KsqlEngineTest {
@Before
public void setUp() {
sharedRuntimeEnabled.put(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED, true);
sharedRuntimeEnabled.put(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE,
ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX
+ "default_"
+ "query");
sharedRuntimeDisabled.put(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED, false);
ksqlConfig = KsqlConfigTestUtil.create("what-eva", sharedRuntimeEnabled);

Expand Down Expand Up @@ -1411,7 +1412,8 @@ public void shouldCleanUpInternalTopicsOnQueryCloseForPersistentQueriesSharedRun

// Then (there are no transient queries, so no internal topics should be deleted):
awaitCleanupComplete();
verify(topicClient).deleteInternalTopics(query.get(0).getQueryApplicationId() + "-" + query.get(0).getQueryId().toString());
final String topicPrefix = query.get(0).getQueryApplicationId().split("query")[0] + "query_";
verify(topicClient).deleteInternalTopics( topicPrefix + query.get(0).getQueryId().toString());
}

@Test
Expand Down Expand Up @@ -1470,15 +1472,15 @@ public void shouldNotHardDeleteSubjectForPersistentQuerySharedRuntimes() throws
ksqlConfig,
Collections.emptyMap()
);
final String applicationId = query.get(0).getQueryApplicationId();
final String topicPrefix = query.get(0).getQueryApplicationId().split("query")[0] + "query_";
final String internalTopic1Val = KsqlConstants.getSRSubject(
applicationId + "-" + query.get(0).getQueryId() + "-subject1" + KsqlConstants.STREAMS_CHANGELOG_TOPIC_SUFFIX, false);
topicPrefix + query.get(0).getQueryId() + "-subject1" + KsqlConstants.STREAMS_CHANGELOG_TOPIC_SUFFIX, false);
final String internalTopic2Val = KsqlConstants.getSRSubject(
applicationId + "-" + query.get(0).getQueryId() + "-subject3" + KsqlConstants.STREAMS_REPARTITION_TOPIC_SUFFIX, false);
topicPrefix + query.get(0).getQueryId() + "-subject3" + KsqlConstants.STREAMS_REPARTITION_TOPIC_SUFFIX, false);
final String internalTopic1Key = KsqlConstants.getSRSubject(
applicationId + "-" + query.get(0).getQueryId() + "-subject1" + KsqlConstants.STREAMS_CHANGELOG_TOPIC_SUFFIX, true);
topicPrefix + query.get(0).getQueryId() + "-subject1" + KsqlConstants.STREAMS_CHANGELOG_TOPIC_SUFFIX, true);
final String internalTopic2Key = KsqlConstants.getSRSubject(
applicationId + "-" + query.get(0).getQueryId() + "-subject3" + KsqlConstants.STREAMS_REPARTITION_TOPIC_SUFFIX, true);
topicPrefix + query.get(0).getQueryId() + "-subject3" + KsqlConstants.STREAMS_REPARTITION_TOPIC_SUFFIX, true);
when(schemaRegistryClient.getAllSubjects()).thenReturn(
Arrays.asList(
internalTopic1Val,
Expand All @@ -1493,6 +1495,7 @@ public void shouldNotHardDeleteSubjectForPersistentQuerySharedRuntimes() throws

// Then:
awaitCleanupComplete();
verify(schemaRegistryClient).getAllSubjects();
verify(schemaRegistryClient, times(4)).deleteSubject(any());
verify(schemaRegistryClient, never()).deleteSubject(internalTopic1Val, true);
verify(schemaRegistryClient, never()).deleteSubject(internalTopic1Key, true);
Expand Down Expand Up @@ -2345,7 +2348,13 @@ private void awaitCleanupComplete() {
private void awaitCleanupComplete(final KsqlEngine ksqlEngine) {
// add a task to the end of the queue to make sure that
// we've finished processing everything up until this point
ksqlEngine.getCleanupService().addCleanupTask(new QueryCleanupTask(serviceContext, "", Optional.empty(), false, "") {
Optional<String> nameTopology;
if (ksqlEngine.getKsqlConfig().getBoolean(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED)) {
nameTopology = Optional.of("test");
} else {
nameTopology = Optional.empty();
}
ksqlEngine.getCleanupService().addCleanupTask(new QueryCleanupTask(serviceContext, "", nameTopology, false, "", KsqlConfig.KSQL_SERVICE_ID_DEFAULT, KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT) {
@Override
public void run() {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ private void initialize(final KsqlConfig configWithApplicationServer) {
StreamsConfig.STATE_DIR_CONFIG,
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG))
.toString(),
serviceContext)
serviceContext,
configWithApplicationServer)
);

commandRunner.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
Expand Down Expand Up @@ -394,30 +395,41 @@ private static void maybeCleanUpQuery(final byte[] command, final KsqlConfig ksq
boolean queryIdFound = false;
final Map<String, Object> streamsProperties =
new HashMap<>(ksqlConfig.getKsqlStreamConfigProps());
boolean sharedRuntimeQuery = false;
String queryId = "";
final JSONObject jsonObject = new JSONObject(new String(command, StandardCharsets.UTF_8));
if (hasKey(jsonObject, "plan")) {
final JSONObject plan = jsonObject.getJSONObject("plan");
if (hasKey(plan, "queryPlan")) {
final JSONObject queryPlan = plan.getJSONObject("queryPlan");
final String queryId = queryPlan.getString("queryId");
streamsProperties.put(
StreamsConfig.APPLICATION_ID_CONFIG,
QueryApplicationId.build(ksqlConfig, true, new QueryId(queryId)));

queryId = queryPlan.getString("queryId");
if (hasKey(queryPlan, "runtimeId")
&& ((Optional<String>) queryPlan.get("runtimeId")).isPresent()) {
streamsProperties.put(
StreamsConfig.APPLICATION_ID_CONFIG,
((Optional<String>) queryPlan.get("runtimeId")).get());
sharedRuntimeQuery = true;
} else {
streamsProperties.put(
StreamsConfig.APPLICATION_ID_CONFIG,
QueryApplicationId.build(ksqlConfig, true, new QueryId(queryId)));
}
queryIdFound = true;
}
}

// the command contains a query, clean up it's internal state store and also the internal topics
if (queryIdFound) {
final StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
final String applicationId =
streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG);
final String topicPrefix = sharedRuntimeQuery
? streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)
: QueryApplicationId.buildInternalTopicPrefix(ksqlConfig, sharedRuntimeQuery) + queryId;

try {
final Admin admin = new DefaultKafkaClientSupplier()
.getAdmin(ksqlConfig.getKsqlAdminClientConfigProps());
final KafkaTopicClient topicClient = new KafkaTopicClientImpl(() -> admin);
topicClient.deleteInternalTopics(applicationId);
topicClient.deleteInternalTopics(topicPrefix);

new StateDirectory(
streamsConfig,
Expand All @@ -427,9 +439,9 @@ private static void maybeCleanUpQuery(final byte[] command, final KsqlConfig ksq
System.out.println(
String.format(
"Cleaned up internal state store and internal topics for query %s",
applicationId));
topicPrefix));
} catch (final Exception e) {
System.out.println(String.format("Failed to clean up query %s ", applicationId));
System.out.println(String.format("Failed to clean up query %s ", topicPrefix));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.ksql.engine.QueryCleanupService;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.BinPackedPersistentQueryMetadataImpl;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.io.File;
import java.util.Arrays;
Expand All @@ -36,9 +37,14 @@ public class PersistentQueryCleanupImpl implements PersistentQueryCleanup {
private final String stateDir;
private final ServiceContext serviceContext;
private final QueryCleanupService queryCleanupService;
private final KsqlConfig ksqlConfig;

public PersistentQueryCleanupImpl(final String stateDir, final ServiceContext serviceContext) {
@SuppressFBWarnings(value = "EI_EXPOSE_REP")
public PersistentQueryCleanupImpl(final String stateDir,
final ServiceContext serviceContext,
final KsqlConfig ksqlConfig) {
this.stateDir = stateDir;
this.ksqlConfig = ksqlConfig;
this.serviceContext = serviceContext;
queryCleanupService = new QueryCleanupService();
queryCleanupService.startAsync();
Expand Down Expand Up @@ -75,13 +81,15 @@ public void cleanupLeakedQueries(final List<PersistentQueryMetadata> persistentQ
allStateStores.removeAll(stateStoreNames);
allStateStores.forEach((storeName) -> queryCleanupService.addCleanupTask(
new QueryCleanupService.QueryCleanupTask(
serviceContext,
storeName.split("/")[0],
1 < storeName.split("__").length
? Optional.of(storeName.split("__")[1])
: Optional.empty(),
false,
stateDir)));
serviceContext,
storeName.split("/")[0],
1 < storeName.split("__").length
? Optional.of(storeName.split("__")[1])
: Optional.empty(),
false,
stateDir,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG))));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ void recover() {
StreamsConfig.STATE_DIR_CONFIG,
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG))
.toString(),
serviceContext)
serviceContext,
ksqlConfig)
);
}

Expand Down
Loading

0 comments on commit cd99f26

Please sign in to comment.