Skip to content

Commit

Permalink
REST high-level client: add get ingest pipeline API (elastic#30847)
Browse files Browse the repository at this point in the history
Relates to elastic#27205
  • Loading branch information
sohaibiftikhar authored and javanna committed Jun 1, 2018
1 parent f76b225 commit 7d955f8
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;

Expand Down Expand Up @@ -87,4 +89,26 @@ public void putPipelineAsync(PutPipelineRequest request, ActionListener<PutPipel
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::putPipeline,
PutPipelineResponse::fromXContent, listener, emptySet(), headers);
}

/**
* Get an existing pipeline
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/get-pipeline-api.html"> Get Pipeline API on elastic.co</a>
*/
public GetPipelineResponse getPipeline(GetPipelineRequest request, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::getPipeline,
GetPipelineResponse::fromXContent, emptySet(), headers);
}

/**
* Asynchronously get an existing pipeline
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/get-pipeline-api.html"> Get Pipeline API on elastic.co</a>
*/
public void getPipelineAsync(GetPipelineRequest request, ActionListener<GetPipelineResponse> listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::getPipeline,
GetPipelineResponse::fromXContent, listener, emptySet(), headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -631,6 +632,18 @@ static Request clusterPutSettings(ClusterUpdateSettingsRequest clusterUpdateSett
return request;
}

static Request getPipeline(GetPipelineRequest getPipelineRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ingest/pipeline")
.addCommaSeparatedPathParts(getPipelineRequest.getIds())
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);

Params parameters = new Params(request);
parameters.withMasterTimeout(getPipelineRequest.masterNodeTimeout());
return request;
}

static Request putPipeline(PutPipelineRequest putPipelineRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ingest/pipeline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
Expand All @@ -32,7 +34,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
Expand Down Expand Up @@ -113,31 +115,7 @@ public void testClusterUpdateSettingNonExistent() {

public void testPutPipeline() throws IOException {
String id = "some_pipeline_id";
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
pipelineBuilder.startObject();
{
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
{
pipelineBuilder.startObject().startObject("set");
{
pipelineBuilder
.field("field", "foo")
.field("value", "bar");
}
pipelineBuilder.endObject().endObject();
pipelineBuilder.startObject().startObject("convert");
{
pipelineBuilder
.field("field", "rank")
.field("type", "integer");
}
pipelineBuilder.endObject().endObject();
}
pipelineBuilder.endArray();
}
pipelineBuilder.endObject();
XContentBuilder pipelineBuilder = buildRandomXContentPipeline();
PutPipelineRequest request = new PutPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
Expand All @@ -147,4 +125,27 @@ public void testPutPipeline() throws IOException {
execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync);
assertTrue(putPipelineResponse.isAcknowledged());
}

public void testGetPipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildRandomXContentPipeline();
{
PutPipelineRequest request = new PutPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);
}

GetPipelineRequest request = new GetPipelineRequest(id);

GetPipelineResponse response =
execute(request, highLevelClient().cluster()::getPipeline, highLevelClient().cluster()::getPipelineAsync);
assertTrue(response.isFound());
assertEquals(response.pipelines().get(0).getId(), id);
PipelineConfiguration expectedConfig =
new PipelineConfiguration(id, BytesReference.bytes(pipelineBuilder), pipelineBuilder.contentType());
assertEquals(expectedConfig.getConfigAsMap(), response.pipelines().get(0).getConfigAsMap());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@

import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down Expand Up @@ -80,4 +85,42 @@ private HighLevelClient(RestClient restClient) {
super(restClient, (client) -> {}, Collections.emptyList());
}
}

protected static XContentBuilder buildRandomXContentPipeline() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
pipelineBuilder.startObject();
{
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
{
pipelineBuilder.startObject().startObject("set");
{
pipelineBuilder
.field("field", "foo")
.field("value", "bar");
}
pipelineBuilder.endObject().endObject();
pipelineBuilder.startObject().startObject("convert");
{
pipelineBuilder
.field("field", "rank")
.field("type", "integer");
}
pipelineBuilder.endObject().endObject();
}
pipelineBuilder.endArray();
}
pipelineBuilder.endObject();
return pipelineBuilder;
}

protected static void createPipeline(String pipelineId) throws IOException {
XContentBuilder builder = buildRandomXContentPipeline();
createPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(builder), builder.contentType()));
}

protected static void createPipeline(PutPipelineRequest putPipelineRequest) throws IOException {
assertOK(client().performRequest(RequestConverters.putPipeline(putPipelineRequest)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
Expand Down Expand Up @@ -1482,6 +1483,20 @@ public void testPutPipeline() throws IOException {
assertEquals(expectedParams, expectedRequest.getParameters());
}

public void testGetPipeline() {
String pipelineId = "some_pipeline_id";
Map<String, String> expectedParams = new HashMap<>();
GetPipelineRequest request = new GetPipelineRequest("some_pipeline_id");
setRandomMasterTimeout(request, expectedParams);
Request expectedRequest = RequestConverters.getPipeline(request);
StringJoiner endpoint = new StringJoiner("/", "/", "");
endpoint.add("_ingest/pipeline");
endpoint.add(pipelineId);
assertEquals(endpoint.toString(), expectedRequest.getEndpoint());
assertEquals(HttpGet.METHOD_NAME, expectedRequest.getMethod());
assertEquals(expectedParams, expectedRequest.getParameters());
}

public void testRollover() throws IOException {
RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10),
randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
Expand All @@ -34,11 +36,13 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.ingest.PipelineConfiguration;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -257,4 +261,74 @@ public void onFailure(Exception e) {
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

public void testGetPipeline() throws IOException {
RestHighLevelClient client = highLevelClient();

{
createPipeline("my-pipeline-id");
}

{
// tag::get-pipeline-request
GetPipelineRequest request = new GetPipelineRequest("my-pipeline-id"); // <1>
// end::get-pipeline-request

// tag::get-pipeline-request-masterTimeout
request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); // <1>
request.masterNodeTimeout("1m"); // <2>
// end::get-pipeline-request-masterTimeout

// tag::get-pipeline-execute
GetPipelineResponse response = client.cluster().getPipeline(request); // <1>
// end::get-pipeline-execute

// tag::get-pipeline-response
boolean successful = response.isFound(); // <1>
List<PipelineConfiguration> pipelines = response.pipelines(); // <2>
for(PipelineConfiguration pipeline: pipelines) {
Map<String, Object> config = pipeline.getConfigAsMap(); // <3>
}
// end::get-pipeline-response

assertTrue(successful);
}
}

public void testGetPipelineAsync() throws Exception {
RestHighLevelClient client = highLevelClient();

{
createPipeline("my-pipeline-id");
}

{
GetPipelineRequest request = new GetPipelineRequest("my-pipeline-id");

// tag::get-pipeline-execute-listener
ActionListener<GetPipelineResponse> listener =
new ActionListener<GetPipelineResponse>() {
@Override
public void onResponse(GetPipelineResponse response) {
// <1>
}

@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::get-pipeline-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);

// tag::get-pipeline-execute-async
client.cluster().getPipelineAsync(request, listener); // <1>
// end::get-pipeline-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
}
Loading

0 comments on commit 7d955f8

Please sign in to comment.