Skip to content

Commit

Permalink
feat: hard delete schemas for push queries (#6061)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang authored Aug 20, 2020
1 parent 8f00a5c commit a036b8b
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,11 @@ private void unregisterQuery(final ServiceContext serviceContext, final QueryMet
}

if (query.hasEverBeenStarted()) {
SchemaRegistryUtil
.cleanupInternalTopicSchemas(applicationId, serviceContext.getSchemaRegistryClient());
SchemaRegistryUtil.cleanupInternalTopicSchemas(
applicationId,
serviceContext.getSchemaRegistryClient(),
query instanceof TransientQueryMetadata);

serviceContext.getTopicClient().deleteInternalTopics(applicationId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public List<Integer> deleteSubject(final String s) {
return ImmutableList.of();
}

@Override
public List<Integer> deleteSubject(final String s, final boolean b) {
return ImmutableList.of();
}

@Override
public List<Integer> deleteSubject(final Map<String, String> map, final String s) {
return ImmutableList.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,26 @@ public final class SchemaRegistryUtil {

private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryUtil.class);

private static final String CHANGE_LOG_SUFFIX = KsqlConstants.STREAMS_CHANGELOG_TOPIC_SUFFIX
public static final String CHANGE_LOG_SUFFIX = KsqlConstants.STREAMS_CHANGELOG_TOPIC_SUFFIX
+ KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX;

private static final String REPARTITION_SUFFIX = KsqlConstants.STREAMS_REPARTITION_TOPIC_SUFFIX
public static final String REPARTITION_SUFFIX = KsqlConstants.STREAMS_REPARTITION_TOPIC_SUFFIX
+ KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX;

private SchemaRegistryUtil() {
}

public static void cleanupInternalTopicSchemas(
final String applicationId,
final SchemaRegistryClient schemaRegistryClient
final SchemaRegistryClient schemaRegistryClient,
final boolean isPermanent
) {
getInternalSubjectNames(applicationId, schemaRegistryClient)
.forEach(subject -> tryDeleteInternalSubject(applicationId, schemaRegistryClient, subject));
.forEach(subject -> tryDeleteInternalSubject(
applicationId,
schemaRegistryClient,
subject,
isPermanent));
}

public static Stream<String> getSubjectNames(final SchemaRegistryClient schemaRegistryClient) {
Expand All @@ -67,6 +72,13 @@ public static void deleteSubjectWithRetries(
ExecutorUtil.executeWithRetries(() -> schemaRegistryClient.deleteSubject(subject), ALWAYS);
}

private static void hardDeleteSubjectWithRetries(
final SchemaRegistryClient schemaRegistryClient,
final String subject) throws Exception {
ExecutorUtil.executeWithRetries(
() -> schemaRegistryClient.deleteSubject(subject, true), ALWAYS);
}

private static Stream<String> getInternalSubjectNames(
final String applicationId,
final SchemaRegistryClient schemaRegistryClient
Expand All @@ -83,10 +95,14 @@ private static Stream<String> getInternalSubjectNames(
private static void tryDeleteInternalSubject(
final String applicationId,
final SchemaRegistryClient schemaRegistryClient,
final String subjectName
final String subjectName,
final boolean isPermanent
) {
try {
deleteSubjectWithRetries(schemaRegistryClient, subjectName);
if (isPermanent) {
hardDeleteSubjectWithRetries(schemaRegistryClient, subjectName);
}
} catch (final Exception e) {
LOG.warn("Could not clean up the schema registry for"
+ " query: " + applicationId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.KsqlConfigTestUtil;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.KsqlExecutionContext.ExecuteResult;
Expand All @@ -58,6 +62,7 @@
import io.confluent.ksql.parser.tree.DropTable;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.SystemColumns;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.services.FakeKafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.services.TestServiceContext;
Expand All @@ -69,6 +74,8 @@
import io.confluent.ksql.util.MetaStoreFixture;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -726,6 +733,35 @@ public void shouldCleanUpInternalTopicsOnEngineCloseForTransientQueries() {
verify(topicClient).deleteInternalTopics(query.getQueryApplicationId());
}

@Test
public void shouldHardDeleteSchemaOnEngineCloseForTransientQueries() throws IOException, RestClientException {
// Given:
final QueryMetadata query = KsqlEngineTestUtil.executeQuery(
serviceContext,
ksqlEngine,
"select * from test1 EMIT CHANGES;",
KSQL_CONFIG, Collections.emptyMap()
);
final String internalTopic1 = query.getQueryApplicationId() + "-subject1" + SchemaRegistryUtil.CHANGE_LOG_SUFFIX;
final String internalTopic2 = query.getQueryApplicationId() + "-subject3" + SchemaRegistryUtil.REPARTITION_SUFFIX;
when(schemaRegistryClient.getAllSubjects()).thenReturn(
Arrays.asList(
internalTopic1,
"subject2",
internalTopic2));

query.start();

// When:
query.close();

// Then:
verify(schemaRegistryClient, times(2)).deleteSubject(any());
verify(schemaRegistryClient).deleteSubject(internalTopic1, true);
verify(schemaRegistryClient).deleteSubject(internalTopic2, true);
verify(schemaRegistryClient, never()).deleteSubject("subject2");
}

@Test
public void shouldNotCleanUpInternalTopicsOnEngineCloseForPersistentQueries() {
// Given:
Expand Down Expand Up @@ -764,6 +800,34 @@ public void shouldCleanUpInternalTopicsOnQueryCloseForPersistentQueries() {
verify(topicClient).deleteInternalTopics(query.get(0).getQueryApplicationId());
}

@Test
public void shouldNotHardDeleteSubjectForPersistentQuery() throws IOException, RestClientException {
// Given:
final List<QueryMetadata> query = KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"create stream persistent as select * from test1 EMIT CHANGES;",
KSQL_CONFIG, Collections.emptyMap()
);
final String applicationId = query.get(0).getQueryApplicationId();
final String internalTopic1 = applicationId + "-subject1" + SchemaRegistryUtil.CHANGE_LOG_SUFFIX;
final String internalTopic2 = applicationId + "-subject3" + SchemaRegistryUtil.REPARTITION_SUFFIX;
when(schemaRegistryClient.getAllSubjects()).thenReturn(
Arrays.asList(
internalTopic1,
"subject2",
internalTopic2));
query.get(0).start();

// When:
query.get(0).close();

// Then:
verify(schemaRegistryClient, times(2)).deleteSubject(any());
verify(schemaRegistryClient, never()).deleteSubject(internalTopic1, true);
verify(schemaRegistryClient, never()).deleteSubject(internalTopic2, true);
}

@Test
public void shouldNotCleanUpInternalTopicsOnCloseIfQueryNeverStarted() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.schema.registry;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -25,6 +26,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

Expand All @@ -44,7 +46,7 @@ public void shouldDeleteChangeLogTopicSchema() throws Exception {
));

// When:
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient, false);

// Then not exception:
verify(schemaRegistryClient).deleteSubject(APP_ID + "SOME-changelog-value");
Expand All @@ -58,12 +60,28 @@ public void shouldDeleteRepartitionTopicSchema() throws Exception {
));

// When:
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient, false);

// Then not exception:
verify(schemaRegistryClient).deleteSubject(APP_ID + "SOME-repartition-value");
}

@Test
public void shouldHardDeleteIfFlagSet() throws Exception {
// Given:
when(schemaRegistryClient.getAllSubjects()).thenReturn(ImmutableList.of(
APP_ID + "SOME-repartition-value"
));

// When:
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient, true);

// Then not exception:
final InOrder inOrder = inOrder(schemaRegistryClient);
inOrder.verify(schemaRegistryClient).deleteSubject(APP_ID + "SOME-repartition-value");
inOrder.verify(schemaRegistryClient).deleteSubject(APP_ID + "SOME-repartition-value", true);
}

@Test
public void shouldNotDeleteOtherSchemasForThisApplicationId() throws Exception {
// Given:
Expand All @@ -72,7 +90,7 @@ public void shouldNotDeleteOtherSchemasForThisApplicationId() throws Exception {
));

// When:
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient, false);

// Then not exception:
verify(schemaRegistryClient, never()).deleteSubject(any());
Expand All @@ -86,7 +104,7 @@ public void shouldNotDeleteOtherSchemas() throws Exception {
));

// When:
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient, false);

// Then not exception:
verify(schemaRegistryClient, never()).deleteSubject(any());
Expand All @@ -98,7 +116,7 @@ public void shouldNotThrowIfAllSubjectsThrows() throws Exception {
when(schemaRegistryClient.getAllSubjects()).thenThrow(new RuntimeException("Boom!"));

// When:
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient, false);

// Then not exception:
verify(schemaRegistryClient).getAllSubjects();
Expand All @@ -114,7 +132,7 @@ public void shouldNotThrowIfDeleteSubjectThrows() throws Exception {
when(schemaRegistryClient.deleteSubject(any())).thenThrow(new RuntimeException("Boom!"));

// When:
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient, false);

// Then not exception:
verify(schemaRegistryClient, times(5)).deleteSubject(any());
Expand Down

0 comments on commit a036b8b

Please sign in to comment.