Skip to content

Commit

Permalink
fix: delete zombie consumer groups 🧟 (#6160)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Sep 10, 2020
1 parent 044b28a commit 2d1697a
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.confluent.ksql.query.QueryExecutor;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.QueryIdGenerator;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.services.SandboxedServiceContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
Expand Down Expand Up @@ -72,19 +71,22 @@ final class EngineContext {
private final KsqlParser parser;
private final Map<QueryId, PersistentQueryMetadata> persistentQueries;
private final Set<QueryMetadata> allLiveQueries = ConcurrentHashMap.newKeySet();
private final QueryCleanupService cleanupService;

static EngineContext create(
final ServiceContext serviceContext,
final ProcessingLogContext processingLogContext,
final MutableMetaStore metaStore,
final QueryIdGenerator queryIdGenerator
final QueryIdGenerator queryIdGenerator,
final QueryCleanupService cleanupService
) {
return new EngineContext(
serviceContext,
processingLogContext,
metaStore,
queryIdGenerator,
new DefaultKsqlParser()
new DefaultKsqlParser(),
cleanupService
);
}

Expand All @@ -93,7 +95,8 @@ private EngineContext(
final ProcessingLogContext processingLogContext,
final MutableMetaStore metaStore,
final QueryIdGenerator queryIdGenerator,
final KsqlParser parser
final KsqlParser parser,
final QueryCleanupService cleanupService
) {
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
this.metaStore = requireNonNull(metaStore, "metaStore");
Expand All @@ -103,14 +106,16 @@ private EngineContext(
this.persistentQueries = new ConcurrentHashMap<>();
this.processingLogContext = requireNonNull(processingLogContext, "processingLogContext");
this.parser = requireNonNull(parser, "parser");
this.cleanupService = requireNonNull(cleanupService, "cleanupService");
}

EngineContext createSandbox(final ServiceContext serviceContext) {
final EngineContext sandBox = EngineContext.create(
SandboxedServiceContext.create(serviceContext),
processingLogContext,
metaStore.copy(),
queryIdGenerator.createSandbox()
queryIdGenerator.createSandbox(),
cleanupService
);

persistentQueries.forEach((queryId, query) ->
Expand Down Expand Up @@ -270,14 +275,19 @@ private void cleanupExternalQueryResources(
) {
final String applicationId = query.getQueryApplicationId();
if (query.hasEverBeenStarted()) {
SchemaRegistryUtil.cleanupInternalTopicSchemas(
applicationId,
serviceContext.getSchemaRegistryClient(),
query instanceof TransientQueryMetadata);

serviceContext.getTopicClient().deleteInternalTopics(applicationId);
cleanupService.addCleanupTask(
new QueryCleanupService.QueryCleanupTask(
serviceContext,
applicationId,
query instanceof TransientQueryMetadata
));
}

StreamsErrorCollector.notifyApplicationClose(applicationId);
}

public void close(final boolean closeQueries) {
getAllLiveQueries().forEach(closeQueries ? QueryMetadata::close : QueryMetadata::stop);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.engine;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.ServiceInfo;
Expand Down Expand Up @@ -47,6 +48,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -59,6 +61,7 @@ public class KsqlEngine implements KsqlExecutionContext, Closeable {
private final ScheduledExecutorService aggregateMetricsCollector;
private final String serviceId;
private final EngineContext primaryContext;
private final QueryCleanupService cleanupService;

public KsqlEngine(
final ServiceContext serviceContext,
Expand Down Expand Up @@ -89,11 +92,13 @@ public KsqlEngine(
final Function<KsqlEngine, KsqlEngineMetrics> engineMetricsFactory,
final QueryIdGenerator queryIdGenerator
) {
this.cleanupService = new QueryCleanupService();
this.primaryContext = EngineContext.create(
serviceContext,
processingLogContext,
metaStore,
queryIdGenerator
queryIdGenerator,
cleanupService
);
this.serviceId = Objects.requireNonNull(serviceId, "serviceId");
this.engineMetrics = engineMetricsFactory.apply(this);
Expand All @@ -110,6 +115,8 @@ public KsqlEngine(
1000,
TimeUnit.MILLISECONDS
);

cleanupService.startAsync();
}

public int numberOfLiveQueries() {
Expand Down Expand Up @@ -154,6 +161,11 @@ public String getServiceId() {
return serviceId;
}

@VisibleForTesting
QueryCleanupService getCleanupService() {
return cleanupService;
}

@Override
public KsqlExecutionContext createSandbox(final ServiceContext serviceContext) {
return new SandboxedExecutionContext(primaryContext, serviceContext);
Expand Down Expand Up @@ -235,6 +247,15 @@ public void close(final boolean closeQueries) {
primaryContext.getAllLiveQueries()
.forEach(closeQueries ? QueryMetadata::close : QueryMetadata::stop);

try {
cleanupService.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.warn("Timed out while closing cleanup service. "
+ "External resources for the following applications may be orphaned: {}",
cleanupService.pendingApplicationIds()
);
}

engineMetrics.close();
aggregateMetricsCollector.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.engine;

import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.services.ServiceContext;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@code QueryCleanupService} helps cleanup external resources from queries
* out of the main line of query execution. This ensures that tasks that might
* take a long time don't happen on the CLI feedback path (such as cleaning up
* consumer groups).
*
* <p>NOTE: this cleanup service is intended to be used across threads and across
* real/sandboxed engines.</p>
*/
@SuppressWarnings("UnstableApiUsage")
class QueryCleanupService extends AbstractExecutionThreadService {

private static final Logger LOG = LoggerFactory.getLogger(QueryCleanupService.class);
private static final Runnable SHUTDOWN_SENTINEL = () -> { };

private final BlockingQueue<Runnable> cleanupTasks;

QueryCleanupService() {
cleanupTasks = new LinkedBlockingDeque<>();
}

@Override
protected void run() {
try {
while (true) {
final Runnable task = cleanupTasks.take();
if (task == SHUTDOWN_SENTINEL) {
return;
}

task.run();
}
} catch (final InterruptedException e) {
// gracefully exit if this method was interrupted and reset
// the interrupt flag
Thread.currentThread().interrupt();
}
}

@Override
protected void triggerShutdown() {
cleanupTasks.add(SHUTDOWN_SENTINEL);
}

public Set<String> pendingApplicationIds() {
return cleanupTasks.stream()
.filter(QueryCleanupTask.class::isInstance)
.map(QueryCleanupTask.class::cast)
.map(t -> t.appId).collect(ImmutableSet.toImmutableSet());
}

public boolean isEmpty() {
return cleanupTasks.isEmpty();
}

public void addCleanupTask(final QueryCleanupTask task) {
cleanupTasks.add(task);
}

static class QueryCleanupTask implements Runnable {
private final String appId;
private final boolean isTransient;
private final ServiceContext serviceContext;

QueryCleanupTask(
final ServiceContext serviceContext,
final String appId,
final boolean isTransient
) {
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.appId = Objects.requireNonNull(appId, "appId");
this.isTransient = isTransient;
}

@Override
public void run() {
tryRun(
() -> SchemaRegistryUtil.cleanupInternalTopicSchemas(
appId,
serviceContext.getSchemaRegistryClient(),
isTransient),
"internal topic schemas"
);

tryRun(() -> serviceContext.getTopicClient().deleteInternalTopics(appId), "internal topics");
tryRun(
() -> serviceContext
.getConsumerGroupClient()
.deleteConsumerGroups(ImmutableSet.of(appId)),
"internal consumer groups");
}

private void tryRun(final Runnable runnable, final String resource) {
try {
runnable.run();
} catch (final Exception e) {
LOG.warn("Failed to cleanup {} for {}", resource, appId, e);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import io.confluent.ksql.exception.KsqlGroupAuthorizationException;
import io.confluent.ksql.util.ExecutorUtil;
import io.confluent.ksql.util.ExecutorUtil.RetryBehaviour;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
Expand All @@ -32,6 +35,8 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.RetriableException;

public class KafkaConsumerGroupClientImpl implements KafkaConsumerGroupClient {

Expand Down Expand Up @@ -100,4 +105,23 @@ public Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(final Str
throw new KafkaResponseGetFailedException("Failed to list Kafka consumer groups offsets", e);
}
}

@Override
public void deleteConsumerGroups(final Set<String> groups) {
final AtomicInteger retryCount = new AtomicInteger(0);
try {
// it takes heartbeat.interval.ms after a consumer is closed for the broker
// to recognize that there are no more consumers in the consumer group - for
// that reason, we retry after 3 seconds (the default heartbeat.interval.ms)
// in the case that we get a GroupNotEmptyException
ExecutorUtil.executeWithRetries(
() -> adminClient.get().deleteConsumerGroups(groups).all().get(),
e -> (e instanceof RetriableException)
|| (e instanceof GroupNotEmptyException && retryCount.getAndIncrement() < 5),
() -> Duration.of(3, ChronoUnit.SECONDS)
);
} catch (Exception e) {
throw new KafkaResponseGetFailedException("Failed to delete consumer groups: " + groups, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,22 @@
import static io.confluent.ksql.util.LimitedProxyBuilder.methodParams;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.services.KafkaConsumerGroupClient.ConsumerGroupSummary;
import io.confluent.ksql.util.LimitedProxyBuilder;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Set;

@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Methods invoked via reflection.
@SuppressWarnings("unused") // Methods invoked via reflection.
public final class SandboxedKafkaConsumerGroupClient {

static KafkaConsumerGroupClient createProxy(final KafkaConsumerGroupClient delegate) {
final SandboxedKafkaConsumerGroupClient sandbox =
new SandboxedKafkaConsumerGroupClient(delegate);

return LimitedProxyBuilder.forClass(KafkaConsumerGroupClient.class)
.forward("describeConsumerGroup", methodParams(String.class), sandbox)
.forward("listGroups", methodParams(), sandbox)
.forward("listConsumerGroupOffsets", methodParams(String.class), sandbox)
.forward("describeConsumerGroup", methodParams(String.class), delegate)
.forward("listGroups", methodParams(), delegate)
.forward("listConsumerGroupOffsets", methodParams(String.class), delegate)
.swallow("deleteConsumerGroups", methodParams(Set.class))
.build();
}

private final KafkaConsumerGroupClient delegate;

private SandboxedKafkaConsumerGroupClient(final KafkaConsumerGroupClient delegate) {
this.delegate = Objects.requireNonNull(delegate, "delegate");
}



public ConsumerGroupSummary describeConsumerGroup(final String groupId) {
return delegate.describeConsumerGroup(groupId);
}

public List<String> listGroups() {
return delegate.listGroups();
}

public Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(final String groupId) {
return delegate.listConsumerGroupOffsets(groupId);
private SandboxedKafkaConsumerGroupClient() {
}
}
Loading

0 comments on commit 2d1697a

Please sign in to comment.