From 400c002e1cc9c91ab82bf931035b907134862bc6 Mon Sep 17 00:00:00 2001 From: Navneet Verma Date: Tue, 16 Jul 2024 10:41:16 -0700 Subject: [PATCH] Fixed LeafReaders casting errors to SegmentReaders when segment replication is enabled during search Signed-off-by: Navneet Verma --- CHANGELOG.md | 1 + .../opensearch/knn/index/KNNIndexShard.java | 4 +- .../opensearch/knn/index/query/KNNWeight.java | 6 +- .../knn/index/SegmentReplicationIT.java | 94 +++++++++++++++++++ .../org/opensearch/knn/KNNRestTestCase.java | 35 ++++++- 5 files changed, 134 insertions(+), 6 deletions(-) create mode 100644 src/test/java/org/opensearch/knn/index/SegmentReplicationIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index e03e06ccb..a4e8541b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Enhancements ### Bug Fixes * Fixing the arithmetic to find the number of vectors to stream from java to jni layer.[#1804](https://github.com/opensearch-project/k-NN/pull/1804) +* Fixed LeafReaders casting errors to SegmentReaders when segment replication is enabled during search.[#1808](https://github.com/opensearch-project/k-NN/pull/1808) * Release memory properly for an array type [#1820](https://github.com/opensearch-project/k-NN/pull/1820) * FIX Same Suffix Cause Recall Drop to zero [#1802](https://github.com/opensearch-project/k-NN/pull/1802) ### Infrastructure diff --git a/src/main/java/org/opensearch/knn/index/KNNIndexShard.java b/src/main/java/org/opensearch/knn/index/KNNIndexShard.java index 81e6a1574..e74260801 100644 --- a/src/main/java/org/opensearch/knn/index/KNNIndexShard.java +++ b/src/main/java/org/opensearch/knn/index/KNNIndexShard.java @@ -10,12 +10,12 @@ import lombok.Getter; import lombok.extern.log4j.Log4j2; import org.apache.lucene.index.FieldInfo; -import org.apache.lucene.index.FilterLeafReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SegmentReader; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FilterDirectory; +import org.opensearch.common.lucene.Lucene; import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexShard; import org.opensearch.knn.index.mapper.KNNVectorFieldMapper; @@ -160,7 +160,7 @@ List getEngineFileContexts(IndexReader indexReader, KNNEngine List engineFiles = new ArrayList<>(); for (LeafReaderContext leafReaderContext : indexReader.leaves()) { - SegmentReader reader = (SegmentReader) FilterLeafReader.unwrap(leafReaderContext.reader()); + SegmentReader reader = Lucene.segmentReader(leafReaderContext.reader()); Path shardPath = ((FSDirectory) FilterDirectory.unwrap(reader.directory())).getDirectory(); String fileExtension = reader.getSegmentInfo().info.getUseCompoundFile() ? knnEngine.getCompoundExtension() diff --git a/src/main/java/org/opensearch/knn/index/query/KNNWeight.java b/src/main/java/org/opensearch/knn/index/query/KNNWeight.java index 503bea42d..a061e740e 100644 --- a/src/main/java/org/opensearch/knn/index/query/KNNWeight.java +++ b/src/main/java/org/opensearch/knn/index/query/KNNWeight.java @@ -11,7 +11,6 @@ import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.DocValues; import org.apache.lucene.index.FieldInfo; -import org.apache.lucene.index.FilterLeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SegmentReader; import org.apache.lucene.search.DocIdSetIterator; @@ -29,6 +28,7 @@ import org.apache.lucene.util.DocIdSetBuilder; import org.apache.lucene.util.FixedBitSet; import org.opensearch.common.io.PathUtils; +import org.opensearch.common.lucene.Lucene; import org.opensearch.knn.common.KNNConstants; import org.opensearch.knn.index.KNNSettings; import org.opensearch.knn.index.SpaceType; @@ -203,7 +203,7 @@ private int[] bitSetToIntArray(final BitSet bitSet) { private Map doANNSearch(final LeafReaderContext context, final BitSet filterIdsBitSet, final int cardinality) throws IOException { - SegmentReader reader = (SegmentReader) FilterLeafReader.unwrap(context.reader()); + final SegmentReader reader = Lucene.segmentReader(context.reader()); String directory = ((FSDirectory) FilterDirectory.unwrap(reader.directory())).getDirectory().toString(); FieldInfo fieldInfo = reader.getFieldInfos().fieldInfo(knnQuery.getField()); @@ -394,7 +394,7 @@ private Map doExactSearch(final LeafReaderContext leafReaderCont } private KNNIterator getFilteredKNNIterator(final LeafReaderContext leafReaderContext, final BitSet filterIdsBitSet) throws IOException { - final SegmentReader reader = (SegmentReader) FilterLeafReader.unwrap(leafReaderContext.reader()); + final SegmentReader reader = Lucene.segmentReader(leafReaderContext.reader()); final FieldInfo fieldInfo = reader.getFieldInfos().fieldInfo(knnQuery.getField()); final BinaryDocValues values = DocValues.getBinary(leafReaderContext.reader(), fieldInfo.getName()); final SpaceType spaceType = getSpaceType(fieldInfo); diff --git a/src/test/java/org/opensearch/knn/index/SegmentReplicationIT.java b/src/test/java/org/opensearch/knn/index/SegmentReplicationIT.java new file mode 100644 index 000000000..7c15b00a0 --- /dev/null +++ b/src/test/java/org/opensearch/knn/index/SegmentReplicationIT.java @@ -0,0 +1,94 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.knn.index; + +import lombok.SneakyThrows; +import lombok.extern.log4j.Log4j2; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.junit.Assert; +import org.opensearch.client.Response; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.knn.KNNRestTestCase; +import org.opensearch.knn.KNNResult; + +import java.util.List; + +/** + * This IT class contains will contain special cases of IT for segment replication behavior. + * All the index created in this test will have replication type SEGMENT, number of replicas: 1 and should be run on + * at-least 2 node configuration. + */ +@Log4j2 +public class SegmentReplicationIT extends KNNRestTestCase { + private static final String INDEX_NAME = "segment-replicated-knn-index"; + + @SneakyThrows + public void testSearchOnReplicas_whenIndexHasDeletedDocs_thenSuccess() { + createKnnIndex(INDEX_NAME, getKNNSegmentReplicatedIndexSettings(), createKNNIndexMethodFieldMapping(FIELD_NAME, 2)); + + Float[] vector = { 1.3f, 2.2f }; + int docsInIndex = 10; + + for (int i = 0; i < docsInIndex; i++) { + addKnnDoc(INDEX_NAME, Integer.toString(i), FIELD_NAME, vector); + } + refreshIndex(INDEX_NAME); + int deleteDocs = 5; + for (int i = 0; i < deleteDocs; i++) { + deleteKnnDoc(INDEX_NAME, Integer.toString(i)); + } + refreshIndex(INDEX_NAME); + // sleep for 5sec to ensure data is replicated. I don't have a better way here to know if segments has been + // replicated. + Thread.sleep(5000); + // validate warmup is successful or not. + doKnnWarmup(List.of(INDEX_NAME)); + + XContentBuilder queryBuilder = XContentFactory.jsonBuilder().startObject().startObject("query"); + queryBuilder.startObject("knn"); + queryBuilder.startObject(FIELD_NAME); + queryBuilder.field("vector", vector); + queryBuilder.field("k", docsInIndex); + queryBuilder.endObject().endObject().endObject().endObject(); + + // validate primaries are working + Response searchResponse = performSearch(INDEX_NAME, queryBuilder.toString(), "preference=_primary"); + String responseBody = EntityUtils.toString(searchResponse.getEntity()); + List knnResults = parseSearchResponse(responseBody, FIELD_NAME); + assertEquals(docsInIndex - deleteDocs, knnResults.size()); + + if (ensureMinDataNodesCountForTestingQueriesOnReplica()) { + // validate replicas are working + searchResponse = performSearch(INDEX_NAME, queryBuilder.toString(), "preference=_replica"); + responseBody = EntityUtils.toString(searchResponse.getEntity()); + knnResults = parseSearchResponse(responseBody, FIELD_NAME); + assertEquals(docsInIndex - deleteDocs, knnResults.size()); + } + } + + private boolean ensureMinDataNodesCountForTestingQueriesOnReplica() { + int dataNodeCount = getDataNodeCount(); + if (dataNodeCount <= 1) { + log.warn( + "Not running segment replication tests named: " + + "testSearchOnReplicas_whenIndexHasDeletedDocs_thenSuccess, as data nodes count is not atleast 2. " + + "Actual datanode count : {}", + dataNodeCount + ); + Assert.assertTrue(true); + // making the test successful because we don't want to break already running tests. + return false; + } + return true; + } +} diff --git a/src/testFixtures/java/org/opensearch/knn/KNNRestTestCase.java b/src/testFixtures/java/org/opensearch/knn/KNNRestTestCase.java index 860cd2efa..ebb171c19 100644 --- a/src/testFixtures/java/org/opensearch/knn/KNNRestTestCase.java +++ b/src/testFixtures/java/org/opensearch/knn/KNNRestTestCase.java @@ -235,7 +235,11 @@ protected Response searchExists(String index, ExistsQueryBuilder existsQueryBuil } protected Response performSearch(final String indexName, final String query) throws IOException { - Request request = new Request("POST", "/" + indexName + "/_search"); + return performSearch(indexName, query, ""); + } + + protected Response performSearch(final String indexName, final String query, final String urlParameters) throws IOException { + Request request = new Request("POST", "/" + indexName + "/_search?" + urlParameters); request.setJsonEntity(query); Response response = client().performRequest(request); @@ -667,6 +671,35 @@ protected Settings getKNNDefaultIndexSettings() { return Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0).put("index.knn", true).build(); } + protected Settings getKNNSegmentReplicatedIndexSettings() { + return Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 1) + .put("index.knn", true) + .put("index.replication.type", "SEGMENT") + .build(); + } + + @SneakyThrows + protected int getDataNodeCount() { + Request request = new Request("GET", "_nodes/stats?filter_path=nodes.*.roles"); + + Response response = client().performRequest(request); + assertEquals(RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + String responseBody = EntityUtils.toString(response.getEntity()); + + Map responseMap = createParser(MediaTypeRegistry.getDefaultMediaType().xContent(), responseBody).map(); + Map nodesInfo = (Map) responseMap.get("nodes"); + int dataNodeCount = 0; + for (String key : nodesInfo.keySet()) { + Map> nodeRoles = (Map>) nodesInfo.get(key); + if (nodeRoles.get("roles").contains("data")) { + dataNodeCount++; + } + } + return dataNodeCount; + } + /** * Get Stats from KNN Plugin */