Skip to content

Commit

Permalink
fix(ksql): allow migrations tool to run connector commands with IF [N…
Browse files Browse the repository at this point in the history
…OT] EXISTS clauses (#8855)

* fix(ksql): allow migrations tool to run connector commands with IF [NOT] EXISTS clauses

* fix unit tests
  • Loading branch information
Zara Lim authored Mar 8, 2022
1 parent eaf2b1f commit a7c8689
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,14 @@ private void executeNonVariableCommands(
ksqlClient.createConnector(
((SqlCreateConnectorStatement) command).getName(),
((SqlCreateConnectorStatement) command).isSource(),
((SqlCreateConnectorStatement) command).getProperties()
((SqlCreateConnectorStatement) command).getProperties(),
((SqlCreateConnectorStatement) command).getIfNotExists()
).get();
} else if (command instanceof SqlDropConnectorStatement) {
ksqlClient.dropConnector(((SqlDropConnectorStatement) command).getName()).get();
ksqlClient.dropConnector(
((SqlDropConnectorStatement) command).getName(),
((SqlDropConnectorStatement) command).getIfExists()
).get();
} else if (command instanceof SqlPropertyCommand) {
if (((SqlPropertyCommand) command).isSetCommand()
&& ((SqlPropertyCommand) command).getValue().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public final class CommandParser {
Pattern.CASE_INSENSITIVE);
private static final Pattern UNSET_PROPERTY = Pattern.compile(
"\\s*UNSET\\s+'((?:[^']*|(?:''))*)'\\s*;\\s*", Pattern.CASE_INSENSITIVE);
private static final Pattern DROP_CONNECTOR =
Pattern.compile("\\s*DROP\\s+CONNECTOR\\s+(.*?)\\s*;\\s*", Pattern.CASE_INSENSITIVE);
private static final Pattern DROP_CONNECTOR = Pattern.compile(
"\\s*DROP\\s+CONNECTOR\\s+(IF\\s+EXISTS\\s+)?(.*?)\\s*;\\s*", Pattern.CASE_INSENSITIVE);
private static final Pattern DEFINE_VARIABLE = Pattern.compile(
"\\s*DEFINE\\s+([a-zA-Z_][a-zA-Z0-9_@]*)\\s*=\\s*'((?:[^']*|(?:''))*)'\\s*;\\s*",
Pattern.CASE_INSENSITIVE);
Expand Down Expand Up @@ -284,7 +284,8 @@ private static SqlCreateConnectorStatement getCreateConnectorStatement(
preserveCase(createConnector.getName()),
createConnector.getType() == Type.SOURCE,
createConnector.getConfig().entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, e -> toFieldType(e.getValue())))
.collect(Collectors.toMap(Entry::getKey, e -> toFieldType(e.getValue()))),
createConnector.ifNotExists()
);
}

Expand All @@ -293,7 +294,11 @@ private static SqlDropConnectorStatement getDropConnectorStatement(final String
if (!dropConnectorMatcher.matches()) {
throw new MigrationException("Invalid DROP CONNECTOR command: " + sql);
}
return new SqlDropConnectorStatement(sql, dropConnectorMatcher.group(1));
return new SqlDropConnectorStatement(
sql,
dropConnectorMatcher.group(2),
dropConnectorMatcher.group(1) != null
);
}

