Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingest-attachment support for per document indexed_chars limit #31352

Merged
merged 3 commits into from
Jun 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 116 additions & 6 deletions docs/plugins/ingest-attachment.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ include::install_remove.asciidoc[]
| `field` | yes | - | The field to get the base64 encoded field from
| `target_field` | no | attachment | The field that will hold the attachment information
| `indexed_chars` | no | 100000 | The number of chars being used for extraction to prevent huge fields. Use `-1` for no limit.
| `indexed_chars_field` | no | `null` | Field name from which you can overwrite the number of chars being used for extraction. See `indexed_chars`.
| `properties` | no | all properties | Array of properties to select to be stored. Can be `content`, `title`, `name`, `author`, `keywords`, `date`, `content_type`, `content_length`, `language`
| `ignore_missing` | no | `false` | If `true` and `field` does not exist, the processor quietly exits without modifying the document
|======
Expand All @@ -44,11 +45,11 @@ PUT _ingest/pipeline/attachment
}
]
}
PUT my_index/my_type/my_id?pipeline=attachment
PUT my_index/_doc/my_id?pipeline=attachment
{
"data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0="
}
GET my_index/my_type/my_id
GET my_index/_doc/my_id
--------------------------------------------------
// CONSOLE

Expand All @@ -59,7 +60,7 @@ Returns this:
{
"found": true,
"_index": "my_index",
"_type": "my_type",
"_type": "_doc",
"_id": "my_id",
"_version": 1,
"_source": {
Expand Down Expand Up @@ -99,6 +100,115 @@ NOTE: Extracting contents from binary data is a resource intensive operation and
consumes a lot of resources. It is highly recommended to run pipelines
using this processor in a dedicated ingest node.

[[ingest-attachment-extracted-chars]]
==== Limit the number of extracted chars

To prevent extracting too many chars and overload the node memory, the number of chars being used for extraction
is limited by default to `100000`. You can change this value by setting `indexed_chars`. Use `-1` for no limit but
ensure when setting this that your node will have enough HEAP to extract the content of very big documents.

You can also define this limit per document by extracting from a given field the limit to set. If the document
has that field, it will overwrite the `indexed_chars` setting. To set this field, define the `indexed_chars_field`
setting.

For example:

[source,js]
--------------------------------------------------
PUT _ingest/pipeline/attachment
{
"description" : "Extract attachment information",
"processors" : [
{
"attachment" : {
"field" : "data",
"indexed_chars" : 11,
"indexed_chars_field" : "max_size"
}
}
]
}
PUT my_index/_doc/my_id?pipeline=attachment
{
"data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0="
}
GET my_index/_doc/my_id
--------------------------------------------------
// CONSOLE

Returns this:

[source,js]
--------------------------------------------------
{
"found": true,
"_index": "my_index",
"_type": "_doc",
"_id": "my_id",
"_version": 1,
"_source": {
"data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=",
"attachment": {
"content_type": "application/rtf",
"language": "sl",
"content": "Lorem ipsum",
"content_length": 11
}
}
}
--------------------------------------------------
// TESTRESPONSE


[source,js]
--------------------------------------------------
PUT _ingest/pipeline/attachment
{
"description" : "Extract attachment information",
"processors" : [
{
"attachment" : {
"field" : "data",
"indexed_chars" : 11,
"indexed_chars_field" : "max_size"
}
}
]
}
PUT my_index/_doc/my_id_2?pipeline=attachment
{
"data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=",
"max_size": 5
}
GET my_index/_doc/my_id_2
--------------------------------------------------
// CONSOLE

Returns this:

[source,js]
--------------------------------------------------
{
"found": true,
"_index": "my_index",
"_type": "_doc",
"_id": "my_id_2",
"_version": 1,
"_source": {
"data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=",
"max_size": 5,
"attachment": {
"content_type": "application/rtf",
"language": "ro",
"content": "Lorem",
"content_length": 5
}
}
}
--------------------------------------------------
// TESTRESPONSE


[[ingest-attachment-with-arrays]]
==== Using the Attachment Processor with arrays

Expand Down Expand Up @@ -150,7 +260,7 @@ PUT _ingest/pipeline/attachment
}
]
}
PUT my_index/my_type/my_id?pipeline=attachment
PUT my_index/_doc/my_id?pipeline=attachment
{
"attachments" : [
{
Expand All @@ -163,7 +273,7 @@ PUT my_index/my_type/my_id?pipeline=attachment
}
]
}
GET my_index/my_type/my_id
GET my_index/_doc/my_id
--------------------------------------------------
// CONSOLE

