Skip to content

Commit

Permalink
fix: fail validation on CREATE CONNECTOR if connector already exists (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Apr 12, 2022
1 parent 979d4a5 commit 94a74fa
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.services;

import static io.confluent.ksql.util.LimitedProxyBuilder.methodParams;
import static io.confluent.ksql.util.LimitedProxyBuilder.noParams;

import io.confluent.ksql.util.LimitedProxyBuilder;
import java.util.Map;
Expand All @@ -33,6 +34,7 @@ private SandboxConnectClient() {
public static ConnectClient createProxy(final ConnectClient delegate) {
return LimitedProxyBuilder.forClass(ConnectClient.class)
.forward("validate", methodParams(String.class, Map.class), delegate)
.forward("connectors", noParams(), delegate)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.services.ConnectClient.ConnectResponse;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.junit.Before;
Expand All @@ -36,6 +37,8 @@ public class SandboxConnectClientTest {
private ConnectClient delegate;
@Mock
private ConnectResponse<ConfigInfos> mockValidateResponse;
@Mock
private ConnectResponse<List<String>> mockListResponse;

private ConnectClient sandboxClient;

Expand All @@ -57,4 +60,16 @@ public void shouldForwardOnValidate() {
assertThat(validateResponse, is(mockValidateResponse));
}

@Test
public void shouldForwardOnList() {
// Given:
when(delegate.connectors()).thenReturn(mockListResponse);

// When:
final ConnectResponse<List<String>> listResponse = sandboxClient.connectors();

// Then:
assertThat(listResponse, is(mockListResponse));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
Expand Down Expand Up @@ -101,7 +102,17 @@ public static StatementExecutorResponse validate(
final CreateConnector createConnector = statement.getStatement();
final ConnectClient client = serviceContext.getConnectClient();

final List<String> errors = validate(createConnector, client);
if (checkForExistingConnector(statement, createConnector, client)) {
final String errorMsg = String.format(
"Connector %s already exists", createConnector.getName());
throw new KsqlRestException(EndpointResponse.create()
.status(HttpStatus.SC_CONFLICT)
.entity(new KsqlErrorMessage(Errors.toErrorCode(HttpStatus.SC_CONFLICT), errorMsg))
.build()
);
}

final List<String> errors = validateConfigs(createConnector, client);
if (!errors.isEmpty()) {
final String errorMessage = "Validation error: " + String.join("\n", errors);
throw new KsqlException(errorMessage);
Expand All @@ -113,8 +124,8 @@ public static StatementExecutorResponse validate(
)));
}

private static List<String> validate(final CreateConnector createConnector,
final ConnectClient client) {
private static List<String> validateConfigs(
final CreateConnector createConnector, final ConnectClient client) {
final Map<String, String> config = buildConnectorConfig(createConnector);

final String connectorType = config.get("connector.class");
Expand Down Expand Up @@ -156,6 +167,30 @@ private static Map<String, String> buildConnectorConfig(final CreateConnector cr
return config;
}

/**
* @return true if there already exists a connector with this name when none is expected.
* This scenario is checked for as part of validation in order to fail fast, since
* otherwise execution would fail with this same error.
*/
private static boolean checkForExistingConnector(
final ConfiguredStatement<CreateConnector> statement,
final CreateConnector createConnector,
final ConnectClient client
) {
if (createConnector.ifNotExists()) {
// nothing to check since the statement is not meant to fail even if the
// connector already exists
return false;
}

final ConnectResponse<List<String>> connectorsResponse = client.connectors();
if (connectorsResponse.error().isPresent()) {
throw new KsqlServerException("Failed to check for existing connector: "
+ connectorsResponse.error().get());
}
return connectorExists(createConnector, connectorsResponse);
}

private static Optional<KsqlEntity> handleIfNotExists(
final ConfiguredStatement<CreateConnector> statement,
final CreateConnector createConnector,
Expand All @@ -167,15 +202,15 @@ private static Optional<KsqlEntity> handleIfNotExists(
+ connectorsResponse.error().get());
}

if (checkIfConnectorExists(createConnector, connectorsResponse)) {
if (connectorExists(createConnector, connectorsResponse)) {
return Optional.of(new WarningEntity(statement.getStatementText(),
String.format("Connector %s already exists", createConnector.getName())));
}
}
return Optional.empty();
}

private static boolean checkIfConnectorExists(
private static boolean connectorExists(
final CreateConnector createConnector,
final ConnectResponse<List<String>> connectorsResponse
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public void shouldReturnError() {
assertThat("expected error response", response.isErroneous());
final KsqlErrorMessage err = response.getErrorMessage();
assertThat(err.getErrorCode(), is(Errors.toErrorCode(HttpStatus.SC_CONFLICT)));
assertThat(err.getMessage(), containsString("Failed to create connector: {\"error_code\":409,\"message\":\"Connector mock-connector already exists\"}"));
assertThat(err.getMessage(), containsString("Connector mock-connector already exists"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public class ConnectExecutorTest {
@Before
public void setUp() {
when(serviceContext.getConnectClient()).thenReturn(connectClient);

when(connectClient.connectors()).thenReturn(
ConnectResponse.success(ImmutableList.of(), HttpStatus.SC_OK));
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -176,7 +179,7 @@ public void shouldThrowOnValidationError() {
}

@Test
public void shouldReturnWarningWhenIfNotExistsSetConnectorExists() {
public void shouldReturnWarningOnExecuteWhenIfNotExistsSetConnectorExists() {
//Given:
givenConnectorExists();

Expand All @@ -190,22 +193,32 @@ public void shouldReturnWarningWhenIfNotExistsSetConnectorExists() {
}

@Test
public void shouldThrowIfConnectorExists() {
//Given:
when(connectClient.create(anyString(), anyMap()))
.thenReturn(
ConnectResponse.failure("Connector foo already exists", HttpStatus.SC_CONFLICT));
public void shouldThrowOnValidateIfConnectorExists() {
// Given:
givenConnectorExists();

// When:
final KsqlRestException e = assertThrows(
KsqlRestException.class,
() -> ConnectExecutor.execute(CREATE_CONNECTOR_CONFIGURED, mock(SessionProperties.class), null, serviceContext));
() -> ConnectExecutor.validate(CREATE_CONNECTOR_CONFIGURED, mock(SessionProperties.class), null, serviceContext));

// Then:
assertThat(e.getResponse().getStatus(), is(HttpStatus.SC_CONFLICT));
final KsqlErrorMessage err = (KsqlErrorMessage) e.getResponse().getEntity();
assertThat(err.getErrorCode(), is(Errors.toErrorCode(HttpStatus.SC_CONFLICT)));
assertThat(err.getMessage(), containsString("Failed to create connector: Connector foo already exists"));
assertThat(err.getMessage(), containsString("Connector foo already exists"));
}

@Test
public void shouldNotThrowOnValidateWhenIfNotExistsSetConnectorExists() {
// Given:
givenConnectorExists();
givenValidationSuccess();

// When:
ConnectExecutor.validate(CREATE_DUPLICATE_CONNECTOR_CONFIGURED, mock(SessionProperties.class), null, serviceContext);

// Then: did not throw
}

@Test
Expand Down

0 comments on commit 94a74fa

Please sign in to comment.