Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SortResponseProcessor to Search Pipelines #14785

Merged
merged 9 commits into from
Jul 22, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750))
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
- Add SortResponseProcessor to Search Pipelines (([#14785](https://github.com/opensearch-project/OpenSearch/issues/14785)))
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProces
new TruncateHitsResponseProcessor.Factory(),
CollapseResponseProcessor.TYPE,
new CollapseResponseProcessor.Factory(),
SplitResponseProcessor.TYPE,
new SplitResponseProcessor.Factory()
SortResponseProcessor.TYPE,
new SortResponseProcessor.Factory()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Processor that sorts an array of items.
* Throws exception is the specified field is not an array.
*/
public class SortResponseProcessor extends AbstractProcessor implements SearchResponseProcessor {
/** Key to reference this processor type from a search pipeline. */
public static final String TYPE = "sort";
/** Key defining the array field to be sorted. */
public static final String SORT_FIELD = "field";
/** Optional key defining the sort order. */
public static final String SORT_ORDER = "order";
/** Optional key to put the sorted values in a different field. */
public static final String TARGET_FIELD = "target_field";
/** Default sort order if not specified */
public static final String DEFAULT_ORDER = "asc";

/** Enum defining how elements will be sorted */
public enum SortOrder {
/** Sort in ascending (natural) order */
ASCENDING("asc"),
/** Sort in descending (reverse) order */
DESCENDING("desc");

private final String direction;

SortOrder(String direction) {
this.direction = direction;
}

@Override
public String toString() {
return this.direction;
}

/**
* Converts the string representation of the enum value to the enum.
* @param value A string ("asc" or "desc")
* @return the corresponding enum value
*/
public static SortOrder fromString(String value) {
if (value == null) {
throw new IllegalArgumentException("Sort direction cannot be null");

Check warning on line 72 in modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SortResponseProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SortResponseProcessor.java#L72

Added line #L72 was not covered by tests
}

if (value.equals(ASCENDING.toString())) {
return ASCENDING;

Check warning on line 76 in modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SortResponseProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SortResponseProcessor.java#L76

Added line #L76 was not covered by tests
} else if (value.equals(DESCENDING.toString())) {
return DESCENDING;
}
throw new IllegalArgumentException("Sort direction [" + value + "] not recognized." + " Valid values are: [asc, desc]");

Check warning on line 80 in modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SortResponseProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SortResponseProcessor.java#L80

Added line #L80 was not covered by tests
}
}

private final String sortField;
private final SortOrder sortOrder;
private final String targetField;

SortResponseProcessor(
String tag,
String description,
boolean ignoreFailure,
String sortField,
SortOrder sortOrder,
String targetField
) {
super(tag, description, ignoreFailure);
this.sortField = Objects.requireNonNull(sortField);
this.sortOrder = Objects.requireNonNull(sortOrder);
this.targetField = targetField == null ? sortField : targetField;
}

/**
* Getter function for sortField
* @return sortField
*/
public String getSortField() {
return sortField;
}

/**
* Getter function for targetField
* @return targetField
*/
public String getTargetField() {
return targetField;
}

/**
* Getter function for sortOrder
* @return sortOrder
*/
public SortOrder getSortOrder() {
return sortOrder;
}

@Override
public String getType() {
return TYPE;
}

@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception {
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
Map<String, DocumentField> fields = hit.getFields();
if (fields.containsKey(sortField)) {
DocumentField docField = hit.getFields().get(sortField);
if (docField == null) {
throw new IllegalArgumentException("field [" + sortField + "] is null, cannot sort.");
}
hit.setDocumentField(targetField, new DocumentField(targetField, getSortedValues(docField.getValues())));
}
if (hit.hasSource()) {
BytesReference sourceRef = hit.getSourceRef();
Tuple<? extends MediaType, Map<String, Object>> typeAndSourceMap = XContentHelper.convertToMap(
sourceRef,
false,
(MediaType) null
);

Map<String, Object> sourceAsMap = typeAndSourceMap.v2();
if (sourceAsMap.containsKey(sortField)) {
Object val = sourceAsMap.get(sortField);
if (val instanceof List) {
@SuppressWarnings("unchecked")
List<Object> listVal = (List<Object>) val;
sourceAsMap.put(targetField, getSortedValues(listVal));
}
XContentBuilder builder = XContentBuilder.builder(typeAndSourceMap.v1().xContent());
builder.map(sourceAsMap);
hit.sourceRef(BytesReference.bytes(builder));
}
}
}
return response;
}

private List<Object> getSortedValues(List<Object> values) {
return values.stream()
.map(this::downcastToComparable)
.sorted(sortOrder.equals(SortOrder.ASCENDING) ? Comparator.naturalOrder() : Comparator.reverseOrder())
.collect(Collectors.toList());
}

@SuppressWarnings("unchecked")
private Comparable<Object> downcastToComparable(Object obj) {
if (obj instanceof Comparable) {
return (Comparable<Object>) obj;
} else if (obj == null) {
throw new IllegalArgumentException("field [" + sortField + "] contains a null value.]");
} else {
throw new IllegalArgumentException("field [" + sortField + "] of type [" + obj.getClass().getName() + "] is not comparable.]");
}
}

static class Factory implements Processor.Factory<SearchResponseProcessor> {

@Override
public SortResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) {
String sortField = ConfigurationUtils.readStringProperty(TYPE, tag, config, SORT_FIELD);
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, TARGET_FIELD, sortField);
try {
SortOrder sortOrder = SortOrder.fromString(
ConfigurationUtils.readStringProperty(TYPE, tag, config, SORT_ORDER, DEFAULT_ORDER)
);
return new SortResponseProcessor(tag, description, ignoreFailure, sortField, sortOrder, targetField);
} catch (IllegalArgumentException e) {
throw ConfigurationUtils.newConfigurationException(TYPE, tag, SORT_ORDER, e.getMessage());

Check warning on line 205 in modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SortResponseProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SortResponseProcessor.java#L204-L205

Added lines #L204 - L205 were not covered by tests
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp
throw new IllegalArgumentException("field [" + splitField + "] is null, cannot split.");
}
Object val = docField.getValue();
if (val == null || !String.class.isAssignableFrom(val.getClass())) {
if (!(val instanceof String)) {
throw new IllegalArgumentException("field [" + splitField + "] is not a string, cannot split");
}
Object[] strings = ((String) val).split(separator, preserveTrailing ? -1 : 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testAllowlistNotSpecified() throws IOException {
try (SearchPipelineCommonModulePlugin plugin = new SearchPipelineCommonModulePlugin()) {
assertEquals(Set.of("oversample", "filter_query", "script"), plugin.getRequestProcessors(createParameters(settings)).keySet());
assertEquals(
Set.of("rename_field", "truncate_hits", "collapse", "split"),
Set.of("rename_field", "truncate_hits", "collapse", "sort"),
plugin.getResponseProcessors(createParameters(settings)).keySet()
);
assertEquals(Set.of(), plugin.getSearchPhaseResultsProcessors(createParameters(settings)).keySet());
Expand Down
Loading
Loading