Expand All @@ -172,7 +282,7 @@ Returns this:
--------------------------------------------------
{
"_index" : "my_index",
"_type" : "my_type",
"_type" : "_doc",
"_id" : "my_id",
"_version" : 1,
"found" : true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
Expand All @@ -42,6 +41,7 @@
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
import static org.elasticsearch.ingest.ConfigurationUtils.readIntProperty;
import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalList;
import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalStringProperty;
import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;

public final class AttachmentProcessor extends AbstractProcessor {
Expand All @@ -55,15 +55,17 @@ public final class AttachmentProcessor extends AbstractProcessor {
private final Set<Property> properties;
private final int indexedChars;
private final boolean ignoreMissing;
private final String indexedCharsField;

AttachmentProcessor(String tag, String field, String targetField, Set<Property> properties,
int indexedChars, boolean ignoreMissing) throws IOException {
int indexedChars, boolean ignoreMissing, String indexedCharsField) {
super(tag);
this.field = field;
this.targetField = targetField;
this.properties = properties;
this.indexedChars = indexedChars;
this.ignoreMissing = ignoreMissing;
this.indexedCharsField = indexedCharsField;
}

boolean isIgnoreMissing() {
Expand All @@ -82,6 +84,17 @@ public void execute(IngestDocument ingestDocument) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot parse.");
}

Integer indexedChars = this.indexedChars;

if (indexedCharsField != null) {
// If the user provided the number of characters to be extracted as part of the document, we use it
indexedChars = ingestDocument.getFieldValue(indexedCharsField, Integer.class, true);
if (indexedChars == null) {
// If the field does not exist we fall back to the global limit
indexedChars = this.indexedChars;
}
}

Metadata metadata = new Metadata();
String parsedContent = "";
try {
Expand Down Expand Up @@ -183,14 +196,15 @@ public AttachmentProcessor create(Map<String, Processor.Factory> registry, Strin
Map<String, Object> config) throws Exception {
String field = readStringProperty(TYPE, processorTag, config, "field");
String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "attachment");
List<String> properyNames = readOptionalList(TYPE, processorTag, config, "properties");
List<String> propertyNames = readOptionalList(TYPE, processorTag, config, "properties");
int indexedChars = readIntProperty(TYPE, processorTag, config, "indexed_chars", NUMBER_OF_CHARS_INDEXED);
boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
String indexedCharsField = readOptionalStringProperty(TYPE, processorTag, config, "indexed_chars_field");

final Set<Property> properties;
if (properyNames != null) {
if (propertyNames != null) {
properties = EnumSet.noneOf(Property.class);
for (String fieldName : properyNames) {
for (String fieldName : propertyNames) {
try {
properties.add(Property.parse(fieldName));
} catch (Exception e) {
Expand All @@ -202,7 +216,7 @@ public AttachmentProcessor create(Map<String, Processor.Factory> registry, Strin
properties = DEFAULT_PROPERTIES;
}

return new AttachmentProcessor(processorTag, field, targetField, properties, indexedChars, ignoreMissing);
return new AttachmentProcessor(processorTag, field, targetField, properties, indexedChars, ignoreMissing, indexedCharsField);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Base64;
Expand All @@ -54,9 +53,9 @@ public class AttachmentProcessorTests extends ESTestCase {
private AttachmentProcessor processor;

@Before
public void createStandardProcessor() throws IOException {
public void createStandardProcessor() {
processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field",
"target_field", EnumSet.allOf(AttachmentProcessor.Property.class), 10000, false);
"target_field", EnumSet.allOf(AttachmentProcessor.Property.class), 10000, false, null);
}

public void testEnglishTextDocument() throws Exception {
Expand Down Expand Up @@ -89,7 +88,7 @@ public void testHtmlDocumentWithRandomFields() throws Exception {
selectedProperties.add(AttachmentProcessor.Property.DATE);
}
processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field",
"target_field", selectedProperties, 10000, false);
"target_field", selectedProperties, 10000, false, null);

Map<String, Object> attachmentData = parseDocument("htmlWithEmptyDateMeta.html", processor);
assertThat(attachmentData.keySet(), hasSize(selectedFieldNames.length));
Expand Down Expand Up @@ -242,15 +241,15 @@ public void testNullValueWithIgnoreMissing() throws Exception {
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
Collections.singletonMap("source_field", null));
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, true);
Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, true, null);
processor.execute(ingestDocument);
assertIngestDocument(originalIngestDocument, ingestDocument);
}

public void testNonExistentWithIgnoreMissing() throws Exception {
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, true);
Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, true, null);
processor.execute(ingestDocument);
assertIngestDocument(originalIngestDocument, ingestDocument);
}
Expand All @@ -259,22 +258,28 @@ public void testNullWithoutIgnoreMissing() throws Exception {
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
Collections.singletonMap("source_field", null));
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, false);
Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, false, null);
Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument));
assertThat(exception.getMessage(), equalTo("field [source_field] is null, cannot parse."));
}

