Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REST high-level client: add put ingest pipeline API #30793

Merged
merged 8 commits into from
May 24, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;

import java.io.IOException;

Expand Down Expand Up @@ -87,4 +89,26 @@ public void listTasksAsync(ListTasksRequest request, ActionListener<ListTasksRes
restHighLevelClient.performRequestAsyncAndParseEntity(request, RequestConverters::listTasks, ListTasksResponse::fromXContent,
listener, emptySet(), headers);
}

/**
* Add a pipeline or update an existing pipeline in the cluster
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html"> Put Pipeline API on elastic.co</a>
*/
public PutPipelineResponse putPipeline(PutPipelineRequest request, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::putPipeline,
PutPipelineResponse::fromXContent, emptySet(), headers);
}

/**
* Asynchronously add a pipeline or update an existing pipeline in the cluster
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html"> Put Pipeline API on elastic.co</a>
*/
public void putPipelineAsync(PutPipelineRequest request, ActionListener<PutPipelineResponse> listener, Header... headers) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for being late to the party. I just realized that the spec here defines the API as ingest.put_pipeline. Instead of adding this API (and all of the other ingest API) to the cluster ones, we should create a new namespace called ingest, similar to what we recently did with snapshot. @sohaibiftikhar would you mind taking care of this ?

Copy link
Contributor Author

@sohaibiftikhar sohaibiftikhar May 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Should I combine this change with #30847 or put in a separate PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do it in a separate PR, thanks!

restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::putPipeline,
PutPipelineResponse::fromXContent, listener, emptySet(), headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -609,6 +610,21 @@ static Request clusterPutSettings(ClusterUpdateSettingsRequest clusterUpdateSett
return request;
}

