From 7798ff182ce732977bca8bf7d90984ab9d3e4b8e Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Fri, 13 Jan 2023 16:59:15 -0500 Subject: [PATCH] Introduce new OpenSearchTransport based on Apache HttpClient 5 (#281) Signed-off-by: Andriy Redko Signed-off-by: Andriy Redko --- CHANGELOG.md | 1 + USER_GUIDE.md | 37 + java-client/build.gradle.kts | 4 + .../client/transport/TransportHeaders.java | 17 + .../httpclient5/ApacheHttpClient5Options.java | 267 +++++ .../ApacheHttpClient5Transport.java | 987 ++++++++++++++++++ .../ApacheHttpClient5TransportBuilder.java | 407 ++++++++ .../transport/httpclient5/DeadHostState.java | 125 +++ .../HttpAsyncResponseConsumerFactory.java | 86 ++ .../transport/httpclient5/Response.java | 214 ++++ .../httpclient5/ResponseException.java | 96 ++ .../httpclient5/WarningFailureException.java | 76 ++ .../httpclient5/WarningsHandler.java | 80 ++ .../HeapBufferedAsyncEntityConsumer.java | 139 +++ .../HeapBufferedAsyncResponseConsumer.java | 123 +++ .../HttpEntityAsyncEntityProducer.java | 182 ++++ .../internal/HttpUriRequestProducer.java | 62 ++ .../transport/httpclient5/internal/Node.java | 289 +++++ .../httpclient5/internal/NodeSelector.java | 105 ++ .../rest_client/RestClientOptions.java | 7 +- ...ntIT.java => AbstractClusterClientIT.java} | 3 +- .../{CrudIT.java => AbstractCrudIT.java} | 2 +- ...ntIT.java => AbstractIndicesClientIT.java} | 3 +- .../{NodesIT.java => AbstractNodesIT.java} | 4 +- ...InfoIT.java => AbstractPingAndInfoIT.java} | 3 +- ...equestTest.java => AbstractRequestIT.java} | 5 +- .../OpenSearchJavaClientTestCase.java | 14 +- .../integTest/OpenSearchTransportSupport.java | 31 + .../httpclient5/ClusterClientIT.java | 25 + .../integTest/httpclient5/CrudIT.java | 14 + .../HttpClient5TransportSupport.java | 123 +++ .../httpclient5/IndicesClientIT.java | 14 + .../integTest/httpclient5/NodesIT.java | 14 + .../integTest/httpclient5/PingAndInfoIT.java | 15 + .../integTest/httpclient5/RequestIT.java | 14 + .../integTest/restclient/ClusterClientIT.java | 25 + .../integTest/restclient/CrudIT.java | 25 + .../integTest/restclient/IndicesClientIT.java | 25 + .../integTest/restclient/NodesIT.java | 25 + .../integTest/restclient/PingAndInfoIT.java | 25 + .../integTest/restclient/RequestIT.java | 25 + 41 files changed, 3709 insertions(+), 29 deletions(-) create mode 100644 java-client/src/main/java/org/opensearch/client/transport/TransportHeaders.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Options.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/DeadHostState.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/HttpAsyncResponseConsumerFactory.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/Response.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/ResponseException.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningFailureException.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningsHandler.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncResponseConsumer.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpEntityAsyncEntityProducer.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpUriRequestProducer.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/Node.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/NodeSelector.java rename java-client/src/test/java/org/opensearch/client/opensearch/integTest/{ClusterClientIT.java => AbstractClusterClientIT.java} (99%) rename java-client/src/test/java/org/opensearch/client/opensearch/integTest/{CrudIT.java => AbstractCrudIT.java} (99%) rename java-client/src/test/java/org/opensearch/client/opensearch/integTest/{IndicesClientIT.java => AbstractIndicesClientIT.java} (97%) rename java-client/src/test/java/org/opensearch/client/opensearch/integTest/{NodesIT.java => AbstractNodesIT.java} (90%) rename java-client/src/test/java/org/opensearch/client/opensearch/integTest/{PingAndInfoIT.java => AbstractPingAndInfoIT.java} (96%) rename java-client/src/test/java/org/opensearch/client/opensearch/integTest/{RequestTest.java => AbstractRequestIT.java} (99%) create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/OpenSearchTransportSupport.java create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/ClusterClientIT.java create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/CrudIT.java create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/IndicesClientIT.java create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/NodesIT.java create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/PingAndInfoIT.java create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/RequestIT.java create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/ClusterClientIT.java create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/CrudIT.java create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/IndicesClientIT.java create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/NodesIT.java create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/PingAndInfoIT.java create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/RequestIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 952be96f46..e4b6df3b4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ### Added - Github workflow for changelog verification ([#239](https://github.com/opensearch-project/opensearch-java/pull/239)) - Github workflow for dependabot PRs ([#247](https://github.com/opensearch-project/opensearch-java/pull/247)) +- Introduce new OpenSearchTransport based on Apache HttpClient 5 ([#328](https://github.com/opensearch-project/opensearch-java/pull/328)) ### Dependencies - Bumps `grgit-gradle` from 4.0.1 to 5.0.0 diff --git a/USER_GUIDE.md b/USER_GUIDE.md index f51063052e..a3403cc8af 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -2,6 +2,7 @@ - [User Guide](#user-guide) - [Sample data](#sample-data) + - [Create a client](#create-a-client) - [Create an index](#create-an-index) - [Index data](#index-data) - [Search for the document](#search-for-the-document) @@ -48,6 +49,42 @@ static class IndexData { } ``` +## Create a client + +There are multiple low level transports which `OpenSearchClient` could be configured with. + +### Create a client using `RestClientTransport` + +```java +Transport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); +OpenSearchClient client = new OpenSearchClient(transport); +``` + +The `JacksonJsonpMapper` class (2.x versions) only supports Java 7 objects by default. [Java 8 modules](https://github.com/FasterXML/jackson-modules-java8) to support JDK8 classes such as the Date and Time API (JSR-310), `Optional`, and more can be used by including [the additional datatype dependency](https://github.com/FasterXML/jackson-modules-java8#usage) and adding the module. For example, to include JSR-310 classes: + +```java +Transport transport = new RestClientTransport(restClient, + new JacksonJsonpMapper(new ObjectMapper().registerModule(new JavaTimeModule()))); +OpenSearchClient client = new OpenSearchClient(transport); +``` + +### Create a client using `ApacheHttpClient5Transport` + +```java +final Transport transport = ApacheHttpClient5TransportBuilder + .builder(hosts) + .setMapper(new JacksonJsonpMapper()) + .build(); +OpenSearchClient client = new OpenSearchClient(transport); +``` + +The Apache HttpClient 5 based transport has dependences on Apache HttpClient 5 and Apache HttpCore 5 which has to be added to the project explicitly. + +```gradle + implementation("org.apache.httpcomponents.client5", "httpclient5", "5.1.4") + implementation("org.apache.httpcomponents.core5", "httpcore5", "5.1.5") +``` + ## Create an index ```java diff --git a/java-client/build.gradle.kts b/java-client/build.gradle.kts index fc50f81adc..b7adb63334 100644 --- a/java-client/build.gradle.kts +++ b/java-client/build.gradle.kts @@ -164,6 +164,10 @@ dependencies { implementation("com.fasterxml.jackson.core", "jackson-databind", jacksonDatabindVersion) testImplementation("com.fasterxml.jackson.datatype", "jackson-datatype-jakarta-jsonp", jacksonVersion) + // ApacheHttpClient5Transport dependencies (optional) + implementation("org.apache.httpcomponents.client5", "httpclient5", "5.1.4") + implementation("org.apache.httpcomponents.core5", "httpcore5", "5.1.5") + // For AwsSdk2Transport "awsSdk2SupportImplementation"("software.amazon.awssdk","sdk-core","[2.15,3.0)") "awsSdk2SupportImplementation"("software.amazon.awssdk","auth","[2.15,3.0)") diff --git a/java-client/src/main/java/org/opensearch/client/transport/TransportHeaders.java b/java-client/src/main/java/org/opensearch/client/transport/TransportHeaders.java new file mode 100644 index 0000000000..b21360a903 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/TransportHeaders.java @@ -0,0 +1,17 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport; + +public final class TransportHeaders { + public static final String ACCEPT = "Accept"; + public static final String USER_AGENT = "User-Agent"; + + private TransportHeaders() { + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Options.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Options.java new file mode 100644 index 0000000000..e10c32a577 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Options.java @@ -0,0 +1,267 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Map.Entry; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.opensearch.client.transport.TransportOptions; +import org.opensearch.client.transport.Version; + +import static org.opensearch.client.transport.TransportHeaders.ACCEPT; +import static org.opensearch.client.transport.TransportHeaders.USER_AGENT; + +public class ApacheHttpClient5Options implements TransportOptions { + /** + * Default request options. + */ + public static final ApacheHttpClient5Options DEFAULT = new Builder( + Collections.emptyList(), + HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory.DEFAULT, + null, + null + ).build(); + + private final List
headers; + private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory; + private final WarningsHandler warningsHandler; + private final RequestConfig requestConfig; + + private ApacheHttpClient5Options(Builder builder) { + this.headers = Collections.unmodifiableList(new ArrayList<>(builder.headers)); + this.httpAsyncResponseConsumerFactory = builder.httpAsyncResponseConsumerFactory; + this.warningsHandler = builder.warningsHandler; + this.requestConfig = builder.requestConfig; + } + + public HttpAsyncResponseConsumerFactory getHttpAsyncResponseConsumerFactory() { + return httpAsyncResponseConsumerFactory; + } + + public WarningsHandler getWarningsHandler() { + return warningsHandler; + } + + public RequestConfig getRequestConfig() { + return requestConfig; + } + + @Override + public Collection> headers() { + return headers.stream() + .map(h -> new AbstractMap.SimpleImmutableEntry<>(h.getName(), h.getValue())) + .collect(Collectors.toList()); + } + + @Override + public Map queryParameters() { + return null; + } + + @Override + public Function, Boolean> onWarnings() { + if (warningsHandler == null) { + return null; + } else { + return warnings -> warningsHandler.warningsShouldFailRequest(warnings); + } + } + + @Override + public Builder toBuilder() { + return new Builder(headers, httpAsyncResponseConsumerFactory, warningsHandler, requestConfig); + } + + public static class Builder implements TransportOptions.Builder { + private final List
headers; + private HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory; + private WarningsHandler warningsHandler; + private RequestConfig requestConfig; + + private Builder(Builder builder) { + this(builder.headers, builder.httpAsyncResponseConsumerFactory, + builder.warningsHandler, builder.requestConfig); + } + + private Builder( + List
headers, + HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory, + WarningsHandler warningsHandler, + RequestConfig requestConfig + ) { + this.headers = new ArrayList<>(headers); + this.httpAsyncResponseConsumerFactory = httpAsyncResponseConsumerFactory; + this.warningsHandler = warningsHandler; + this.requestConfig = requestConfig; + } + + /** + * Add the provided header to the request. + * + * @param name the header name + * @param value the header value + * @throws NullPointerException if {@code name} or {@code value} is null. + */ + @Override + public Builder addHeader(String name, String value) { + Objects.requireNonNull(name, "header name cannot be null"); + Objects.requireNonNull(value, "header value cannot be null"); + this.headers.add(new ReqHeader(name, value)); + return this; + } + + @Override + public TransportOptions.Builder setParameter(String name, String value) { + return this; + } + + /** + * Called if there are warnings to determine if those warnings should fail the request. + */ + @Override + public TransportOptions.Builder onWarnings(Function, Boolean> listener) { + if (listener == null) { + setWarningsHandler(null); + } else { + setWarningsHandler(w -> { + if (w != null && !w.isEmpty()) { + return listener.apply(w); + } else { + return false; + } + }); + } + + return this; + } + + /** + * Set the {@link HttpAsyncResponseConsumerFactory} used to create one + * {@link AsyncResponseConsumer} callback per retry. Controls how the + * response body gets streamed from a non-blocking HTTP connection on the + * client side. + * + * @param httpAsyncResponseConsumerFactory factory for creating {@link AsyncResponseConsumer}. + * @throws NullPointerException if {@code httpAsyncResponseConsumerFactory} is null. + */ + public void setHttpAsyncResponseConsumerFactory(HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) { + this.httpAsyncResponseConsumerFactory = Objects.requireNonNull( + httpAsyncResponseConsumerFactory, + "httpAsyncResponseConsumerFactory cannot be null" + ); + } + + /** + * How this request should handle warnings. If null (the default) then + * this request will default to the behavior dictacted by + * `setStrictDeprecationMode`. + *

+ * This can be set to {@link WarningsHandler#PERMISSIVE} if the client + * should ignore all warnings which is the same behavior as setting + * strictDeprecationMode to true. It can be set to + * {@link WarningsHandler#STRICT} if the client should fail if there are + * any warnings which is the same behavior as settings + * strictDeprecationMode to false. + *

+ * It can also be set to a custom implementation of + * {@linkplain WarningsHandler} to permit only certain warnings or to + * fail the request if the warnings returned don't + * exactly match some set. + * + * @param warningsHandler the {@link WarningsHandler} to be used + */ + public void setWarningsHandler(WarningsHandler warningsHandler) { + this.warningsHandler = warningsHandler; + } + + /** + * set RequestConfig, which can set socketTimeout, connectTimeout + * and so on by request + * @param requestConfig http client RequestConfig + * @return Builder + */ + public Builder setRequestConfig(RequestConfig requestConfig) { + this.requestConfig = requestConfig; + return this; + } + + @Override + public ApacheHttpClient5Options build() { + return new ApacheHttpClient5Options(this); + } + } + + static ApacheHttpClient5Options initialOptions() { + String ua = String.format( + Locale.ROOT, + "opensearch-java/%s (Java/%s)", + Version.VERSION == null ? "Unknown" : Version.VERSION.toString(), + System.getProperty("java.version") + ); + + return new ApacheHttpClient5Options( + DEFAULT.toBuilder() + .addHeader(USER_AGENT, ua) + .addHeader(ACCEPT, ApacheHttpClient5Transport.JsonContentType.toString()) + ); + } + + static ApacheHttpClient5Options of(TransportOptions options) { + if (options instanceof ApacheHttpClient5Options) { + return (ApacheHttpClient5Options)options; + + } else { + final Builder builder = new Builder(DEFAULT.toBuilder()); + options.headers().forEach(h -> builder.addHeader(h.getKey(), h.getValue())); + options.queryParameters().forEach(builder::setParameter); + builder.onWarnings(options.onWarnings()); + return builder.build(); + } + } + + /** + * Custom implementation of {@link BasicHeader} that overrides equals and + * hashCode so it is easier to test equality of {@link ApacheHttpClient5Options}. + */ + static final class ReqHeader extends BasicHeader { + ReqHeader(String name, String value) { + super(name, value); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other instanceof ReqHeader) { + Header otherHeader = (Header) other; + return Objects.equals(getName(), otherHeader.getName()) && Objects.equals(getValue(), otherHeader.getValue()); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(getName(), getValue()); + } + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java new file mode 100644 index 0000000000..033cbcaaa7 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java @@ -0,0 +1,987 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.GZIPOutputStream; + +import javax.annotation.Nullable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hc.client5.http.auth.AuthCache; +import org.apache.hc.client5.http.auth.AuthScheme; +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.Credentials; +import org.apache.hc.client5.http.auth.CredentialsProvider; +import org.apache.hc.client5.http.classic.methods.HttpHead; +import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase; +import org.apache.hc.client5.http.entity.GzipDecompressingEntity; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.auth.BasicAuthCache; +import org.apache.hc.client5.http.impl.auth.BasicScheme; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.io.entity.BufferedHttpEntity; +import org.apache.hc.core5.http.io.entity.ByteArrayEntity; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.io.entity.HttpEntityWrapper; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.message.RequestLine; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.net.URIBuilder; +import org.apache.hc.core5.util.Args; +import org.opensearch.client.json.JsonpDeserializer; +import org.opensearch.client.json.JsonpMapper; +import org.opensearch.client.json.NdJsonpSerializable; +import org.opensearch.client.opensearch._types.ErrorResponse; +import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.transport.Endpoint; +import org.opensearch.client.transport.JsonEndpoint; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.TransportException; +import org.opensearch.client.transport.TransportOptions; +import org.opensearch.client.transport.endpoints.BooleanEndpoint; +import org.opensearch.client.transport.endpoints.BooleanResponse; +import org.opensearch.client.transport.httpclient5.internal.HttpUriRequestProducer; +import org.opensearch.client.transport.httpclient5.internal.Node; +import org.opensearch.client.transport.httpclient5.internal.NodeSelector; +import org.opensearch.client.util.MissingRequiredPropertyException; + +import jakarta.json.stream.JsonGenerator; +import jakarta.json.stream.JsonParser; + +public class ApacheHttpClient5Transport implements OpenSearchTransport { + private static final Log logger = LogFactory.getLog(ApacheHttpClient5Transport.class); + static final ContentType JsonContentType = ContentType.APPLICATION_JSON; + + private final JsonpMapper mapper; + private final CloseableHttpAsyncClient client; + private final ApacheHttpClient5Options transportOptions; + private final ConcurrentMap denylist = new ConcurrentHashMap<>(); + private final AtomicInteger lastNodeIndex = new AtomicInteger(0); + private volatile NodeTuple> nodeTuple; + private final NodeSelector nodeSelector; + private final WarningsHandler warningsHandler; + private final FailureListener failureListener; + private final boolean compressionEnabled; + private final boolean chunkedEnabled; + private final String pathPrefix; + private final List

defaultHeaders; + + public ApacheHttpClient5Transport(final CloseableHttpAsyncClient client, final Header[] defaultHeaders, + final List nodes, final JsonpMapper mapper, @Nullable TransportOptions options, final String pathPrefix, + final FailureListener failureListener, final NodeSelector nodeSelector, final boolean strictDeprecationMode, + final boolean compressionEnabled, final boolean chunkedEnabled) { + this.mapper = mapper; + this.client = client; + this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders)); + this.pathPrefix = pathPrefix; + this.transportOptions = (options == null) ? ApacheHttpClient5Options.initialOptions() : ApacheHttpClient5Options.of(options); + this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE; + this.nodeSelector = (nodeSelector == null) ? NodeSelector.ANY : nodeSelector; + this.failureListener = (failureListener == null) ? new FailureListener() : failureListener; + this.chunkedEnabled = chunkedEnabled; + this.compressionEnabled = compressionEnabled; + setNodes(nodes); + } + + @Override + public ResponseT performRequest(RequestT request, + Endpoint endpoint, TransportOptions options) throws IOException { + try { + return performRequestAsync(request, endpoint, options).join(); + } catch (final CompletionException ex) { + if (ex.getCause() instanceof RuntimeException) { + throw (RuntimeException) ex.getCause(); + } else if (ex.getCause() instanceof IOException) { + throw (IOException) ex.getCause(); + } else { + throw new IOException(ex.getCause()); + } + } + } + + @Override + public CompletableFuture performRequestAsync(RequestT request, + Endpoint endpoint, TransportOptions options) { + + final ApacheHttpClient5Options requestOptions = (options == null) ? transportOptions : ApacheHttpClient5Options.of(options); + final CompletableFuture future = new CompletableFuture<>(); + final HttpUriRequestBase clientReq = prepareLowLevelRequest(request, endpoint, requestOptions); + final WarningsHandler warningsHandler = (requestOptions.getWarningsHandler() == null) ? + this.warningsHandler : requestOptions.getWarningsHandler(); + + try { + performRequestAsync(nextNodes(), requestOptions, clientReq, warningsHandler, future); + } catch(final IOException ex) { + future.completeExceptionally(ex); + } + + return future.thenApply(r -> { + try { + return (ResponseT)prepareResponse(r, endpoint); + } catch (final IOException ex) { + throw new CompletionException(ex); + } + }); + } + + @Override + public JsonpMapper jsonpMapper() { + return mapper; + } + + @Override + public TransportOptions options() { + return transportOptions; + } + + @Override + public void close() throws IOException { + client.close(); + } + + private void performRequestAsync(final NodeTuple> nodeTuple, final ApacheHttpClient5Options options, + final HttpUriRequestBase request, final WarningsHandler warningsHandler, final CompletableFuture listener) { + final RequestContext context = createContextForNextAttempt(options, request, nodeTuple.nodes.next(), nodeTuple.authCache); + Future future = client.execute( + context.requestProducer, + context.asyncResponseConsumer, + context.context, + new FutureCallback() { + @Override + public void completed(ClassicHttpResponse httpResponse) { + try { + ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, + httpResponse, warningsHandler); + if (responseOrResponseException.responseException == null) { + listener.complete(responseOrResponseException.response); + } else { + if (nodeTuple.nodes.hasNext()) { + performRequestAsync(nodeTuple, options, request, warningsHandler, listener); + } else { + listener.completeExceptionally(responseOrResponseException.responseException); + } + } + } catch (Exception e) { + listener.completeExceptionally(e); + } + } + + @Override + public void failed(Exception failure) { + try { + onFailure(context.node); + if (nodeTuple.nodes.hasNext()) { + performRequestAsync(nodeTuple, options, request, warningsHandler, listener); + } else { + listener.completeExceptionally(failure); + } + } catch (Exception e) { + listener.completeExceptionally(e); + } + } + + @Override + public void cancelled() { + listener.completeExceptionally(new CancellationException("request was cancelled")); + } + } + ); + + if (future instanceof org.apache.hc.core5.concurrent.Cancellable) { + request.setDependency((org.apache.hc.core5.concurrent.Cancellable) future); + } + } + + /** + * Replaces the nodes with which the client communicates. + * + * @param nodes the new nodes to communicate with. + */ + private void setNodes(Collection nodes) { + if (nodes == null || nodes.isEmpty()) { + throw new IllegalArgumentException("nodes must not be null or empty"); + } + AuthCache authCache = new BasicAuthCache(); + + Map nodesByHost = new LinkedHashMap<>(); + for (Node node : nodes) { + Objects.requireNonNull(node, "node cannot be null"); + // TODO should we throw an IAE if we have two nodes with the same host? + nodesByHost.put(node.getHost(), node); + authCache.put(node.getHost(), new BasicScheme()); + } + this.nodeTuple = new NodeTuple<>(Collections.unmodifiableList(new ArrayList<>(nodesByHost.values())), authCache); + this.denylist.clear(); + } + + private ResponseOrResponseException convertResponse(final HttpUriRequestBase request, final Node node, + final ClassicHttpResponse httpResponse, final WarningsHandler warningsHandler) throws IOException { + int statusCode = httpResponse.getCode(); + + Optional.ofNullable(httpResponse.getEntity()) + .map(HttpEntity::getContentEncoding) + .filter("gzip"::equalsIgnoreCase) + .map(gzipHeaderValue -> new GzipDecompressingEntity(httpResponse.getEntity())) + .ifPresent(httpResponse::setEntity); + + Response response = new Response(new RequestLine(request), node.getHost(), httpResponse); + Set ignoreErrorCodes = getIgnoreErrorCodes("400,401,403,404,405", request.getMethod()); + if (isSuccessfulResponse(statusCode) || ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) { + onResponse(node); + if (warningsHandler.warningsShouldFailRequest(response.getWarnings())) { + throw new WarningFailureException(response); + } + return new ResponseOrResponseException(response); + } + ResponseException responseException = new ResponseException(response); + if (isRetryStatus(statusCode)) { + // mark host dead and retry against next one + onFailure(node); + return new ResponseOrResponseException(responseException); + } + // mark host alive and don't retry, as the error should be a request problem + onResponse(node); + throw responseException; + } + + private static Set getIgnoreErrorCodes(String ignoreString, String requestMethod) { + Set ignoreErrorCodes; + if (ignoreString == null) { + if (HttpHead.METHOD_NAME.equals(requestMethod)) { + // 404 never causes error if returned for a HEAD request + ignoreErrorCodes = Collections.singleton(404); + } else { + ignoreErrorCodes = Collections.emptySet(); + } + } else { + String[] ignoresArray = ignoreString.split(","); + ignoreErrorCodes = new HashSet<>(); + if (HttpHead.METHOD_NAME.equals(requestMethod)) { + // 404 never causes error if returned for a HEAD request + ignoreErrorCodes.add(404); + } + for (String ignoreCode : ignoresArray) { + try { + ignoreErrorCodes.add(Integer.valueOf(ignoreCode)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e); + } + } + } + return ignoreErrorCodes; + } + + private static boolean isSuccessfulResponse(int statusCode) { + return statusCode < 300; + } + + private static boolean isRetryStatus(int statusCode) { + switch (statusCode) { + case 502: + case 503: + case 504: + return true; + } + return false; + } + + /** + * Returns a non-empty {@link Iterator} of nodes to be used for a request + * that match the {@link NodeSelector}. + *

+ * If there are no living nodes that match the {@link NodeSelector} + * this will return the dead node that matches the {@link NodeSelector} + * that is closest to being revived. + * @throws IOException if no nodes are available + */ + private NodeTuple> nextNodes() throws IOException { + NodeTuple> nodeTuple = this.nodeTuple; + Iterable hosts = selectNodes(nodeTuple, denylist, lastNodeIndex, nodeSelector); + return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache); + } + + /** + * Select nodes to try and sorts them so that the first one will be tried initially, then the following ones + * if the previous attempt failed and so on. Package private for testing. + */ + static Iterable selectNodes( + NodeTuple> nodeTuple, + Map denylist, + AtomicInteger lastNodeIndex, + NodeSelector nodeSelector + ) throws IOException { + /* + * Sort the nodes into living and dead lists. + */ + List livingNodes = new ArrayList<>(Math.max(0, nodeTuple.nodes.size() - denylist.size())); + List deadNodes = new ArrayList<>(denylist.size()); + for (Node node : nodeTuple.nodes) { + DeadHostState deadness = denylist.get(node.getHost()); + if (deadness == null || deadness.shallBeRetried()) { + livingNodes.add(node); + } else { + deadNodes.add(new DeadNode(node, deadness)); + } + } + + if (false == livingNodes.isEmpty()) { + /* + * Normal state: there is at least one living node. If the + * selector is ok with any over the living nodes then use them + * for the request. + */ + List selectedLivingNodes = new ArrayList<>(livingNodes); + nodeSelector.select(selectedLivingNodes); + if (false == selectedLivingNodes.isEmpty()) { + /* + * Rotate the list using a global counter as the distance so subsequent + * requests will try the nodes in a different order. + */ + Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement()); + return selectedLivingNodes; + } + } + + /* + * Last resort: there are no good nodes to use, either because + * the selector rejected all the living nodes or because there aren't + * any living ones. Either way, we want to revive a single dead node + * that the NodeSelectors are OK with. We do this by passing the dead + * nodes through the NodeSelector so it can have its say in which nodes + * are ok. If the selector is ok with any of the nodes then we will take + * the one in the list that has the lowest revival time and try it. + */ + if (false == deadNodes.isEmpty()) { + final List selectedDeadNodes = new ArrayList<>(deadNodes); + /* + * We'd like NodeSelectors to remove items directly from deadNodes + * so we can find the minimum after it is filtered without having + * to compare many things. This saves us a sort on the unfiltered + * list. + */ + nodeSelector.select(() -> new DeadNodeIteratorAdapter(selectedDeadNodes.iterator())); + if (false == selectedDeadNodes.isEmpty()) { + return Collections.singletonList(Collections.min(selectedDeadNodes).node); + } + } + throw new IOException( + "NodeSelector [" + nodeSelector + "] rejected all nodes, " + "living " + livingNodes + " and dead " + deadNodes + ); + } + + /** + * Called after each failed attempt. + * Receives as an argument the host that was used for the failed attempt. + */ + private void onFailure(Node node) { + while (true) { + DeadHostState previousDeadHostState = denylist.putIfAbsent( + node.getHost(), + new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER) + ); + if (previousDeadHostState == null) { + if (logger.isDebugEnabled()) { + logger.debug("added [" + node + "] to denylist"); + } + break; + } + if (denylist.replace(node.getHost(), previousDeadHostState, new DeadHostState(previousDeadHostState))) { + if (logger.isDebugEnabled()) { + logger.debug("updated [" + node + "] already in denylist"); + } + break; + } + } + failureListener.onFailure(node); + } + + private RequestContext createContextForNextAttempt(final ApacheHttpClient5Options options, + final HttpUriRequestBase request, final Node node, final AuthCache authCache) { + request.reset(); + return new RequestContext(options, request, node, authCache); + } + + private ResponseT prepareResponse(Response clientResp, + Endpoint endpoint + ) throws IOException { + + try { + int statusCode = clientResp.getStatusLine().getStatusCode(); + + if (endpoint.isError(statusCode)) { + JsonpDeserializer errorDeserializer = endpoint.errorDeserializer(statusCode); + if (errorDeserializer == null) { + throw new TransportException( + "Request failed with status code '" + statusCode + "'", + new ResponseException(clientResp) + ); + } + + HttpEntity entity = clientResp.getEntity(); + if (entity == null) { + throw new TransportException( + "Expecting a response body, but none was sent", + new ResponseException(clientResp) + ); + } + + // We may have to replay it. + entity = new BufferedHttpEntity(entity); + + try { + InputStream content = entity.getContent(); + try (JsonParser parser = mapper.jsonProvider().createParser(content)) { + ErrorT error = errorDeserializer.deserialize(parser, mapper); + // TODO: have the endpoint provide the exception constructor + throw new OpenSearchException((ErrorResponse) error); + } + } catch(MissingRequiredPropertyException errorEx) { + // Could not decode exception, try the response type + try { + ResponseT response = decodeResponse(statusCode, entity, clientResp, endpoint); + return response; + } catch(Exception respEx) { + // No better luck: throw the original error decoding exception + throw new TransportException("Failed to decode error response", new ResponseException(clientResp)); + } + } + } else { + return decodeResponse(statusCode, clientResp.getEntity(), clientResp, endpoint); + } + } finally { + EntityUtils.consume(clientResp.getEntity()); + } + } + + private HttpUriRequestBase prepareLowLevelRequest( + RequestT request, + Endpoint endpoint, + @Nullable ApacheHttpClient5Options options + ) { + final String method = endpoint.method(request); + final String path = endpoint.requestUrl(request); + final Map params = endpoint.queryParameters(request); + + final URI uri = buildUri(pathPrefix, path, params); + final HttpUriRequestBase clientReq = new HttpUriRequestBase(method, uri); + if (endpoint.hasRequestBody()) { + // Request has a body and must implement JsonpSerializable or NdJsonpSerializable + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + if (request instanceof NdJsonpSerializable) { + writeNdJson((NdJsonpSerializable) request, baos); + } else { + JsonGenerator generator = mapper.jsonProvider().createGenerator(baos); + mapper.serialize(request, generator); + generator.close(); + } + + addRequestBody(clientReq, new ByteArrayEntity(baos.toByteArray(), JsonContentType)); + } + + setHeaders(clientReq, options.headers()); + + if (options.getRequestConfig() != null) { + clientReq.setConfig(options.getRequestConfig()); + } + + return clientReq; + } + + private HttpUriRequestBase addRequestBody(HttpUriRequestBase httpRequest, HttpEntity entity) { + if (entity != null) { + if (compressionEnabled) { + if (chunkedEnabled) { + entity = new ContentCompressingEntity(entity, chunkedEnabled); + } else { + entity = new ContentCompressingEntity(entity); + } + } else if (chunkedEnabled) { + entity = new ContentHttpEntity(entity, chunkedEnabled); + } + httpRequest.setEntity(entity); + } + return httpRequest; + } + + private void setHeaders(HttpRequest httpRequest, Collection> requestHeaders) { + // request headers override default headers, so we don't add default headers if they exist as request headers + final Set requestNames = new HashSet<>(requestHeaders.size()); + for (Map.Entry requestHeader : requestHeaders) { + httpRequest.addHeader(new BasicHeader(requestHeader.getKey(), requestHeader.getValue())); + requestNames.add(requestHeader.getKey()); + } + for (Header defaultHeader : defaultHeaders) { + if (requestNames.contains(defaultHeader.getName()) == false) { + httpRequest.addHeader(defaultHeader); + } + } + if (compressionEnabled) { + httpRequest.addHeader("Accept-Encoding", "gzip"); + } + } + /** + * Called after each successful request call. + * Receives as an argument the host that was used for the successful request. + */ + private void onResponse(Node node) { + DeadHostState removedHost = this.denylist.remove(node.getHost()); + if (logger.isDebugEnabled() && removedHost != null) { + logger.debug("removed [" + node + "] from denylist"); + } + } + + private ResponseT decodeResponse( + int statusCode, @Nullable HttpEntity entity, Response clientResp, Endpoint endpoint + ) throws IOException { + + if (endpoint instanceof BooleanEndpoint) { + BooleanEndpoint bep = (BooleanEndpoint) endpoint; + + @SuppressWarnings("unchecked") + ResponseT response = (ResponseT) new BooleanResponse(bep.getResult(statusCode)); + return response; + + } else if (endpoint instanceof JsonEndpoint){ + JsonEndpoint jsonEndpoint = (JsonEndpoint)endpoint; + // Successful response + ResponseT response = null; + JsonpDeserializer responseParser = jsonEndpoint.responseDeserializer(); + if (responseParser != null) { + // Expecting a body + if (entity == null) { + throw new TransportException( + "Expecting a response body, but none was sent", + new ResponseException(clientResp) + ); + } + InputStream content = entity.getContent(); + try (JsonParser parser = mapper.jsonProvider().createParser(content)) { + response = responseParser.deserialize(parser, mapper); + }; + } + return response; + } else { + throw new TransportException("Unhandled endpoint type: '" + endpoint.getClass().getName() + "'"); + } + } + + /** + * {@link NodeTuple} enables the {@linkplain Node}s and {@linkplain AuthCache} + * to be set together in a thread safe, volatile way. + */ + static class NodeTuple { + final T nodes; + final AuthCache authCache; + + NodeTuple(final T nodes, final AuthCache authCache) { + this.nodes = nodes; + this.authCache = authCache; + } + } + + /** + * Contains a reference to a denylisted node and the time until it is + * revived. We use this so we can do a single pass over the denylist. + */ + private static class DeadNode implements Comparable { + final Node node; + final DeadHostState deadness; + + DeadNode(Node node, DeadHostState deadness) { + this.node = node; + this.deadness = deadness; + } + + @Override + public String toString() { + return node.toString(); + } + + @Override + public int compareTo(DeadNode rhs) { + return deadness.compareTo(rhs.deadness); + } + } + + /** + * Adapts an Iterator<DeadNodeAndRevival> into an + * Iterator<Node>. + */ + private static class DeadNodeIteratorAdapter implements Iterator { + private final Iterator itr; + + private DeadNodeIteratorAdapter(Iterator itr) { + this.itr = itr; + } + + @Override + public boolean hasNext() { + return itr.hasNext(); + } + + @Override + public Node next() { + return itr.next().node; + } + + @Override + public void remove() { + itr.remove(); + } + } + + /** + * Write an nd-json value by serializing each of its items on a separate line, recursing if its items themselves implement + * {@link NdJsonpSerializable} to flattening nested structures. + */ + private void writeNdJson(NdJsonpSerializable value, ByteArrayOutputStream baos) { + Iterator values = value._serializables(); + while(values.hasNext()) { + Object item = values.next(); + if (item instanceof NdJsonpSerializable && item != value) { // do not recurse on the item itself + writeNdJson((NdJsonpSerializable) item, baos); + } else { + JsonGenerator generator = mapper.jsonProvider().createGenerator(baos); + mapper.serialize(item, generator); + generator.close(); + baos.write('\n'); + } + } + } + + private static URI buildUri(String pathPrefix, String path, Map params) { + Objects.requireNonNull(path, "path must not be null"); + try { + String fullPath; + if (pathPrefix != null && pathPrefix.isEmpty() == false) { + if (pathPrefix.endsWith("/") && path.startsWith("/")) { + fullPath = pathPrefix.substring(0, pathPrefix.length() - 1) + path; + } else if (pathPrefix.endsWith("/") || path.startsWith("/")) { + fullPath = pathPrefix + path; + } else { + fullPath = pathPrefix + "/" + path; + } + } else { + fullPath = path; + } + + URIBuilder uriBuilder = new URIBuilder(fullPath); + for (Map.Entry param : params.entrySet()) { + uriBuilder.addParameter(param.getKey(), param.getValue()); + } + + // The Apache HttpClient 5.x **does not** encode URIs but Apache HttpClient 4.x does. It leads + // to the issues with Unicode characters (f.e. document IDs could contain Unicode characters) and + // weird characters are being passed instead. By using `toASCIIString()`, the URI is already created + // with proper encoding. + return new URI(uriBuilder.build().toASCIIString()); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e.getMessage(), e); + } + } + + private static class RequestContext { + private final Node node; + private final AsyncRequestProducer requestProducer; + private final AsyncResponseConsumer asyncResponseConsumer; + private final HttpClientContext context; + + RequestContext(final ApacheHttpClient5Options options, final HttpUriRequestBase request, + final Node node, final AuthCache authCache) { + this.node = node; + this.requestProducer = HttpUriRequestProducer.create(request, node.getHost()); + this.asyncResponseConsumer = options + .getHttpAsyncResponseConsumerFactory() + .createHttpAsyncResponseConsumer(); + this.context = HttpClientContext.create(); + context.setAuthCache(new WrappingAuthCache(context, authCache)); + } + } + + /** + * The Apache HttpClient 5 adds "Authorization" header even if the credentials for basic authentication are not provided + * (effectively, username and password are 'null'). To workaround that, wrapping the AuthCache around current HttpClientContext + * and ensuring that the credentials are indeed provided for particular HttpHost, otherwise returning no authentication scheme + * even if it is present in the cache. + */ + private static class WrappingAuthCache implements AuthCache { + private final HttpClientContext context; + private final AuthCache delegate; + private final boolean usePersistentCredentials = true; + + WrappingAuthCache(HttpClientContext context, AuthCache delegate) { + this.context = context; + this.delegate = delegate; + } + + @Override + public void put(HttpHost host, AuthScheme authScheme) { + delegate.put(host, authScheme); + } + + @Override + public AuthScheme get(HttpHost host) { + AuthScheme authScheme = delegate.get(host); + + if (authScheme != null) { + final CredentialsProvider credsProvider = context.getCredentialsProvider(); + if (credsProvider != null) { + final String schemeName = authScheme.getName(); + final AuthScope authScope = new AuthScope(host, null, schemeName); + final Credentials creds = credsProvider.getCredentials(authScope, context); + + // See please https://issues.apache.org/jira/browse/HTTPCLIENT-2203 + if (authScheme instanceof BasicScheme) { + ((BasicScheme) authScheme).initPreemptive(creds); + } + + if (creds == null) { + return null; + } + } + } + + return authScheme; + } + + @Override + public void remove(HttpHost host) { + if (!usePersistentCredentials) { + delegate.remove(host); + } + } + + @Override + public void clear() { + delegate.clear(); + } + } + + private static class ResponseOrResponseException { + private final Response response; + private final ResponseException responseException; + + ResponseOrResponseException(Response response) { + this.response = Objects.requireNonNull(response); + this.responseException = null; + } + + ResponseOrResponseException(ResponseException responseException) { + this.responseException = Objects.requireNonNull(responseException); + this.response = null; + } + } + + /** + * Listener that allows to be notified whenever a failure happens. Useful when sniffing is enabled, so that we can sniff on failure. + * The default implementation is a no-op. + */ + public static class FailureListener { + /** + * Create a {@link FailureListener} instance. + */ + public FailureListener() {} + + /** + * Notifies that the node provided as argument has just failed. + * + * @param node The node which has failed. + */ + public void onFailure(Node node) {} + } + + /** + * A gzip compressing entity that also implements {@code getContent()}. + */ + public static class ContentCompressingEntity extends HttpEntityWrapper { + private static final String GZIP_CODEC = "gzip"; + + private Optional chunkedEnabled; + + /** + * Creates a {@link ContentCompressingEntity} instance with the provided HTTP entity. + * + * @param entity the HTTP entity. + */ + public ContentCompressingEntity(HttpEntity entity) { + super(entity); + this.chunkedEnabled = Optional.empty(); + } + + /** + * Returns content encoding of the entity, if known. + */ + @Override + public String getContentEncoding() { + return GZIP_CODEC; + } + + /** + * Creates a {@link ContentCompressingEntity} instance with the provided HTTP entity. + * + * @param entity the HTTP entity. + * @param chunkedEnabled force enable/disable chunked transfer-encoding. + */ + public ContentCompressingEntity(HttpEntity entity, boolean chunkedEnabled) { + super(entity); + this.chunkedEnabled = Optional.of(chunkedEnabled); + } + + /** + * Returns a content stream of the entity. + */ + @Override + public InputStream getContent() throws IOException { + ByteArrayInputOutputStream out = new ByteArrayInputOutputStream(1024); + try (GZIPOutputStream gzipOut = new GZIPOutputStream(out)) { + super.writeTo(gzipOut); + } + return out.asInput(); + } + + /** + * A gzip compressing entity doesn't work with chunked encoding with sigv4 + * + * @return false + */ + @Override + public boolean isChunked() { + return chunkedEnabled.orElseGet(super::isChunked); + } + + /** + * A gzip entity requires content length in http headers. + * + * @return content length of gzip entity + */ + @Override + public long getContentLength() { + if (chunkedEnabled.isPresent()) { + if (chunkedEnabled.get()) { + return -1L; + } else { + long size; + try (InputStream is = getContent()) { + size = is.readAllBytes().length; + } catch (IOException ex) { + size = -1L; + } + + return size; + } + } else { + return -1; + } + } + + /** + * Writes the entity content out to the output stream. + * @param outStream the output stream to write entity content to + * @throws IOException if an I/O error occurs + */ + @Override + public void writeTo(final OutputStream outStream) throws IOException { + Args.notNull(outStream, "Output stream"); + final GZIPOutputStream gzip = new GZIPOutputStream(outStream); + super.writeTo(gzip); + // Only close output stream if the wrapped entity has been + // successfully written out + gzip.close(); + } + } + + /** + * An entity that lets the caller specify the return value of {@code isChunked()}. + */ + public static class ContentHttpEntity extends HttpEntityWrapper { + private Optional chunkedEnabled; + + /** + * Creates a {@link ContentHttpEntity} instance with the provided HTTP entity. + * + * @param entity the HTTP entity. + */ + public ContentHttpEntity(HttpEntity entity) { + super(entity); + this.chunkedEnabled = Optional.empty(); + } + + /** + * Creates a {@link ContentHttpEntity} instance with the provided HTTP entity. + * + * @param entity the HTTP entity. + * @param chunkedEnabled force enable/disable chunked transfer-encoding. + */ + public ContentHttpEntity(HttpEntity entity, boolean chunkedEnabled) { + super(entity); + this.chunkedEnabled = Optional.of(chunkedEnabled); + } + + /** + * A chunked entity requires transfer-encoding:chunked in http headers + * which requires isChunked to be true + * + * @return true + */ + @Override + public boolean isChunked() { + return chunkedEnabled.orElseGet(super::isChunked); + } + } + + /** + * A ByteArrayOutputStream that can be turned into an input stream without copying the underlying buffer. + */ + private static class ByteArrayInputOutputStream extends ByteArrayOutputStream { + ByteArrayInputOutputStream(int size) { + super(size); + } + + public InputStream asInput() { + return new ByteArrayInputStream(this.buf, 0, this.count); + } + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java new file mode 100644 index 0000000000..bf75ba74b4 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java @@ -0,0 +1,407 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.security.AccessController; +import java.security.NoSuchAlgorithmException; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +import org.apache.hc.client5.http.auth.CredentialsProvider; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.core5.function.Factory; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.reactor.ssl.TlsDetails; +import org.apache.hc.core5.util.Timeout; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.client.json.JsonpMapper; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.transport.TransportOptions; +import org.opensearch.client.transport.httpclient5.internal.Node; +import org.opensearch.client.transport.httpclient5.internal.NodeSelector; + +public class ApacheHttpClient5TransportBuilder { + /** + * The default connection timeout in milliseconds. + */ + public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000; + + /** + * The default response timeout in milliseconds. + */ + public static final int DEFAULT_RESPONSE_TIMEOUT_MILLIS = 30000; + + /** + * The default maximum of connections per route. + */ + public static final int DEFAULT_MAX_CONN_PER_ROUTE = 10; + + /** + * The default maximum total connections. + */ + public static final int DEFAULT_MAX_CONN_TOTAL = 30; + + private static final Header[] EMPTY_HEADERS = new Header[0]; + + private final List nodes; + private Header[] defaultHeaders = EMPTY_HEADERS; + private ApacheHttpClient5Transport.FailureListener failureListener; + private HttpClientConfigCallback httpClientConfigCallback; + private RequestConfigCallback requestConfigCallback; + private String pathPrefix; + private NodeSelector nodeSelector = NodeSelector.ANY; + private boolean strictDeprecationMode = false; + private boolean compressionEnabled = false; + private Optional chunkedEnabled; + private JsonpMapper mapper; + private TransportOptions options; + + /** + * Creates a new builder instance and sets the hosts that the client will send requests to. + * + * @throws IllegalArgumentException if {@code nodes} is {@code null} or empty. + */ + ApacheHttpClient5TransportBuilder(List nodes) { + if (nodes == null || nodes.isEmpty()) { + throw new IllegalArgumentException("nodes must not be null or empty"); + } + for (Node node : nodes) { + if (node == null) { + throw new IllegalArgumentException("node cannot be null"); + } + } + this.nodes = nodes; + this.chunkedEnabled = Optional.empty(); + } + + /** + * Sets the default request headers, which will be sent along with each request. + *

+ * Request-time headers will always overwrite any default headers. + * + * @param defaultHeaders array of default header + * @throws NullPointerException if {@code defaultHeaders} or any header is {@code null}. + */ + public ApacheHttpClient5TransportBuilder setDefaultHeaders(Header[] defaultHeaders) { + Objects.requireNonNull(defaultHeaders, "defaultHeaders must not be null"); + for (Header defaultHeader : defaultHeaders) { + Objects.requireNonNull(defaultHeader, "default header must not be null"); + } + this.defaultHeaders = defaultHeaders; + return this; + } + + /** + * Sets the {@link RestClient.FailureListener} to be notified for each request failure + * + * @param failureListener the {@link RestClient.FailureListener} for each failure + * @throws NullPointerException if {@code failureListener} is {@code null}. + */ + public ApacheHttpClient5TransportBuilder setFailureListener(ApacheHttpClient5Transport.FailureListener failureListener) { + Objects.requireNonNull(failureListener, "failureListener must not be null"); + this.failureListener = failureListener; + return this; + } + + /** + * Sets the {@link HttpClientConfigCallback} to be used to customize http client configuration + * + * @param httpClientConfigCallback the {@link HttpClientConfigCallback} to be used + * @throws NullPointerException if {@code httpClientConfigCallback} is {@code null}. + */ + public ApacheHttpClient5TransportBuilder setHttpClientConfigCallback(HttpClientConfigCallback httpClientConfigCallback) { + Objects.requireNonNull(httpClientConfigCallback, "httpClientConfigCallback must not be null"); + this.httpClientConfigCallback = httpClientConfigCallback; + return this; + } + + /** + * Sets the {@link RequestConfigCallback} to be used to customize http client configuration + * + * @param requestConfigCallback the {@link RequestConfigCallback} to be used + * @throws NullPointerException if {@code requestConfigCallback} is {@code null}. + */ + public ApacheHttpClient5TransportBuilder setRequestConfigCallback(RequestConfigCallback requestConfigCallback) { + Objects.requireNonNull(requestConfigCallback, "requestConfigCallback must not be null"); + this.requestConfigCallback = requestConfigCallback; + return this; + } + + /** + * Sets the path's prefix for every request used by the http client. + *

+ * For example, if this is set to "/my/path", then any client request will become "/my/path/" + endpoint. + *

+ * In essence, every request's {@code endpoint} is prefixed by this {@code pathPrefix}. The path prefix is useful for when + * OpenSearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; + * it is not intended for other purposes and it should not be supplied in other scenarios. + * + * @param pathPrefix the path prefix for every request. + * @throws NullPointerException if {@code pathPrefix} is {@code null}. + * @throws IllegalArgumentException if {@code pathPrefix} is empty, or ends with more than one '/'. + */ + public ApacheHttpClient5TransportBuilder setPathPrefix(String pathPrefix) { + this.pathPrefix = cleanPathPrefix(pathPrefix); + return this; + } + + /** + * Sets the {@link JsonpMapper} instance to be used for parsing JSON payloads. If not provided + * the {@link JacksonJsonpMapper} is going to be used. + * @param mapper the {@link JsonpMapper} instance + */ + public ApacheHttpClient5TransportBuilder setMapper(JsonpMapper mapper) { + this.mapper = mapper; + return this; + } + + /** + * Sets the default {@link TransportOptions} to be used by the client + * @param options default {@link TransportOptions} + */ + public ApacheHttpClient5TransportBuilder setOptions(TransportOptions options) { + this.options = options; + return this; + } + + /** + * Cleans up the given path prefix to ensure that looks like "/base/path". + * + * @param pathPrefix the path prefix to be cleaned up. + * @return the cleaned up path prefix. + * @throws NullPointerException if {@code pathPrefix} is {@code null}. + * @throws IllegalArgumentException if {@code pathPrefix} is empty, or ends with more than one '/'. + */ + public static String cleanPathPrefix(String pathPrefix) { + Objects.requireNonNull(pathPrefix, "pathPrefix must not be null"); + + if (pathPrefix.isEmpty()) { + throw new IllegalArgumentException("pathPrefix must not be empty"); + } + + String cleanPathPrefix = pathPrefix; + if (cleanPathPrefix.startsWith("/") == false) { + cleanPathPrefix = "/" + cleanPathPrefix; + } + + // best effort to ensure that it looks like "/base/path" rather than "/base/path/" + if (cleanPathPrefix.endsWith("/") && cleanPathPrefix.length() > 1) { + cleanPathPrefix = cleanPathPrefix.substring(0, cleanPathPrefix.length() - 1); + + if (cleanPathPrefix.endsWith("/")) { + throw new IllegalArgumentException("pathPrefix is malformed. too many trailing slashes: [" + pathPrefix + "]"); + } + } + return cleanPathPrefix; + } + + /** + * Sets the {@link NodeSelector} to be used for all requests. + * + * @param nodeSelector the {@link NodeSelector} to be used + * @throws NullPointerException if the provided nodeSelector is null + */ + public ApacheHttpClient5TransportBuilder setNodeSelector(NodeSelector nodeSelector) { + Objects.requireNonNull(nodeSelector, "nodeSelector must not be null"); + this.nodeSelector = nodeSelector; + return this; + } + + /** + * Whether the REST client should return any response containing at least + * one warning header as a failure. + * + * @param strictDeprecationMode flag for enabling strict deprecation mode + */ + public ApacheHttpClient5TransportBuilder setStrictDeprecationMode(boolean strictDeprecationMode) { + this.strictDeprecationMode = strictDeprecationMode; + return this; + } + + /** + * Whether the REST client should compress requests using gzip content encoding and add the "Accept-Encoding: gzip" + * header to receive compressed responses. + * + * @param compressionEnabled flag for enabling compression + */ + public ApacheHttpClient5TransportBuilder setCompressionEnabled(boolean compressionEnabled) { + this.compressionEnabled = compressionEnabled; + return this; + } + + /** + * Whether the REST client should use Transfer-Encoding: chunked for requests or not" + * + * @param chunkedEnabled force enable/disable chunked transfer-encoding. + */ + public ApacheHttpClient5TransportBuilder setChunkedEnabled(boolean chunkedEnabled) { + this.chunkedEnabled = Optional.of(chunkedEnabled); + return this; + } + + /** + * Creates a new {@link RestClient} based on the provided configuration. + */ + public ApacheHttpClient5Transport build() { + if (failureListener == null) { + failureListener = new ApacheHttpClient5Transport.FailureListener(); + } + CloseableHttpAsyncClient httpClient = AccessController.doPrivileged( + (PrivilegedAction) this::createHttpClient + ); + + if (mapper == null) { + mapper = new JacksonJsonpMapper(); + } + + final ApacheHttpClient5Transport transport = new ApacheHttpClient5Transport( + httpClient, + defaultHeaders, + nodes, + mapper, + options, + pathPrefix, + failureListener, + nodeSelector, + strictDeprecationMode, + compressionEnabled, + chunkedEnabled.orElse(false) + ); + + httpClient.start(); + return transport; + } + + /** + * Returns a new {@link ApacheHttpClient5TransportBuilder} to help with {@link ApacheHttpClient5Transport} creation. + * Creates a new builder instance and sets the hosts that the client will send requests to. + *

+ * Prefer this to {@link #builder(HttpHost...)} if you have metadata up front about the nodes. + * If you don't either one is fine. + * + * @param nodes The nodes that the client will send requests to. + */ + public static ApacheHttpClient5TransportBuilder builder(Node... nodes) { + return new ApacheHttpClient5TransportBuilder(nodes == null ? null : Arrays.asList(nodes)); + } + + /** + * Returns a new {@link ApacheHttpClient5TransportBuilder} to help with {@link ApacheHttpClient5Transport} creation. + * Creates a new builder instance and sets the nodes that the client will send requests to. + *

+ * You can use this if you do not have metadata up front about the nodes. If you do, prefer + * {@link #builder(Node...)}. + * @see Node#Node(HttpHost) + * + * @param hosts The hosts that the client will send requests to. + */ + public static ApacheHttpClient5TransportBuilder builder(HttpHost... hosts) { + if (hosts == null || hosts.length == 0) { + throw new IllegalArgumentException("hosts must not be null nor empty"); + } + List nodes = Arrays.stream(hosts).map(Node::new).collect(Collectors.toList()); + return new ApacheHttpClient5TransportBuilder(nodes); + } + + private CloseableHttpAsyncClient createHttpClient() { + // default timeouts are all infinite + RequestConfig.Builder requestConfigBuilder = RequestConfig.custom() + .setConnectTimeout(Timeout.ofMilliseconds(DEFAULT_CONNECT_TIMEOUT_MILLIS)) + .setResponseTimeout(Timeout.ofMilliseconds(DEFAULT_RESPONSE_TIMEOUT_MILLIS)); + + if (requestConfigCallback != null) { + requestConfigBuilder = requestConfigCallback.customizeRequestConfig(requestConfigBuilder); + } + + try { + final TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create() + .setSslContext(SSLContext.getDefault()) + // See https://issues.apache.org/jira/browse/HTTPCLIENT-2219 + .setTlsDetailsFactory(new Factory() { + @Override + public TlsDetails create(final SSLEngine sslEngine) { + return new TlsDetails(sslEngine.getSession(), sslEngine.getApplicationProtocol()); + } + }) + .build(); + + final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create() + .setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE) + .setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL) + .setTlsStrategy(tlsStrategy) + .build(); + + HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create() + .setDefaultRequestConfig(requestConfigBuilder.build()) + .setConnectionManager(connectionManager) + .setTargetAuthenticationStrategy(DefaultAuthenticationStrategy.INSTANCE) + .disableAutomaticRetries(); + if (httpClientConfigCallback != null) { + httpClientBuilder = httpClientConfigCallback.customizeHttpClient(httpClientBuilder); + } + + final HttpAsyncClientBuilder finalBuilder = httpClientBuilder; + return AccessController.doPrivileged((PrivilegedAction) finalBuilder::build); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("could not create the default ssl context", e); + } + } + + /** + * Callback used the default {@link RequestConfig} being set to the {@link CloseableHttpClient} + * @see HttpClientBuilder#setDefaultRequestConfig + */ + public interface RequestConfigCallback { + /** + * Allows to customize the {@link RequestConfig} that will be used with each request. + * It is common to customize the different timeout values through this method without losing any other useful default + * value that the {@link RestClientBuilder} internally sets. + * + * @param requestConfigBuilder the {@link RestClientBuilder} for customizing the request configuration. + */ + RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder); + } + + /** + * Callback used to customize the {@link CloseableHttpClient} instance used by a {@link RestClient} instance. + * Allows to customize default {@link RequestConfig} being set to the client and any parameter that + * can be set through {@link HttpClientBuilder} + */ + public interface HttpClientConfigCallback { + /** + * Allows to customize the {@link CloseableHttpAsyncClient} being created and used by the {@link RestClient}. + * Commonly used to customize the default {@link CredentialsProvider} for authentication for communication + * through TLS/SSL without losing any other useful default value that the {@link RestClientBuilder} internally + * sets, like connection pooling. + * + * @param httpClientBuilder the {@link HttpClientBuilder} for customizing the client instance. + */ + HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder); + } + + +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/DeadHostState.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/DeadHostState.java new file mode 100644 index 0000000000..a09afde125 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/DeadHostState.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * Holds the state of a dead connection to a host. Keeps track of how many failed attempts were performed and + * when the host should be retried (based on number of previous failed attempts). + * Class is immutable, a new copy of it should be created each time the state has to be changed. + */ +final class DeadHostState implements Comparable { + + private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1); + static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30); + static final Supplier DEFAULT_TIME_SUPPLIER = System::nanoTime; + + private final int failedAttempts; + private final long deadUntilNanos; + private final Supplier timeSupplier; + + /** + * Build the initial dead state of a host. Useful when a working host stops functioning + * and needs to be marked dead after its first failure. In such case the host will be retried after a minute or so. + * + * @param timeSupplier a way to supply the current time and allow for unit testing + */ + DeadHostState(Supplier timeSupplier) { + this.failedAttempts = 1; + this.deadUntilNanos = timeSupplier.get() + MIN_CONNECTION_TIMEOUT_NANOS; + this.timeSupplier = timeSupplier; + } + + /** + * Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence + * it already failed for one or more consecutive times. The more failed attempts we register the longer we wait + * to retry that same host again. Minimum is 1 minute (for a node the only failed once created + * through {@link #DeadHostState(Supplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times) + * + * @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt + */ + DeadHostState(DeadHostState previousDeadHostState) { + long timeoutNanos = (long) Math.min( + MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1), + MAX_CONNECTION_TIMEOUT_NANOS + ); + this.deadUntilNanos = previousDeadHostState.timeSupplier.get() + timeoutNanos; + this.failedAttempts = previousDeadHostState.failedAttempts + 1; + this.timeSupplier = previousDeadHostState.timeSupplier; + } + + /** + * Indicates whether it's time to retry to failed host or not. + * + * @return true if the host should be retried, false otherwise + */ + boolean shallBeRetried() { + return timeSupplier.get() - deadUntilNanos > 0; + } + + /** + * Returns the timestamp (nanos) till the host is supposed to stay dead without being retried. + * After that the host should be retried. + */ + long getDeadUntilNanos() { + return deadUntilNanos; + } + + int getFailedAttempts() { + return failedAttempts; + } + + @Override + public int compareTo(DeadHostState other) { + if (timeSupplier != other.timeSupplier) { + throw new IllegalArgumentException( + "can't compare DeadHostStates holding different time suppliers as they may be based on different clocks" + ); + } + return Long.compare(deadUntilNanos, other.deadUntilNanos); + } + + @Override + public String toString() { + return "DeadHostState{" + + "failedAttempts=" + + failedAttempts + + ", deadUntilNanos=" + + deadUntilNanos + + ", timeSupplier=" + + timeSupplier + + '}'; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/HttpAsyncResponseConsumerFactory.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/HttpAsyncResponseConsumerFactory.java new file mode 100644 index 0000000000..c9322f473e --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/HttpAsyncResponseConsumerFactory.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.opensearch.client.transport.httpclient5.internal.HeapBufferedAsyncResponseConsumer; + +/** + * Factory used to create instances of {@link AsyncResponseConsumer}. Each request retry needs its own instance of the + * consumer object. Users can implement this interface and pass their own instance to the specialized + * performRequest methods that accept an {@link HttpAsyncResponseConsumerFactory} instance as argument. + */ +public interface HttpAsyncResponseConsumerFactory { + + /** + * Creates the default type of {@link AsyncResponseConsumer}, based on heap buffering with a buffer limit of 100MB. + */ + HttpAsyncResponseConsumerFactory DEFAULT = new HeapBufferedResponseConsumerFactory( + HeapBufferedResponseConsumerFactory.DEFAULT_BUFFER_LIMIT); + + /** + * Creates the {@link AsyncResponseConsumer}, called once per request attempt. + */ + AsyncResponseConsumer createHttpAsyncResponseConsumer(); + + /** + * Default factory used to create instances of {@link AsyncResponseConsumer}. + * Creates one instance of {@link HeapBufferedAsyncResponseConsumer} for each request attempt, with a configurable + * buffer limit which defaults to 100MB. + */ + class HeapBufferedResponseConsumerFactory implements HttpAsyncResponseConsumerFactory { + + // default buffer limit is 100MB + static final int DEFAULT_BUFFER_LIMIT = 100 * 1024 * 1024; + + private final int bufferLimit; + + /** + * Creates a {@link HeapBufferedResponseConsumerFactory} instance with the given buffer limit. + * + * @param bufferLimitBytes the buffer limit to be applied to this instance + */ + public HeapBufferedResponseConsumerFactory(int bufferLimitBytes) { + this.bufferLimit = bufferLimitBytes; + } + + /** + * Creates the {@link AsyncResponseConsumer}, called once per request attempt. + */ + @Override + public AsyncResponseConsumer createHttpAsyncResponseConsumer() { + return new HeapBufferedAsyncResponseConsumer(bufferLimit); + } + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/Response.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/Response.java new file mode 100644 index 0000000000..2f36d517de --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/Response.java @@ -0,0 +1,214 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.message.RequestLine; +import org.apache.hc.core5.http.message.StatusLine; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Holds an opensearch response. It wraps the {@link HttpResponse} returned and associates it with + * its corresponding {@link RequestLine} and {@link HttpHost}. + */ +final class Response { + + private final RequestLine requestLine; + private final HttpHost host; + private final ClassicHttpResponse response; + + Response(RequestLine requestLine, HttpHost host, ClassicHttpResponse response) { + Objects.requireNonNull(requestLine, "requestLine cannot be null"); + Objects.requireNonNull(host, "host cannot be null"); + Objects.requireNonNull(response, "response cannot be null"); + this.requestLine = requestLine; + this.host = host; + this.response = response; + } + + /** + * Returns the request line that generated this response + */ + public RequestLine getRequestLine() { + return requestLine; + } + + /** + * Returns the node that returned this response + */ + public HttpHost getHost() { + return host; + } + + /** + * Returns the status line of the current response + */ + public StatusLine getStatusLine() { + return new StatusLine(response); + } + + /** + * Returns all the response headers + */ + public Header[] getHeaders() { + return response.getHeaders(); + } + + /** + * Returns the value of the first header with a specified name of this message. + * If there is more than one matching header in the message the first element is returned. + * If there is no matching header in the message null is returned. + * + * @param name header name + */ + public String getHeader(String name) { + Header header = response.getFirstHeader(name); + if (header == null) { + return null; + } + return header.getValue(); + } + + /** + * Returns the response body available, null otherwise + * @see HttpEntity + */ + public HttpEntity getEntity() { + return response.getEntity(); + } + + /** + * Optimized regular expression to test if a string matches the RFC 1123 date + * format (with quotes and leading space). Start/end of line characters and + * atomic groups are used to prevent backtracking. + */ + private static final Pattern WARNING_HEADER_DATE_PATTERN = Pattern.compile("^ " + // start of line, leading space + // quoted RFC 1123 date format + "\"" + // opening quote + "(?>Mon|Tue|Wed|Thu|Fri|Sat|Sun), " + // day of week, atomic group to prevent backtracking + "\\d{2} " + // 2-digit day + "(?>Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) " + // month, atomic group to prevent backtracking + "\\d{4} " + // 4-digit year + "\\d{2}:\\d{2}:\\d{2} " + // (two-digit hour):(two-digit minute):(two-digit second) + "GMT" + // GMT + "\"$"); // closing quote (optional, since an older version can still send a warn-date), end of line + + /** + * Length of RFC 1123 format (with quotes and leading space), used in + * matchWarningHeaderPatternByPrefix(String). + */ + private static final int WARNING_HEADER_DATE_LENGTH = 0 + 1 + 1 + 3 + 1 + 1 + 2 + 1 + 3 + 1 + 4 + 1 + 2 + 1 + 2 + 1 + 2 + 1 + 3 + 1; + + /** + * Tests if a string matches the RFC 7234 specification for warning headers. + * This assumes that the warn code is always 299 and the warn agent is always + * OpenSearch. + * + * @param s the value of a warning header formatted according to RFC 7234 + * @return {@code true} if the input string matches the specification + */ + private static boolean matchWarningHeaderPatternByPrefix(final String s) { + return s.startsWith("299 OpenSearch-"); + } + + /** + * Refer to org.opensearch.common.logging.DeprecationLogger + */ + private static String extractWarningValueFromWarningHeader(final String s) { + String warningHeader = s; + + /* + * The following block tests for the existence of a RFC 1123 date in the warning header. If the date exists, it is removed for + * extractWarningValueFromWarningHeader(String) to work properly (as it does not handle dates). + */ + if (s.length() > WARNING_HEADER_DATE_LENGTH) { + final String possibleDateString = s.substring(s.length() - WARNING_HEADER_DATE_LENGTH); + final Matcher matcher = WARNING_HEADER_DATE_PATTERN.matcher(possibleDateString); + + if (matcher.matches()) { + warningHeader = warningHeader.substring(0, s.length() - WARNING_HEADER_DATE_LENGTH); + } + } + + final int firstQuote = warningHeader.indexOf('\"'); + final int lastQuote = warningHeader.length() - 1; + final String warningValue = warningHeader.substring(firstQuote + 1, lastQuote); + return warningValue; + } + + /** + * Returns a list of all warning headers returned in the response. + */ + public List getWarnings() { + List warnings = new ArrayList<>(); + for (Header header : response.getHeaders("Warning")) { + String warning = header.getValue(); + if (matchWarningHeaderPatternByPrefix(warning)) { + warnings.add(extractWarningValueFromWarningHeader(warning)); + } else { + warnings.add(warning); + } + } + return warnings; + } + + /** + * Returns true if there is at least one warning header returned in the + * response. + */ + public boolean hasWarnings() { + Header[] warnings = response.getHeaders("Warning"); + return warnings != null && warnings.length > 0; + } + + ClassicHttpResponse getHttpResponse() { + return response; + } + + /** + * Convert response to string representation + */ + @Override + public String toString() { + return "Response{" + "requestLine=" + requestLine + ", host=" + host + ", response=" + getStatusLine() + '}'; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ResponseException.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ResponseException.java new file mode 100644 index 0000000000..6cd0358a93 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ResponseException.java @@ -0,0 +1,96 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.BufferedHttpEntity; +import org.apache.hc.core5.http.io.entity.EntityUtils; + +import java.io.IOException; +import java.util.Locale; + +/** + * Exception thrown when an opensearch node responds to a request with a status code that indicates an error. + * Holds the response that was returned. + */ +public final class ResponseException extends IOException { + + private final Response response; + + /** + * Creates a ResponseException containing the given {@code Response}. + * + * @param response The error response. + */ + ResponseException(Response response) throws IOException { + super(buildMessage(response)); + this.response = response; + } + + static String buildMessage(Response response) throws IOException { + String message = String.format( + Locale.ROOT, + "method [%s], host [%s], URI [%s], status line [%s]", + response.getRequestLine().getMethod(), + response.getHost(), + response.getRequestLine().getUri(), + response.getStatusLine().toString() + ); + + if (response.hasWarnings()) { + message += "\n" + "Warnings: " + response.getWarnings(); + } + + HttpEntity entity = response.getEntity(); + if (entity != null) { + if (entity.isRepeatable() == false) { + entity = new BufferedHttpEntity(entity); + response.getHttpResponse().setEntity(entity); + } + try { + message += "\n" + EntityUtils.toString(entity); + } catch (final ParseException ex) { + throw new IOException(ex); + } + } + return message; + } + + /** + * Returns the {@link Response} that caused this exception to be thrown. + */ + public Response getResponse() { + return response; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningFailureException.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningFailureException.java new file mode 100644 index 0000000000..8627fd9d0c --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningFailureException.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.io.IOException; + +/** + * This exception is used to indicate that one or more {@link Response#getWarnings()} exist + * and is typically used when the {@link ApacheHttpClient5Transport} is set to fail by passing + * `true` to `strictDeprecationMode`. + */ +// This class extends RuntimeException in order to deal with wrapping that is done in FutureUtils on exception. +// if the exception is not of type OpenSearchException or RuntimeException it will be wrapped in a UncategorizedExecutionException +public final class WarningFailureException extends RuntimeException { + private final Response response; + + /** + * Creates a {@link WarningFailureException} instance. + * + * @param response the response that contains warnings. + * @throws IOException if there is a problem building the exception message. + */ + public WarningFailureException(Response response) throws IOException { + super(ResponseException.buildMessage(response)); + this.response = response; + } + + /** + * Wrap a {@linkplain WarningFailureException} with another one with the current + * stack trace. This is used during synchronous calls so that the caller + * ends up in the stack trace of the exception thrown. + * + * @param e the exception to be wrapped. + */ + WarningFailureException(WarningFailureException e) { + super(e.getMessage(), e); + this.response = e.getResponse(); + } + + /** + * Returns the {@link Response} that caused this exception to be thrown. + */ + public Response getResponse() { + return response; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningsHandler.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningsHandler.java new file mode 100644 index 0000000000..1a0b73d07d --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningsHandler.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.util.List; + +/** + * Called if there are warnings to determine if those warnings should fail the + * request. + */ +public interface WarningsHandler { + + /** + * Determines whether the given list of warnings should fail the request. + * + * @param warnings a list of warnings. + * @return boolean indicating if the request should fail. + */ + boolean warningsShouldFailRequest(List warnings); + + /** + * The permissive warnings handler. Warnings will not fail the request. + */ + WarningsHandler PERMISSIVE = new WarningsHandler() { + @Override + public boolean warningsShouldFailRequest(List warnings) { + return false; + } + + @Override + public String toString() { + return "permissive"; + } + }; + + /** + * The strict warnings handler. Warnings will fail the request. + */ + WarningsHandler STRICT = new WarningsHandler() { + @Override + public boolean warningsShouldFailRequest(List warnings) { + return false == warnings.isEmpty(); + } + + @Override + public String toString() { + return "strict"; + } + }; +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java new file mode 100644 index 0000000000..aa4d73a237 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.hc.core5.http.ContentTooLongException; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.nio.AsyncEntityConsumer; +import org.apache.hc.core5.http.nio.entity.AbstractBinAsyncEntityConsumer; +import org.apache.hc.core5.util.ByteArrayBuffer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Default implementation of {@link AsyncEntityConsumer}. Buffers the whole + * response content in heap memory, meaning that the size of the buffer is equal to the content-length of the response. + * Limits the size of responses that can be read based on a configurable argument. Throws an exception in case the entity is longer + * than the configured buffer limit. + */ +public class HeapBufferedAsyncEntityConsumer extends AbstractBinAsyncEntityConsumer { + + private final int bufferLimitBytes; + private AtomicReference bufferRef = new AtomicReference<>(); + + /** + * Creates a new instance of this consumer with the provided buffer limit. + * + * @param bufferLimit the buffer limit. Must be greater than 0. + * @throws IllegalArgumentException if {@code bufferLimit} is less than or equal to 0. + */ + public HeapBufferedAsyncEntityConsumer(int bufferLimit) { + if (bufferLimit <= 0) { + throw new IllegalArgumentException("bufferLimit must be greater than 0"); + } + this.bufferLimitBytes = bufferLimit; + } + + /** + * Get the limit of the buffer. + */ + public int getBufferLimit() { + return bufferLimitBytes; + } + + /** + * Triggered to signal beginning of entity content stream. + * + * @param contentType the entity content type + */ + @Override + protected void streamStart(final ContentType contentType) throws HttpException, IOException {} + + /** + * Triggered to obtain the capacity increment. + * + * @return the number of bytes this consumer is prepared to process. + */ + @Override + protected int capacityIncrement() { + return Integer.MAX_VALUE; + } + + /** + * Triggered to pass incoming data packet to the data consumer. + * + * @param src the data packet. + * @param endOfStream flag indicating whether this data packet is the last in the data stream. + * + */ + @Override + protected void data(final ByteBuffer src, final boolean endOfStream) throws IOException { + if (src == null) { + return; + } + + ByteArrayBuffer buffer = bufferRef.get(); + if (buffer == null) { + buffer = new ByteArrayBuffer(bufferLimitBytes); + if (bufferRef.compareAndSet(null, buffer) == false) { + buffer = bufferRef.get(); + } + } + + int len = src.limit(); + if (buffer.length() + len > bufferLimitBytes) { + throw new ContentTooLongException( + "entity content is too long [" + len + "] for the configured buffer limit [" + bufferLimitBytes + "]" + ); + } + + if (len < 0) { + len = 4096; + } + + if (src.hasArray()) { + buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining()); + } else { + while (src.hasRemaining()) { + buffer.append(src.get()); + } + } + } + + /** + * Triggered to generate entity representation. + * + * @return the entity content + */ + @Override + protected byte[] generateContent() throws IOException { + final ByteArrayBuffer buffer = bufferRef.get(); + return buffer == null ? new byte[0] : buffer.toByteArray(); + } + + /** + * Release resources being held + */ + @Override + public void releaseResources() { + ByteArrayBuffer buffer = bufferRef.getAndSet(null); + if (buffer != null) { + buffer.clear(); + buffer = null; + } + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncResponseConsumer.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncResponseConsumer.java new file mode 100644 index 0000000000..4d0ceb66e7 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncResponseConsumer.java @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.io.entity.ByteArrayEntity; +import org.apache.hc.core5.http.message.BasicClassicHttpResponse; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.http.nio.support.AbstractAsyncResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; + +import java.io.IOException; + +/** + * Default implementation of {@link AsyncResponseConsumer}. Buffers the whole + * response content in heap memory, meaning that the size of the buffer is equal to the content-length of the response. + * Limits the size of responses that can be read based on a configurable argument. Throws an exception in case the entity is longer + * than the configured buffer limit. + */ +public class HeapBufferedAsyncResponseConsumer extends AbstractAsyncResponseConsumer { + private static final Log LOGGER = LogFactory.getLog(HeapBufferedAsyncResponseConsumer.class); + private final int bufferLimit; + + /** + * Creates a new instance of this consumer with the provided buffer limit. + * + * @param bufferLimit the buffer limit. Must be greater than 0. + * @throws IllegalArgumentException if {@code bufferLimit} is less than or equal to 0. + */ + public HeapBufferedAsyncResponseConsumer(int bufferLimit) { + super(new HeapBufferedAsyncEntityConsumer(bufferLimit)); + this.bufferLimit = bufferLimit; + } + + /** + * Get the limit of the buffer. + */ + public int getBufferLimit() { + return bufferLimit; + } + + /** + * Triggered to signal receipt of an intermediate (1xx) HTTP response. + * + * @param response the intermediate (1xx) HTTP response. + * @param context the actual execution context. + */ + @Override + public void informationResponse(final HttpResponse response, final HttpContext context) throws HttpException, IOException {} + + /** + * Triggered to generate object that represents a result of response message processing. + * @param response the response message. + * @param entity the response entity. + * @param contentType the response content type. + * @return the result of response processing. + */ + @Override + protected ClassicHttpResponse buildResult(final HttpResponse response, final byte[] entity, final ContentType contentType) { + final ClassicHttpResponse classicResponse = new BasicClassicHttpResponse(response.getCode()); + classicResponse.setVersion(response.getVersion()); + classicResponse.setHeaders(response.getHeaders()); + classicResponse.setReasonPhrase(response.getReasonPhrase()); + if (response.getLocale() != null) { + classicResponse.setLocale(response.getLocale()); + } + + if (entity != null) { + String encoding = null; + + try { + final Header contentEncoding = response.getHeader(HttpHeaders.CONTENT_ENCODING); + if (contentEncoding != null) { + encoding = contentEncoding.getValue(); + } + } catch (final HttpException ex) { + LOGGER.debug("Unable to detect content encoding", ex); + } + + final ByteArrayEntity httpEntity = new ByteArrayEntity(entity, contentType, encoding); + classicResponse.setEntity(httpEntity); + } + + return classicResponse; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpEntityAsyncEntityProducer.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpEntityAsyncEntityProducer.java new file mode 100644 index 0000000000..1c669a55c7 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpEntityAsyncEntityProducer.java @@ -0,0 +1,182 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.ResourceHolder; +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.Asserts; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * The {@link AsyncEntityProducer} implementation for {@link HttpEntity} + */ +public class HttpEntityAsyncEntityProducer implements AsyncEntityProducer { + + private final HttpEntity entity; + private final ByteBuffer byteBuffer; + private final boolean chunked; + private final AtomicReference exception; + private final AtomicReference channelRef; + private boolean eof; + + /** + * Create new async HTTP entity producer + * @param entity HTTP entity + * @param bufferSize buffer size + */ + public HttpEntityAsyncEntityProducer(final HttpEntity entity, final int bufferSize) { + this.entity = Args.notNull(entity, "Http Entity"); + this.byteBuffer = ByteBuffer.allocate(bufferSize); + this.chunked = entity.isChunked(); + this.exception = new AtomicReference<>(); + this.channelRef = new AtomicReference<>(); + } + + /** + * Create new async HTTP entity producer with default buffer size (8192 bytes) + * @param entity HTTP entity + */ + public HttpEntityAsyncEntityProducer(final HttpEntity entity) { + this(entity, 8192); + } + + /** + * Determines whether the producer can consistently produce the same content + * after invocation of {@link ResourceHolder#releaseResources()}. + */ + @Override + public boolean isRepeatable() { + return entity.isRepeatable(); + } + + /** + * Returns content type of the entity, if known. + */ + @Override + public String getContentType() { + return entity.getContentType(); + } + + /** + * Returns length of the entity, if known. + */ + @Override + public long getContentLength() { + return entity.getContentLength(); + } + + /** + * Returns the number of bytes immediately available for output. + * This method can be used as a hint to control output events + * of the underlying I/O session. + * + * @return the number of bytes immediately available for output + */ + @Override + public int available() { + return Integer.MAX_VALUE; + } + + /** + * Returns content encoding of the entity, if known. + */ + @Override + public String getContentEncoding() { + return entity.getContentEncoding(); + } + + /** + * Returns chunked transfer hint for this entity. + *

+ * The behavior of wrapping entities is implementation dependent, + * but should respect the primary purpose. + *

+ */ + @Override + public boolean isChunked() { + return chunked; + } + + /** + * Preliminary declaration of trailing headers. + */ + @Override + public Set getTrailerNames() { + return entity.getTrailerNames(); + } + + /** + * Triggered to signal the ability of the underlying data channel + * to accept more data. The data producer can choose to write data + * immediately inside the call or asynchronously at some later point. + * + * @param channel the data channel capable to accepting more data. + */ + @Override + public void produce(final DataStreamChannel channel) throws IOException { + ReadableByteChannel stream = channelRef.get(); + if (stream == null) { + stream = Channels.newChannel(entity.getContent()); + Asserts.check(channelRef.getAndSet(stream) == null, "Illegal producer state"); + } + if (!eof) { + final int bytesRead = stream.read(byteBuffer); + if (bytesRead < 0) { + eof = true; + } + } + if (byteBuffer.position() > 0) { + byteBuffer.flip(); + channel.write(byteBuffer); + byteBuffer.compact(); + } + if (eof && byteBuffer.position() == 0) { + channel.endStream(); + releaseResources(); + } + } + + /** + * Triggered to signal a failure in data generation. + * + * @param cause the cause of the failure. + */ + @Override + public void failed(final Exception cause) { + if (exception.compareAndSet(null, cause)) { + releaseResources(); + } + } + + /** + * Release resources being held + */ + @Override + public void releaseResources() { + eof = false; + final ReadableByteChannel stream = channelRef.getAndSet(null); + if (stream != null) { + try { + stream.close(); + } catch (final IOException ex) { + /* Close quietly */ + } + } + } + +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpUriRequestProducer.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpUriRequestProducer.java new file mode 100644 index 0000000000..b9940009c7 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpUriRequestProducer.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.net.URIAuthority; +import org.apache.hc.core5.util.Args; + +/** + * The producer of the {@link HttpUriRequestBase} instances associated with a particular {@link HttpHost} + */ +public class HttpUriRequestProducer extends BasicRequestProducer { + private final HttpUriRequestBase request; + + HttpUriRequestProducer(final HttpUriRequestBase request, final AsyncEntityProducer entityProducer) { + super(request, entityProducer); + this.request = request; + } + + /** + * Get the produced {@link HttpUriRequestBase} instance + * @return produced {@link HttpUriRequestBase} instance + */ + public HttpUriRequestBase getRequest() { + return request; + } + + /** + * Create new request producer for {@link HttpUriRequestBase} instance and {@link HttpHost} + * @param request {@link HttpUriRequestBase} instance + * @param host {@link HttpHost} instance + * @return new request producer + */ + public static HttpUriRequestProducer create(final HttpUriRequestBase request, final HttpHost host) { + Args.notNull(request, "Request"); + Args.notNull(host, "HttpHost"); + + // TODO: Should we copy request here instead of modifying in place? + request.setAuthority(new URIAuthority(host)); + request.setScheme(host.getSchemeName()); + + final HttpEntity entity = request.getEntity(); + AsyncEntityProducer entityProducer = null; + + if (entity != null) { + entityProducer = new HttpEntityAsyncEntityProducer(entity); + } + + return new HttpUriRequestProducer(request, entityProducer); + } + +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/Node.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/Node.java new file mode 100644 index 0000000000..8f55f67cde --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/Node.java @@ -0,0 +1,289 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.hc.core5.http.HttpHost; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; + +/** + * Metadata about an {@link HttpHost} running OpenSearch. + */ +public class Node { + /** + * Address that this host claims is its primary contact point. + */ + private final HttpHost host; + /** + * Addresses on which the host is listening. These are useful to have + * around because they allow you to find a host based on any address it + * is listening on. + */ + private final Set boundHosts; + /** + * Name of the node as configured by the {@code node.name} attribute. + */ + private final String name; + /** + * Version of OpenSearch that the node is running or {@code null} + * if we don't know the version. + */ + private final String version; + /** + * Roles that the OpenSearch process on the host has or {@code null} + * if we don't know what roles the node has. + */ + private final Roles roles; + /** + * Attributes declared on the node. + */ + private final Map> attributes; + + /** + * Create a {@linkplain Node} with metadata. All parameters except + * {@code host} are nullable and implementations of {@link NodeSelector} + * need to decide what to do in their absence. + * + * @param host primary host address + * @param boundHosts addresses on which the host is listening + * @param name name of the node + * @param version version of OpenSearch + * @param roles roles that the OpenSearch process has on the host + * @param attributes attributes declared on the node + */ + public Node(HttpHost host, Set boundHosts, String name, String version, Roles roles, Map> attributes) { + if (host == null) { + throw new IllegalArgumentException("host cannot be null"); + } + this.host = host; + this.boundHosts = boundHosts; + this.name = name; + this.version = version; + this.roles = roles; + this.attributes = attributes; + } + + /** + * Create a {@linkplain Node} without any metadata. + * + * @param host primary host address + */ + public Node(HttpHost host) { + this(host, null, null, null, null, null); + } + + /** + * Contact information for the host. + */ + public HttpHost getHost() { + return host; + } + + /** + * Addresses on which the host is listening. These are useful to have + * around because they allow you to find a host based on any address it + * is listening on. + */ + public Set getBoundHosts() { + return boundHosts; + } + + /** + * The {@code node.name} of the node. + */ + public String getName() { + return name; + } + + /** + * Version of OpenSearch that the node is running or {@code null} + * if we don't know the version. + */ + public String getVersion() { + return version; + } + + /** + * Roles that the OpenSearch process on the host has or {@code null} + * if we don't know what roles the node has. + */ + public Roles getRoles() { + return roles; + } + + /** + * Attributes declared on the node. + */ + public Map> getAttributes() { + return attributes; + } + + /** + * Convert node to string representation + */ + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + b.append("[host=").append(host); + if (boundHosts != null) { + b.append(", bound=").append(boundHosts); + } + if (name != null) { + b.append(", name=").append(name); + } + if (version != null) { + b.append(", version=").append(version); + } + if (roles != null) { + b.append(", roles=").append(roles); + } + if (attributes != null) { + b.append(", attributes=").append(attributes); + } + return b.append(']').toString(); + } + + /** + * Compare two nodes for equality + * @param obj node instance to compare with + */ + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + Node other = (Node) obj; + return host.equals(other.host) + && Objects.equals(boundHosts, other.boundHosts) + && Objects.equals(name, other.name) + && Objects.equals(version, other.version) + && Objects.equals(roles, other.roles) + && Objects.equals(attributes, other.attributes); + } + + /** + * Calculate the hash code of the node + */ + @Override + public int hashCode() { + return Objects.hash(host, boundHosts, name, version, roles, attributes); + } + + /** + * Role information about an OpenSearch process. + */ + public static final class Roles { + + private final Set roles; + + /** + * Create a {@link Roles} instance of the given string set. + * + * @param roles set of role names. + */ + public Roles(final Set roles) { + this.roles = new TreeSet<>(roles); + } + + /** + * Returns whether or not the node could be elected cluster-manager. + */ + public boolean isClusterManagerEligible() { + return roles.contains("master") || roles.contains("cluster_manager"); + } + + /** + * Returns whether or not the node could be elected cluster-manager. + * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #isClusterManagerEligible()} + */ + @Deprecated + public boolean isMasterEligible() { + return isClusterManagerEligible(); + } + + /** + * Returns whether or not the node stores data. + */ + public boolean isData() { + return roles.contains("data"); + } + + /** + * Returns whether or not the node runs ingest pipelines. + */ + public boolean isIngest() { + return roles.contains("ingest"); + } + + /** + * Returns whether the node is dedicated to provide search capability. + */ + public boolean isSearch() { + return roles.contains("search"); + } + + /** + * Convert roles to string representation + */ + @Override + public String toString() { + return String.join(",", roles); + } + + /** + * Compare two roles for equality + * @param obj roles instance to compare with + */ + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + Roles other = (Roles) obj; + return roles.equals(other.roles); + } + + /** + * Calculate the hash code of the roles + */ + @Override + public int hashCode() { + return roles.hashCode(); + } + + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/NodeSelector.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/NodeSelector.java new file mode 100644 index 0000000000..eb11d8bb0b --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/NodeSelector.java @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import java.util.Iterator; + +/** + * Selects nodes that can receive requests. Used to keep requests away + * from cluster-manager nodes or to send them to nodes with a particular attribute. + */ +public interface NodeSelector { + /** + * Select the {@link Node}s to which to send requests. This is called with + * a mutable {@link Iterable} of {@linkplain Node}s in the order that the + * rest client would prefer to use them and implementers should remove + * nodes from the that should not receive the request. Implementers may + * iterate the nodes as many times as they need. + *

+ * This may be called twice per request: first for "living" nodes that + * have not been denylisted by previous errors. If the selector removes + * all nodes from the list or if there aren't any living nodes then the + * {@link org.opensearch.client.transport.httpclient5.ApacheHttpClient5Transport} + * will call this method with a list of "dead" nodes. + *

+ * Implementers should not rely on the ordering of the nodes. + * + * @param nodes the {@link Node}s targeted for the sending requests + */ + void select(Iterable nodes); + /* + * We were fairly careful with our choice of Iterable here. The caller has + * a List but reordering the list is likely to break round robin. Luckily + * Iterable doesn't allow any reordering. + */ + + /** + * Selector that matches any node. + */ + NodeSelector ANY = new NodeSelector() { + @Override + public void select(Iterable nodes) { + // Intentionally does nothing + } + + @Override + public String toString() { + return "ANY"; + } + }; + + /** + * Selector that matches any node that has metadata and doesn't + * have the {@code cluster_manager} role OR it has the data {@code data} + * role. + */ + NodeSelector SKIP_DEDICATED_CLUSTER_MANAGERS = new NodeSelector() { + @Override + public void select(Iterable nodes) { + for (Iterator itr = nodes.iterator(); itr.hasNext();) { + Node node = itr.next(); + if (node.getRoles() == null) continue; + if (node.getRoles().isClusterManagerEligible() + && false == node.getRoles().isData() + && false == node.getRoles().isIngest()) { + itr.remove(); + } + } + } + + @Override + public String toString() { + return "SKIP_DEDICATED_CLUSTER_MANAGERS"; + } + }; +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/rest_client/RestClientOptions.java b/java-client/src/main/java/org/opensearch/client/transport/rest_client/RestClientOptions.java index 3015189421..f74a535639 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/rest_client/RestClientOptions.java +++ b/java-client/src/main/java/org/opensearch/client/transport/rest_client/RestClientOptions.java @@ -45,6 +45,9 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.opensearch.client.transport.TransportHeaders.ACCEPT; +import static org.opensearch.client.transport.TransportHeaders.USER_AGENT; + public class RestClientOptions implements TransportOptions { private final RequestOptions options; @@ -178,8 +181,6 @@ public RestClientOptions build() { } } - private static final String USER_AGENT = "User-Agent"; - static RestClientOptions initialOptions() { String ua = String.format( Locale.ROOT, @@ -191,7 +192,7 @@ static RestClientOptions initialOptions() { return new RestClientOptions( RequestOptions.DEFAULT.toBuilder() .addHeader(USER_AGENT, ua) - .addHeader("Accept", RestClientTransport.JsonContentType.toString()) + .addHeader(ACCEPT, RestClientTransport.JsonContentType.toString()) .build() ); } diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/ClusterClientIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractClusterClientIT.java similarity index 99% rename from java-client/src/test/java/org/opensearch/client/opensearch/integTest/ClusterClientIT.java rename to java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractClusterClientIT.java index 8b21eadba8..f709de64ee 100644 --- a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/ClusterClientIT.java +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractClusterClientIT.java @@ -34,8 +34,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -public class ClusterClientIT extends OpenSearchJavaClientTestCase { - +public abstract class AbstractClusterClientIT extends OpenSearchJavaClientTestCase { public void testClusterPutSettings() throws IOException { OpenSearchClient openSearchClient = javaClient(); diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/CrudIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractCrudIT.java similarity index 99% rename from java-client/src/test/java/org/opensearch/client/opensearch/integTest/CrudIT.java rename to java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractCrudIT.java index 5a31ca2e92..380a5bdd8a 100644 --- a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/CrudIT.java +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractCrudIT.java @@ -36,7 +36,7 @@ import java.util.Collections; import java.util.List; -public class CrudIT extends OpenSearchJavaClientTestCase { +public abstract class AbstractCrudIT extends OpenSearchJavaClientTestCase { public void testDelete() throws IOException { { diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/IndicesClientIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractIndicesClientIT.java similarity index 97% rename from java-client/src/test/java/org/opensearch/client/opensearch/integTest/IndicesClientIT.java rename to java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractIndicesClientIT.java index 38726a57a5..c56556770a 100644 --- a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/IndicesClientIT.java +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractIndicesClientIT.java @@ -22,8 +22,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -public class IndicesClientIT extends OpenSearchJavaClientTestCase { - +public abstract class AbstractIndicesClientIT extends OpenSearchJavaClientTestCase { public void testIndicesExists() throws IOException { // Index present diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/NodesIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractNodesIT.java similarity index 90% rename from java-client/src/test/java/org/opensearch/client/opensearch/integTest/NodesIT.java rename to java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractNodesIT.java index 74fc61a3ec..6a00a76ac3 100644 --- a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/NodesIT.java +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractNodesIT.java @@ -16,10 +16,10 @@ import org.opensearch.client.opensearch.nodes.NodesStatsResponse; -public class NodesIT extends OpenSearchJavaClientTestCase { +public abstract class AbstractNodesIT extends OpenSearchJavaClientTestCase { public void testNodesStats() throws IOException { final NodesStatsResponse response = javaClient().nodes().stats(); assertThat(response.clusterName(), not(nullValue())); assertThat(response.nodes(), not(anEmptyMap())); } -} \ No newline at end of file +} diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/PingAndInfoIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractPingAndInfoIT.java similarity index 96% rename from java-client/src/test/java/org/opensearch/client/opensearch/integTest/PingAndInfoIT.java rename to java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractPingAndInfoIT.java index ef69c05b7b..79f4eb97c9 100644 --- a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/PingAndInfoIT.java +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractPingAndInfoIT.java @@ -17,8 +17,7 @@ import java.io.IOException; import java.util.Map; -public class PingAndInfoIT extends OpenSearchJavaClientTestCase { - +public abstract class AbstractPingAndInfoIT extends OpenSearchJavaClientTestCase { public void testPing() throws IOException { BooleanResponse ping = javaClient().ping(); assertTrue(ping.value()); diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/RequestTest.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractRequestIT.java similarity index 99% rename from java-client/src/test/java/org/opensearch/client/opensearch/integTest/RequestTest.java rename to java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractRequestIT.java index a3c5c8a6ff..374b3d1fc3 100644 --- a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/RequestTest.java +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/AbstractRequestIT.java @@ -32,7 +32,6 @@ package org.opensearch.client.opensearch.integTest; - import org.junit.Test; import org.opensearch.Version; import org.opensearch.client.opensearch.OpenSearchAsyncClient; @@ -72,9 +71,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -public class RequestTest extends OpenSearchJavaClientTestCase { - - +public abstract class AbstractRequestIT extends OpenSearchJavaClientTestCase { @Test public void testCount() throws Exception { diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java index d12aa93bf0..11c748762f 100644 --- a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java @@ -11,7 +11,6 @@ import org.opensearch.Version; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; -import org.opensearch.client.json.jackson.JacksonJsonpMapper; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch._types.ExpandWildcard; import org.opensearch.client.opensearch.cat.IndicesResponse; @@ -28,7 +27,6 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; -import org.opensearch.client.transport.rest_client.RestClientTransport; import org.opensearch.common.settings.Settings; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.test.rest.OpenSearchRestTestCase; @@ -40,7 +38,7 @@ import java.util.Optional; import java.util.TreeSet; -public abstract class OpenSearchJavaClientTestCase extends OpenSearchRestTestCase { +public abstract class OpenSearchJavaClientTestCase extends OpenSearchRestTestCase implements OpenSearchTransportSupport { private static OpenSearchClient javaClient; private static OpenSearchClient adminJavaClient; @@ -76,21 +74,11 @@ public void initJavaClient() throws IOException { } } - private boolean isHttps() { - return Optional.ofNullable(System.getProperty("https")) - .map("true"::equalsIgnoreCase) - .orElse(false); - } - @Override protected String getProtocol() { return isHttps() ? "https" : "http"; } - protected OpenSearchClient buildJavaClient(Settings settings, HttpHost[] hosts) throws IOException { - return new OpenSearchClient(new RestClientTransport(buildClient(settings, hosts), new JacksonJsonpMapper())); - } - @Override protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException { RestClientBuilder builder = RestClient.builder(hosts); diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/OpenSearchTransportSupport.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/OpenSearchTransportSupport.java new file mode 100644 index 0000000000..3a469598d8 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/OpenSearchTransportSupport.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.http.HttpHost; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.common.settings.Settings; + +public interface OpenSearchTransportSupport { + default boolean isHttps() { + return Optional.ofNullable(System.getProperty("https")) + .map("true"::equalsIgnoreCase) + .orElse(false); + } + + default OpenSearchClient buildJavaClient(Settings settings, HttpHost[] hosts) throws IOException { + return new OpenSearchClient(buildTransport(settings, hosts)); + } + + OpenSearchTransport buildTransport(Settings settings, HttpHost[] hosts) throws IOException; +} diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/ClusterClientIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/ClusterClientIT.java new file mode 100644 index 0000000000..5e13da89a2 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/ClusterClientIT.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.httpclient5; + +import java.io.IOException; + +import org.opensearch.client.opensearch.integTest.AbstractClusterClientIT; +import org.opensearch.client.transport.httpclient5.ResponseException; + +public class ClusterClientIT extends AbstractClusterClientIT implements HttpClient5TransportSupport { + @Override + public void testClusterHealthNotFoundIndex() throws IOException { + try { + super.testClusterHealthNotFoundIndex(); + } catch (ResponseException e) { + assertNotNull(e); + } + } +} diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/CrudIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/CrudIT.java new file mode 100644 index 0000000000..1e4c995944 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/CrudIT.java @@ -0,0 +1,14 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.httpclient5; + +import org.opensearch.client.opensearch.integTest.AbstractCrudIT; + +public class CrudIT extends AbstractCrudIT implements HttpClient5TransportSupport { +} diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java new file mode 100644 index 0000000000..6ffe6419a1 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.httpclient5; + +import static org.opensearch.test.rest.OpenSearchRestTestCase.CLIENT_PATH_PREFIX; +import static org.opensearch.test.rest.OpenSearchRestTestCase.CLIENT_SOCKET_TIMEOUT; + +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; +import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.client5.http.ssl.NoopHostnameVerifier; +import org.apache.hc.core5.function.Factory; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.reactor.ssl.TlsDetails; +import org.apache.hc.core5.ssl.SSLContextBuilder; +import org.apache.hc.core5.util.Timeout; +import org.opensearch.client.opensearch.integTest.OpenSearchTransportSupport; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ThreadContext; + +interface HttpClient5TransportSupport extends OpenSearchTransportSupport { + @Override + default OpenSearchTransport buildTransport(Settings settings, org.apache.http.HttpHost[] hosts) throws IOException { + final HttpHost[] converted = Arrays + .stream(hosts) + .map(h -> new HttpHost(h.getSchemeName(), h.getHostName(), h.getPort())) + .toArray(HttpHost[]::new); + + final ApacheHttpClient5TransportBuilder builder = ApacheHttpClient5TransportBuilder.builder(converted); + configure(builder, settings, converted); + return builder.setStrictDeprecationMode(true).build(); + } + + private void configure(ApacheHttpClient5TransportBuilder builder, Settings settings, HttpHost[] hosts) throws IOException { + if (isHttps()) { + try { + final SSLContext sslcontext = SSLContextBuilder + .create() + .loadTrustMaterial(null, (chains, authType) -> true) + .build(); + + builder.setHttpClientConfigCallback(httpClientBuilder -> { + String userName = Optional.ofNullable(System.getProperty("user")).orElse("admin"); + String password = Optional.ofNullable(System.getProperty("password")).orElse("admin"); + + final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + for (final HttpHost host: hosts) { + credentialsProvider.setCredentials(new AuthScope(host), + new UsernamePasswordCredentials(userName, password.toCharArray())); + } + + final TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create() + .setSslContext(sslcontext) + // disable the certificate since our testing cluster just uses the default security configuration + .setHostnameVerifier(NoopHostnameVerifier.INSTANCE) + // See https://issues.apache.org/jira/browse/HTTPCLIENT-2219 + .setTlsDetailsFactory(new Factory() { + @Override + public TlsDetails create(final SSLEngine sslEngine) { + return new TlsDetails(sslEngine.getSession(), sslEngine.getApplicationProtocol()); + } + }) + .build(); + + final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create() + .setTlsStrategy(tlsStrategy) + .build(); + + return httpClientBuilder + .setDefaultCredentialsProvider(credentialsProvider) + .setConnectionManager(connectionManager); + }); + } catch (KeyStoreException | NoSuchAlgorithmException | KeyManagementException e) { + throw new RuntimeException("Error setting up ssl", e); + } + } + + Map headers = ThreadContext.buildDefaultHeaders(settings); + Header[] defaultHeaders = new Header[headers.size()]; + int i = 0; + for (Map.Entry entry : headers.entrySet()) { + defaultHeaders[i++] = new BasicHeader(entry.getKey(), entry.getValue()); + } + builder.setDefaultHeaders(defaultHeaders); + final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT); + final TimeValue socketTimeout = TimeValue.parseTimeValue( + socketTimeoutString == null ? "60s" : socketTimeoutString, + CLIENT_SOCKET_TIMEOUT + ); + builder.setRequestConfigCallback( + conf -> conf.setResponseTimeout(Timeout.ofMilliseconds(Math.toIntExact(socketTimeout.getMillis()))) + ); + if (settings.hasValue(CLIENT_PATH_PREFIX)) { + builder.setPathPrefix(settings.get(CLIENT_PATH_PREFIX)); + } + } +} diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/IndicesClientIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/IndicesClientIT.java new file mode 100644 index 0000000000..ecd1469d3b --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/IndicesClientIT.java @@ -0,0 +1,14 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.httpclient5; + +import org.opensearch.client.opensearch.integTest.AbstractIndicesClientIT; + +public class IndicesClientIT extends AbstractIndicesClientIT implements HttpClient5TransportSupport { +} diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/NodesIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/NodesIT.java new file mode 100644 index 0000000000..286ef7d15d --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/NodesIT.java @@ -0,0 +1,14 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.httpclient5; + +import org.opensearch.client.opensearch.integTest.AbstractNodesIT; + +public class NodesIT extends AbstractNodesIT implements HttpClient5TransportSupport { +} diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/PingAndInfoIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/PingAndInfoIT.java new file mode 100644 index 0000000000..9bc6f883b2 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/PingAndInfoIT.java @@ -0,0 +1,15 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.httpclient5; + +import org.opensearch.client.opensearch.integTest.AbstractPingAndInfoIT; + +public class PingAndInfoIT extends AbstractPingAndInfoIT implements HttpClient5TransportSupport { +} + diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/RequestIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/RequestIT.java new file mode 100644 index 0000000000..bfa6d90798 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/RequestIT.java @@ -0,0 +1,14 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.httpclient5; + +import org.opensearch.client.opensearch.integTest.AbstractRequestIT; + +public class RequestIT extends AbstractRequestIT implements HttpClient5TransportSupport { +} diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/ClusterClientIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/ClusterClientIT.java new file mode 100644 index 0000000000..188eb616f5 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/ClusterClientIT.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.restclient; + +import java.io.IOException; + +import org.apache.http.HttpHost; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.integTest.AbstractClusterClientIT; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.rest_client.RestClientTransport; +import org.opensearch.common.settings.Settings; + +public class ClusterClientIT extends AbstractClusterClientIT { + @Override + public OpenSearchTransport buildTransport(Settings settings, HttpHost[] hosts) throws IOException { + return new RestClientTransport(buildClient(settings, hosts), new JacksonJsonpMapper()); + } +} diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/CrudIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/CrudIT.java new file mode 100644 index 0000000000..6831f1d95d --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/CrudIT.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.restclient; + +import java.io.IOException; + +import org.apache.http.HttpHost; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.integTest.AbstractCrudIT; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.rest_client.RestClientTransport; +import org.opensearch.common.settings.Settings; + +public class CrudIT extends AbstractCrudIT { + @Override + public OpenSearchTransport buildTransport(Settings settings, HttpHost[] hosts) throws IOException { + return new RestClientTransport(buildClient(settings, hosts), new JacksonJsonpMapper()); + } +} diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/IndicesClientIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/IndicesClientIT.java new file mode 100644 index 0000000000..190127ea9b --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/IndicesClientIT.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.restclient; + +import java.io.IOException; + +import org.apache.http.HttpHost; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.integTest.AbstractIndicesClientIT; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.rest_client.RestClientTransport; +import org.opensearch.common.settings.Settings; + +public class IndicesClientIT extends AbstractIndicesClientIT { + @Override + public OpenSearchTransport buildTransport(Settings settings, HttpHost[] hosts) throws IOException { + return new RestClientTransport(buildClient(settings, hosts), new JacksonJsonpMapper()); + } +} diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/NodesIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/NodesIT.java new file mode 100644 index 0000000000..47f07306b8 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/NodesIT.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.restclient; + +import java.io.IOException; + +import org.apache.http.HttpHost; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.integTest.AbstractNodesIT; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.rest_client.RestClientTransport; +import org.opensearch.common.settings.Settings; + +public class NodesIT extends AbstractNodesIT { + @Override + public OpenSearchTransport buildTransport(Settings settings, HttpHost[] hosts) throws IOException { + return new RestClientTransport(buildClient(settings, hosts), new JacksonJsonpMapper()); + } +} diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/PingAndInfoIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/PingAndInfoIT.java new file mode 100644 index 0000000000..e5ac93649e --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/PingAndInfoIT.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.restclient; + +import java.io.IOException; + +import org.apache.http.HttpHost; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.integTest.AbstractPingAndInfoIT; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.rest_client.RestClientTransport; +import org.opensearch.common.settings.Settings; + +public class PingAndInfoIT extends AbstractPingAndInfoIT { + @Override + public OpenSearchTransport buildTransport(Settings settings, HttpHost[] hosts) throws IOException { + return new RestClientTransport(buildClient(settings, hosts), new JacksonJsonpMapper()); + } +} diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/RequestIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/RequestIT.java new file mode 100644 index 0000000000..2f001541cf --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/restclient/RequestIT.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.restclient; + +import java.io.IOException; + +import org.apache.http.HttpHost; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.integTest.AbstractRequestIT; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.rest_client.RestClientTransport; +import org.opensearch.common.settings.Settings; + +public class RequestIT extends AbstractRequestIT { + @Override + public OpenSearchTransport buildTransport(Settings settings, HttpHost[] hosts) throws IOException { + return new RestClientTransport(buildClient(settings, hosts), new JacksonJsonpMapper()); + } +}