private static SqlPropertyCommand getSetPropertyCommand(
Expand Down Expand Up @@ -494,17 +499,20 @@ public static class SqlCreateConnectorStatement extends SqlCommand {
final String name;
final boolean isSource;
final ImmutableMap<String, Object> properties;
final boolean ifNotExist;

SqlCreateConnectorStatement(
final String command,
final String name,
final boolean isSource,
final Map<String, Object> properties
final Map<String, Object> properties,
final boolean ifNotExist
) {
super(command);
this.name = Objects.requireNonNull(name);
this.isSource = isSource;
this.properties = ImmutableMap.copyOf(Objects.requireNonNull(properties));
this.ifNotExist = ifNotExist;
}

public String getName() {
Expand All @@ -515,6 +523,10 @@ public boolean isSource() {
return isSource;
}

public boolean getIfNotExists() {
return ifNotExist;
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "properties is ImmutableMap")
public Map<String, Object> getProperties() {
return properties;
Expand All @@ -526,18 +538,25 @@ public Map<String, Object> getProperties() {
*/
public static class SqlDropConnectorStatement extends SqlCommand {
final String name;
final boolean ifExists;

SqlDropConnectorStatement(
final String command,
final String name
final String name,
final boolean ifExists
) {
super(command);
this.name = Objects.requireNonNull(name);
this.ifExists = ifExists;
}

public String getName() {
return name;
}

public boolean getIfExists() {
return ifExists;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ private void shouldApplyMigrations() throws Exception {
configFilePath,
migrationsDir,
"CREATE STREAM ${streamName} (A STRING) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');\n" +
"DROP CONNECTOR IF EXISTS nonExistant;\n" +
"-- let's create some connectors!!!\n" +
"CREATE SOURCE CONNECTOR C WITH ('connector.class'='org.apache.kafka.connect.tools.MockSourceConnector');\n" +
"DEFINE connectorName = 'D';" +
Expand Down Expand Up @@ -302,6 +303,7 @@ private void shouldApplyMigrations() throws Exception {
"CREATE STREAM ${variable} (ADDR ADDRESS) WITH (KAFKA_TOPIC='${variable}', PARTITIONS=1, VALUE_FORMAT='JSON');" +
"UNDEFINE variable;" +
"INSERT INTO HOMES VALUES (STRUCT(number := 123, street := 'sesame st', city := '${variable}'));" +
"CREATE SOURCE CONNECTOR IF NOT EXISTS C WITH ('connector.class'='org.apache.kafka.connect.tools.MockSourceConnector');\n" +
"DROP TYPE ADDRESS;"
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
Expand Down Expand Up @@ -83,7 +84,9 @@ public class ApplyMigrationCommandTest {
+ "insert into foo ( a ) values ( 'efgh' );"
+ "INSERT INTO `FOO` ( `A` ) values ( 'ijkl' );";
private static final String CREATE_CONNECTOR = "CREATE SINK CONNECTOR woof WITH ('meow'='woof');";
private static final String CREATE_CONNECTOR_IF_NOT_EXISTS = "CREATE SINK CONNECTOR IF NOT EXISTS woof WITH ('meow'='woof');";
private static final String DROP_CONNECTOR = "DROP CONNECTOR WOOF;";
private static final String DROP_CONNECTOR_IF_EXISTS = "DROP CONNECTOR IF EXISTS WOOF;";
private static final Map<String, Object> CONNECTOR_PROPERTIES = ImmutableMap.of("meow", "woof");
private static final String SET_COMMANDS = COMMAND
+ "SET 'auto.offset.reset' = 'earliest';"
Expand Down Expand Up @@ -150,8 +153,10 @@ public void setUp() throws ExecutionException, InterruptedException {
when(ksqlClient.describeSource(MIGRATIONS_STREAM)).thenReturn(sourceDescriptionCf);
when(ksqlClient.describeSource(MIGRATIONS_TABLE)).thenReturn(sourceDescriptionCf);
when(ksqlClient.describeSource("`FOO`")).thenReturn(fooDescriptionCf);
when(ksqlClient.createConnector("`WOOF`", false, CONNECTOR_PROPERTIES)).thenReturn(voidCf);
when(ksqlClient.dropConnector("WOOF")).thenReturn(voidCf);
when(ksqlClient.createConnector("`WOOF`", false, CONNECTOR_PROPERTIES, false)).thenReturn(voidCf);
when(ksqlClient.createConnector("`WOOF`", false, CONNECTOR_PROPERTIES, true)).thenReturn(voidCf);
when(ksqlClient.dropConnector("WOOF", false)).thenReturn(voidCf);
when(ksqlClient.dropConnector("WOOF", true)).thenReturn(voidCf);
when(sourceDescriptionCf.get()).thenReturn(sourceDescription);
when(statementResultCf.get()).thenReturn(statementResult);
when(fooDescriptionCf.get()).thenReturn(fooDescription);
Expand Down Expand Up @@ -615,7 +620,29 @@ public void shouldApplyCreateConnectorStatement() throws Exception {
assertThat(result, is(0));
final InOrder inOrder = inOrder(ksqlClient);
verifyMigratedVersion(inOrder, 3, "1", MigrationState.MIGRATED,
() -> inOrder.verify(ksqlClient).createConnector("`WOOF`", false, CONNECTOR_PROPERTIES));
() -> inOrder.verify(ksqlClient).createConnector("`WOOF`", false, CONNECTOR_PROPERTIES, false));
inOrder.verify(ksqlClient).close();
inOrder.verifyNoMoreInteractions();
}

@Test
public void shouldApplyCreateConnectorIfNotExistsStatement() throws Exception {
// Given:
command = PARSER.parse("-v", "3");
createMigrationFile(1, NAME, migrationsDir, COMMAND);
createMigrationFile(3, NAME, migrationsDir,CREATE_CONNECTOR_IF_NOT_EXISTS );
givenCurrentMigrationVersion("1");
givenAppliedMigration(1, NAME, MigrationState.MIGRATED);

// When:
final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed(
Instant.ofEpochMilli(1000), ZoneId.systemDefault()));

// Then:
assertThat(result, is(0));
final InOrder inOrder = inOrder(ksqlClient);
verifyMigratedVersion(inOrder, 3, "1", MigrationState.MIGRATED,
() -> inOrder.verify(ksqlClient).createConnector("`WOOF`", false, CONNECTOR_PROPERTIES, true));
inOrder.verify(ksqlClient).close();
inOrder.verifyNoMoreInteractions();
}
Expand All @@ -637,7 +664,29 @@ public void shouldApplyDropConnectorStatement() throws Exception {
assertThat(result, is(0));
final InOrder inOrder = inOrder(ksqlClient);
verifyMigratedVersion(inOrder, 3, "1", MigrationState.MIGRATED,
() -> inOrder.verify(ksqlClient).dropConnector("WOOF"));
() -> inOrder.verify(ksqlClient).dropConnector("WOOF", false));
inOrder.verify(ksqlClient).close();
inOrder.verifyNoMoreInteractions();
}

@Test
public void shouldApplyDropConnectorIfExistsStatement() throws Exception {
// Given:
command = PARSER.parse("-v", "3");
createMigrationFile(1, NAME, migrationsDir, COMMAND);
createMigrationFile(3, NAME, migrationsDir, DROP_CONNECTOR_IF_EXISTS);
givenCurrentMigrationVersion("1");
givenAppliedMigration(1, NAME, MigrationState.MIGRATED);

// When:
final int result = command.command(config, (cfg, headers) -> ksqlClient, migrationsDir, Clock.fixed(
Instant.ofEpochMilli(1000), ZoneId.systemDefault()));

// Then:
assertThat(result, is(0));
final InOrder inOrder = inOrder(ksqlClient);
verifyMigratedVersion(inOrder, 3, "1", MigrationState.MIGRATED,
() -> inOrder.verify(ksqlClient).dropConnector("WOOF", true));
inOrder.verify(ksqlClient).close();
inOrder.verifyNoMoreInteractions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,37 @@ public void shouldParseCreateConnectorStatementWithVariables() {
assertThat(((SqlCreateConnectorStatement) commands.get(0)).getProperties().get("topic.prefix"), is("jdbc-"));
assertThat(((SqlCreateConnectorStatement) commands.get(0)).getProperties().get("table.whitelist"), is("users"));
assertThat(((SqlCreateConnectorStatement) commands.get(0)).getProperties().get("key"), is("username"));
assertThat(((SqlCreateConnectorStatement) commands.get(0)).getIfNotExists(), is(false));
}

@Test
public void shouldParseCreateConnectorIfNotExistsStatement() {
// Given:
final String createConnector = "CREATE SOURCE CONNECTOR IF NOT EXISTS ${name} WITH(\n"
+ " \"connector.class\"='io.confluent.connect.jdbc.JdbcSourceConnector',\n"
+ " \"connection.url\"=${url},\n"
+ " \"mode\"='bulk',\n"
+ " \"topic.prefix\"='jdbc-',\n"
+ " \"table.whitelist\"='users',\n"
+ " \"key\"='username');";

// When:
List<SqlCommand> commands = parse(createConnector, ImmutableMap.of("url", "'jdbc:postgresql://localhost:5432/my.db'", "name", "`jdbc_connector`"));

// Then:
assertThat(commands.size(), is(1));
assertThat(commands.get(0), instanceOf(SqlCreateConnectorStatement.class));
assertThat(commands.get(0).getCommand(), is(createConnector));
assertThat(((SqlCreateConnectorStatement) commands.get(0)).getName(), is("`jdbc_connector`"));
assertThat(((SqlCreateConnectorStatement) commands.get(0)).isSource(), is(true));
assertThat(((SqlCreateConnectorStatement) commands.get(0)).getProperties().size(), is(6));
assertThat(((SqlCreateConnectorStatement) commands.get(0)).getProperties().get("connector.class"), is("io.confluent.connect.jdbc.JdbcSourceConnector"));
assertThat(((SqlCreateConnectorStatement) commands.get(0)).getProperties().get("connection.url"), is("jdbc:postgresql://localhost:5432/my.db"));
assertThat(((SqlCreateConnectorStatement) commands.get(0)).getProperties().get("mode"), is("bulk"));
assertThat(((SqlCreateConnectorStatement) commands.get(0)).getProperties().get("topic.prefix"), is("jdbc-"));
assertThat(((SqlCreateConnectorStatement) commands.get(0)).getProperties().get("table.whitelist"), is("users"));
assertThat(((SqlCreateConnectorStatement) commands.get(0)).getProperties().get("key"), is("username"));
assertThat(((SqlCreateConnectorStatement) commands.get(0)).getIfNotExists(), is(true));
}

@Test
Expand All @@ -401,8 +432,24 @@ public void shouldParseDropConnectorStatement() {
assertThat(commands.size(), is(1));
assertThat(commands.get(0).getCommand(), is(dropConnector));
assertThat(commands.get(0), instanceOf(SqlDropConnectorStatement.class));
assertThat(((SqlDropConnectorStatement) commands.get(0)).getName(), is("`jdbc-connector`"));
assertThat(((SqlDropConnectorStatement) commands.get(0)).getIfExists(), is(false));
}

@Test
public void shouldParseDropConnectorIfExistsStatement() {
// Given:
final String dropConnector = "DRoP CONNEcTOR IF EXISTS `jdbc-connector` ;";

// When:
List<SqlCommand> commands = parse(dropConnector);

// Then:
assertThat(commands.size(), is(1));
assertThat(commands.get(0).getCommand(), is(dropConnector));
assertThat(commands.get(0), instanceOf(SqlDropConnectorStatement.class));
assertThat(((SqlDropConnectorStatement) commands.get(0)).getName(), is("`jdbc-connector`"));
assertThat(((SqlDropConnectorStatement) commands.get(0)).getIfExists(), is(true));
}

@Test
Expand Down

0 comments on commit a7c8689

Please sign in to comment.