Skip to content

Commit

Permalink
Add SortResponseProcessor to Search Pipelines (opensearch-project#14785
Browse files Browse the repository at this point in the history
…) (opensearch-project#14868)

* Add SortResponseProcessor for search pipelines

* Add stupid and unnecessary javadocs to satisfy overly strict CI

* Split casting and sorting methods for readability

* Register the sort processor factory

* Address code review comments

* Cast individual list elements to avoid creating two lists

* Add yamlRestTests

* Clarify why there's unusual sorting

* Use instanceof instead of isAssignableFrom

---------

Signed-off-by: Daniel Widdis <[email protected]>
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
dbwiddis authored and kkewwei committed Jul 24, 2024
1 parent fa3dcfe commit 1eb928c
Show file tree
Hide file tree
Showing 7 changed files with 597 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Print reason why parent task was cancelled ([#14604](https://github.com/opensearch-project/OpenSearch/issues/14604))
- Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750))
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
- Add SortResponseProcessor to Search Pipelines (([#14785](https://github.com/opensearch-project/OpenSearch/issues/14785)))
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProces
new TruncateHitsResponseProcessor.Factory(),
CollapseResponseProcessor.TYPE,
new CollapseResponseProcessor.Factory(),
SplitResponseProcessor.TYPE,
new SplitResponseProcessor.Factory()
SortResponseProcessor.TYPE,
new SortResponseProcessor.Factory()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Processor that sorts an array of items.
* Throws exception is the specified field is not an array.
*/
public class SortResponseProcessor extends AbstractProcessor implements SearchResponseProcessor {
/** Key to reference this processor type from a search pipeline. */
public static final String TYPE = "sort";
/** Key defining the array field to be sorted. */
public static final String SORT_FIELD = "field";
/** Optional key defining the sort order. */
public static final String SORT_ORDER = "order";
/** Optional key to put the sorted values in a different field. */
public static final String TARGET_FIELD = "target_field";
/** Default sort order if not specified */
public static final String DEFAULT_ORDER = "asc";

/** Enum defining how elements will be sorted */
public enum SortOrder {
/** Sort in ascending (natural) order */
ASCENDING("asc"),
/** Sort in descending (reverse) order */
DESCENDING("desc");

private final String direction;

SortOrder(String direction) {
this.direction = direction;
}

@Override
public String toString() {
return this.direction;
}

/**
* Converts the string representation of the enum value to the enum.
* @param value A string ("asc" or "desc")
* @return the corresponding enum value
*/
public static SortOrder fromString(String value) {
if (value == null) {
throw new IllegalArgumentException("Sort direction cannot be null");
}

if (value.equals(ASCENDING.toString())) {
return ASCENDING;
} else if (value.equals(DESCENDING.toString())) {
return DESCENDING;
}
throw new IllegalArgumentException("Sort direction [" + value + "] not recognized." + " Valid values are: [asc, desc]");
}
}

private final String sortField;
private final SortOrder sortOrder;
private final String targetField;

SortResponseProcessor(
String tag,
String description,
boolean ignoreFailure,
String sortField,
SortOrder sortOrder,
String targetField
) {
super(tag, description, ignoreFailure);
this.sortField = Objects.requireNonNull(sortField);
this.sortOrder = Objects.requireNonNull(sortOrder);
this.targetField = targetField == null ? sortField : targetField;
}

/**
* Getter function for sortField
* @return sortField
*/
public String getSortField() {
return sortField;
}

/**
* Getter function for targetField
* @return targetField
*/
public String getTargetField() {
return targetField;
}

/**
* Getter function for sortOrder
* @return sortOrder
*/
public SortOrder getSortOrder() {
return sortOrder;
}

@Override
public String getType() {
return TYPE;
}

@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception {
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
Map<String, DocumentField> fields = hit.getFields();
if (fields.containsKey(sortField)) {
DocumentField docField = hit.getFields().get(sortField);
if (docField == null) {
throw new IllegalArgumentException("field [" + sortField + "] is null, cannot sort.");
}
hit.setDocumentField(targetField, new DocumentField(targetField, getSortedValues(docField.getValues())));
}
if (hit.hasSource()) {
BytesReference sourceRef = hit.getSourceRef();
Tuple<? extends MediaType, Map<String, Object>> typeAndSourceMap = XContentHelper.convertToMap(
sourceRef,
false,
(MediaType) null
);

Map<String, Object> sourceAsMap = typeAndSourceMap.v2();
if (sourceAsMap.containsKey(sortField)) {
Object val = sourceAsMap.get(sortField);
if (val instanceof List) {
@SuppressWarnings("unchecked")
List<Object> listVal = (List<Object>) val;
sourceAsMap.put(targetField, getSortedValues(listVal));
}
XContentBuilder builder = XContentBuilder.builder(typeAndSourceMap.v1().xContent());
builder.map(sourceAsMap);
hit.sourceRef(BytesReference.bytes(builder));
}
}
}
return response;
}

private List<Object> getSortedValues(List<Object> values) {
return values.stream()
.map(this::downcastToComparable)
.sorted(sortOrder.equals(SortOrder.ASCENDING) ? Comparator.naturalOrder() : Comparator.reverseOrder())
.collect(Collectors.toList());
}

@SuppressWarnings("unchecked")
private Comparable<Object> downcastToComparable(Object obj) {
if (obj instanceof Comparable) {
return (Comparable<Object>) obj;
} else if (obj == null) {
throw new IllegalArgumentException("field [" + sortField + "] contains a null value.]");
} else {
throw new IllegalArgumentException("field [" + sortField + "] of type [" + obj.getClass().getName() + "] is not comparable.]");
}
}

static class Factory implements Processor.Factory<SearchResponseProcessor> {

@Override
public SortResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) {
String sortField = ConfigurationUtils.readStringProperty(TYPE, tag, config, SORT_FIELD);
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, TARGET_FIELD, sortField);
try {
SortOrder sortOrder = SortOrder.fromString(
ConfigurationUtils.readStringProperty(TYPE, tag, config, SORT_ORDER, DEFAULT_ORDER)
);
return new SortResponseProcessor(tag, description, ignoreFailure, sortField, sortOrder, targetField);
} catch (IllegalArgumentException e) {
throw ConfigurationUtils.newConfigurationException(TYPE, tag, SORT_ORDER, e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp
throw new IllegalArgumentException("field [" + splitField + "] is null, cannot split.");
}
Object val = docField.getValue();
if (val == null || !String.class.isAssignableFrom(val.getClass())) {
if (!(val instanceof String)) {
throw new IllegalArgumentException("field [" + splitField + "] is not a string, cannot split");
}
Object[] strings = ((String) val).split(separator, preserveTrailing ? -1 : 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testAllowlistNotSpecified() throws IOException {
try (SearchPipelineCommonModulePlugin plugin = new SearchPipelineCommonModulePlugin()) {
assertEquals(Set.of("oversample", "filter_query", "script"), plugin.getRequestProcessors(createParameters(settings)).keySet());
assertEquals(
Set.of("rename_field", "truncate_hits", "collapse", "split"),
Set.of("rename_field", "truncate_hits", "collapse", "sort"),
plugin.getResponseProcessors(createParameters(settings)).keySet()
);
assertEquals(Set.of(), plugin.getSearchPhaseResultsProcessors(createParameters(settings)).keySet());
Expand Down
Loading

0 comments on commit 1eb928c

Please sign in to comment.