public void testNonExistentWithoutIgnoreMissing() throws Exception {
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, false);
Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, false, null);
Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument));
assertThat(exception.getMessage(), equalTo("field [source_field] not present as part of path [source_field]"));
}

private Map<String, Object> parseDocument(String file, AttachmentProcessor processor) throws Exception {
return parseDocument(file, processor, new HashMap<>());
}

private Map<String, Object> parseDocument(String file, AttachmentProcessor processor, Map<String, Object> optionalFields)
throws Exception {
Map<String, Object> document = new HashMap<>();
document.put("source_field", getAsBase64(file));
document.putAll(optionalFields);

IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
processor.execute(ingestDocument);
Expand All @@ -284,7 +289,47 @@ private Map<String, Object> parseDocument(String file, AttachmentProcessor proce
return attachmentData;
}

protected String getAsBase64(String filename) throws Exception {
public void testIndexedChars() throws Exception {
processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field",
"target_field", EnumSet.allOf(AttachmentProcessor.Property.class), 19, false, null);

Map<String, Object> attachmentData = parseDocument("text-in-english.txt", processor);

assertThat(attachmentData.keySet(), containsInAnyOrder("language", "content", "content_type", "content_length"));
assertThat(attachmentData.get("language"), is("en"));
assertThat(attachmentData.get("content"), is("\"God Save the Queen"));
assertThat(attachmentData.get("content_type").toString(), containsString("text/plain"));
assertThat(attachmentData.get("content_length"), is(19L));

processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field",
"target_field", EnumSet.allOf(AttachmentProcessor.Property.class), 19, false, "max_length");

attachmentData = parseDocument("text-in-english.txt", processor);

assertThat(attachmentData.keySet(), containsInAnyOrder("language", "content", "content_type", "content_length"));
assertThat(attachmentData.get("language"), is("en"));
assertThat(attachmentData.get("content"), is("\"God Save the Queen"));
assertThat(attachmentData.get("content_type").toString(), containsString("text/plain"));
assertThat(attachmentData.get("content_length"), is(19L));

attachmentData = parseDocument("text-in-english.txt", processor, Collections.singletonMap("max_length", 10));

assertThat(attachmentData.keySet(), containsInAnyOrder("language", "content", "content_type", "content_length"));
assertThat(attachmentData.get("language"), is("sk"));
assertThat(attachmentData.get("content"), is("\"God Save"));
assertThat(attachmentData.get("content_type").toString(), containsString("text/plain"));
assertThat(attachmentData.get("content_length"), is(10L));

attachmentData = parseDocument("text-in-english.txt", processor, Collections.singletonMap("max_length", 100));

assertThat(attachmentData.keySet(), containsInAnyOrder("language", "content", "content_type", "content_length"));
assertThat(attachmentData.get("language"), is("en"));
assertThat(attachmentData.get("content"), is("\"God Save the Queen\" (alternatively \"God Save the King\""));
assertThat(attachmentData.get("content_type").toString(), containsString("text/plain"));
assertThat(attachmentData.get("content_length"), is(56L));
}

private String getAsBase64(String filename) throws Exception {
String path = "/org/elasticsearch/ingest/attachment/test/sample-files/" + filename;
try (InputStream is = AttachmentProcessorTests.class.getResourceAsStream(path)) {
byte bytes[] = IOUtils.toByteArray(is);
Expand Down
Loading