static Request putPipeline(PutPipelineRequest putPipelineRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ingest/pipeline")
.addPathPart(putPipelineRequest.getId())
.build();
Request request = new Request(HttpPut.METHOD_NAME, endpoint);

Params parameters = new Params(request);
parameters.withTimeout(putPipelineRequest.timeout());
parameters.withMasterTimeout(putPipelineRequest.masterNodeTimeout());

request.setEntity(createEntity(putPipelineRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request listTasks(ListTasksRequest listTaskRequest) {
if (listTaskRequest.getTaskId() != null && listTaskRequest.getTaskId().isSet()) {
throw new IllegalArgumentException("TaskId cannot be used for list tasks request");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskInfo;

Expand Down Expand Up @@ -136,4 +141,41 @@ public void testListTasks() throws IOException {
}
assertTrue("List tasks were not found", listTasksFound);
}

public void testPutPipeline() throws IOException {
String id = "some_pipeline_id";
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
pipelineBuilder.startObject();
{
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
{
pipelineBuilder.startObject().startObject("set");
{
pipelineBuilder
.field("field", "foo")
.field("value", "bar");
}
pipelineBuilder.endObject().endObject();
pipelineBuilder.startObject().startObject("convert");
{
pipelineBuilder
.field("field", "rank")
.field("type", "integer");
}
pipelineBuilder.endObject().endObject();
}
pipelineBuilder.endArray();
}
pipelineBuilder.endObject();
PutPipelineRequest request = new PutPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType());

PutPipelineResponse putPipelineResponse =
execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync);
assertTrue(putPipelineResponse.isAcknowledged());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -91,6 +92,7 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.RandomCreateIndexGenerator;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.TermQueryBuilder;
Expand Down Expand Up @@ -119,6 +121,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -1402,6 +1405,26 @@ public void testClusterPutSettings() throws IOException {
assertEquals(expectedParams, expectedRequest.getParameters());
}

public void testPutPipeline() throws IOException {
String pipelineId = "some_pipeline_id";
PutPipelineRequest request = new PutPipelineRequest(
"some_pipeline_id",
new BytesArray("{}".getBytes(StandardCharsets.UTF_8)),
XContentType.JSON
);
Map<String, String> expectedParams = new HashMap<>();
setRandomMasterTimeout(request, expectedParams);
setRandomTimeout(request::timeout, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, expectedParams);

Request expectedRequest = RequestConverters.putPipeline(request);
StringJoiner endpoint = new StringJoiner("/", "/", "");
endpoint.add("_ingest/pipeline");
endpoint.add(pipelineId);
assertEquals(endpoint.toString(), expectedRequest.getEndpoint());
assertEquals(HttpPut.METHOD_NAME, expectedRequest.getMethod());
assertEquals(expectedParams, expectedRequest.getParameters());
}

public void testRollover() throws IOException {
RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10),
randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -41,6 +43,7 @@
import org.elasticsearch.tasks.TaskInfo;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -80,19 +83,19 @@ public void testClusterPutSettings() throws IOException {
// end::put-settings-request

// tag::put-settings-create-settings
String transientSettingKey =
String transientSettingKey =
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey();
int transientSettingValue = 10;
Settings transientSettings =
Settings transientSettings =
Settings.builder()
.put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES)
.build(); // <1>

String persistentSettingKey =
String persistentSettingKey =
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey();
String persistentSettingValue =
String persistentSettingValue =
EnableAllocationDecider.Allocation.NONE.name();
Settings persistentSettings =
Settings persistentSettings =
Settings.builder()
.put(persistentSettingKey, persistentSettingValue)
.build(); // <2>
Expand All @@ -105,9 +108,9 @@ public void testClusterPutSettings() throws IOException {

{
// tag::put-settings-settings-builder
Settings.Builder transientSettingsBuilder =
Settings.Builder transientSettingsBuilder =
Settings.builder()
.put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES);
.put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES);
request.transientSettings(transientSettingsBuilder); // <1>
// end::put-settings-settings-builder
}
Expand Down Expand Up @@ -164,7 +167,7 @@ public void testClusterUpdateSettingsAsync() throws Exception {
ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();

// tag::put-settings-execute-listener
ActionListener<ClusterUpdateSettingsResponse> listener =
ActionListener<ClusterUpdateSettingsResponse> listener =
new ActionListener<ClusterUpdateSettingsResponse>() {
@Override
public void onResponse(ClusterUpdateSettingsResponse response) {
Expand Down Expand Up @@ -272,4 +275,80 @@ public void onFailure(Exception e) {
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

public void testPutPipeline() throws IOException {
RestHighLevelClient client = highLevelClient();

{
// tag::put-pipeline-request
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tag seems to be missing its end

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

String source =
"{\"description\":\"my set of processors\"," +
"\"processors\":[{\"set\":{\"field\":\"foo\",\"value\":\"bar\"}}]}";
PutPipelineRequest request = new PutPipelineRequest(
"my-pipeline-id", // <1>
new BytesArray(source.getBytes(StandardCharsets.UTF_8)), // <2>
XContentType.JSON // <3>
);
// end::put-pipeline-request

// tag::put-pipeline-request-timeout
request.timeout(TimeValue.timeValueMinutes(2)); // <1>
request.timeout("2m"); // <2>
// end::put-pipeline-request-timeout

// tag::put-pipeline-request-masterTimeout
request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); // <1>
request.masterNodeTimeout("1m"); // <2>
// end::put-pipeline-request-masterTimeout

// tag::put-pipeline-execute
PutPipelineResponse response = client.cluster().putPipeline(request); // <1>
// end::put-pipeline-execute

// tag::put-pipeline-response
boolean acknowledged = response.isAcknowledged(); // <1>
// end::put-pipeline-response
assertTrue(acknowledged);
}
}

public void testPutPipelineAsync() throws Exception {
RestHighLevelClient client = highLevelClient();

{
String source =
"{\"description\":\"my set of processors\"," +
"\"processors\":[{\"set\":{\"field\":\"foo\",\"value\":\"bar\"}}]}";
PutPipelineRequest request = new PutPipelineRequest(
"my-pipeline-id",
new BytesArray(source.getBytes(StandardCharsets.UTF_8)),
XContentType.JSON
);

// tag::put-pipeline-execute-listener
ActionListener<PutPipelineResponse> listener =
new ActionListener<PutPipelineResponse>() {
@Override
public void onResponse(PutPipelineResponse response) {
// <1>
}

@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::put-pipeline-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);

// tag::put-pipeline-execute-async
client.cluster().putPipelineAsync(request, listener); // <1>
// end::put-pipeline-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
}
Loading