diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java b/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java index 17fae2806da3..6e020ad39d5e 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java @@ -29,14 +29,18 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.function.Function; +import javax.ws.rs.core.HttpHeaders; +import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; import org.apache.http.client.ResponseHandler; import org.apache.http.client.fluent.Request; import org.apache.http.entity.ContentType; +import org.apache.http.message.BasicHeader; import org.apache.http.util.EntityUtils; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; @@ -59,9 +63,14 @@ public class DefaultConnectClient implements ConnectClient { private static final int MAX_ATTEMPTS = 3; private final URI connectUri; + private final Optional authHeader; - public DefaultConnectClient(final String connectUri) { + public DefaultConnectClient( + final String connectUri, + final Optional authHeader + ) { Objects.requireNonNull(connectUri, "connectUri"); + this.authHeader = Objects.requireNonNull(authHeader, "authHeader"); try { this.connectUri = new URI(connectUri); @@ -84,6 +93,7 @@ public ConnectResponse create( final ConnectResponse connectResponse = withRetries(() -> Request .Post(connectUri.resolve(CONNECTORS)) + .setHeaders(headers()) .socketTimeout(DEFAULT_TIMEOUT_MS) .connectTimeout(DEFAULT_TIMEOUT_MS) .bodyString( @@ -114,6 +124,7 @@ public ConnectResponse> connectors() { final ConnectResponse> connectResponse = withRetries(() -> Request .Get(connectUri.resolve(CONNECTORS)) + .setHeaders(headers()) .socketTimeout(DEFAULT_TIMEOUT_MS) .connectTimeout(DEFAULT_TIMEOUT_MS) .execute() @@ -138,6 +149,7 @@ public ConnectResponse status(final String connector) { final ConnectResponse connectResponse = withRetries(() -> Request .Get(connectUri.resolve(CONNECTORS + "/" + connector + STATUS)) + .setHeaders(headers()) .socketTimeout(DEFAULT_TIMEOUT_MS) .connectTimeout(DEFAULT_TIMEOUT_MS) .execute() @@ -162,6 +174,7 @@ public ConnectResponse describe(final String connector) { final ConnectResponse connectResponse = withRetries(() -> Request .Get(connectUri.resolve(String.format("%s/%s", CONNECTORS, connector))) + .setHeaders(headers()) .socketTimeout(DEFAULT_TIMEOUT_MS) .connectTimeout(DEFAULT_TIMEOUT_MS) .execute() @@ -185,6 +198,7 @@ public ConnectResponse delete(final String connector) { final ConnectResponse connectResponse = withRetries(() -> Request .Delete(connectUri.resolve(String.format("%s/%s", CONNECTORS, connector))) + .setHeaders(headers()) .socketTimeout(DEFAULT_TIMEOUT_MS) .connectTimeout(DEFAULT_TIMEOUT_MS) .execute() @@ -200,6 +214,12 @@ public ConnectResponse delete(final String connector) { } } + private Header[] headers() { + return authHeader.isPresent() + ? new Header[]{new BasicHeader(HttpHeaders.AUTHORIZATION, authHeader.get())} + : new Header[]{}; + } + @SuppressWarnings("unchecked") private static ConnectResponse withRetries(final Callable> action) { try { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/ServiceContextFactory.java b/ksql-engine/src/main/java/io/confluent/ksql/services/ServiceContextFactory.java index bd1c867109be..338c6ea7b1fe 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/services/ServiceContextFactory.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/ServiceContextFactory.java @@ -19,6 +19,7 @@ import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory; import io.confluent.ksql.util.KsqlConfig; import java.util.Collections; +import java.util.Optional; import java.util.function.Supplier; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.streams.KafkaClientSupplier; @@ -35,6 +36,9 @@ public static ServiceContext create( ksqlConfig, new DefaultKafkaClientSupplier(), new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap())::get, + new DefaultConnectClient( + ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY), + Optional.empty()), ksqlClient ); } @@ -43,6 +47,7 @@ public static ServiceContext create( final KsqlConfig ksqlConfig, final KafkaClientSupplier kafkaClientSupplier, final Supplier srClientFactory, + final ConnectClient connectClient, final SimpleKsqlClient ksqlClient ) { final Admin adminClient = kafkaClientSupplier.getAdmin( @@ -54,7 +59,7 @@ public static ServiceContext create( adminClient, new KafkaTopicClientImpl(adminClient), srClientFactory, - new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY)), + connectClient, ksqlClient ); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTestUtil.java index 1df0ffeb6557..0a56b6293048 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTestUtil.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTestUtil.java @@ -28,6 +28,7 @@ import io.confluent.ksql.services.TestServiceContext; import io.confluent.ksql.statement.Injectors; import io.confluent.ksql.util.KsqlConfig; +import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.streams.KafkaClientSupplier; @@ -57,7 +58,9 @@ public static KsqlContext create( adminClient, kafkaTopicClient, () -> schemaRegistryClient, - new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY)) + new DefaultConnectClient( + ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY), + Optional.empty()) ); final String metricsPrefix = "instance-" + COUNTER.getAndIncrement() + "-"; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java index 1dd53ee884fc..765621a6dc8e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java @@ -23,12 +23,15 @@ import com.github.tomakehurst.wiremock.client.WireMock; import com.github.tomakehurst.wiremock.core.WireMockConfiguration; import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.github.tomakehurst.wiremock.matching.EqualToPattern; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.json.JsonMapper; import io.confluent.ksql.metastore.model.MetaStoreMatchers.OptionalMatchers; import io.confluent.ksql.services.ConnectClient.ConnectResponse; import java.util.List; +import java.util.Optional; +import javax.ws.rs.core.HttpHeaders; import org.apache.http.HttpStatus; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; @@ -57,16 +60,19 @@ public class DefaultConnectClientTest { ), ConnectorType.SOURCE ); + private static final String AUTH_HEADER = "Basic FOOBAR"; @Rule public WireMockRule wireMockRule = new WireMockRule( - WireMockConfiguration.wireMockConfig().dynamicPort()); + WireMockConfiguration.wireMockConfig() + .dynamicPort() + ); private ConnectClient client; @Before public void setup() { - client = new DefaultConnectClient("http://localhost:" + wireMockRule.port()); + client = new DefaultConnectClient("http://localhost:" + wireMockRule.port(), Optional.of(AUTH_HEADER)); } @Test @@ -74,6 +80,7 @@ public void testCreate() throws JsonProcessingException { // Given: WireMock.stubFor( WireMock.post(WireMock.urlEqualTo("/connectors")) + .withHeader(HttpHeaders.AUTHORIZATION, new EqualToPattern(AUTH_HEADER)) .willReturn(WireMock.aResponse() .withStatus(HttpStatus.SC_CREATED) .withBody(MAPPER.writeValueAsString(SAMPLE_INFO))) @@ -93,6 +100,7 @@ public void testCreateWithError() throws JsonProcessingException { // Given: WireMock.stubFor( WireMock.post(WireMock.urlEqualTo("/connectors")) + .withHeader(HttpHeaders.AUTHORIZATION, new EqualToPattern(AUTH_HEADER)) .willReturn(WireMock.aResponse() .withStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR) .withBody("Oh no!")) @@ -112,6 +120,7 @@ public void testList() throws JsonProcessingException { // Given: WireMock.stubFor( WireMock.get(WireMock.urlEqualTo("/connectors")) + .withHeader(HttpHeaders.AUTHORIZATION, new EqualToPattern(AUTH_HEADER)) .willReturn(WireMock.aResponse() .withStatus(HttpStatus.SC_OK) .withBody(MAPPER.writeValueAsString(ImmutableList.of("one", "two")))) @@ -130,6 +139,7 @@ public void testDescribe() throws JsonProcessingException { // Given: WireMock.stubFor( WireMock.get(WireMock.urlEqualTo("/connectors/foo")) + .withHeader(HttpHeaders.AUTHORIZATION, new EqualToPattern(AUTH_HEADER)) .willReturn(WireMock.aResponse() .withStatus(HttpStatus.SC_OK) .withBody(MAPPER.writeValueAsString(SAMPLE_INFO))) @@ -148,6 +158,7 @@ public void testStatus() throws JsonProcessingException { // Given: WireMock.stubFor( WireMock.get(WireMock.urlEqualTo("/connectors/foo/status")) + .withHeader(HttpHeaders.AUTHORIZATION, new EqualToPattern(AUTH_HEADER)) .willReturn(WireMock.aResponse() .withStatus(HttpStatus.SC_OK) .withBody(MAPPER.writeValueAsString(SAMPLE_STATUS))) @@ -174,6 +185,7 @@ public void testDelete() throws JsonProcessingException { // Given: WireMock.stubFor( WireMock.delete(WireMock.urlEqualTo("/connectors/foo")) + .withHeader(HttpHeaders.AUTHORIZATION, new EqualToPattern(AUTH_HEADER)) .willReturn(WireMock.aResponse() .withStatus(HttpStatus.SC_NO_CONTENT)) ); @@ -191,6 +203,7 @@ public void testListShouldRetryOnFailure() throws JsonProcessingException { // Given: WireMock.stubFor( WireMock.get(WireMock.urlEqualTo("/connectors")) + .withHeader(HttpHeaders.AUTHORIZATION, new EqualToPattern(AUTH_HEADER)) .willReturn(WireMock.aResponse() .withStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR) .withBody("Encountered an error!")) diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java b/ksql-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java index 3962d89c2748..e140f354fb9a 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java @@ -20,6 +20,7 @@ import io.confluent.ksql.util.FakeKafkaClientSupplier; import io.confluent.ksql.util.KsqlConfig; import java.util.Collections; +import java.util.Optional; import java.util.function.Supplier; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.streams.KafkaClientSupplier; @@ -63,7 +64,7 @@ public static ServiceContext create( new FakeKafkaClientSupplier().getAdmin(Collections.emptyMap()), topicClient, srClientFactory, - new DefaultConnectClient("http://localhost:8083") + new DefaultConnectClient("http://localhost:8083", Optional.empty()) ); } @@ -80,7 +81,9 @@ public static ServiceContext create( adminClient, new KafkaTopicClientImpl(adminClient), srClientFactory, - new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY)) + new DefaultConnectClient( + ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY), + Optional.empty()) ); } diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java index 83a772924076..5174ecfe09d0 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java @@ -417,7 +417,7 @@ static ServiceContext getServiceContext() { new StubKafkaClientSupplier().getAdmin(Collections.emptyMap()), new StubKafkaTopicClient(), () -> schemaRegistryClient, - new DefaultConnectClient("http://localhost:8083"), + new DefaultConnectClient("http://localhost:8083", Optional.empty()), DisabledKsqlClient.instance() ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/RestServiceContextFactory.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/RestServiceContextFactory.java index 2971261e4270..87d1d2dc84e4 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/RestServiceContextFactory.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/RestServiceContextFactory.java @@ -17,6 +17,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory; +import io.confluent.ksql.services.DefaultConnectClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.ServiceContextFactory; import io.confluent.ksql.util.KsqlConfig; @@ -71,6 +72,7 @@ public static ServiceContext create( ksqlConfig, kafkaClientSupplier, srClientFactory, + new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY), authHeader), new DefaultKsqlClient(authHeader) ); }