Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reindex: Use request flavored methods #30317

Merged
merged 1 commit into from
May 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

package org.elasticsearch.index.reindex.remote;

import org.apache.http.HttpEntity;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -40,33 +40,27 @@
import org.elasticsearch.search.sort.SortBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;

/**
* Builds requests for remote version of Elasticsearch. Note that unlike most of the
* rest of Elasticsearch this file needs to be compatible with very old versions of
* Elasticsearch. Thus is often uses identifiers for versions like {@code 2000099}
* Elasticsearch. Thus it often uses identifiers for versions like {@code 2000099}
* for {@code 2.0.0-alpha1}. Do not drop support for features from this file just
* because the version constants have been removed.
*/
final class RemoteRequestBuilders {
private RemoteRequestBuilders() {}

static String initialSearchPath(SearchRequest searchRequest) {
static Request initialSearch(SearchRequest searchRequest, BytesReference query, Version remoteVersion) {
// It is nasty to build paths with StringBuilder but we'll be careful....
StringBuilder path = new StringBuilder("/");
addIndexesOrTypes(path, "Index", searchRequest.indices());
addIndexesOrTypes(path, "Type", searchRequest.types());
path.append("_search");
return path.toString();
}
Request request = new Request("POST", path.toString());

static Map<String, String> initialSearchParams(SearchRequest searchRequest, Version remoteVersion) {
Map<String, String> params = new HashMap<>();
if (searchRequest.scroll() != null) {
TimeValue keepAlive = searchRequest.scroll().keepAlive();
if (remoteVersion.before(Version.V_5_0_0)) {
Expand All @@ -75,16 +69,16 @@ static Map<String, String> initialSearchParams(SearchRequest searchRequest, Vers
* timeout seems safer than less. */
keepAlive = timeValueMillis((long) Math.ceil(keepAlive.millisFrac()));
}
params.put("scroll", keepAlive.getStringRep());
request.addParameter("scroll", keepAlive.getStringRep());
}
params.put("size", Integer.toString(searchRequest.source().size()));
request.addParameter("size", Integer.toString(searchRequest.source().size()));
if (searchRequest.source().version() == null || searchRequest.source().version() == true) {
/*
* Passing `null` here just add the `version` request parameter
* without any value. This way of requesting the version works
* for all supported versions of Elasticsearch.
*/
params.put("version", null);
request.addParameter("version", null);
}
if (searchRequest.source().sorts() != null) {
boolean useScan = false;
Expand All @@ -101,13 +95,13 @@ static Map<String, String> initialSearchParams(SearchRequest searchRequest, Vers
}
}
if (useScan) {
params.put("search_type", "scan");
request.addParameter("search_type", "scan");
} else {
StringBuilder sorts = new StringBuilder(sortToUri(searchRequest.source().sorts().get(0)));
for (int i = 1; i < searchRequest.source().sorts().size(); i++) {
sorts.append(',').append(sortToUri(searchRequest.source().sorts().get(i)));
}
params.put("sort", sorts.toString());
request.addParameter("sort", sorts.toString());
}
}
if (remoteVersion.before(Version.fromId(2000099))) {
Expand All @@ -126,20 +120,18 @@ static Map<String, String> initialSearchParams(SearchRequest searchRequest, Vers
fields.append(',').append(searchRequest.source().storedFields().fieldNames().get(i));
}
String storedFieldsParamName = remoteVersion.before(Version.V_5_0_0_alpha4) ? "fields" : "stored_fields";
params.put(storedFieldsParamName, fields.toString());
request.addParameter(storedFieldsParamName, fields.toString());
}
return params;
}

static HttpEntity initialSearchEntity(SearchRequest searchRequest, BytesReference query, Version remoteVersion) {
// EMPTY is safe here because we're not calling namedObject
try (XContentBuilder entity = JsonXContent.contentBuilder();
XContentParser queryParser = XContentHelper
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, query)) {
entity.startObject();

entity.field("query"); {
/* We're intentionally a bit paranoid here - copying the query as xcontent rather than writing a raw field. We don't want
/* We're intentionally a bit paranoid here - copying the query
* as xcontent rather than writing a raw field. We don't want
* poorly written queries to escape. Ever. */
entity.copyCurrentStructure(queryParser);
XContentParser.Token shouldBeEof = queryParser.nextToken();
Expand All @@ -160,10 +152,11 @@ static HttpEntity initialSearchEntity(SearchRequest searchRequest, BytesReferenc

entity.endObject();
BytesRef bytes = BytesReference.bytes(entity).toBytesRef();
return new ByteArrayEntity(bytes.bytes, bytes.offset, bytes.length, ContentType.APPLICATION_JSON);
request.setEntity(new ByteArrayEntity(bytes.bytes, bytes.offset, bytes.length, ContentType.APPLICATION_JSON));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: should we use NByteArrayEntity instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dunno! I've never really into the difference.

} catch (IOException e) {
throw new ElasticsearchException("unexpected error building entity", e);
}
return request;
}

private static void addIndexesOrTypes(StringBuilder path, String name, String[] indicesOrTypes) {
Expand Down Expand Up @@ -193,45 +186,50 @@ private static String sortToUri(SortBuilder<?> sort) {
throw new IllegalArgumentException("Unsupported sort [" + sort + "]");
}

static String scrollPath() {
return "/_search/scroll";
}
static Request scroll(String scroll, TimeValue keepAlive, Version remoteVersion) {
Request request = new Request("POST", "/_search/scroll");

static Map<String, String> scrollParams(TimeValue keepAlive, Version remoteVersion) {
if (remoteVersion.before(Version.V_5_0_0)) {
/* Versions of Elasticsearch before 5.0 couldn't parse nanos or micros
* so we toss out that resolution, rounding up so we shouldn't end up
* with 0s. */
keepAlive = timeValueMillis((long) Math.ceil(keepAlive.millisFrac()));
}
return singletonMap("scroll", keepAlive.getStringRep());
}
request.addParameter("scroll", keepAlive.getStringRep());

static HttpEntity scrollEntity(String scroll, Version remoteVersion) {
if (remoteVersion.before(Version.fromId(2000099))) {
// Versions before 2.0.0 extract the plain scroll_id from the body
return new StringEntity(scroll, ContentType.TEXT_PLAIN);
request.setEntity(new StringEntity(scroll, ContentType.TEXT_PLAIN));
return request;
}

try (XContentBuilder entity = JsonXContent.contentBuilder()) {
return new StringEntity(Strings.toString(entity.startObject()
.field("scroll_id", scroll)
.endObject()), ContentType.APPLICATION_JSON);
entity.startObject()
.field("scroll_id", scroll)
.endObject();
request.setEntity(new StringEntity(Strings.toString(entity), ContentType.APPLICATION_JSON));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use NStringEntity instead? Which makes me wonder, should we make this easier for users, and expose setEntityAsString and setEntityAsByteArray methods additionally to what we expose now??

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. I think I'll merge as is and make a followup with those so we can talk about them!

} catch (IOException e) {
throw new ElasticsearchException("failed to build scroll entity", e);
}
return request;
}

static HttpEntity clearScrollEntity(String scroll, Version remoteVersion) {
static Request clearScroll(String scroll, Version remoteVersion) {
Request request = new Request("DELETE", "/_search/scroll");

if (remoteVersion.before(Version.fromId(2000099))) {
// Versions before 2.0.0 extract the plain scroll_id from the body
return new StringEntity(scroll, ContentType.TEXT_PLAIN);
request.setEntity(new StringEntity(scroll, ContentType.TEXT_PLAIN));
return request;
}
try (XContentBuilder entity = JsonXContent.contentBuilder()) {
return new StringEntity(Strings.toString(entity.startObject()
.array("scroll_id", scroll)
.endObject()), ContentType.APPLICATION_JSON);
entity.startObject()
.array("scroll_id", scroll)
.endObject();
request.setEntity(new StringEntity(Strings.toString(entity), ContentType.APPLICATION_JSON));
} catch (IOException e) {
throw new ElasticsearchException("failed to build clear scroll entity", e);
}
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,43 +30,34 @@
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.clearScrollEntity;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchEntity;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchParams;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchPath;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollEntity;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollParams;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollPath;
import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.MAIN_ACTION_PARSER;
import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.RESPONSE_PARSER;

Expand All @@ -88,13 +79,13 @@ public RemoteScrollableHitSource(Logger logger, BackoffPolicy backoffPolicy, Thr
protected void doStart(Consumer<? super Response> onResponse) {
lookupRemoteVersion(version -> {
remoteVersion = version;
execute("POST", initialSearchPath(searchRequest), initialSearchParams(searchRequest, version),
initialSearchEntity(searchRequest, query, remoteVersion), RESPONSE_PARSER, r -> onStartResponse(onResponse, r));
execute(RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion),
RESPONSE_PARSER, r -> onStartResponse(onResponse, r));
});
}

void lookupRemoteVersion(Consumer<Version> onVersion) {
execute("GET", "", emptyMap(), null, MAIN_ACTION_PARSER, onVersion);
execute(new Request("GET", ""), MAIN_ACTION_PARSER, onVersion);
}

private void onStartResponse(Consumer<? super Response> onResponse, Response response) {
Expand All @@ -108,15 +99,13 @@ private void onStartResponse(Consumer<? super Response> onResponse, Response res

@Override
protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer<? super Response> onResponse) {
Map<String, String> scrollParams = scrollParams(
timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos()),
remoteVersion);
execute("POST", scrollPath(), scrollParams, scrollEntity(scrollId, remoteVersion), RESPONSE_PARSER, onResponse);
TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos());
execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, onResponse);
}

@Override
protected void clearScroll(String scrollId, Runnable onCompletion) {
client.performRequestAsync("DELETE", scrollPath(), emptyMap(), clearScrollEntity(scrollId, remoteVersion), new ResponseListener() {
client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() {
@Override
public void onSuccess(org.elasticsearch.client.Response response) {
logger.debug("Successfully cleared [{}]", scrollId);
Expand Down Expand Up @@ -162,7 +151,7 @@ protected void cleanup(Runnable onCompletion) {
});
}

private <T> void execute(String method, String uri, Map<String, String> params, HttpEntity entity,
private <T> void execute(Request request,
BiFunction<XContentParser, XContentType, T> parser, Consumer<? super T> listener) {
// Preserve the thread context so headers survive after the call
java.util.function.Supplier<ThreadContext.StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(true);
Expand All @@ -171,7 +160,7 @@ class RetryHelper extends AbstractRunnable {

@Override
protected void doRun() throws Exception {
client.performRequestAsync(method, uri, params, entity, new ResponseListener() {
client.performRequestAsync(request, new ResponseListener() {
@Override
public void onSuccess(org.elasticsearch.client.Response response) {
// Restore the thread context to get the precious headers
Expand Down
Loading