Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ksql): add ifExists/ifNotExist parameters to java client connector functions #8851

Merged
merged 3 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/developer-guide/ksqldb-clients/java-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -731,12 +731,12 @@ Map<String, String> connectorProperties = ImmutableMap.of(
"table.whitelist", "users",
"key", "username"
);
client.createConnector("jdbc-connector", true, connectorProperties).get();
client.createConnector("jdbc-connector", true, connectorProperties, false).get();
```

Drop a connector:
```java
client.dropConnector("jdbc-connector").get();
client.dropConnector("jdbc-connector", true).get();
```

List connectors:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,22 @@ public interface Client extends Closeable {
CompletableFuture<Void> createConnector(
String connectorName, boolean isSource, Map<String, Object> properties);

/**
* Creates a connector.
*
* <p>If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed.
*
* @param connectorName name of the connector
* @param isSource true if the connector is a source connector, false if it is a sink connector
* @param properties connector properties
* @param ifNotExists is ifNotExists is set to true, then the command won't fail if a connector
* with the same name already exists
* @return result of connector creation
*/
CompletableFuture<Void> createConnector(
String connectorName, boolean isSource, Map<String, Object> properties, boolean ifNotExists);

/**
* Drops a connector.
*
Expand All @@ -261,6 +277,19 @@ CompletableFuture<Void> createConnector(
*/
CompletableFuture<Void> dropConnector(String connectorName);

/**
* Drops a connector.
*
* <p>If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed.
*
* @param connectorName name of the connector to drop
* @param ifExists ifExists is set to true, then the statement won't fail if the connector
* does not exist
* @return a future that completes once the server response is received
*/
CompletableFuture<Void> dropConnector(String connectorName, boolean ifExists);

/**
* Returns a list of connectors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,15 @@ static boolean isDescribeConnectorResponse(final JsonObject ksqlEntity) {
}

static boolean isCreateConnectorResponse(final JsonObject ksqlEntity) {
return ksqlEntity.getJsonObject("info") != null;
return ksqlEntity.getJsonObject("info") != null
|| (ksqlEntity.getString("message") != null
&& ksqlEntity.getString("message").contains("already exists"));
}

static boolean isDropConnectorResponse(final JsonObject ksqlEntity) {
return ksqlEntity.getString("connectorName") != null;
return ksqlEntity.getString("connectorName") != null
|| (ksqlEntity.getString("message") != null
&& ksqlEntity.getString("message").contains("not exist"));
}

static boolean isConnectErrorResponse(final JsonObject ksqlEntity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,36 @@ public CompletableFuture<Void> createConnector(
return cf;
}

@Override
public CompletableFuture<Void> createConnector(
final String name,
final boolean isSource,
final Map<String, Object> properties,
final boolean ifNotExists
) {
final CompletableFuture<Void> cf = new CompletableFuture<>();
final String connectorConfigs = properties.entrySet()
.stream()
.map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue()))
.collect(Collectors.joining(","));
final String type = isSource ? "SOURCE" : "SINK";
final String ifNotExistsClause = ifNotExists ? "IF NOT EXISTS" : "";

makePostRequest(
KSQL_ENDPOINT,
new JsonObject()
.put("ksql",
String.format("CREATE %s CONNECTOR %s %s WITH (%s);",
type, ifNotExistsClause, name, connectorConfigs))
.put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
response, cf, ConnectorCommandResponseHandler::handleCreateConnectorResponse)
);

return cf;
}

@Override
public CompletableFuture<Void> dropConnector(final String name) {
final CompletableFuture<Void> cf = new CompletableFuture<>();
Expand All @@ -392,6 +422,24 @@ public CompletableFuture<Void> dropConnector(final String name) {
return cf;
}

@Override
public CompletableFuture<Void> dropConnector(final String name, final boolean ifExists) {
final CompletableFuture<Void> cf = new CompletableFuture<>();
final String ifExistsClause = ifExists ? "if exists " : "";

makePostRequest(
KSQL_ENDPOINT,
new JsonObject()
.put("ksql", "drop connector " + ifExistsClause + name + ";")
.put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
response, cf, ConnectorCommandResponseHandler::handleDropConnectorResponse)
);

return cf;
}

@Override
public CompletableFuture<List<ConnectorInfo>> listConnectors() {
final CompletableFuture<List<ConnectorInfo>> cf = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,20 @@ public void shouldCreateConnector() throws Exception {
assertThat(testEndpoints.getLastSql(), is("CREATE SOURCE CONNECTOR name WITH ();"));
}

@Test
public void shouldCreateConnectorIfNotExist() throws Exception {
// Given
final CreateConnectorEntity entity = new CreateConnectorEntity("create connector;",
new ConnectorInfo("name", Collections.emptyMap(), Collections.emptyList(), SOURCE_TYPE));
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));

// When:
javaClient.createConnector("name", true, Collections.emptyMap(), true).get();

// Then:
assertThat(testEndpoints.getLastSql(), is("CREATE SOURCE CONNECTOR IF NOT EXISTS name WITH ();"));
}

@Test
public void shouldDropConnector() throws Exception {
// Given
Expand All @@ -1585,6 +1599,19 @@ public void shouldDropConnector() throws Exception {
assertThat(testEndpoints.getLastSql(), is("drop connector name;"));
}

@Test
public void shouldDropConnectorIfExists() throws Exception {
// Given
final DropConnectorEntity entity = new DropConnectorEntity("drop connector;", "name");
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));

// When:
javaClient.dropConnector("name", true).get();

// Then:
assertThat(testEndpoints.getLastSql(), is("drop connector if exists name;"));
}

@Test
public void shouldStoreVariables() {
// When:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,13 @@ public void shouldDropConnector() throws Exception {
}, is(0));
}

@Test
public void shouldNotFailToDropNonExistantConnector() throws Exception {
// When/Then:
client.dropConnector("nonExistentConnector", true).get();
}


@Test
public void shouldCreateConnector() throws Exception {
// When:
Expand All @@ -1091,6 +1098,15 @@ public void shouldCreateConnector() throws Exception {
);
}

@Test
public void shouldNotFailToCreateConnectorThatExists() throws Exception {
// Given:
givenConnectorExists();

// When/Then:
client.createConnector(TEST_CONNECTOR, true, ImmutableMap.of("connector.class", MOCK_SOURCE_CLASS), true).get();
}

@Test
public void shouldCreateConnectorWithVariables() throws Exception {
// When:
Expand Down