Skip to content

Commit

Permalink
Enhance MilvusVectorStore with flexible index management (#15058)
Browse files Browse the repository at this point in the history
  • Loading branch information
chetanchoudhary authored Jul 31, 2024
1 parent a93c1b6 commit 734e657
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 73 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Changelog

## [v0.1.22]

### Added

- Introduced a new `IndexManagement` enum to control index creation behavior:
- `NO_VALIDATION`: Skips index validation and creation.
- `CREATE_IF_NOT_EXISTS`: Creates the index only if it doesn't exist (default behavior).
- Added a new `index_management` parameter to the `MilvusVectorStore` constructor, allowing users to specify the desired index management strategy.

### Changed

- Updated the collection creation and index management logic in `_create_hybrid_index` to ensure proper sequence of operations.
- Refactored index creation logic to respect the new `index_management` setting.

### Fixed

- Resolved an issue where the collection object was potentially being recreated after index creation, which could lead to inconsistencies.
- Ensured that the collection is created before any index operations are attempted in hybrid mode.

### Improved

- Streamlined the process of collection creation and index management for hybrid (dense and sparse) vector operations.
- Provided more control over index creation behavior through the `index_management` parameter.

### Developer Notes

- The `_create_index_if_required` method now checks the `index_management` setting before proceeding with index creation or validation.
- The `_create_index_if_required` method now passes `self.collection_name` to `_create_hybrid_index` when in sparse mode.
- No changes to the existing public API of `MilvusVectorStore` were made; the `index_management` parameter is an addition to the constructor.

### Upgrade Notes

- This change is backwards compatible. Existing code using `MilvusVectorStore` without specifying `index_management` will default to the previous behavior (`CREATE_IF_NOT_EXISTS`).
- Users can now optionally specify an `index_management` strategy when initializing `MilvusVectorStore` to control index creation behavior.
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from llama_index.vector_stores.milvus.base import MilvusVectorStore
from llama_index.vector_stores.milvus.base import MilvusVectorStore, IndexManagement

__all__ = ["MilvusVectorStore"]
__all__ = ["MilvusVectorStore", "IndexManagement"]
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import logging
from typing import Any, Dict, List, Optional, Union
from enum import Enum


import pymilvus # noqa
from llama_index.core.bridge.pydantic import Field, PrivateAttr
Expand Down Expand Up @@ -47,6 +49,13 @@
RRFRanker = None


class IndexManagement(Enum):
"""Enumeration representing the supported index management operations."""

NO_VALIDATION = "no_validation"
CREATE_IF_NOT_EXISTS = "create_if_not_exists"


def _to_milvus_filter(
standard_filters: MetadataFilters, scalar_filters: ScalarMetadataFilters = None
) -> str:
Expand Down Expand Up @@ -133,6 +142,7 @@ class MilvusVectorStore(BasePydanticVectorStore):
These weights are used to adjust the importance of the dense and sparse components of the embeddings
in the hybrid retrieval process.
Defaults to an empty dictionary, implying that the ranker will operate with its predefined default settings.
index_managemen (IndexManagement): Specifies the index management strategy to use. Defaults to "create_if_not_exists".
Raises:
ImportError: Unable to import `pymilvus`.
Expand Down Expand Up @@ -181,6 +191,7 @@ class MilvusVectorStore(BasePydanticVectorStore):
sparse_embedding_function: Any
hybrid_ranker: str
hybrid_ranker_params: dict = {}
index_management: IndexManagement = IndexManagement.CREATE_IF_NOT_EXISTS

_milvusclient: MilvusClient = PrivateAttr()
_collection: Any = PrivateAttr()
Expand All @@ -205,6 +216,7 @@ def __init__(
sparse_embedding_function: Optional[BaseSparseEmbeddingFunction] = None,
hybrid_ranker: str = "RRFRanker",
hybrid_ranker_params: dict = {},
index_management: IndexManagement = IndexManagement.CREATE_IF_NOT_EXISTS,
**kwargs: Any,
) -> None:
"""Init params."""
Expand All @@ -224,6 +236,7 @@ def __init__(
sparse_embedding_function=sparse_embedding_function,
hybrid_ranker=hybrid_ranker,
hybrid_ranker_params=hybrid_ranker_params,
index_management=index_management,
)

# Select the similarity metric
Expand Down Expand Up @@ -523,10 +536,11 @@ def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResul
sparse_search_params = {"metric_type": "IP"}

sparse_req = AnnSearchRequest(
[sparse_emb],
self.sparse_embedding_field,
sparse_search_params,
data=[sparse_emb],
anns_field=self.sparse_embedding_field,
param=sparse_search_params,
limit=query.similarity_top_k,
expr=string_expr, # Apply metadata filters to sparse search
)

dense_search_params = {
Expand All @@ -535,10 +549,11 @@ def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResul
}
dense_emb = query.query_embedding
dense_req = AnnSearchRequest(
[dense_emb],
self.embedding_field,
dense_search_params,
data=[dense_emb],
anns_field=self.embedding_field,
param=dense_search_params,
limit=query.similarity_top_k,
expr=string_expr, # Apply metadata filters to dense search
)
ranker = None

Expand Down Expand Up @@ -602,72 +617,112 @@ def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResul

return VectorStoreQueryResult(nodes=nodes, similarities=similarities, ids=ids)

def _create_index_if_required(self, force: bool = False) -> None:
# This helper method is introduced to allow the index to be created
# both in the constructor and in the `add` method. The `force` flag is
# provided to ensure that the index is created in the constructor even
# if self.overwrite is false. In the `add` method, the index is
# recreated only if self.overwrite is true.
def _create_index_if_required(self) -> None:
"""
Create or validate the index based on the index management strategy.
This method decides whether to create or validate the index based on
the specified index management strategy and the current state of the collection.
"""
if self.index_management == IndexManagement.NO_VALIDATION:
return

if self.enable_sparse is False:
if (self._collection.has_index() and self.overwrite) or force:
self._create_dense_index()
else:
self._create_hybrid_index(self.collection_name)

def _create_dense_index(self) -> None:
"""
Create or recreate the dense vector index.
This method handles the creation of the dense vector index based on
the current index management strategy and the state of the collection.
"""
index_exists = self._collection.has_index()

if (
not index_exists
and self.index_management == IndexManagement.CREATE_IF_NOT_EXISTS
) or (index_exists and self.overwrite):
if index_exists:
self._collection.release()
self._collection.drop_index()
base_params: Dict[str, Any] = self.index_config.copy()
index_type: str = base_params.pop("index_type", "FLAT")
index_params: Dict[str, Union[str, Dict[str, Any]]] = {
"params": base_params,
"metric_type": self.similarity_metric,
"index_type": index_type,
}
self._collection.create_index(
self.embedding_field, index_params=index_params
)
self._collection.load()
else:
if (
self._collection.has_index(index_name=self.embedding_field)
and self.overwrite
) or force:
if self._collection.has_index(index_name=self.embedding_field) is True:
self._collection.release()
self._collection.drop_index(index_name=self.embedding_field)
if (
self._collection.has_index(index_name=self.sparse_embedding_field)
is True
):
self._collection.drop_index(index_name=self.sparse_embedding_field)
self._create_hybrid_index(self.collection_name)
self._collection.load()

def _create_hybrid_index(self, collection_name):
schema = MilvusClient.create_schema(auto_id=False, enable_dynamic_field=True)

schema.add_field(
field_name="id",
datatype=DataType.VARCHAR,
max_length=65535,
is_primary=True,
)
schema.add_field(
field_name=self.embedding_field,
datatype=DataType.FLOAT_VECTOR,
dim=self.dim,
)
schema.add_field(
field_name=self.sparse_embedding_field,
datatype=DataType.SPARSE_FLOAT_VECTOR,
)
self._collection = Collection(
collection_name, schema=schema, using=self._milvusclient._using

base_params: Dict[str, Any] = self.index_config.copy()
index_type: str = base_params.pop("index_type", "FLAT")
index_params: Dict[str, Union[str, Dict[str, Any]]] = {
"params": base_params,
"metric_type": self.similarity_metric,
"index_type": index_type,
}
self._collection.create_index(
self.embedding_field, index_params=index_params
)
self._collection.load()

def _create_hybrid_index(self, collection_name: str) -> None:
"""
Create or recreate the hybrid (dense and sparse) vector index.
Args:
collection_name (str): The name of the collection to create the index for.
"""
# Check if the collection exists, if not, create it
if collection_name not in self._milvusclient.list_collections():
schema = MilvusClient.create_schema(
auto_id=False, enable_dynamic_field=True
)
schema.add_field(
field_name="id",
datatype=DataType.VARCHAR,
max_length=65535,
is_primary=True,
)
schema.add_field(
field_name=self.embedding_field,
datatype=DataType.FLOAT_VECTOR,
dim=self.dim,
)
schema.add_field(
field_name=self.sparse_embedding_field,
datatype=DataType.SPARSE_FLOAT_VECTOR,
)
self._milvusclient.create_collection(
collection_name=collection_name, schema=schema
)

# Initialize or get the collection
self._collection = Collection(collection_name, using=self._milvusclient._using)

dense_index_exists = self._collection.has_index(index_name=self.embedding_field)
sparse_index_exists = self._collection.has_index(
index_name=self.sparse_embedding_field
)

sparse_index = {"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "IP"}
self._collection.create_index(self.sparse_embedding_field, sparse_index)
base_params = self.index_config.copy()
index_type = base_params.pop("index_type", "FLAT")
dense_index = {
"params": base_params,
"metric_type": self.similarity_metric,
"index_type": index_type,
}
self._collection.create_index(self.embedding_field, dense_index)
if (
(not dense_index_exists or not sparse_index_exists)
and self.index_management == IndexManagement.CREATE_IF_NOT_EXISTS
or (dense_index_exists and sparse_index_exists and self.overwrite)
):
if dense_index_exists:
self._collection.release()
self._collection.drop_index(index_name=self.embedding_field)
if sparse_index_exists:
self._collection.drop_index(index_name=self.sparse_embedding_field)

# Create sparse index
sparse_index = {"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "IP"}
self._collection.create_index(self.sparse_embedding_field, sparse_index)

# Create dense index
base_params = self.index_config.copy()
index_type = base_params.pop("index_type", "FLAT")
dense_index = {
"params": base_params,
"metric_type": self.similarity_metric,
"index_type": index_type,
}
self._collection.create_index(self.embedding_field, dense_index)

self._collection.load()
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ exclude = ["**/BUILD"]
license = "MIT"
name = "llama-index-vector-stores-milvus"
readme = "README.md"
version = "0.1.21"
version = "0.1.22"

[tool.poetry.dependencies]
python = ">=3.8.1,<4.0"
Expand Down

0 comments on commit 734e657

Please sign in to comment.