-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
REST high-level client: add put ingest pipeline API #30793
Changes from 6 commits
2c5d70e
a5c389a
42942b6
4c3c61a
a11026b
aa0d270
e77799d
977c259
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,12 +25,17 @@ | |
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; | ||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; | ||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; | ||
import org.elasticsearch.action.ingest.PutPipelineRequest; | ||
import org.elasticsearch.action.ingest.PutPipelineResponse; | ||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.ByteSizeUnit; | ||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
import org.elasticsearch.common.xcontent.XContentType; | ||
import org.elasticsearch.common.xcontent.support.XContentMapValues; | ||
import org.elasticsearch.indices.recovery.RecoverySettings; | ||
import org.elasticsearch.ingest.Pipeline; | ||
import org.elasticsearch.rest.RestStatus; | ||
import org.elasticsearch.tasks.TaskInfo; | ||
|
||
|
@@ -136,4 +141,38 @@ public void testListTasks() throws IOException { | |
} | ||
assertTrue("List tasks were not found", listTasksFound); | ||
} | ||
|
||
public void testPutPipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
XContentType xContentType = randomFrom(XContentType.values()); | ||
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent()); | ||
pipelineBuilder.startObject().field(Pipeline.DESCRIPTION_KEY, "some random set of processors"); | ||
pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY); | ||
//Start first processor | ||
pipelineBuilder.startObject(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tend to like doing something like this:
The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the review. I only saw one thing to do (you mentioned two). Made the change for that. |
||
pipelineBuilder.startObject("set"); | ||
pipelineBuilder.field("field", "foo"); | ||
pipelineBuilder.field("value", "bar"); | ||
pipelineBuilder.endObject(); | ||
pipelineBuilder.endObject(); | ||
//End first processor | ||
//Start second processor | ||
pipelineBuilder.startObject(); | ||
pipelineBuilder.startObject("convert"); | ||
pipelineBuilder.field("field", "rank"); | ||
pipelineBuilder.field("type", "integer"); | ||
pipelineBuilder.endObject(); | ||
pipelineBuilder.endObject(); | ||
//End second processor | ||
pipelineBuilder.endArray(); | ||
pipelineBuilder.endObject(); | ||
PutPipelineRequest request = new PutPipelineRequest( | ||
id, | ||
BytesReference.bytes(pipelineBuilder), | ||
pipelineBuilder.contentType()); | ||
|
||
PutPipelineResponse putPipelineResponse = | ||
execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync); | ||
assertTrue(putPipelineResponse.isAcknowledged()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,17 +21,19 @@ | |
|
||
import org.elasticsearch.ElasticsearchException; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.FailedNodeException; | ||
import org.elasticsearch.action.LatchedActionListener; | ||
import org.elasticsearch.action.TaskOperationFailure; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; | ||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; | ||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; | ||
import org.elasticsearch.action.ingest.PutPipelineRequest; | ||
import org.elasticsearch.action.ingest.PutPipelineResponse; | ||
import org.elasticsearch.client.ESRestHighLevelClientTestCase; | ||
import org.elasticsearch.client.RestHighLevelClient; | ||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; | ||
import org.elasticsearch.common.bytes.BytesArray; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.ByteSizeUnit; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
|
@@ -41,6 +43,7 @@ | |
import org.elasticsearch.tasks.TaskInfo; | ||
|
||
import java.io.IOException; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
@@ -80,19 +83,19 @@ public void testClusterPutSettings() throws IOException { | |
// end::put-settings-request | ||
|
||
// tag::put-settings-create-settings | ||
String transientSettingKey = | ||
String transientSettingKey = | ||
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(); | ||
int transientSettingValue = 10; | ||
Settings transientSettings = | ||
Settings transientSettings = | ||
Settings.builder() | ||
.put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES) | ||
.build(); // <1> | ||
|
||
String persistentSettingKey = | ||
String persistentSettingKey = | ||
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(); | ||
String persistentSettingValue = | ||
String persistentSettingValue = | ||
EnableAllocationDecider.Allocation.NONE.name(); | ||
Settings persistentSettings = | ||
Settings persistentSettings = | ||
Settings.builder() | ||
.put(persistentSettingKey, persistentSettingValue) | ||
.build(); // <2> | ||
|
@@ -105,9 +108,9 @@ public void testClusterPutSettings() throws IOException { | |
|
||
{ | ||
// tag::put-settings-settings-builder | ||
Settings.Builder transientSettingsBuilder = | ||
Settings.Builder transientSettingsBuilder = | ||
Settings.builder() | ||
.put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES); | ||
.put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES); | ||
request.transientSettings(transientSettingsBuilder); // <1> | ||
// end::put-settings-settings-builder | ||
} | ||
|
@@ -164,7 +167,7 @@ public void testClusterUpdateSettingsAsync() throws Exception { | |
ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); | ||
|
||
// tag::put-settings-execute-listener | ||
ActionListener<ClusterUpdateSettingsResponse> listener = | ||
ActionListener<ClusterUpdateSettingsResponse> listener = | ||
new ActionListener<ClusterUpdateSettingsResponse>() { | ||
@Override | ||
public void onResponse(ClusterUpdateSettingsResponse response) { | ||
|
@@ -272,4 +275,80 @@ public void onFailure(Exception e) { | |
assertTrue(latch.await(30L, TimeUnit.SECONDS)); | ||
} | ||
} | ||
|
||
public void testPutPipeline() throws IOException { | ||
RestHighLevelClient client = highLevelClient(); | ||
|
||
{ | ||
// tag::put-pipeline-request | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This tag seems to be missing its end There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
String source = | ||
"{\"description\":\"my set of processors\"," + | ||
"\"processors\":[{\"set\":{\"field\":\"foo\",\"value\":\"bar\"}}]}"; | ||
PutPipelineRequest request = new PutPipelineRequest( | ||
"my-pipeline-id", // <1> | ||
new BytesArray(source.getBytes(StandardCharsets.UTF_8)), // <2> | ||
XContentType.JSON // <3> | ||
); | ||
// end::put-pipeline-request | ||
|
||
// tag::put-pipeline-request-timeout | ||
request.timeout(TimeValue.timeValueMinutes(2)); // <1> | ||
request.timeout("2m"); // <2> | ||
// end::put-pipeline-request-timeout | ||
|
||
// tag::put-pipeline-request-masterTimeout | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This tag seems to be in the wron place and missing its end. |
||
request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); // <1> | ||
request.masterNodeTimeout("1m"); // <2> | ||
// end::put-pipeline-request-masterTimeout | ||
|
||
// tag::put-pipeline-execute | ||
PutPipelineResponse response = client.cluster().putPipeline(request); // <1> | ||
// end::put-pipeline-execute | ||
|
||
// tag::put-pipeline-response | ||
boolean acknowledged = response.isAcknowledged(); // <1> | ||
// end::put-pipeline-response | ||
assertTrue(acknowledged); | ||
} | ||
} | ||
|
||
public void testPutPipelineAsync() throws Exception { | ||
RestHighLevelClient client = highLevelClient(); | ||
|
||
{ | ||
String source = | ||
"{\"description\":\"my set of processors\"," + | ||
"\"processors\":[{\"set\":{\"field\":\"foo\",\"value\":\"bar\"}}]}"; | ||
PutPipelineRequest request = new PutPipelineRequest( | ||
"my-pipeline-id", | ||
new BytesArray(source.getBytes(StandardCharsets.UTF_8)), | ||
XContentType.JSON | ||
); | ||
|
||
// tag::put-pipeline-execute-listener | ||
ActionListener<PutPipelineResponse> listener = | ||
new ActionListener<PutPipelineResponse>() { | ||
@Override | ||
public void onResponse(PutPipelineResponse response) { | ||
// <1> | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
// <2> | ||
} | ||
}; | ||
// end::put-pipeline-execute-listener | ||
|
||
// Replace the empty listener by a blocking listener in test | ||
final CountDownLatch latch = new CountDownLatch(1); | ||
listener = new LatchedActionListener<>(listener, latch); | ||
|
||
// tag::put-pipeline-execute-async | ||
client.cluster().putPipelineAsync(request, listener); // <1> | ||
// end::put-pipeline-execute-async | ||
|
||
assertTrue(latch.await(30L, TimeUnit.SECONDS)); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for being late to the party. I just realized that the spec here defines the API as
ingest.put_pipeline
. Instead of adding this API (and all of the other ingest API) to the cluster ones, we should create a new namespace called ingest, similar to what we recently did with snapshot. @sohaibiftikhar would you mind taking care of this ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Should I combine this change with #30847 or put in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would do it in a separate PR, thanks!