From 2d1697a9014c397ab8924cd12102cf374ebb166f Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Thu, 10 Sep 2020 15:49:32 -0700 Subject: [PATCH] =?UTF-8?q?fix:=20delete=20zombie=20consumer=20groups=20?= =?UTF-8?q?=F0=9F=A7=9F=20(#6160)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../confluent/ksql/engine/EngineContext.java | 32 +++-- .../io/confluent/ksql/engine/KsqlEngine.java | 23 +++- .../ksql/engine/QueryCleanupService.java | 130 ++++++++++++++++++ .../KafkaConsumerGroupClientImpl.java | 24 ++++ .../SandboxedKafkaConsumerGroupClient.java | 36 +---- .../io/confluent/ksql/util/ExecutorUtil.java | 2 +- .../confluent/ksql/engine/KsqlEngineTest.java | 81 ++++++++++- .../FakeKafkaConsumerGroupClient.java | 12 ++ .../services/KafkaConsumerGroupClient.java | 2 + .../stubs/StubKafkaConsumerGroupClient.java | 5 + 10 files changed, 302 insertions(+), 45 deletions(-) create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryCleanupService.java diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java index c07ce18db4d9..c4298fe81670 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java @@ -35,7 +35,6 @@ import io.confluent.ksql.query.QueryExecutor; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.QueryIdGenerator; -import io.confluent.ksql.schema.registry.SchemaRegistryUtil; import io.confluent.ksql.services.SandboxedServiceContext; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; @@ -72,19 +71,22 @@ final class EngineContext { private final KsqlParser parser; private final Map persistentQueries; private final Set allLiveQueries = ConcurrentHashMap.newKeySet(); + private final QueryCleanupService cleanupService; static EngineContext create( final ServiceContext serviceContext, final ProcessingLogContext processingLogContext, final MutableMetaStore metaStore, - final QueryIdGenerator queryIdGenerator + final QueryIdGenerator queryIdGenerator, + final QueryCleanupService cleanupService ) { return new EngineContext( serviceContext, processingLogContext, metaStore, queryIdGenerator, - new DefaultKsqlParser() + new DefaultKsqlParser(), + cleanupService ); } @@ -93,7 +95,8 @@ private EngineContext( final ProcessingLogContext processingLogContext, final MutableMetaStore metaStore, final QueryIdGenerator queryIdGenerator, - final KsqlParser parser + final KsqlParser parser, + final QueryCleanupService cleanupService ) { this.serviceContext = requireNonNull(serviceContext, "serviceContext"); this.metaStore = requireNonNull(metaStore, "metaStore"); @@ -103,6 +106,7 @@ private EngineContext( this.persistentQueries = new ConcurrentHashMap<>(); this.processingLogContext = requireNonNull(processingLogContext, "processingLogContext"); this.parser = requireNonNull(parser, "parser"); + this.cleanupService = requireNonNull(cleanupService, "cleanupService"); } EngineContext createSandbox(final ServiceContext serviceContext) { @@ -110,7 +114,8 @@ EngineContext createSandbox(final ServiceContext serviceContext) { SandboxedServiceContext.create(serviceContext), processingLogContext, metaStore.copy(), - queryIdGenerator.createSandbox() + queryIdGenerator.createSandbox(), + cleanupService ); persistentQueries.forEach((queryId, query) -> @@ -270,14 +275,19 @@ private void cleanupExternalQueryResources( ) { final String applicationId = query.getQueryApplicationId(); if (query.hasEverBeenStarted()) { - SchemaRegistryUtil.cleanupInternalTopicSchemas( - applicationId, - serviceContext.getSchemaRegistryClient(), - query instanceof TransientQueryMetadata); - - serviceContext.getTopicClient().deleteInternalTopics(applicationId); + cleanupService.addCleanupTask( + new QueryCleanupService.QueryCleanupTask( + serviceContext, + applicationId, + query instanceof TransientQueryMetadata + )); } StreamsErrorCollector.notifyApplicationClose(applicationId); } + + public void close(final boolean closeQueries) { + getAllLiveQueries().forEach(closeQueries ? QueryMetadata::close : QueryMetadata::stop); + + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java index 09055520c65c..10541f4f348b 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java @@ -15,6 +15,7 @@ package io.confluent.ksql.engine; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.ServiceInfo; @@ -47,6 +48,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +61,7 @@ public class KsqlEngine implements KsqlExecutionContext, Closeable { private final ScheduledExecutorService aggregateMetricsCollector; private final String serviceId; private final EngineContext primaryContext; + private final QueryCleanupService cleanupService; public KsqlEngine( final ServiceContext serviceContext, @@ -89,11 +92,13 @@ public KsqlEngine( final Function engineMetricsFactory, final QueryIdGenerator queryIdGenerator ) { + this.cleanupService = new QueryCleanupService(); this.primaryContext = EngineContext.create( serviceContext, processingLogContext, metaStore, - queryIdGenerator + queryIdGenerator, + cleanupService ); this.serviceId = Objects.requireNonNull(serviceId, "serviceId"); this.engineMetrics = engineMetricsFactory.apply(this); @@ -110,6 +115,8 @@ public KsqlEngine( 1000, TimeUnit.MILLISECONDS ); + + cleanupService.startAsync(); } public int numberOfLiveQueries() { @@ -154,6 +161,11 @@ public String getServiceId() { return serviceId; } + @VisibleForTesting + QueryCleanupService getCleanupService() { + return cleanupService; + } + @Override public KsqlExecutionContext createSandbox(final ServiceContext serviceContext) { return new SandboxedExecutionContext(primaryContext, serviceContext); @@ -235,6 +247,15 @@ public void close(final boolean closeQueries) { primaryContext.getAllLiveQueries() .forEach(closeQueries ? QueryMetadata::close : QueryMetadata::stop); + try { + cleanupService.stopAsync().awaitTerminated(30, TimeUnit.SECONDS); + } catch (TimeoutException e) { + log.warn("Timed out while closing cleanup service. " + + "External resources for the following applications may be orphaned: {}", + cleanupService.pendingApplicationIds() + ); + } + engineMetrics.close(); aggregateMetricsCollector.shutdown(); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryCleanupService.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryCleanupService.java new file mode 100644 index 000000000000..5b0605611047 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryCleanupService.java @@ -0,0 +1,130 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.engine; + +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import io.confluent.ksql.schema.registry.SchemaRegistryUtil; +import io.confluent.ksql.services.ServiceContext; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@code QueryCleanupService} helps cleanup external resources from queries + * out of the main line of query execution. This ensures that tasks that might + * take a long time don't happen on the CLI feedback path (such as cleaning up + * consumer groups). + * + *

NOTE: this cleanup service is intended to be used across threads and across + * real/sandboxed engines.

+ */ +@SuppressWarnings("UnstableApiUsage") +class QueryCleanupService extends AbstractExecutionThreadService { + + private static final Logger LOG = LoggerFactory.getLogger(QueryCleanupService.class); + private static final Runnable SHUTDOWN_SENTINEL = () -> { }; + + private final BlockingQueue cleanupTasks; + + QueryCleanupService() { + cleanupTasks = new LinkedBlockingDeque<>(); + } + + @Override + protected void run() { + try { + while (true) { + final Runnable task = cleanupTasks.take(); + if (task == SHUTDOWN_SENTINEL) { + return; + } + + task.run(); + } + } catch (final InterruptedException e) { + // gracefully exit if this method was interrupted and reset + // the interrupt flag + Thread.currentThread().interrupt(); + } + } + + @Override + protected void triggerShutdown() { + cleanupTasks.add(SHUTDOWN_SENTINEL); + } + + public Set pendingApplicationIds() { + return cleanupTasks.stream() + .filter(QueryCleanupTask.class::isInstance) + .map(QueryCleanupTask.class::cast) + .map(t -> t.appId).collect(ImmutableSet.toImmutableSet()); + } + + public boolean isEmpty() { + return cleanupTasks.isEmpty(); + } + + public void addCleanupTask(final QueryCleanupTask task) { + cleanupTasks.add(task); + } + + static class QueryCleanupTask implements Runnable { + private final String appId; + private final boolean isTransient; + private final ServiceContext serviceContext; + + QueryCleanupTask( + final ServiceContext serviceContext, + final String appId, + final boolean isTransient + ) { + this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); + this.appId = Objects.requireNonNull(appId, "appId"); + this.isTransient = isTransient; + } + + @Override + public void run() { + tryRun( + () -> SchemaRegistryUtil.cleanupInternalTopicSchemas( + appId, + serviceContext.getSchemaRegistryClient(), + isTransient), + "internal topic schemas" + ); + + tryRun(() -> serviceContext.getTopicClient().deleteInternalTopics(appId), "internal topics"); + tryRun( + () -> serviceContext + .getConsumerGroupClient() + .deleteConsumerGroups(ImmutableSet.of(appId)), + "internal consumer groups"); + } + + private void tryRun(final Runnable runnable, final String resource) { + try { + runnable.run(); + } catch (final Exception e) { + LOG.warn("Failed to cleanup {} for {}", resource, appId, e); + } + } + } + +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaConsumerGroupClientImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaConsumerGroupClientImpl.java index 76df012d3c9d..f727fa01bade 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaConsumerGroupClientImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaConsumerGroupClientImpl.java @@ -19,10 +19,13 @@ import io.confluent.ksql.exception.KsqlGroupAuthorizationException; import io.confluent.ksql.util.ExecutorUtil; import io.confluent.ksql.util.ExecutorUtil.RetryBehaviour; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.Admin; @@ -32,6 +35,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.RetriableException; public class KafkaConsumerGroupClientImpl implements KafkaConsumerGroupClient { @@ -100,4 +105,23 @@ public Map listConsumerGroupOffsets(final Str throw new KafkaResponseGetFailedException("Failed to list Kafka consumer groups offsets", e); } } + + @Override + public void deleteConsumerGroups(final Set groups) { + final AtomicInteger retryCount = new AtomicInteger(0); + try { + // it takes heartbeat.interval.ms after a consumer is closed for the broker + // to recognize that there are no more consumers in the consumer group - for + // that reason, we retry after 3 seconds (the default heartbeat.interval.ms) + // in the case that we get a GroupNotEmptyException + ExecutorUtil.executeWithRetries( + () -> adminClient.get().deleteConsumerGroups(groups).all().get(), + e -> (e instanceof RetriableException) + || (e instanceof GroupNotEmptyException && retryCount.getAndIncrement() < 5), + () -> Duration.of(3, ChronoUnit.SECONDS) + ); + } catch (Exception e) { + throw new KafkaResponseGetFailedException("Failed to delete consumer groups: " + groups, e); + } + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaConsumerGroupClient.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaConsumerGroupClient.java index f018353a579c..5fed358e18f8 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaConsumerGroupClient.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaConsumerGroupClient.java @@ -18,46 +18,22 @@ import static io.confluent.ksql.util.LimitedProxyBuilder.methodParams; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import io.confluent.ksql.services.KafkaConsumerGroupClient.ConsumerGroupSummary; import io.confluent.ksql.util.LimitedProxyBuilder; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; +import java.util.Set; @SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Methods invoked via reflection. @SuppressWarnings("unused") // Methods invoked via reflection. public final class SandboxedKafkaConsumerGroupClient { static KafkaConsumerGroupClient createProxy(final KafkaConsumerGroupClient delegate) { - final SandboxedKafkaConsumerGroupClient sandbox = - new SandboxedKafkaConsumerGroupClient(delegate); - return LimitedProxyBuilder.forClass(KafkaConsumerGroupClient.class) - .forward("describeConsumerGroup", methodParams(String.class), sandbox) - .forward("listGroups", methodParams(), sandbox) - .forward("listConsumerGroupOffsets", methodParams(String.class), sandbox) + .forward("describeConsumerGroup", methodParams(String.class), delegate) + .forward("listGroups", methodParams(), delegate) + .forward("listConsumerGroupOffsets", methodParams(String.class), delegate) + .swallow("deleteConsumerGroups", methodParams(Set.class)) .build(); } - private final KafkaConsumerGroupClient delegate; - - private SandboxedKafkaConsumerGroupClient(final KafkaConsumerGroupClient delegate) { - this.delegate = Objects.requireNonNull(delegate, "delegate"); - } - - - - public ConsumerGroupSummary describeConsumerGroup(final String groupId) { - return delegate.describeConsumerGroup(groupId); - } - - public List listGroups() { - return delegate.listGroups(); - } - - public Map listConsumerGroupOffsets(final String groupId) { - return delegate.listConsumerGroupOffsets(groupId); + private SandboxedKafkaConsumerGroupClient() { } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/ExecutorUtil.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/ExecutorUtil.java index ea7e90679009..535ca45cca12 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/ExecutorUtil.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/ExecutorUtil.java @@ -92,7 +92,7 @@ public static T executeWithRetries( } catch (final Exception e) { final Throwable cause = e instanceof ExecutionException ? e.getCause() : e; if (shouldRetry.test(cause)) { - log.info("Retrying request. Retry no: " + retries, e); + log.info("Retrying request. Retry no: {} Cause: '{}'", retries, e.getMessage()); lastException = e; } else if (cause instanceof Exception) { throw (Exception) cause; diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java index 3f34092103f2..74d524300dc0 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java @@ -36,13 +36,13 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import com.google.common.collect.Iterables; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; @@ -50,6 +50,7 @@ import io.confluent.ksql.KsqlConfigTestUtil; import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.KsqlExecutionContext.ExecuteResult; +import io.confluent.ksql.engine.QueryCleanupService.QueryCleanupTask; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.metastore.MutableMetaStore; import io.confluent.ksql.name.SourceName; @@ -63,6 +64,7 @@ import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.SystemColumns; import io.confluent.ksql.schema.registry.SchemaRegistryUtil; +import io.confluent.ksql.services.FakeKafkaConsumerGroupClient; import io.confluent.ksql.services.FakeKafkaTopicClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.TestServiceContext; @@ -74,7 +76,6 @@ import io.confluent.ksql.util.MetaStoreFixture; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -82,6 +83,7 @@ import java.util.HashMap; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.avro.Schema; @@ -711,6 +713,7 @@ public void shouldCleanUpInternalTopicsOnClose() { query.close(); // Then: + awaitCleanupComplete(); verify(topicClient).deleteInternalTopics(query.getQueryApplicationId()); } @@ -756,12 +759,65 @@ public void shouldHardDeleteSchemaOnEngineCloseForTransientQueries() throws IOEx query.close(); // Then: + awaitCleanupComplete(); verify(schemaRegistryClient, times(2)).deleteSubject(any()); verify(schemaRegistryClient).deleteSubject(internalTopic1, true); verify(schemaRegistryClient).deleteSubject(internalTopic2, true); verify(schemaRegistryClient, never()).deleteSubject("subject2"); } + @Test + public void shouldCleanUpConsumerGroupsOnClose() { + // Given: + final QueryMetadata query = KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "create table bar as select * from test2;", + KSQL_CONFIG, Collections.emptyMap() + ).get(0); + + query.start(); + + // When: + query.close(); + + // Then: + awaitCleanupComplete(); + final Set deletedConsumerGroups = ( + (FakeKafkaConsumerGroupClient) serviceContext.getConsumerGroupClient() + ).getDeletedConsumerGroups(); + + assertThat( + Iterables.getOnlyElement(deletedConsumerGroups), + containsString("_confluent-ksql-default_query_CTAS_BAR_0")); + } + + @Test + public void shouldCleanUpTransientConsumerGroupsOnClose() { + // Given: + final QueryMetadata query = KsqlEngineTestUtil.executeQuery( + serviceContext, + ksqlEngine, + "select * from test1 EMIT CHANGES;", + KSQL_CONFIG, Collections.emptyMap() + ); + + query.start(); + + // When: + query.close(); + + // Then: + awaitCleanupComplete(); + final Set deletedConsumerGroups = ( + (FakeKafkaConsumerGroupClient) serviceContext.getConsumerGroupClient() + ).getDeletedConsumerGroups(); + + assertThat( + Iterables.getOnlyElement(deletedConsumerGroups), + containsString("_confluent-ksql-default_transient_")); + } + @Test public void shouldNotCleanUpInternalTopicsOnEngineCloseForPersistentQueries() { // Given: @@ -797,6 +853,7 @@ public void shouldCleanUpInternalTopicsOnQueryCloseForPersistentQueries() { query.get(0).close(); // Then (there are no transient queries, so no internal topics should be deleted): + awaitCleanupComplete(); verify(topicClient).deleteInternalTopics(query.get(0).getQueryApplicationId()); } @@ -823,6 +880,7 @@ public void shouldNotHardDeleteSubjectForPersistentQuery() throws IOException, R query.get(0).close(); // Then: + awaitCleanupComplete(); verify(schemaRegistryClient, times(2)).deleteSubject(any()); verify(schemaRegistryClient, never()).deleteSubject(internalTopic1, true); verify(schemaRegistryClient, never()).deleteSubject(internalTopic2, true); @@ -1499,6 +1557,25 @@ private PreparedStatement prepare(final ParsedStatement stmt) { return ksqlEngine.prepare(stmt); } + private void awaitCleanupComplete() { + // add a task to the end of the queue to make sure that + // we've finished processing everything up until this point + ksqlEngine.getCleanupService().addCleanupTask(new QueryCleanupTask(serviceContext, "", false) { + @Override + public void run() { + // do nothing + } + }); + + // busy wait is fine here because this should only be + // used in tests - if we ever have the need to make this + // production ready, then we should properly implement this + // with a condition variable wait/notify pattern + while (!ksqlEngine.getCleanupService().isEmpty()) { + Thread.yield(); + } + } + private void givenTopicWithSchema(final String topicName, final Schema schema) { try { givenTopicsExist(1, topicName); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/FakeKafkaConsumerGroupClient.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/FakeKafkaConsumerGroupClient.java index ef8693f12c3f..6df8b68c5334 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/FakeKafkaConsumerGroupClient.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/FakeKafkaConsumerGroupClient.java @@ -3,6 +3,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.exception.KafkaResponseGetFailedException; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -14,6 +15,8 @@ public class FakeKafkaConsumerGroupClient implements KafkaConsumerGroupClient { private static final List groups = ImmutableList.of("cg1", "cg2"); + private Set deletedConsumerGroups = new HashSet<>(); + @Override public List listGroups() { return groups; @@ -49,4 +52,13 @@ public Map listConsumerGroupOffsets(String gr ); } } + + @Override + public void deleteConsumerGroups(final Set groups) { + deletedConsumerGroups.addAll(groups); + } + + public Set getDeletedConsumerGroups() { + return ImmutableSet.copyOf(deletedConsumerGroups); + } } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/services/KafkaConsumerGroupClient.java b/ksqldb-execution/src/main/java/io/confluent/ksql/services/KafkaConsumerGroupClient.java index 201dcaf9b6c6..7b041fbd1ba1 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/services/KafkaConsumerGroupClient.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/services/KafkaConsumerGroupClient.java @@ -33,6 +33,8 @@ public interface KafkaConsumerGroupClient { Map listConsumerGroupOffsets(String group); + void deleteConsumerGroups(Set groups); + /** * API POJOs */ diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/stubs/StubKafkaConsumerGroupClient.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/stubs/StubKafkaConsumerGroupClient.java index 31598c60758c..7d13d279571e 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/stubs/StubKafkaConsumerGroupClient.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/stubs/StubKafkaConsumerGroupClient.java @@ -65,4 +65,9 @@ public Map listConsumerGroupOffsets(final Str ); } } + + @Override + public void deleteConsumerGroups(final Set groups) { + // do nothing + } }