diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchClient.java new file mode 100644 index 0000000000000..ab47b1c5e792a --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchClient.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.asyncsearch.AsyncSearchResponse; +import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest; + +import java.io.IOException; + +import static java.util.Collections.emptySet; + +public class AsyncSearchClient { + private final RestHighLevelClient restHighLevelClient; + + AsyncSearchClient(RestHighLevelClient restHighLevelClient) { + this.restHighLevelClient = restHighLevelClient; + } + + /** + * Submit a new async search request. + * See the docs for more. + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(request, AsyncSearchRequestConverters::submitAsyncSearch, options, + AsyncSearchResponse::fromXContent, emptySet()); + } + + /** + * Asynchronously submit a new async search request. + * See the docs for more. + * + * the docs for more. + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + * @return cancellable that may be used to cancel the request + */ + public Cancellable submitAsyncSearchAsync(SubmitAsyncSearchRequest request, RequestOptions options, + ActionListener listener) { + return restHighLevelClient.performRequestAsyncAndParseEntity(request, AsyncSearchRequestConverters::submitAsyncSearch, options, + AsyncSearchResponse::fromXContent, listener, emptySet()); + } + +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchRequestConverters.java new file mode 100644 index 0000000000000..2d91bd926ef4d --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchRequestConverters.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client; + +import org.apache.http.client.methods.HttpPost; +import org.elasticsearch.client.RequestConverters.Params; +import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest; +import org.elasticsearch.rest.action.search.RestSearchAction; + +import java.io.IOException; +import java.util.Locale; + +import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE; + +final class AsyncSearchRequestConverters { + + static Request submitAsyncSearch(SubmitAsyncSearchRequest asyncSearchRequest) throws IOException { + String endpoint = new RequestConverters.EndpointBuilder().addCommaSeparatedPathParts( + asyncSearchRequest.getIndices()) + .addPathPartAsIs("_async_search").build(); + Request request = new Request(HttpPost.METHOD_NAME, endpoint); + Params params = new RequestConverters.Params(); + // add all typical search params and search request source as body + addSearchRequestParams(params, asyncSearchRequest); + if (asyncSearchRequest.getSearchSource() != null) { + request.setEntity(RequestConverters.createEntity(asyncSearchRequest.getSearchSource(), REQUEST_BODY_CONTENT_TYPE)); + } + // set async search submit specific parameters + if (asyncSearchRequest.isCleanOnCompletion() != null) { + params.putParam("clean_on_completion", asyncSearchRequest.isCleanOnCompletion().toString()); + } + if (asyncSearchRequest.getKeepAlive() != null) { + params.putParam("keep_alive", asyncSearchRequest.getKeepAlive().getStringRep()); + } + if (asyncSearchRequest.getWaitForCompletion() != null) { + params.putParam("wait_for_completion", asyncSearchRequest.getWaitForCompletion().getStringRep()); + } + request.addParameters(params.asMap()); + return request; + } + + static void addSearchRequestParams(Params params, SubmitAsyncSearchRequest request) { + params.putParam(RestSearchAction.TYPED_KEYS_PARAM, "true"); + params.withRouting(request.getRouting()); + params.withPreference(request.getPreference()); + params.withIndicesOptions(request.getIndicesOptions()); + params.withSearchType(request.getSearchType().name().toLowerCase(Locale.ROOT)); + params.withMaxConcurrentShardRequests(request.getMaxConcurrentShardRequests()); + if (request.getRequestCache() != null) { + params.withRequestCache(request.getRequestCache()); + } + if (request.getAllowPartialSearchResults() != null) { + params.withAllowPartialResults(request.getAllowPartialSearchResults()); + } + params.withBatchedReduceSize(request.getBatchedReduceSize()); + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index a8532f6d16339..0bb0eca83a389 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -403,22 +403,22 @@ static Request search(SearchRequest searchRequest, String searchEndpoint) throws return request; } - private static void addSearchRequestParams(Params params, SearchRequest searchRequest) { + static void addSearchRequestParams(Params params, SearchRequest searchRequest) { params.putParam(RestSearchAction.TYPED_KEYS_PARAM, "true"); params.withRouting(searchRequest.routing()); params.withPreference(searchRequest.preference()); params.withIndicesOptions(searchRequest.indicesOptions()); - params.putParam("search_type", searchRequest.searchType().name().toLowerCase(Locale.ROOT)); + params.withSearchType(searchRequest.searchType().name().toLowerCase(Locale.ROOT)); params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips())); params.putParam("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize())); - params.putParam("max_concurrent_shard_requests", Integer.toString(searchRequest.getMaxConcurrentShardRequests())); + params.withMaxConcurrentShardRequests(searchRequest.getMaxConcurrentShardRequests()); if (searchRequest.requestCache() != null) { - params.putParam("request_cache", Boolean.toString(searchRequest.requestCache())); + params.withRequestCache(searchRequest.requestCache()); } if (searchRequest.allowPartialSearchResults() != null) { - params.putParam("allow_partial_search_results", Boolean.toString(searchRequest.allowPartialSearchResults())); + params.withAllowPartialResults(searchRequest.allowPartialSearchResults()); } - params.putParam("batched_reduce_size", Integer.toString(searchRequest.getBatchedReduceSize())); + params.withBatchedReduceSize(searchRequest.getBatchedReduceSize()); if (searchRequest.scroll() != null) { params.putParam("scroll", searchRequest.scroll().keepAlive()); } @@ -860,6 +860,26 @@ Params withPreference(String preference) { return putParam("preference", preference); } + Params withSearchType(String searchType) { + return putParam("search_type", searchType); + } + + Params withMaxConcurrentShardRequests(int maxConcurrentShardRequests) { + return putParam("max_concurrent_shard_requests", Integer.toString(maxConcurrentShardRequests)); + } + + Params withBatchedReduceSize(int batchedReduceSize) { + return putParam("batched_reduce_size", Integer.toString(batchedReduceSize)); + } + + Params withRequestCache(boolean requestCache) { + return putParam("request_cache", Boolean.toString(requestCache)); + } + + Params withAllowPartialResults(boolean allowPartialSearchResults) { + return putParam("allow_partial_search_results", Boolean.toString(allowPartialSearchResults)); + } + Params withRealtime(boolean realtime) { if (realtime == false) { return putParam("realtime", Boolean.FALSE.toString()); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index e1ca36f552819..8b5262d6aada6 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -265,6 +265,7 @@ public class RestHighLevelClient implements Closeable { private final TransformClient transformClient = new TransformClient(this); private final EnrichClient enrichClient = new EnrichClient(this); private final EqlClient eqlClient = new EqlClient(this); + private final AsyncSearchClient asyncSearchClient = new AsyncSearchClient(this); /** * Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the @@ -428,13 +429,23 @@ public final XPackClient xpack() { * A wrapper for the {@link RestHighLevelClient} that provides methods for * accessing the Elastic Index Lifecycle APIs. *

- * See the X-Pack APIs + * See the X-Pack APIs * on elastic.co for more information. */ public IndexLifecycleClient indexLifecycle() { return ilmClient; } + /** + * A wrapper for the {@link RestHighLevelClient} that provides methods for accessing the Elastic Index Async Search APIs. + *

+ * See the X-Pack APIs on elastic.co + * for more information. + */ + public AsyncSearchClient asyncSearch() { + return asyncSearchClient; + } + /** * Provides methods for accessing the Elastic Licensed Migration APIs that * are shipped with the default distribution of Elasticsearch. All of diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/AsyncSearchResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/AsyncSearchResponse.java new file mode 100644 index 0000000000000..47dd444ea5463 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/AsyncSearchResponse.java @@ -0,0 +1,214 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.asyncsearch; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; + +import java.io.IOException; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +/** + * A response of an async search request. + */ +public class AsyncSearchResponse implements ToXContentObject { + @Nullable + private final String id; + private final int version; + @Nullable + private final SearchResponse searchResponse; + @Nullable + private final ElasticsearchException error; + private final boolean isRunning; + private final boolean isPartial; + + private final long startTimeMillis; + private final long expirationTimeMillis; + + /** + * Creates an {@link AsyncSearchResponse} with the arguments that are always present in the server response + */ + AsyncSearchResponse(int version, + boolean isPartial, + boolean isRunning, + long startTimeMillis, + long expirationTimeMillis, + @Nullable String id, + @Nullable SearchResponse searchResponse, + @Nullable ElasticsearchException error) { + this.version = version; + this.isPartial = isPartial; + this.isRunning = isRunning; + this.startTimeMillis = startTimeMillis; + this.expirationTimeMillis = expirationTimeMillis; + this.id = id; + this.searchResponse = searchResponse; + this.error = error; + } + + /** + * Returns the id of the async search request or null if the response is not stored in the cluster. + */ + @Nullable + public String getId() { + return id; + } + + /** + * Returns the version of this response. + */ + public int getVersion() { + return version; + } + + /** + * Returns the current {@link SearchResponse} or null if not available. + * + * See {@link #isPartial()} to determine whether the response contains partial or complete + * results. + */ + public SearchResponse getSearchResponse() { + return searchResponse; + } + + /** + * Returns the failure reason or null if the query is running or has completed normally. + */ + public ElasticsearchException getFailure() { + return error; + } + + /** + * Returns true if the {@link SearchResponse} contains partial + * results computed from a subset of the total shards. + */ + public boolean isPartial() { + return isPartial; + } + + /** + * Whether the search is still running in the cluster. + * + * A value of false indicates that the response is final + * even if {@link #isPartial()} returns true. In such case, + * the partial response represents the status of the search before a + * non-recoverable failure. + */ + public boolean isRunning() { + return isRunning; + } + + /** + * When this response was created as a timestamp in milliseconds since epoch. + */ + public long getStartTime() { + return startTimeMillis; + } + + /** + * When this response will expired as a timestamp in milliseconds since epoch. + */ + public long getExpirationTime() { + return expirationTimeMillis; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (id != null) { + builder.field("id", id); + } + builder.field("version", version); + builder.field("is_partial", isPartial); + builder.field("is_running", isRunning); + builder.field("start_time_in_millis", startTimeMillis); + builder.field("expiration_time_in_millis", expirationTimeMillis); + + if (searchResponse != null) { + builder.field("response"); + searchResponse.toXContent(builder, params); + } + if (error != null) { + builder.startObject("error"); + error.toXContent(builder, params); + builder.endObject(); + } + builder.endObject(); + return builder; + } + + public static final ParseField ID_FIELD = new ParseField("id"); + public static final ParseField VERSION_FIELD = new ParseField("version"); + public static final ParseField IS_PARTIAL_FIELD = new ParseField("is_partial"); + public static final ParseField IS_RUNNING_FIELD = new ParseField("is_running"); + public static final ParseField START_TIME_FIELD = new ParseField("start_time_in_millis"); + public static final ParseField EXPIRATION_FIELD = new ParseField("expiration_time_in_millis"); + public static final ParseField RESPONSE_FIELD = new ParseField("response"); + public static final ParseField ERROR_FIELD = new ParseField("error"); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "submit_async_search_response", true, + args -> new AsyncSearchResponse( + (int) args[0], + (boolean) args[1], + (boolean) args[2], + (long) args[3], + (long) args[4], + (String) args[5], + (SearchResponse) args[6], + (ElasticsearchException) args[7])); + static { + PARSER.declareInt(constructorArg(), VERSION_FIELD); + PARSER.declareBoolean(constructorArg(), IS_PARTIAL_FIELD); + PARSER.declareBoolean(constructorArg(), IS_RUNNING_FIELD); + PARSER.declareLong(constructorArg(), START_TIME_FIELD); + PARSER.declareLong(constructorArg(), EXPIRATION_FIELD); + PARSER.declareString(optionalConstructorArg(), ID_FIELD); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> AsyncSearchResponse.parseSearchResponse(p), + RESPONSE_FIELD); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), ERROR_FIELD); + } + + private static SearchResponse parseSearchResponse(XContentParser p) throws IOException { + // we should be before the opening START_OBJECT of the response + ensureExpectedToken(Token.START_OBJECT, p.currentToken(), p::getTokenLocation); + p.nextToken(); + return SearchResponse.innerFromXContent(p); + } + + public static AsyncSearchResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.apply(parser, null); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/SubmitAsyncSearchRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/SubmitAsyncSearchRequest.java new file mode 100644 index 0000000000000..1b0a07c4dea41 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/SubmitAsyncSearchRequest.java @@ -0,0 +1,284 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.elasticsearch.client.asyncsearch; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Validatable; +import org.elasticsearch.client.ValidationException; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +import java.util.Objects; +import java.util.Optional; + +/** + * A request to track asynchronously the progress of a search against one or more indices. + */ +public class SubmitAsyncSearchRequest implements Validatable { + + public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 1; + public static final int DEFAULT_BATCHED_REDUCE_SIZE = 5; + private static final boolean DEFAULT_CCS_MINIMIZE_ROUNDTRIPS = false; + private static final boolean DEFAULT_REQUEST_CACHE_VALUE = true; + + public static long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis(); + + private TimeValue waitForCompletion; + private Boolean cleanOnCompletion; + private TimeValue keepAlive; + private final SearchRequest searchRequest; + + /** + * Creates a new request + */ + public SubmitAsyncSearchRequest(SearchSourceBuilder source, String... indices) { + this.searchRequest = new SearchRequest(indices, source); + searchRequest.setCcsMinimizeRoundtrips(DEFAULT_CCS_MINIMIZE_ROUNDTRIPS); + searchRequest.setPreFilterShardSize(DEFAULT_PRE_FILTER_SHARD_SIZE); + searchRequest.setBatchedReduceSize(DEFAULT_BATCHED_REDUCE_SIZE); + searchRequest.requestCache(DEFAULT_REQUEST_CACHE_VALUE); + } + + /** + * Get the target indices + */ + public String[] getIndices() { + return this.searchRequest.indices(); + } + + + /** + * Get the minimum time that the request should wait before returning a partial result (defaults to 1 second). + */ + public TimeValue getWaitForCompletion() { + return waitForCompletion; + } + + /** + * Sets the minimum time that the request should wait before returning a partial result (defaults to 1 second). + */ + public void setWaitForCompletion(TimeValue waitForCompletion) { + this.waitForCompletion = waitForCompletion; + } + + /** + * Returns whether the resource resource should be removed on completion or failure (defaults to true). + */ + public Boolean isCleanOnCompletion() { + return cleanOnCompletion; + } + + /** + * Determines if the resource should be removed on completion or failure (defaults to true). + */ + public void setCleanOnCompletion(boolean cleanOnCompletion) { + this.cleanOnCompletion = cleanOnCompletion; + } + + /** + * Get the amount of time after which the result will expire (defaults to 5 days). + */ + public TimeValue getKeepAlive() { + return keepAlive; + } + + /** + * Sets the amount of time after which the result will expire (defaults to 5 days). + */ + public void setKeepAlive(TimeValue keepAlive) { + this.keepAlive = keepAlive; + } + + // setters for request parameters of the wrapped SearchRequest + /** + * Set the routing value to control the shards that the search will be executed on. + * A comma separated list of routing values to control the shards the search will be executed on. + */ + public void setRouting(String routing) { + this.searchRequest.routing(routing); + } + + /** + * Set the routing values to control the shards that the search will be executed on. + */ + public void setRoutings(String... routings) { + this.searchRequest.routing(routings); + } + + /** + * Get the routing value to control the shards that the search will be executed on. + */ + public String getRouting() { + return this.searchRequest.routing(); + } + + /** + * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to + * {@code _local} to prefer local shards or a custom value, which guarantees that the same order + * will be used across different requests. + */ + public void setPreference(String preference) { + this.searchRequest.preference(preference); + } + + /** + * Get the preference to execute the search. + */ + public String getPreference() { + return this.searchRequest.preference(); + } + + /** + * Specifies what type of requested indices to ignore and how to deal with indices wildcard expressions. + */ + public void setIndicesOptions(IndicesOptions indicesOptions) { + this.searchRequest.indicesOptions(indicesOptions); + } + + /** + * Get the indices Options. + */ + public IndicesOptions getIndicesOptions() { + return this.searchRequest.indicesOptions(); + } + + /** + * The search type to execute, defaults to {@link SearchType#DEFAULT}. + */ + public void setSearchType(SearchType searchType) { + this.searchRequest.searchType(searchType); + } + + /** + * Get the search type to execute, defaults to {@link SearchType#DEFAULT}. + */ + public SearchType getSearchType() { + return this.searchRequest.searchType(); + } + + /** + * Sets if this request should allow partial results. (If method is not called, + * will default to the cluster level setting). + */ + public void setAllowPartialSearchResults(boolean allowPartialSearchResults) { + this.searchRequest.allowPartialSearchResults(allowPartialSearchResults); + } + + /** + * Gets if this request should allow partial results. + */ + public Boolean getAllowPartialSearchResults() { + return this.searchRequest.allowPartialSearchResults(); + } + + /** + * Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection + * mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large. + */ + public void setBatchedReduceSize(int batchedReduceSize) { + this.searchRequest.setBatchedReduceSize(batchedReduceSize); + } + + /** + * Gets the number of shard results that should be reduced at once on the coordinating node. + * This defaults to 5 for {@link SubmitAsyncSearchRequest}. + */ + public int getBatchedReduceSize() { + return this.searchRequest.getBatchedReduceSize(); + } + + /** + * Sets if this request should use the request cache or not, assuming that it can (for + * example, if "now" is used, it will never be cached). By default (not set, or null, + * will default to the index level setting if request cache is enabled or not). + */ + public void setRequestCache(Boolean requestCache) { + this.searchRequest.requestCache(requestCache); + } + + /** + * Gets if this request should use the request cache or not. + * Defaults to `true` for {@link SubmitAsyncSearchRequest}. + */ + public Boolean getRequestCache() { + return this.searchRequest.requestCache(); + } + + /** + * Returns the number of shard requests that should be executed concurrently on a single node. + * The default is {@code 5}. + */ + public int getMaxConcurrentShardRequests() { + return this.searchRequest.getMaxConcurrentShardRequests(); + } + + /** + * Sets the number of shard requests that should be executed concurrently on a single node. + * The default is {@code 5}. + */ + public void setMaxConcurrentShardRequests(int maxConcurrentShardRequests) { + this.searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests); + } + + /** + * Gets if the source of the {@link SearchSourceBuilder} initially used on this request. + */ + public SearchSourceBuilder getSearchSource() { + return this.searchRequest.source(); + } + + @Override + public Optional validate() { + final ValidationException validationException = new ValidationException(); + if (searchRequest.isSuggestOnly()) { + validationException.addValidationError("suggest-only queries are not supported"); + } + if (keepAlive != null && keepAlive.getMillis() < MIN_KEEP_ALIVE) { + validationException.addValidationError("[keep_alive] must be greater than 1 minute, got: " + keepAlive.toString()); + } + if (validationException.validationErrors().isEmpty()) { + return Optional.empty(); + } + return Optional.of(validationException); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SubmitAsyncSearchRequest request = (SubmitAsyncSearchRequest) o; + return Objects.equals(searchRequest, request.searchRequest) + && Objects.equals(getKeepAlive(), request.getKeepAlive()) + && Objects.equals(getWaitForCompletion(), request.getWaitForCompletion()) + && Objects.equals(isCleanOnCompletion(), request.isCleanOnCompletion()); + } + + @Override + public int hashCode() { + return Objects.hash(searchRequest, getKeepAlive(), getWaitForCompletion(), isCleanOnCompletion()); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/AsyncSearchRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/AsyncSearchRequestConvertersTests.java new file mode 100644 index 0000000000000..df8fc65c751e6 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/AsyncSearchRequestConvertersTests.java @@ -0,0 +1,115 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client; + +import org.apache.http.client.methods.HttpPost; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.rest.action.search.RestSearchAction; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.test.ESTestCase; + +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.StringJoiner; + +import static org.elasticsearch.client.RequestConvertersTests.createTestSearchSourceBuilder; +import static org.elasticsearch.client.RequestConvertersTests.setRandomIndicesOptions; + +public class AsyncSearchRequestConvertersTests extends ESTestCase { + + public void testSubmitAsyncSearch() throws Exception { + String[] indices = RequestConvertersTests.randomIndicesNames(0, 5); + Map expectedParams = new HashMap<>(); + SearchSourceBuilder searchSourceBuilder = createTestSearchSourceBuilder(); + SubmitAsyncSearchRequest submitRequest = new SubmitAsyncSearchRequest(searchSourceBuilder, indices); + + // the following parameters might be overwritten by random ones later, + // but we need to set these since they are the default we send over http + expectedParams.put("request_cache", "true"); + expectedParams.put("batched_reduce_size", "5"); + setRandomSearchParams(submitRequest, expectedParams); + setRandomIndicesOptions(submitRequest::setIndicesOptions, submitRequest::getIndicesOptions, expectedParams); + + if (randomBoolean()) { + boolean cleanOnCompletion = randomBoolean(); + submitRequest.setCleanOnCompletion(cleanOnCompletion); + expectedParams.put("clean_on_completion", Boolean.toString(cleanOnCompletion)); + } + if (randomBoolean()) { + TimeValue keepAlive = TimeValue.parseTimeValue(randomTimeValue(), "test"); + submitRequest.setKeepAlive(keepAlive); + expectedParams.put("keep_alive", keepAlive.getStringRep()); + } + if (randomBoolean()) { + TimeValue waitForCompletion = TimeValue.parseTimeValue(randomTimeValue(), "test"); + submitRequest.setWaitForCompletion(waitForCompletion); + expectedParams.put("wait_for_completion", waitForCompletion.getStringRep()); + } + + Request request = AsyncSearchRequestConverters.submitAsyncSearch(submitRequest); + StringJoiner endpoint = new StringJoiner("/", "/", ""); + String index = String.join(",", indices); + if (Strings.hasLength(index)) { + endpoint.add(index); + } + endpoint.add("_async_search"); + assertEquals(HttpPost.METHOD_NAME, request.getMethod()); + assertEquals(endpoint.toString(), request.getEndpoint()); + assertEquals(expectedParams, request.getParameters()); + RequestConvertersTests.assertToXContentBody(searchSourceBuilder, request.getEntity()); + } + + private static void setRandomSearchParams(SubmitAsyncSearchRequest request, Map expectedParams) { + expectedParams.put(RestSearchAction.TYPED_KEYS_PARAM, "true"); + if (randomBoolean()) { + request.setRouting(randomAlphaOfLengthBetween(3, 10)); + expectedParams.put("routing", request.getRouting()); + } + if (randomBoolean()) { + request.setPreference(randomAlphaOfLengthBetween(3, 10)); + expectedParams.put("preference", request.getPreference()); + } + if (randomBoolean()) { + request.setSearchType(randomFrom(SearchType.CURRENTLY_SUPPORTED)); + } + expectedParams.put("search_type", request.getSearchType().name().toLowerCase(Locale.ROOT)); + if (randomBoolean()) { + request.setAllowPartialSearchResults(randomBoolean()); + expectedParams.put("allow_partial_search_results", Boolean.toString(request.getAllowPartialSearchResults())); + } + if (randomBoolean()) { + request.setRequestCache(randomBoolean()); + expectedParams.put("request_cache", Boolean.toString(request.getRequestCache())); + } + if (randomBoolean()) { + request.setBatchedReduceSize(randomIntBetween(2, Integer.MAX_VALUE)); + } + expectedParams.put("batched_reduce_size", Integer.toString(request.getBatchedReduceSize())); + if (randomBoolean()) { + request.setMaxConcurrentShardRequests(randomIntBetween(1, Integer.MAX_VALUE)); + } + expectedParams.put("max_concurrent_shard_requests", Integer.toString(request.getMaxConcurrentShardRequests())); + } + +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 3e4ecda385388..a7291b3dbbcf3 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -1021,12 +1021,34 @@ public void testSearchNullSource() throws IOException { public void testSearch() throws Exception { String searchEndpoint = randomFrom("_" + randomAlphaOfLength(5)); String[] indices = randomIndicesNames(0, 5); + Map expectedParams = new HashMap<>(); + SearchRequest searchRequest = createTestSearchRequest(indices, expectedParams); + + Request request = RequestConverters.search(searchRequest, searchEndpoint); + StringJoiner endpoint = new StringJoiner("/", "/", ""); + String index = String.join(",", indices); + if (Strings.hasLength(index)) { + endpoint.add(index); + } + endpoint.add(searchEndpoint); + assertEquals(HttpPost.METHOD_NAME, request.getMethod()); + assertEquals(endpoint.toString(), request.getEndpoint()); + assertEquals(expectedParams, request.getParameters()); + assertToXContentBody(searchRequest.source(), request.getEntity()); + } + + public static SearchRequest createTestSearchRequest(String[] indices, Map expectedParams) { SearchRequest searchRequest = new SearchRequest(indices); - Map expectedParams = new HashMap<>(); setRandomSearchParams(searchRequest, expectedParams); setRandomIndicesOptions(searchRequest::indicesOptions, searchRequest::indicesOptions, expectedParams); + SearchSourceBuilder searchSourceBuilder = createTestSearchSourceBuilder(); + searchRequest.source(searchSourceBuilder); + return searchRequest; + } + + public static SearchSourceBuilder createTestSearchSourceBuilder() { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); // rarely skip setting the search source completely if (frequently()) { @@ -1070,22 +1092,11 @@ public void testSearch() throws Exception { searchSourceBuilder.collapse(new CollapseBuilder(randomAlphaOfLengthBetween(3, 10))); } } - searchRequest.source(searchSourceBuilder); - } - - Request request = RequestConverters.search(searchRequest, searchEndpoint); - StringJoiner endpoint = new StringJoiner("/", "/", ""); - String index = String.join(",", indices); - if (Strings.hasLength(index)) { - endpoint.add(index); } - endpoint.add(searchEndpoint); - assertEquals(HttpPost.METHOD_NAME, request.getMethod()); - assertEquals(endpoint.toString(), request.getEndpoint()); - assertEquals(expectedParams, request.getParameters()); - assertToXContentBody(searchSourceBuilder, request.getEntity()); + return searchSourceBuilder; } + public void testSearchNullIndicesAndTypes() { expectThrows(NullPointerException.class, () -> new SearchRequest((String[]) null)); expectThrows(NullPointerException.class, () -> new SearchRequest().indices((String[]) null)); @@ -1867,7 +1878,7 @@ private static void setRandomSearchParams(SearchRequest searchRequest, expectedParams.put("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize())); } - static void setRandomIndicesOptions(Consumer setter, Supplier getter, + public static void setRandomIndicesOptions(Consumer setter, Supplier getter, Map expectedParams) { if (randomBoolean()) { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 870db8a63a970..468dbbd5d3ef7 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -890,6 +890,7 @@ public void testApiNamingConventions() throws Exception { apiName.startsWith("eql.") == false && apiName.endsWith("freeze") == false && apiName.endsWith("reload_analyzers") == false && + apiName.startsWith("async_search") == false && // IndicesClientIT.getIndexTemplate should be renamed "getTemplate" in version 8.0 when we // can get rid of 7.0's deprecated "getTemplate" apiName.equals("indices.get_index_template") == false) { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/AsyncSearchIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/AsyncSearchIT.java new file mode 100644 index 0000000000000..a1c608b3c6ef4 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/AsyncSearchIT.java @@ -0,0 +1,57 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.asyncsearch; + +import org.elasticsearch.client.ESRestHighLevelClientTestCase; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class AsyncSearchIT extends ESRestHighLevelClientTestCase { + + public void testSubmitAsyncSearchRequest() throws IOException { + String index = "test-index"; + createIndex(index, Settings.EMPTY); + + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()); + SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(sourceBuilder, index); + // 15 sec should be enough to make sure we always complete right away + request.setWaitForCompletion(new TimeValue(15, TimeUnit.SECONDS)); + AsyncSearchResponse response = highLevelClient().asyncSearch().submitAsyncSearch(request, RequestOptions.DEFAULT); + assertTrue(response.getVersion() >= 0); + assertFalse(response.isPartial()); + assertTrue(response.getStartTime() > 0); + assertTrue(response.getExpirationTime() > 0); + assertNotNull(response.getSearchResponse()); + if (response.isRunning() == false) { + assertNull(response.getId()); + assertFalse(response.isPartial()); + } else { + assertTrue(response.isPartial()); + assertNotNull(response.getId()); + } + } + +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/AsyncSearchResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/AsyncSearchResponseTests.java new file mode 100644 index 0000000000000..6caf69db9ac6c --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/AsyncSearchResponseTests.java @@ -0,0 +1,85 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.asyncsearch; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchResponse.Clusters; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.client.AbstractResponseTestCase; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.search.internal.InternalSearchResponse; + +import java.io.IOException; + +import static org.hamcrest.Matchers.containsString; + +public class AsyncSearchResponseTests + extends AbstractResponseTestCase { + + @Override + protected org.elasticsearch.xpack.core.search.action.AsyncSearchResponse createServerTestInstance(XContentType xContentType) { + int version = randomIntBetween(0, Integer.MAX_VALUE); + boolean isPartial = randomBoolean(); + boolean isRunning = randomBoolean(); + long startTimeMillis = randomLongBetween(0, Long.MAX_VALUE); + long expirationTimeMillis = randomLongBetween(0, Long.MAX_VALUE); + String id = randomBoolean() ? null : randomAlphaOfLength(10); + ElasticsearchException error = randomBoolean() ? null : new ElasticsearchException(randomAlphaOfLength(10)); + // add search response, minimal object is okay since the full randomization of parsing is tested in SearchResponseTests + SearchResponse searchResponse = randomBoolean() ? null + : new SearchResponse(InternalSearchResponse.empty(), randomAlphaOfLength(10), 1, 1, 0, randomIntBetween(0, 10000), + ShardSearchFailure.EMPTY_ARRAY, Clusters.EMPTY); + org.elasticsearch.xpack.core.search.action.AsyncSearchResponse testResponse = + new org.elasticsearch.xpack.core.search.action.AsyncSearchResponse(id, version, searchResponse, error, isPartial, isRunning, + startTimeMillis, expirationTimeMillis); + return testResponse; + } + + @Override + protected AsyncSearchResponse doParseToClientInstance(XContentParser parser) throws IOException { + return AsyncSearchResponse.fromXContent(parser); + } + + @Override + protected void assertInstances(org.elasticsearch.xpack.core.search.action.AsyncSearchResponse expected, AsyncSearchResponse parsed) { + assertNotSame(parsed, expected); + assertEquals(expected.getId(), parsed.getId()); + assertEquals(expected.getVersion(), parsed.getVersion()); + assertEquals(expected.isRunning(), parsed.isRunning()); + assertEquals(expected.isPartial(), parsed.isPartial()); + assertEquals(expected.getStartTime(), parsed.getStartTime()); + assertEquals(expected.getExpirationTime(), parsed.getExpirationTime()); + // we cannot directly compare error since Exceptions are wrapped differently on parsing, but we can check original message + if (expected.getFailure() != null) { + assertThat(parsed.getFailure().getMessage(), containsString(expected.getFailure().getMessage())); + } else { + assertNull(parsed.getFailure()); + } + // we don't need to check the complete parsed search response since this is done elsewhere + // only spot-check some randomized properties for equality here + if (expected.getSearchResponse() != null) { + assertEquals(expected.getSearchResponse().getTook(), parsed.getSearchResponse().getTook()); + assertEquals(expected.getSearchResponse().getScrollId(), parsed.getSearchResponse().getScrollId()); + } else { + assertNull(parsed.getSearchResponse()); + } + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/SubmitAsyncSearchRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/SubmitAsyncSearchRequestTests.java new file mode 100644 index 0000000000000..f7075052cab2b --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/SubmitAsyncSearchRequestTests.java @@ -0,0 +1,57 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.asyncsearch; + +import org.elasticsearch.client.ValidationException; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.suggest.SuggestBuilder; +import org.elasticsearch.test.ESTestCase; + +import java.util.Optional; + +public class SubmitAsyncSearchRequestTests extends ESTestCase { + + public void testValidation() { + { + SearchSourceBuilder source = new SearchSourceBuilder(); + SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(source, "test"); + Optional validation = request.validate(); + assertFalse(validation.isPresent()); + } + { + SearchSourceBuilder source = new SearchSourceBuilder().suggest(new SuggestBuilder()); + SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(source, "test"); + Optional validation = request.validate(); + assertTrue(validation.isPresent()); + assertEquals(1, validation.get().validationErrors().size()); + assertEquals("suggest-only queries are not supported", validation.get().validationErrors().get(0)); + } + { + SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(new SearchSourceBuilder(), "test"); + request.setKeepAlive(new TimeValue(1)); + Optional validation = request.validate(); + assertTrue(validation.isPresent()); + assertEquals(1, validation.get().validationErrors().size()); + assertEquals("[keep_alive] must be greater than 1 minute, got: 1ms", validation.get().validationErrors().get(0)); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java index 81d61f2996ef4..9cabe14e9eed0 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java @@ -260,7 +260,7 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept return innerFromXContent(parser); } - static SearchResponse innerFromXContent(XContentParser parser) throws IOException { + public static SearchResponse innerFromXContent(XContentParser parser) throws IOException { ensureExpectedToken(Token.FIELD_NAME, parser.currentToken(), parser::getTokenLocation); String currentFieldName = parser.currentName(); SearchHits hits = null; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java index 9828635ae64fe..80d1439f80abc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java @@ -26,7 +26,9 @@ public class AsyncSearchResponse extends ActionResponse implements StatusToXCont @Nullable private final String id; private final int version; + @Nullable private final SearchResponse searchResponse; + @Nullable private final ElasticsearchException error; private final boolean isRunning; private final boolean isPartial;