Skip to content

Commit

Permalink
fix: drop succeeds even if missing topic or schema (#3131)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang authored Jul 29, 2019
1 parent b15c6d0 commit ba03d6f
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.exception;

import java.util.List;
import javafx.util.Pair;

public class KafkaDeleteTopicsException extends KafkaTopicClientException {
private final List<Pair<String, Throwable>> exceptionList;

public KafkaDeleteTopicsException(
final String message,
final List<Pair<String, Throwable>> failList) {
super(message);
exceptionList = failList;
}

public final List<Pair<String, Throwable>> getExceptionList() {
return exceptionList;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.services;

import com.google.common.collect.Lists;
import io.confluent.ksql.exception.KafkaDeleteTopicsException;
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.topic.TopicProperties;
import io.confluent.ksql.util.ExecutorUtil;
Expand All @@ -31,7 +32,9 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javafx.util.Pair;
import javax.annotation.concurrent.ThreadSafe;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
Expand All @@ -46,6 +49,7 @@
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -244,7 +248,7 @@ public void deleteTopics(final Collection<String> topicsToDelete) {
final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
final List<String> failList = Lists.newArrayList();

final List<Pair<String, Throwable>> exceptionList = Lists.newArrayList();
for (final Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) {
try {
entry.getValue().get(30, TimeUnit.SECONDS);
Expand All @@ -255,14 +259,17 @@ public void deleteTopics(final Collection<String> topicsToDelete) {
throw new TopicDeletionDisabledException("Topic deletion is disabled. "
+ "To delete the topic, you must set '" + DELETE_TOPIC_ENABLE + "' to true in "
+ "the Kafka broker configuration.");
} else if (!(rootCause instanceof UnknownTopicOrPartitionException)) {
LOG.error(String.format("Could not delete topic '%s'", entry.getKey()), e);
failList.add(entry.getKey());
exceptionList.add(new Pair<>(entry.getKey(), rootCause));
}

LOG.error(String.format("Could not delete topic '%s'", entry.getKey()), e);
failList.add(entry.getKey());
}
}

if (!failList.isEmpty()) {
throw new KsqlException("Failed to clean up topics: " + String.join(",", failList));
throw new KafkaDeleteTopicsException("Failed to clean up topics: "
+ String.join(",", failList), exceptionList);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
Expand All @@ -33,6 +35,7 @@
import io.confluent.ksql.util.ExecutorUtil;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;

import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
Expand All @@ -53,6 +56,8 @@ public class TopicDeleteInjector implements Injector {
private final KafkaTopicClient topicClient;
private final SchemaRegistryClient schemaRegistryClient;

private static final int SUBJECT_NOT_FOUND_ERROR_CODE = 40401;

public TopicDeleteInjector(
final KsqlExecutionContext executionContext,
final ServiceContext serviceContext
Expand Down Expand Up @@ -99,19 +104,18 @@ public <T extends Statement> ConfiguredStatement<T> inject(
() -> topicClient.deleteTopics(ImmutableList.of(source.getKafkaTopicName())),
ExecutorUtil.RetryBehaviour.ALWAYS);
} catch (Exception e) {
throw new KsqlException("Could not delete the corresponding kafka topic: "
+ source.getKafkaTopicName(), e);
throw new RuntimeException("Could not delete the corresponding kafka topic: "
+ sourceName, e);
}

if (source.getValueSerdeFactory().getFormat() == Format.AVRO) {
try {
try {
if (source.getValueSerdeFactory().getFormat() == Format.AVRO) {
SchemaRegistryUtil.deleteSubjectWithRetries(
schemaRegistryClient,
source.getKafkaTopicName() + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX);
} catch (final Exception e) {
throw new KsqlException("Could not clean up the schema registry for topic: "
+ source.getKafkaTopicName(), e);
schemaRegistryClient,
source.getKafkaTopicName() + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX);
}
} catch (final Exception e) {
checkSchemaError(e, source.getKafkaTopicName());
}
} else if (dropStatement.getIfExists()) {
throw new KsqlException("Could not find source to delete topic for: " + statement);
Expand All @@ -123,6 +127,14 @@ public <T extends Statement> ConfiguredStatement<T> inject(
return statement.withStatement(withoutDeleteText, withoutDelete);
}

private void checkSchemaError(final Exception error, final String sourceName) {
if (!(error instanceof RestClientException
&& ((RestClientException) error).getErrorCode() == SUBJECT_NOT_FOUND_ERROR_CODE)) {
throw new RuntimeException("Could not clean up the schema registry for topic: "
+ sourceName, error);
}
}

private void checkTopicRefs(final DataSource<?> source) {
final String topicName = source.getKafkaTopicName();
final String sourceName = source.getName();
Expand All @@ -133,7 +145,7 @@ private void checkTopicRefs(final DataSource<?> source) {
.filter(name -> !sourceName.equals(name))
.collect(Collectors.joining(", "));
if (!using.isEmpty()) {
throw new KsqlException(
throw new RuntimeException(
String.format(
"Refusing to delete topic. Found other data sources (%s) using topic %s",
using,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.niceMock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -30,6 +29,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.exception.KafkaDeleteTopicsException;
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.exception.KafkaTopicExistsException;
import io.confluent.ksql.util.KsqlConstants;
Expand All @@ -43,6 +43,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
Expand Down Expand Up @@ -274,15 +276,46 @@ public void shouldDeleteInternalTopics() {
verify(adminClient);
}

@Test(expected = TopicDeletionDisabledException.class)
public void shouldThrowTopicDeletionDisabledException()
throws InterruptedException, ExecutionException, TimeoutException {
// Given:
expect(adminClient.deleteTopics(anyObject())).andReturn(
deleteTopicException(new TopicDeletionDisabledException("error")));
replay(adminClient);

final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);

// When:
kafkaTopicClient.deleteTopics(Collections.singletonList(topicName1));
}

@Test(expected = KafkaDeleteTopicsException.class)
public void shouldThrowKafkaDeleteTopicsException()
throws InterruptedException, ExecutionException, TimeoutException {
// Given:
expect(adminClient.deleteTopics(anyObject())).andReturn(
(deleteTopicException(new Exception("error"))));
replay(adminClient);

final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);

// When:
kafkaTopicClient.deleteTopics(Collections.singletonList(topicName1));
}

@Test
public void shouldDeleteTopicThrowOnTopicDeletionDisabledException() {
public void shouldNotThrowKafkaDeleteTopicsExceptionWhenMissingTopic()
throws InterruptedException, ExecutionException, TimeoutException {
// Given:
expect(adminClient.deleteTopics(anyObject())).andReturn(getTopicDeletionDisableException());
expect(adminClient.deleteTopics(anyObject())).andReturn(
(deleteTopicException(new UnknownTopicOrPartitionException("error"))));
replay(adminClient);

final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);

// When:
kafkaTopicClient.deleteTopics(Collections.singletonList(topicName2));
kafkaTopicClient.deleteTopics(Collections.singletonList(topicName1));
}

@Test
Expand Down Expand Up @@ -559,20 +592,18 @@ private Collection<ConfigResource> describeBrokerRequest() {
return Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, node.idString()));
}

private DeleteTopicsResult getTopicDeletionDisableException() {
private static DeleteTopicsResult deleteTopicException(Exception e)
throws InterruptedException, ExecutionException, TimeoutException {
final DeleteTopicsResult deleteTopicsResult = mock(DeleteTopicsResult.class);
final KafkaFuture<Void> kafkaFuture = mock(KafkaFuture.class);

try {
expect(kafkaFuture.get()).andThrow(
new TopicDeletionDisabledException("Topic deletion is disabled")
);
} catch (final Exception e) {
// this should not happen in the test
}
expect(kafkaFuture.get(30, TimeUnit.SECONDS)).andThrow(
new ExecutionException(e)
);
replay(kafkaFuture);

expect(deleteTopicsResult.values())
.andReturn(Collections.singletonMap(topicName1, kafkaFuture));
.andReturn(Collections.singletonMap(topicName1, kafkaFuture));

replay(deleteTopicsResult);
return deleteTopicsResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doThrow;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.metastore.MutableMetaStore;
Expand All @@ -42,11 +44,11 @@
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -161,7 +163,7 @@ public void shouldThrowExceptionIfSourceDoesNotExist() {
"DROP SOMETHING", new DropStream(QualifiedName.of("SOMETHING_ELSE"), true, true));

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("Could not find source to delete topic for");

// When:
Expand Down Expand Up @@ -206,7 +208,7 @@ public void shouldThrowExceptionIfOtherSourcesUsingTopic() {
when(metaStore.getAllDataSources()).thenReturn(sources);

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(
"Refusing to delete topic. "
+ "Found other data sources (OTHER1, OTHER2) using topic something");
Expand All @@ -215,6 +217,17 @@ public void shouldThrowExceptionIfOtherSourcesUsingTopic() {
deleteInjector.inject(dropStatement);
}

@Test
public void shouldNotThrowIfSchemaIsMissing() throws IOException, RestClientException {
// Given:
when(source.getValueSerdeFactory()).thenReturn(new KsqlAvroSerdeFactory("foo"));
doThrow(new RestClientException("Subject not found.", 404, 40401))
.when(registryClient).deleteSubject("something" + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX);

// When:
deleteInjector.inject(DROP_WITH_DELETE_TOPIC);
}

private DataSource<?> givenSource(final String name, final String topicName) {
final DataSource source = mock(DataSource.class);
when(source.getName()).thenReturn(name);
Expand All @@ -232,5 +245,4 @@ private static <T extends Statement> ConfiguredStatement<T> givenStatement(
new KsqlConfig(ImmutableMap.of())
);
}

}

0 comments on commit ba03d6f

Please sign in to comment.