From 14d0797b230271f2cb4ca705163e56f487670efd Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Wed, 3 Mar 2021 11:32:56 -0800 Subject: [PATCH] feat(migrations): implement clean command --- .../tools/migrations/MigrationConfig.java | 12 +- .../commands/CleanMigrationsCommand.java | 182 ++++++++++++- .../commands/InitializeMigrationCommand.java | 38 +-- .../migrations/util/ServerVersionUtil.java | 23 ++ .../ksql/tools/migrations/MigrationsTest.java | 86 ++++++- .../commands/CleanMigrationsCommandTest.java | 240 ++++++++++++++++++ .../InitializeMigrationCommandTest.java | 2 +- 7 files changed, 537 insertions(+), 46 deletions(-) create mode 100644 ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/CleanMigrationsCommandTest.java diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/MigrationConfig.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/MigrationConfig.java index 5a442a78879f..2e96cd9307f3 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/MigrationConfig.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/MigrationConfig.java @@ -29,9 +29,9 @@ public final class MigrationConfig extends AbstractConfig { public static final String KSQL_SERVER_URL = "ksql.server.url"; public static final String KSQL_MIGRATIONS_STREAM_NAME = "ksql.migrations.stream.name"; - public static final String KSQL_MIGRATIONS_STREAM_NAME_DEFAULT = "migration_events"; + public static final String KSQL_MIGRATIONS_STREAM_NAME_DEFAULT = "MIGRATION_EVENTS"; public static final String KSQL_MIGRATIONS_TABLE_NAME = "ksql.migrations.table.name"; - public static final String KSQL_MIGRATIONS_TABLE_NAME_DEFAULT = "migration_schema_versions"; + public static final String KSQL_MIGRATIONS_TABLE_NAME_DEFAULT = "MIGRATION_SCHEMA_VERSIONS"; public static final String KSQL_MIGRATIONS_STREAM_TOPIC_NAME = "ksql.migrations.stream.topic.name"; public static final String KSQL_MIGRATIONS_TABLE_TOPIC_NAME = "ksql.migrations.table.topic.name"; @@ -73,16 +73,16 @@ private MigrationConfig(final Map configs, final String id) { id + "ksql_" + configs .getOrDefault(KSQL_MIGRATIONS_STREAM_NAME, KSQL_MIGRATIONS_STREAM_NAME_DEFAULT), Importance.MEDIUM, - "The name of the migration stream topic. It defaults to " + id + "ksql_" + configs - .getOrDefault(KSQL_MIGRATIONS_STREAM_NAME, KSQL_MIGRATIONS_STREAM_NAME_DEFAULT) + "The name of the migration stream topic. It defaults to " + + "'ksql_'" ).define( KSQL_MIGRATIONS_TABLE_TOPIC_NAME, Type.STRING, id + "ksql_" + configs .getOrDefault(KSQL_MIGRATIONS_TABLE_NAME, KSQL_MIGRATIONS_TABLE_NAME_DEFAULT), Importance.MEDIUM, - "The name of the migration table topic. It defaults to" + id + "ksql_" + configs - .getOrDefault(KSQL_MIGRATIONS_TABLE_NAME, KSQL_MIGRATIONS_TABLE_NAME_DEFAULT) + "The name of the migration table topic. It defaults to " + + "'ksql_'" ).define( KSQL_MIGRATIONS_TOPIC_REPLICAS, Type.INT, diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/CleanMigrationsCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/CleanMigrationsCommand.java index 7094f7933bff..2b24dd815cba 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/CleanMigrationsCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/CleanMigrationsCommand.java @@ -16,11 +16,26 @@ package io.confluent.ksql.tools.migrations.commands; import com.github.rvesse.airline.annotations.Command; +import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.api.client.Client; +import io.confluent.ksql.api.client.QueryInfo; +import io.confluent.ksql.api.client.SourceDescription; +import io.confluent.ksql.api.client.StreamInfo; +import io.confluent.ksql.api.client.TableInfo; +import io.confluent.ksql.tools.migrations.MigrationConfig; +import io.confluent.ksql.tools.migrations.MigrationException; +import io.confluent.ksql.tools.migrations.util.MigrationsUtil; +import io.confluent.ksql.tools.migrations.util.ServerVersionUtil; +import io.confluent.ksql.util.KsqlException; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Command( - name = "new", + name = "clean", description = "Cleans all resources related to migrations. WARNING: this is not reversible!" ) public class CleanMigrationsCommand extends BaseCommand { @@ -29,7 +44,47 @@ public class CleanMigrationsCommand extends BaseCommand { @Override protected int command() { - throw new UnsupportedOperationException(); + if (!validateConfigFilePresent()) { + return 1; + } + + final MigrationConfig config; + try { + config = MigrationConfig.load(configFile); + } catch (KsqlException | MigrationException e) { + LOGGER.error(e.getMessage()); + return 1; + } + + return command(config, MigrationsUtil::getKsqlClient); + } + + @VisibleForTesting + int command( + final MigrationConfig config, + final Function clientSupplier + ) { + final String streamName = config.getString(MigrationConfig.KSQL_MIGRATIONS_STREAM_NAME); + final String tableName = config.getString(MigrationConfig.KSQL_MIGRATIONS_TABLE_NAME); + + final Client ksqlClient; + try { + ksqlClient = clientSupplier.apply(config); + } catch (MigrationException e) { + LOGGER.error(e.getMessage()); + return 1; + } + + if (ServerVersionUtil.serverVersionCompatible(ksqlClient, config) + && deleteMigrationsTable(ksqlClient, tableName) + && deleteMigrationsStream(ksqlClient, streamName)) { + LOGGER.info("Migrations metadata cleaned successfully"); + ksqlClient.close(); + return 0; + } else { + ksqlClient.close(); + return 1; + } } @Override @@ -37,4 +92,127 @@ protected Logger getLogger() { return LOGGER; } + private boolean deleteMigrationsTable(final Client ksqlClient, final String tableName) { + try { + if (!sourceExists(ksqlClient, tableName, true)) { + // nothing to delete + return true; + } + + final SourceDescription tableInfo = getSourceInfo(ksqlClient, tableName, true); + terminateQueryForTable(ksqlClient, tableInfo); + dropSource(ksqlClient, tableName, true); + + return true; + } catch (MigrationException e) { + LOGGER.error(e.getMessage()); + return false; + } + } + + private boolean deleteMigrationsStream(final Client ksqlClient, final String streamName) { + try { + if (!sourceExists(ksqlClient, streamName, false)) { + // nothing to delete + return true; + } + + dropSource(ksqlClient, streamName, false); + return true; + } catch (MigrationException e) { + LOGGER.error(e.getMessage()); + return false; + } + } + + private static boolean sourceExists( + final Client ksqlClient, + final String sourceName, + final boolean isTable + ) { + try { + if (isTable) { + final List tables = ksqlClient.listTables().get(); + return tables.stream() + .anyMatch(tableInfo -> tableInfo.getName().equalsIgnoreCase(sourceName)); + } else { + final List streams = ksqlClient.listStreams().get(); + return streams.stream() + .anyMatch(streamInfo -> streamInfo.getName().equalsIgnoreCase(sourceName)); + } + } catch (InterruptedException | ExecutionException e) { + throw new MigrationException(String.format( + "Failed to check for presence of metadata %s '%s': %s", + isTable ? "table" : "stream", + sourceName, + e.getMessage())); + } + } + + /** + * Returns the source description, assuming the source exists. + */ + private static SourceDescription getSourceInfo( + final Client ksqlClient, + final String sourceName, + final boolean isTable + ) { + try { + return ksqlClient.describeSource(sourceName).get(); + } catch (InterruptedException | ExecutionException e) { + throw new MigrationException(String.format("Failed to describe metadata %s '%s': %s", + isTable ? "table" : "stream", + sourceName, + e.getMessage())); + } + } + + private static void terminateQueryForTable( + final Client ksqlClient, + final SourceDescription tableDesc + ) { + final List queries = tableDesc.writeQueries(); + if (queries.size() == 0) { + LOGGER.info("Found 0 queries writing to metadata table"); + return; + } + + if (queries.size() > 1) { + throw new MigrationException("Found multiple queries writing to metadata table. Query IDs: " + + queries.stream() + .map(QueryInfo::getId) + .collect(Collectors.joining("', '", "'", "'."))); + } + + final String queryId = queries.get(0).getId(); + LOGGER.info("Found 1 query writing to metadata table. Query ID: {}", queryId); + + LOGGER.info("Terminating query with ID: {}", queryId); + try { + ksqlClient.executeStatement("TERMINATE " + queryId + ";").get(); + } catch (InterruptedException | ExecutionException e) { + throw new MigrationException(String.format( + "Failed to terminate query populating metadata table. Query ID: %s. Error: %s", + queryId, e.getMessage())); + } + } + + private static void dropSource( + final Client ksqlClient, + final String sourceName, + final boolean isTable + ) { + final String sourceType = isTable ? "table" : "stream"; + LOGGER.info("Dropping migrations metadata {}: {}", sourceType, sourceName); + try { + final String sql = String.format("DROP %s %s DELETE TOPIC;", + sourceType.toUpperCase(), sourceName); + ksqlClient.executeStatement(sql).get(); + } catch (InterruptedException | ExecutionException e) { + throw new MigrationException(String.format("Failed to drop metadata %s '%s': %s", + sourceType, + sourceName, + e.getMessage())); + } + } } diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommand.java index 8ee061db1a83..37c427a5928e 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommand.java @@ -18,7 +18,6 @@ import com.github.rvesse.airline.annotations.Command; import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.api.client.Client; -import io.confluent.ksql.api.client.ServerInfo; import io.confluent.ksql.tools.migrations.MigrationConfig; import io.confluent.ksql.tools.migrations.MigrationException; import io.confluent.ksql.tools.migrations.util.MigrationsUtil; @@ -57,8 +56,12 @@ private String createEventStream(final String name, final String topic, final in + ");\n"; } - private String createVersionTable(final String name, final String topic) { - return "CREATE TABLE " + name + "\n" + private String createVersionTable( + final String tableName, + final String streamName, + final String topic + ) { + return "CREATE TABLE " + tableName + "\n" + " WITH (\n" + " KAFKA_TOPIC='" + topic + "'\n" + " )\n" @@ -71,7 +74,7 @@ private String createVersionTable(final String name, final String topic) { + " latest_by_offset(started_on) AS started_on, \n" + " latest_by_offset(completed_on) AS completed_on, \n" + " latest_by_offset(previous) AS previous\n" - + " FROM migration_events \n" + + " FROM " + streamName + " \n" + " GROUP BY version_key;\n"; } @@ -106,6 +109,7 @@ int command( ); final String versionTableCommand = createVersionTable( tableName, + streamName, config.getString(MigrationConfig.KSQL_MIGRATIONS_TABLE_TOPIC_NAME) ); @@ -117,10 +121,10 @@ int command( return 1; } - if (serverVersionCompatible(ksqlClient, config) + if (ServerVersionUtil.serverVersionCompatible(ksqlClient, config) && tryCreate(ksqlClient, eventStreamCommand, streamName, true) && tryCreate(ksqlClient, versionTableCommand, tableName, false)) { - LOGGER.info("Schema metadata initialized successfully"); + LOGGER.info("Migrations metadata initialized successfully"); ksqlClient.close(); } else { ksqlClient.close(); @@ -130,28 +134,6 @@ && tryCreate(ksqlClient, versionTableCommand, tableName, false)) { return 0; } - private static boolean serverVersionCompatible( - final Client ksqlClient, - final MigrationConfig config - ) { - final String ksqlServerUrl = config.getString(MigrationConfig.KSQL_SERVER_URL); - final ServerInfo serverInfo; - try { - serverInfo = ServerVersionUtil.getServerInfo(ksqlClient, ksqlServerUrl); - } catch (MigrationException e) { - LOGGER.error("Failed to get server info to verify version compatibility: {}", e.getMessage()); - return false; - } - - final String serverVersion = serverInfo.getServerVersion(); - try { - return ServerVersionUtil.isSupportedVersion(serverVersion); - } catch (MigrationException e) { - LOGGER.warn(e.getMessage() + ". Proceeding anyway."); - return true; - } - } - private static boolean tryCreate( final Client client, final String command, diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/ServerVersionUtil.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/ServerVersionUtil.java index 32293528001e..290a546b5556 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/ServerVersionUtil.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/ServerVersionUtil.java @@ -17,6 +17,7 @@ import io.confluent.ksql.api.client.Client; import io.confluent.ksql.api.client.ServerInfo; +import io.confluent.ksql.tools.migrations.MigrationConfig; import io.confluent.ksql.tools.migrations.MigrationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -84,6 +85,28 @@ public static boolean versionSupportsMultiKeyPullQuery(final String ksqlServerVe return version.isAtLeast(6, 1, 0, 14); } + public static boolean serverVersionCompatible( + final Client ksqlClient, + final MigrationConfig config + ) { + final String ksqlServerUrl = config.getString(MigrationConfig.KSQL_SERVER_URL); + final ServerInfo serverInfo; + try { + serverInfo = getServerInfo(ksqlClient, ksqlServerUrl); + } catch (MigrationException e) { + LOGGER.error("Failed to get server info to verify version compatibility: {}", e.getMessage()); + return false; + } + + final String serverVersion = serverInfo.getServerVersion(); + try { + return isSupportedVersion(serverVersion); + } catch (MigrationException e) { + LOGGER.warn(e.getMessage() + ". Proceeding anyway."); + return true; + } + } + private static class KsqlServerVersion { private static final Pattern VERSION_PATTERN = Pattern.compile("v?([0-9]+)\\.([0-9]+)\\..*"); diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java index cf524274afb0..666835887ee6 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.is; import com.github.rvesse.airline.Cli; +import com.google.common.collect.ImmutableMap; import io.confluent.common.utils.IntegrationTest; import io.confluent.ksql.integration.IntegrationTestHarness; import io.confluent.ksql.integration.Retry; @@ -31,27 +32,37 @@ import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.entity.SourceDescription; import io.confluent.ksql.rest.entity.SourceDescriptionEntity; +import io.confluent.ksql.rest.entity.SourceInfo; import io.confluent.ksql.rest.entity.StreamedRow; +import io.confluent.ksql.rest.entity.StreamsList; +import io.confluent.ksql.rest.entity.TablesList; import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; import io.confluent.ksql.rest.server.TestKsqlRestApp; import io.confluent.ksql.tools.migrations.commands.BaseCommand; import io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import kafka.zookeeper.ZooKeeperClientException; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.test.TestUtils; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -76,6 +87,9 @@ public class MigrationsTest { private static final Cli MIGRATIONS_CLI = new Cli<>(Migrations.class); + private static final String MIGRATIONS_STREAM = "custom_migration_stream_name"; + private static final String MIGRATIONS_TABLE = "custom_migration_table_name"; + private static String configFilePath; @BeforeClass @@ -84,7 +98,11 @@ public static void setUpClass() throws Exception { createAndVerifyDirectoryStructure(testDir); configFilePath = Paths.get(testDir, MigrationsDirectoryUtil.MIGRATIONS_CONFIG_FILE).toString(); - initializeAndVerifyMetadataStreamAndTable(configFilePath); + + writeAdditionalConfigs(ImmutableMap.of( + MigrationConfig.KSQL_MIGRATIONS_STREAM_NAME, MIGRATIONS_STREAM, + MigrationConfig.KSQL_MIGRATIONS_TABLE_NAME, MIGRATIONS_TABLE + )); } @AfterClass @@ -92,6 +110,16 @@ public static void classTearDown() { REST_APP.getPersistentQueries().forEach(str -> makeKsqlRequest("TERMINATE " + str + ";")); } + @Before + public void setUp() { + initializeAndVerifyMetadataStreamAndTable(configFilePath); + } + + @After + public void tearDown() { + cleanAndVerify(configFilePath); + } + @Test public void testApply() throws IOException { // Migration file @@ -111,7 +139,7 @@ public void testApply() throws IOException { // This is needed to make sure that the table is fully done being created. // It's a similar situation to https://github.com/confluentinc/ksql/issues/6249 assertThatEventually( - () -> makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='CURRENT';").size(), + () -> makeKsqlQuery("SELECT * FROM " + MIGRATIONS_TABLE + " WHERE VERSION_KEY='CURRENT';").size(), is(1) ); final int status = MIGRATIONS_CLI.parse("--config-file", configFilePath, "apply", "-a").run(); @@ -122,7 +150,7 @@ public void testApply() throws IOException { describeSource("BAR"); // verify current - final List current = makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='CURRENT';"); + final List current = makeKsqlQuery("SELECT * FROM " + MIGRATIONS_TABLE + " WHERE VERSION_KEY='CURRENT';"); assertThatEventually(() -> current.size(), is(2)); assertThatEventually(() -> current.get(1).getRow().get().getColumns().get(1), is("2")); assertThatEventually(() -> current.get(1).getRow().get().getColumns().get(2), is("bar bar BAR")); @@ -130,7 +158,7 @@ public void testApply() throws IOException { assertThatEventually(() -> current.get(1).getRow().get().getColumns().get(7), is("1")); // verify version 1 - final List version1 = makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='1';"); + final List version1 = makeKsqlQuery("SELECT * FROM " + MIGRATIONS_TABLE + " WHERE VERSION_KEY='1';"); assertThatEventually(() -> version1.size(), is(2)); assertThatEventually(() -> version1.get(1).getRow().get().getColumns().get(1), is("1")); assertThatEventually(() -> version1.get(1).getRow().get().getColumns().get(2), is("foo FOO fO0")); @@ -138,7 +166,7 @@ public void testApply() throws IOException { assertThatEventually(() -> version1.get(1).getRow().get().getColumns().get(7), is("")); // verify version 2 - final List version2 = makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='CURRENT';"); + final List version2 = makeKsqlQuery("SELECT * FROM " + MIGRATIONS_TABLE + " WHERE VERSION_KEY='CURRENT';"); assertThatEventually(() -> version2.size(), is(2)); assertThatEventually(() -> version2.get(1).getRow().get().getColumns().get(1), is("2")); assertThatEventually(() -> version2.get(1).getRow().get().getColumns().get(2), is("bar bar BAR")); @@ -172,15 +200,24 @@ private static void createAndVerifyDirectoryStructure(final String testDir) thro assertThat(lines.get(0), is(MigrationConfig.KSQL_SERVER_URL + "=" + REST_APP.getHttpListener().toString())); } + private static void writeAdditionalConfigs(final Map additionalConfigs) throws Exception { + try (PrintWriter out = new PrintWriter(new OutputStreamWriter( + new FileOutputStream(configFilePath, true), StandardCharsets.UTF_8))) { + for (Map.Entry entry : additionalConfigs.entrySet()) { + out.println(entry.getKey() + "=" + entry.getValue()); + } + } + } + private static void initializeAndVerifyMetadataStreamAndTable(final String configFile) { // use `initialize` to create metadata stream and table final int status = MIGRATIONS_CLI.parse("--config-file", configFile, "initialize").run(); assertThat(status, is(0)); // verify metadata stream - final SourceDescription streamDesc = describeSource("migration_events"); + final SourceDescription streamDesc = describeSource(MIGRATIONS_STREAM); assertThatEventually(() -> streamDesc.getType(), is("STREAM")); - assertThatEventually(() -> streamDesc.getTopic(), is("default_ksql_migration_events")); + assertThatEventually(() -> streamDesc.getTopic(), is("default_ksql_" + MIGRATIONS_STREAM)); assertThatEventually(() -> streamDesc.getKeyFormat(), is("KAFKA")); assertThatEventually(() -> streamDesc.getValueFormat(), is("JSON")); assertThatEventually(() -> streamDesc.getPartitions(), is(1)); @@ -197,9 +234,9 @@ private static void initializeAndVerifyMetadataStreamAndTable(final String confi )); // verify metadata table - final SourceDescription tableDesc = describeSource("migration_schema_versions"); + final SourceDescription tableDesc = describeSource(MIGRATIONS_TABLE); assertThatEventually(() -> tableDesc.getType(), is("TABLE")); - assertThatEventually(() -> tableDesc.getTopic(), is("default_ksql_migration_schema_versions")); + assertThatEventually(() -> tableDesc.getTopic(), is("default_ksql_" + MIGRATIONS_TABLE)); assertThatEventually(() -> tableDesc.getKeyFormat(), is("KAFKA")); assertThatEventually(() -> tableDesc.getValueFormat(), is("JSON")); assertThatEventually(() -> tableDesc.getPartitions(), is(1)); @@ -216,6 +253,37 @@ private static void initializeAndVerifyMetadataStreamAndTable(final String confi )); } + private static void cleanAndVerify(final String configFile) { + // Given: + assertThat(sourceExists(MIGRATIONS_STREAM, false), is(true)); + assertThat(sourceExists(MIGRATIONS_TABLE, true), is(true)); + + // When: use `clean` to clean up metadata stream and table + final int status = MIGRATIONS_CLI.parse("--config-file", configFile, "clean").run(); + assertThat(status, is(0)); + + // Then: + assertThatEventually(() -> sourceExists(MIGRATIONS_STREAM, false), is(false)); + assertThat(sourceExists(MIGRATIONS_TABLE, true), is(false)); + } + + private static boolean sourceExists(final String sourceName, final boolean isTable) { + final String sourceType = isTable ? "TABLE" : "STREAM"; + final List entities = makeKsqlRequest("LIST " + sourceType + "S;"); + assertThat(entities, hasSize(1)); + + final Stream names; + if (isTable) { + assertThat(entities.get(0), instanceOf(TablesList.class)); + names = ((TablesList) entities.get(0)).getTables().stream().map(SourceInfo.Table::getName); + } else { + assertThat(entities.get(0), instanceOf(StreamsList.class)); + names = ((StreamsList) entities.get(0)).getStreams().stream().map(SourceInfo.Stream::getName); + } + + return names.anyMatch(n -> n.equalsIgnoreCase(sourceName)); + } + private static SourceDescription describeSource(final String name) { final List entities = makeKsqlRequest("DESCRIBE " + name + ";"); diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/CleanMigrationsCommandTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/CleanMigrationsCommandTest.java new file mode 100644 index 000000000000..f6dfd4357aef --- /dev/null +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/CleanMigrationsCommandTest.java @@ -0,0 +1,240 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.tools.migrations.commands; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.github.rvesse.airline.SingleCommand; +import com.google.common.collect.ImmutableList; +import io.confluent.ksql.api.client.Client; +import io.confluent.ksql.api.client.ExecuteStatementResult; +import io.confluent.ksql.api.client.QueryInfo; +import io.confluent.ksql.api.client.ServerInfo; +import io.confluent.ksql.api.client.SourceDescription; +import io.confluent.ksql.api.client.StreamInfo; +import io.confluent.ksql.api.client.TableInfo; +import io.confluent.ksql.tools.migrations.MigrationConfig; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class CleanMigrationsCommandTest { + + private static final SingleCommand PARSER = + SingleCommand.singleCommand(CleanMigrationsCommand.class); + + private static final String MIGRATIONS_STREAM = "migrations_stream"; + private static final String MIGRATIONS_TABLE = "migrations_table"; + private static final String CTAS_QUERY_ID = "ctas_migration_table_0"; + + @Mock + private MigrationConfig config; + @Mock + private Client client; + @Mock + private CompletableFuture serverInfoCf; + @Mock + private CompletableFuture executeStatementCf; + @Mock + private CompletableFuture> listStreamsCf; + @Mock + private CompletableFuture> listTablesCf; + @Mock + private CompletableFuture describeSourceCf; + @Mock + private ServerInfo serverInfo; + @Mock + private ExecuteStatementResult executeStatementResult; + @Mock + private StreamInfo streamInfo; + @Mock + private TableInfo tableInfo; + @Mock + private SourceDescription sourceDescription; + @Mock + private QueryInfo ctasQueryInfo; + @Mock + private QueryInfo otherQueryInfo; + + private CleanMigrationsCommand command; + + @Before + public void setUp() throws Exception { + when(config.getString(MigrationConfig.KSQL_MIGRATIONS_STREAM_NAME)).thenReturn(MIGRATIONS_STREAM); + when(config.getString(MigrationConfig.KSQL_MIGRATIONS_TABLE_NAME)).thenReturn(MIGRATIONS_TABLE); + when(client.serverInfo()).thenReturn(serverInfoCf); + when(client.executeStatement(anyString())).thenReturn(executeStatementCf); + when(client.listStreams()).thenReturn(listStreamsCf); + when(client.listTables()).thenReturn(listTablesCf); + when(client.describeSource(MIGRATIONS_TABLE)).thenReturn(describeSourceCf); + + when(serverInfoCf.get()).thenReturn(serverInfo); + when(serverInfo.getServerVersion()).thenReturn("v0.14.0"); + when(executeStatementCf.get()).thenReturn(executeStatementResult); + when(listStreamsCf.get()).thenReturn(ImmutableList.of(streamInfo)); + when(listTablesCf.get()).thenReturn(ImmutableList.of(tableInfo)); + when(describeSourceCf.get()).thenReturn(sourceDescription); + + when(streamInfo.getName()).thenReturn(MIGRATIONS_STREAM); + when(tableInfo.getName()).thenReturn(MIGRATIONS_TABLE); + when(sourceDescription.writeQueries()).thenReturn(ImmutableList.of(ctasQueryInfo)); + when(ctasQueryInfo.getId()).thenReturn(CTAS_QUERY_ID); + when(otherQueryInfo.getId()).thenReturn("other_query_id"); + + command = PARSER.parse(); + } + + @Test + public void shouldCleanMigrationsStreamAndTable() { + // When: + final int status = command.command(config, cfg -> client); + + // Then: + assertThat(status, is(0)); + + verify(client).executeStatement("TERMINATE " + CTAS_QUERY_ID + ";"); + verify(client).executeStatement("DROP TABLE " + MIGRATIONS_TABLE + " DELETE TOPIC;"); + verify(client).executeStatement("DROP STREAM " + MIGRATIONS_STREAM + " DELETE TOPIC;"); + } + + @Test + public void shouldCleanMigrationsStreamEvenIfTableDoesntExist() throws Exception { + // Given: + givenMigrationsTableDoesNotExist(); + + // When: + final int status = command.command(config, cfg -> client); + + // Then: + assertThat(status, is(0)); + + verify(client).executeStatement("DROP STREAM " + MIGRATIONS_STREAM + " DELETE TOPIC;"); + verify(client, never()).executeStatement("DROP TABLE " + MIGRATIONS_TABLE + " DELETE TOPIC;"); + verify(client, never()).executeStatement("TERMINATE " + CTAS_QUERY_ID + ";"); + } + + @Test + public void shouldCleanMigrationsTableEvenIfStreamDoesntExist() throws Exception { + // Given: + givenMigrationsStreamDoesNotExist(); + + // When: + final int status = command.command(config, cfg -> client); + + // Then: + assertThat(status, is(0)); + + verify(client).executeStatement("DROP TABLE " + MIGRATIONS_TABLE + " DELETE TOPIC;"); + verify(client, never()).executeStatement("DROP STREAM " + MIGRATIONS_STREAM + " DELETE TOPIC;"); + + // a single query writing to the table will still be dropped, even if the stream + // doesn't exist. we could change this in the future but it's an unlikely (and + // unexpected) edge case that doesn't seem too important. + verify(client).executeStatement("TERMINATE " + CTAS_QUERY_ID + ";"); + } + + @Test + public void shouldCleanMigrationsTableEvenIfQueryDoesntExist() { + // Given: + when(sourceDescription.writeQueries()).thenReturn(ImmutableList.of()); + + // When: + final int status = command.command(config, cfg -> client); + + // Then: + assertThat(status, is(0)); + + verify(client).executeStatement("DROP TABLE " + MIGRATIONS_TABLE + " DELETE TOPIC;"); + verify(client).executeStatement("DROP STREAM " + MIGRATIONS_STREAM + " DELETE TOPIC;"); + } + + @Test + public void shouldFailIfMultipleQueriesWritingToTable() { + // Given: + when(sourceDescription.writeQueries()).thenReturn(ImmutableList.of(ctasQueryInfo, otherQueryInfo)); + + // When: + final int status = command.command(config, cfg -> client); + + // Then: + assertThat(status, is(1)); + + verify(client, never()).executeStatement("TERMINATE " + CTAS_QUERY_ID + ";"); + verify(client, never()).executeStatement("DROP TABLE " + MIGRATIONS_TABLE + " DELETE TOPIC;"); + verify(client, never()).executeStatement("DROP STREAM " + MIGRATIONS_STREAM + " DELETE TOPIC;"); + } + + @Test + public void shouldSucceedIfNeverInitialized() throws Exception { + // Given: + givenMigrationsStreamDoesNotExist(); + givenMigrationsTableDoesNotExist(); + + // When: + final int status = command.command(config, cfg -> client); + + // Then: + assertThat(status, is(0)); + } + + @Test + public void shouldNotCleanIfServerVersionIncompatible() { + // Given: + when(serverInfo.getServerVersion()).thenReturn("v0.9.0"); + + // When: + final int status = command.command(config, cfg -> client); + + // Then: + assertThat(status, is(1)); + + verify(client, never()).executeStatement(anyString()); + } + + @Test + public void shouldCleanEvenIfCantParseServerVersion() { + // Given: + when(serverInfo.getServerVersion()).thenReturn("not_a_valid_version"); + + // When: + final int status = command.command(config, cfg -> client); + + // Then: + assertThat(status, is(0)); + + verify(client).executeStatement("TERMINATE " + CTAS_QUERY_ID + ";"); + verify(client).executeStatement("DROP TABLE " + MIGRATIONS_TABLE + " DELETE TOPIC;"); + verify(client).executeStatement("DROP STREAM " + MIGRATIONS_STREAM + " DELETE TOPIC;"); + } + + private void givenMigrationsStreamDoesNotExist() throws Exception { + when(listStreamsCf.get()).thenReturn(ImmutableList.of()); + } + + private void givenMigrationsTableDoesNotExist() throws Exception { + when(listTablesCf.get()).thenReturn(ImmutableList.of()); + } +} \ No newline at end of file diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommandTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommandTest.java index aaa9a8ddd82b..2dab257f3fcd 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommandTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommandTest.java @@ -77,7 +77,7 @@ public class InitializeMigrationCommandTest { + " latest_by_offset(started_on) AS started_on, \n" + " latest_by_offset(completed_on) AS completed_on, \n" + " latest_by_offset(previous) AS previous\n" - + " FROM migration_events \n" + + " FROM " + MIGRATIONS_STREAM + " \n" + " GROUP BY version_key;\n"; @Mock