Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce FetchContext #62357

Merged
merged 17 commits into from
Sep 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@
import org.apache.lucene.search.QueryVisitor;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.FetchContext;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightPhase;
import org.elasticsearch.search.fetch.subphase.highlight.Highlighter;
import org.elasticsearch.search.fetch.subphase.highlight.SearchHighlightContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.lookup.SourceLookup;

Expand All @@ -56,11 +55,11 @@ final class PercolatorHighlightSubFetchPhase implements FetchSubPhase {
}

@Override
public FetchSubPhaseProcessor getProcessor(SearchContext searchContext, SearchLookup lookup) throws IOException {
if (searchContext.highlight() == null) {
public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext, SearchLookup lookup) {
if (fetchContext.highlight() == null) {
return null;
}
List<PercolateQuery> percolateQueries = locatePercolatorQuery(searchContext.query());
List<PercolateQuery> percolateQueries = locatePercolatorQuery(fetchContext.query());
if (percolateQueries.isEmpty()) {
return null;
}
Expand All @@ -69,7 +68,7 @@ public FetchSubPhaseProcessor getProcessor(SearchContext searchContext, SearchLo
LeafReaderContext ctx;

@Override
public void setNextReader(LeafReaderContext readerContext) throws IOException {
public void setNextReader(LeafReaderContext readerContext) {
this.ctx = readerContext;
}

Expand Down Expand Up @@ -101,10 +100,8 @@ percolatorLeafReaderContext, slot, new SourceLookup(), new HashMap<>()
);
subContext.sourceLookup().setSource(document);
// force source because MemoryIndex does not store fields
SearchHighlightContext highlight = new SearchHighlightContext(searchContext.highlight().fields(), true);
QueryShardContext shardContext = new QueryShardContext(searchContext.getQueryShardContext());
FetchSubPhaseProcessor processor = highlightPhase.getProcessor(shardContext, searchContext.shardTarget(),
highlight, query);
SearchHighlightContext highlight = new SearchHighlightContext(fetchContext.highlight().fields(), true);
FetchSubPhaseProcessor processor = highlightPhase.getProcessor(fetchContext, highlight, query);
processor.process(subContext);
for (Map.Entry<String, HighlightField> entry : subContext.hit().getHighlightFields().entrySet()) {
if (percolateQuery.getDocuments().size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.apache.lucene.util.BitSetIterator;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.search.fetch.FetchContext;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SearchLookup;

import java.io.IOException;
Expand All @@ -57,10 +57,10 @@ final class PercolatorMatchedSlotSubFetchPhase implements FetchSubPhase {
static final String FIELD_NAME_PREFIX = "_percolator_document_slot";

@Override
public FetchSubPhaseProcessor getProcessor(SearchContext searchContext, SearchLookup lookup) throws IOException {
public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext, SearchLookup lookup) throws IOException {

List<PercolateContext> percolateContexts = new ArrayList<>();
List<PercolateQuery> percolateQueries = locatePercolatorQuery(searchContext.query());
List<PercolateQuery> percolateQueries = locatePercolatorQuery(fetchContext.query());
boolean singlePercolateQuery = percolateQueries.size() == 1;
for (PercolateQuery pq : percolateQueries) {
percolateContexts.add(new PercolateContext(pq, singlePercolateQuery));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,33 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery;
import org.elasticsearch.common.lucene.search.function.RandomScoreFunction;
import org.elasticsearch.search.fetch.FetchContext;
import org.elasticsearch.search.fetch.subphase.highlight.SearchHighlightContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.test.ESTestCase;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;

import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;

public class PercolatorHighlightSubFetchPhaseTests extends ESTestCase {

public void testHitsExecutionNeeded() throws IOException {
public void testHitsExecutionNeeded() {
PercolateQuery percolateQuery = new PercolateQuery("_name", ctx -> null, Collections.singletonList(new BytesArray("{}")),
new MatchAllDocsQuery(), Mockito.mock(IndexSearcher.class), null, new MatchAllDocsQuery());
PercolatorHighlightSubFetchPhase subFetchPhase = new PercolatorHighlightSubFetchPhase(emptyMap());
SearchContext searchContext = Mockito.mock(SearchContext.class);
Mockito.when(searchContext.highlight()).thenReturn(new SearchHighlightContext(Collections.emptyList()));
Mockito.when(searchContext.query()).thenReturn(new MatchAllDocsQuery());
FetchContext fetchContext = mock(FetchContext.class);
Mockito.when(fetchContext.highlight()).thenReturn(new SearchHighlightContext(Collections.emptyList()));
Mockito.when(fetchContext.query()).thenReturn(new MatchAllDocsQuery());

assertNull(subFetchPhase.getProcessor(searchContext, null));
Mockito.when(searchContext.query()).thenReturn(percolateQuery);
assertNotNull(subFetchPhase.getProcessor(searchContext, null));
assertNull(subFetchPhase.getProcessor(fetchContext, null));
Mockito.when(fetchContext.query()).thenReturn(percolateQuery);
assertNotNull(subFetchPhase.getProcessor(fetchContext, null));
}

public void testLocatePercolatorQuery() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.memory.MemoryIndex;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.ScoreDoc;
Expand All @@ -37,9 +36,9 @@
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.FetchContext;
import org.elasticsearch.search.fetch.FetchSubPhase.HitContext;
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SourceLookup;
import org.elasticsearch.test.ESTestCase;

Expand All @@ -63,7 +62,6 @@ public void testHitsExecute() throws Exception {
PercolatorMatchedSlotSubFetchPhase phase = new PercolatorMatchedSlotSubFetchPhase();

try (DirectoryReader reader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = new IndexSearcher(reader);
LeafReaderContext context = reader.leaves().get(0);
// A match:
{
Expand All @@ -75,7 +73,7 @@ public void testHitsExecute() throws Exception {
PercolateQuery percolateQuery = new PercolateQuery("_name", queryStore, Collections.emptyList(),
new MatchAllDocsQuery(), memoryIndex.createSearcher(), null, new MatchNoDocsQuery());

SearchContext sc = mock(SearchContext.class);
FetchContext sc = mock(FetchContext.class);
when(sc.query()).thenReturn(percolateQuery);

FetchSubPhaseProcessor processor = phase.getProcessor(sc, null);
Expand All @@ -96,7 +94,7 @@ public void testHitsExecute() throws Exception {
PercolateQuery percolateQuery = new PercolateQuery("_name", queryStore, Collections.emptyList(),
new MatchAllDocsQuery(), memoryIndex.createSearcher(), null, new MatchNoDocsQuery());

SearchContext sc = mock(SearchContext.class);
FetchContext sc = mock(FetchContext.class);
when(sc.query()).thenReturn(percolateQuery);

FetchSubPhaseProcessor processor = phase.getProcessor(sc, null);
Expand All @@ -116,7 +114,7 @@ public void testHitsExecute() throws Exception {
PercolateQuery percolateQuery = new PercolateQuery("_name", queryStore, Collections.emptyList(),
new MatchAllDocsQuery(), memoryIndex.createSearcher(), null, new MatchNoDocsQuery());

SearchContext sc = mock(SearchContext.class);
FetchContext sc = mock(FetchContext.class);
when(sc.query()).thenReturn(percolateQuery);

FetchSubPhaseProcessor processor = phase.getProcessor(sc, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ setup:
body:
keyword: [ "a" ]

- do:
indices.refresh:
index: [ test ]

- do:
catch: bad_request
search:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,22 @@

package org.elasticsearch.search.fetch;

import org.apache.logging.log4j.LogManager;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.termvectors.TermVectorsRequest;
import org.elasticsearch.action.termvectors.TermVectorsResponse;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.termvectors.TermVectorsService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.SearchExtBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
Expand Down Expand Up @@ -115,21 +111,21 @@ private static final class TermVectorsFetchSubPhase implements FetchSubPhase {
private static final String NAME = "term_vectors_fetch";

@Override
public FetchSubPhaseProcessor getProcessor(SearchContext searchContext, SearchLookup lookup) {
public FetchSubPhaseProcessor getProcessor(FetchContext searchContext, SearchLookup lookup) {
return new FetchSubPhaseProcessor() {
@Override
public void setNextReader(LeafReaderContext readerContext) {

}

@Override
public void process(HitContext hitContext) {
public void process(HitContext hitContext) throws IOException {
hitExecute(searchContext, hitContext);
}
};
}

private void hitExecute(SearchContext context, HitContext hitContext) {
private void hitExecute(FetchContext context, HitContext hitContext) throws IOException {
TermVectorsFetchBuilder fetchSubPhaseBuilder = (TermVectorsFetchBuilder)context.getSearchExt(NAME);
if (fetchSubPhaseBuilder == null) {
return;
Expand All @@ -140,19 +136,18 @@ private void hitExecute(SearchContext context, HitContext hitContext) {
hitField = new DocumentField(NAME, new ArrayList<>(1));
hitContext.hit().setDocumentField(NAME, hitField);
}
TermVectorsRequest termVectorsRequest = new TermVectorsRequest(context.indexShard().shardId().getIndex().getName(),
hitContext.hit().getId());
TermVectorsResponse termVector = TermVectorsService.getTermVectors(context.indexShard(), termVectorsRequest);
try {
Terms terms = hitContext.reader().getTermVector(hitContext.docId(), field);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking I understand, is this just an optional clean-up ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it means that we don't need to expose the whole index shard to the FetchContext.

if (terms != null) {
TermsEnum te = terms.iterator();
Map<String, Integer> tv = new HashMap<>();
TermsEnum terms = termVector.getFields().terms(field).iterator();
BytesRef term;
while ((term = terms.next()) != null) {
tv.put(term.utf8ToString(), terms.postings(null, PostingsEnum.ALL).freq());
PostingsEnum pe = null;
while ((term = te.next()) != null) {
pe = te.postings(pe, PostingsEnum.FREQS);
pe.nextDoc();
tv.put(term.utf8ToString(), pe.freq());
}
hitField.getValues().add(tv);
} catch (IOException e) {
LogManager.getLogger(FetchSubPhasePluginIT.class).info("Swallowed exception", e);
}
}
}
Expand Down
Loading