From 94a74fa5b2e337a9a7d2381797812248a1cfb643 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Tue, 12 Apr 2022 08:40:46 -0700 Subject: [PATCH] fix: fail validation on CREATE CONNECTOR if connector already exists (#9014) --- .../ksql/services/SandboxConnectClient.java | 2 + .../services/SandboxConnectClientTest.java | 15 +++++++ .../server/execution/ConnectExecutor.java | 45 ++++++++++++++++--- .../rest/server/ConnectIntegrationTest.java | 2 +- .../server/execution/ConnectExecutorTest.java | 29 ++++++++---- 5 files changed, 79 insertions(+), 14 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java index 6cf5814073b6..7aa45c50a7be 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java @@ -16,6 +16,7 @@ package io.confluent.ksql.services; import static io.confluent.ksql.util.LimitedProxyBuilder.methodParams; +import static io.confluent.ksql.util.LimitedProxyBuilder.noParams; import io.confluent.ksql.util.LimitedProxyBuilder; import java.util.Map; @@ -33,6 +34,7 @@ private SandboxConnectClient() { public static ConnectClient createProxy(final ConnectClient delegate) { return LimitedProxyBuilder.forClass(ConnectClient.class) .forward("validate", methodParams(String.class, Map.class), delegate) + .forward("connectors", noParams(), delegate) .build(); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxConnectClientTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxConnectClientTest.java index 1d48ca85bc3a..c35afca37764 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxConnectClientTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxConnectClientTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.services.ConnectClient.ConnectResponse; +import java.util.List; import java.util.Map; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.junit.Before; @@ -36,6 +37,8 @@ public class SandboxConnectClientTest { private ConnectClient delegate; @Mock private ConnectResponse mockValidateResponse; + @Mock + private ConnectResponse> mockListResponse; private ConnectClient sandboxClient; @@ -57,4 +60,16 @@ public void shouldForwardOnValidate() { assertThat(validateResponse, is(mockValidateResponse)); } + @Test + public void shouldForwardOnList() { + // Given: + when(delegate.connectors()).thenReturn(mockListResponse); + + // When: + final ConnectResponse> listResponse = sandboxClient.connectors(); + + // Then: + assertThat(listResponse, is(mockListResponse)); + } + } \ No newline at end of file diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ConnectExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ConnectExecutor.java index e1ae8c12ef1d..571b3c41e0e2 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ConnectExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ConnectExecutor.java @@ -40,6 +40,7 @@ import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.apache.hc.core5.http.HttpStatus; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; @@ -101,7 +102,17 @@ public static StatementExecutorResponse validate( final CreateConnector createConnector = statement.getStatement(); final ConnectClient client = serviceContext.getConnectClient(); - final List errors = validate(createConnector, client); + if (checkForExistingConnector(statement, createConnector, client)) { + final String errorMsg = String.format( + "Connector %s already exists", createConnector.getName()); + throw new KsqlRestException(EndpointResponse.create() + .status(HttpStatus.SC_CONFLICT) + .entity(new KsqlErrorMessage(Errors.toErrorCode(HttpStatus.SC_CONFLICT), errorMsg)) + .build() + ); + } + + final List errors = validateConfigs(createConnector, client); if (!errors.isEmpty()) { final String errorMessage = "Validation error: " + String.join("\n", errors); throw new KsqlException(errorMessage); @@ -113,8 +124,8 @@ public static StatementExecutorResponse validate( ))); } - private static List validate(final CreateConnector createConnector, - final ConnectClient client) { + private static List validateConfigs( + final CreateConnector createConnector, final ConnectClient client) { final Map config = buildConnectorConfig(createConnector); final String connectorType = config.get("connector.class"); @@ -156,6 +167,30 @@ private static Map buildConnectorConfig(final CreateConnector cr return config; } + /** + * @return true if there already exists a connector with this name when none is expected. + * This scenario is checked for as part of validation in order to fail fast, since + * otherwise execution would fail with this same error. + */ + private static boolean checkForExistingConnector( + final ConfiguredStatement statement, + final CreateConnector createConnector, + final ConnectClient client + ) { + if (createConnector.ifNotExists()) { + // nothing to check since the statement is not meant to fail even if the + // connector already exists + return false; + } + + final ConnectResponse> connectorsResponse = client.connectors(); + if (connectorsResponse.error().isPresent()) { + throw new KsqlServerException("Failed to check for existing connector: " + + connectorsResponse.error().get()); + } + return connectorExists(createConnector, connectorsResponse); + } + private static Optional handleIfNotExists( final ConfiguredStatement statement, final CreateConnector createConnector, @@ -167,7 +202,7 @@ private static Optional handleIfNotExists( + connectorsResponse.error().get()); } - if (checkIfConnectorExists(createConnector, connectorsResponse)) { + if (connectorExists(createConnector, connectorsResponse)) { return Optional.of(new WarningEntity(statement.getStatementText(), String.format("Connector %s already exists", createConnector.getName()))); } @@ -175,7 +210,7 @@ private static Optional handleIfNotExists( return Optional.empty(); } - private static boolean checkIfConnectorExists( + private static boolean connectorExists( final CreateConnector createConnector, final ConnectResponse> connectorsResponse ) { diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/ConnectIntegrationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/ConnectIntegrationTest.java index 1befb6814582..1669e54e7009 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/ConnectIntegrationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/ConnectIntegrationTest.java @@ -258,7 +258,7 @@ public void shouldReturnError() { assertThat("expected error response", response.isErroneous()); final KsqlErrorMessage err = response.getErrorMessage(); assertThat(err.getErrorCode(), is(Errors.toErrorCode(HttpStatus.SC_CONFLICT))); - assertThat(err.getMessage(), containsString("Failed to create connector: {\"error_code\":409,\"message\":\"Connector mock-connector already exists\"}")); + assertThat(err.getMessage(), containsString("Connector mock-connector already exists")); } @Test diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java index 53e3fee82484..ad03e8f56044 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java @@ -101,6 +101,9 @@ public class ConnectExecutorTest { @Before public void setUp() { when(serviceContext.getConnectClient()).thenReturn(connectClient); + + when(connectClient.connectors()).thenReturn( + ConnectResponse.success(ImmutableList.of(), HttpStatus.SC_OK)); } @SuppressWarnings("unchecked") @@ -176,7 +179,7 @@ public void shouldThrowOnValidationError() { } @Test - public void shouldReturnWarningWhenIfNotExistsSetConnectorExists() { + public void shouldReturnWarningOnExecuteWhenIfNotExistsSetConnectorExists() { //Given: givenConnectorExists(); @@ -190,22 +193,32 @@ public void shouldReturnWarningWhenIfNotExistsSetConnectorExists() { } @Test - public void shouldThrowIfConnectorExists() { - //Given: - when(connectClient.create(anyString(), anyMap())) - .thenReturn( - ConnectResponse.failure("Connector foo already exists", HttpStatus.SC_CONFLICT)); + public void shouldThrowOnValidateIfConnectorExists() { + // Given: + givenConnectorExists(); // When: final KsqlRestException e = assertThrows( KsqlRestException.class, - () -> ConnectExecutor.execute(CREATE_CONNECTOR_CONFIGURED, mock(SessionProperties.class), null, serviceContext)); + () -> ConnectExecutor.validate(CREATE_CONNECTOR_CONFIGURED, mock(SessionProperties.class), null, serviceContext)); // Then: assertThat(e.getResponse().getStatus(), is(HttpStatus.SC_CONFLICT)); final KsqlErrorMessage err = (KsqlErrorMessage) e.getResponse().getEntity(); assertThat(err.getErrorCode(), is(Errors.toErrorCode(HttpStatus.SC_CONFLICT))); - assertThat(err.getMessage(), containsString("Failed to create connector: Connector foo already exists")); + assertThat(err.getMessage(), containsString("Connector foo already exists")); + } + + @Test + public void shouldNotThrowOnValidateWhenIfNotExistsSetConnectorExists() { + // Given: + givenConnectorExists(); + givenValidationSuccess(); + + // When: + ConnectExecutor.validate(CREATE_DUPLICATE_CONNECTOR_CONFIGURED, mock(SessionProperties.class), null, serviceContext); + + // Then: did not throw } @Test