Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(java client): Add connector functions to java client #7222

Merged
merged 8 commits into from
Mar 19, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.rest.entity.ConnectorList;
import io.confluent.ksql.rest.entity.CreateConnectorEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil;
import io.confluent.ksql.rest.server.ConnectExecutable;
Expand Down Expand Up @@ -114,7 +115,6 @@
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.After;
import org.junit.AfterClass;
Expand Down Expand Up @@ -272,8 +272,7 @@ private static void writeConnectConfigs(final String path, final Map<String, Str

@AfterClass
public static void classTearDown() {
((ConnectorList) makeKsqlRequest("SHOW CONNECTORS;").get(0)).getConnectors()
.forEach(c -> makeKsqlRequest("DROP CONNECTOR " + c.getName() + ";"));
cleanupConnectors();
CONNECT.shutdown();
REST_APP.getPersistentQueries().forEach(str -> makeKsqlRequest("TERMINATE " + str + ";"));
}
Expand Down Expand Up @@ -1073,8 +1072,13 @@ public void shouldDropConnector() throws Exception {
client.dropConnector(TEST_CONNECTOR).get();

// Then:
final List<ConnectorInfo> connectors = client.listConnectors().get();
assertThat(connectors.size(), is(0));
assertThatEventually(() -> {
try {
return client.listConnectors().get().size();
} catch (InterruptedException | ExecutionException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this exception expected (under normal cases)? What would cause this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not. It's just that since this is a part of a lambda, we need to do a try-catch here in order for the code to compile.

return null;
}
}, is(0));
}

@Test
Expand Down Expand Up @@ -1123,11 +1127,29 @@ private void verifyNumQueries(final int numQueries) {
}

private void givenConnectorExists() {
// Make sure we are starting from a clean slate before creating a new connector.
cleanupConnectors();

makeKsqlRequest("CREATE SOURCE CONNECTOR " + TEST_CONNECTOR + " WITH ('connector.class'='" + MOCK_SOURCE_CLASS + "');");

assertThatEventually(
() -> {
try {
return ((ConnectorList) makeKsqlRequest("SHOW CONNECTORS;").get(0)).getConnectors().size();
} catch (AssertionError e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same questions as above. I notice there's no analogous try-catch around the SHOW CONNECTORS; request in the cleanupConnectors() helper method, curious why this situation is different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes a while to provision a connector, even after CREATE CONNECTOR returns a successful response. When ksqlDB calls SHOW CONNECTORS;, it makes a call to '/info' for reach connector, so if the new connector is not done provisioning, it returns an error response which propagates.

return 0;
}},
is(1)
);
}

private static void cleanupConnectors() {
((ConnectorList) makeKsqlRequest("SHOW CONNECTORS;").get(0)).getConnectors()
.forEach(c -> makeKsqlRequest("DROP CONNECTOR " + c.getName() + ";"));
assertThatEventually(
() -> ((ConnectorList) makeKsqlRequest("SHOW CONNECTORS;")
.get(0)).getConnectors().size(),
is(1)
is(0)
);
}

Expand Down