Skip to content

Commit

Permalink
fix: 404 for /topics connect endpoint logs a warn instead of showing …
Browse files Browse the repository at this point in the history
…up in CLI
  • Loading branch information
Gerrrr committed Dec 9, 2021
1 parent 061fc22 commit d6b91ae
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DescribeConnectorExecutor {

private static final Logger LOG = LoggerFactory.getLogger(DescribeConnectorExecutor.class);

@VisibleForTesting
static final String TOPICS_KEY = "topics";

Expand Down Expand Up @@ -99,7 +104,15 @@ public StatementExecutorResponse execute(
final ConnectResponse<Map<String, Map<String, List<String>>>> topicsResponse = serviceContext
.getConnectClient()
.topics(connectorName);
if (topicsResponse.error().isPresent()) {
// topics endpoint is relatively new (KAFKA-9422), so 404 here is expected behavior for older
// Connect versions. Rather than showing a scary warning to the user, we just log it to the
// server logs.
if (topicsResponse.error().isPresent()
&& topicsResponse.httpCode() == HttpStatus.SC_NOT_FOUND) {
topics = ImmutableList.of();
warnings = ImmutableList.of();
LOG.warn("Could not list related topics due to error: " + topicsResponse.error().get());
} else if (topicsResponse.error().isPresent()) {
topics = ImmutableList.of();
warnings = ImmutableList.of(
new KsqlWarning("Could not list related topics due to error: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,31 @@ public void shouldErrorIfConnectClientFailsDescribe() {
assertThat(((ErrorEntity) entity.get()).getErrorMessage(), is("error"));
}

@Test
public void shouldNotWarnClientOnMissingTopicsEndpoint() {
// Given:
when(connectClient.topics(any())).thenReturn(ConnectResponse.failure("not found",
HttpStatus.SC_NOT_FOUND));

// When:
final Optional<KsqlEntity> entity = executor
.execute(describeStatement, mock(SessionProperties.class), engine, serviceContext)
.getEntity();

// Then:
verify(engine).getMetaStore();
verify(metaStore).getAllDataSources();
verify(connectClient).status("connector");
verify(connectClient).describe("connector");
verify(connectClient).topics("connector");
assertThat("Expected a response", entity.isPresent());
assertThat(entity.get(), instanceOf(ConnectorDescription.class));

final ConnectorDescription description = (ConnectorDescription) entity.get();
assertThat(description.getTopics(), empty());
assertThat(description.getWarnings(), empty());
}

@Test
public void shouldWorkIfUnknownConnector() {
// Given:
Expand Down

0 comments on commit d6b91ae

Please sign in to comment.