Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial implementation for vectordb #3879

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/packages/autogen-ext/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ langchain = ["langchain_core~= 0.3.3"]
azure = ["azure-core", "azure-identity"]
docker = ["docker~=7.0"]
openai = ["openai>=1.3"]
chromadb = ["chromadb~=0.5.15", "sentence-transformers"]

[tool.hatch.build.targets.wheel]
packages = ["src/autogen_ext"]
Expand Down Expand Up @@ -56,4 +57,4 @@ test = "pytest -n auto"
[tool.mypy]
[[tool.mypy.overrides]]
module = "docker.*"
ignore_missing_imports = true
ignore_missing_imports = true
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from ._chromadb import AsyncChromaVectorDB, ChromaVectorDB
from ._factory import VectorDBFactory

__all__ = ["ChromaVectorDB", "AsyncChromaVectorDB", "VectorDBFactory"]
378 changes: 378 additions & 0 deletions python/packages/autogen-ext/src/autogen_ext/storage/_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,378 @@
from typing import (
Any,
Callable,
List,
Mapping,
Optional,
Protocol,
Sequence,
Tuple,
Union,
runtime_checkable,
)

from pydantic import BaseModel

Metadata = Union[Mapping[str, Any], None]
Vector = Union[Sequence[float], Sequence[int]]
ItemID = Union[str, int]


class Document(BaseModel):
"""Define Document according to autogen 0.4 specifications."""

id: ItemID
content: Optional[str] = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why assuming string?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same representation used in 0.2 and I believe it assumes the content has already been preprocessed to a string format. More complex data types can lead to more complex operations in general. Would you suggest something different?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the vector store can have a payload object, which is usually a json. Unless you know the schema mapping returning string is a problem, maybe we can use dictionary or some other model?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is meant to capture the full payload but only the main text content of the associated document and the transformation between autogen doc and vectordb doc is handled in each implementation. The pydantic model is defined with arbitrary_types_allowed which should allow implementations to unpack additional attributes as needed at the document level instead of the content. Would that work for the scenario you have in mind?

metadata: Optional[Metadata] = None
embedding: Optional[Vector] = None

model_config = {"arbitrary_types_allowed": True}


"""QueryResults is the response from the vector database for a query/queries.
A query is a list containing one string while queries is a list containing multiple strings.
The response is a list of query results, each query result is a list of tuples containing the document and the distance.
"""
QueryResults = List[List[Tuple[Document, float]]]


@runtime_checkable
class AsyncVectorDB(Protocol):
"""
Abstract class for async vector database. A vector database is responsible for storing and retrieving documents.

Attributes:
active_collection: Any | The active collection in the vector database. Make get_collection faster. Default is None.
type: str | The type of the vector database, chroma, pgvector, etc. Default is "".

Methods:
create_collection: Callable[[str, bool, bool], Awaitable[Any]] | Create a collection in the vector database.
get_collection: Callable[[str], Awaitable[Any]] | Get the collection from the vector database.
delete_collection: Callable[[str], Awaitable[Any]] | Delete the collection from the vector database.
insert_docs: Callable[[List[Document], str, bool], Awaitable[None]] | Insert documents into the collection of the vector database.
update_docs: Callable[[List[Document], str], Awaitable[None]] | Update documents in the collection of the vector database.
delete_docs: Callable[[List[ItemID], str], Awaitable[None]] | Delete documents from the collection of the vector database.
retrieve_docs: Callable[[List[str], str, int, float], Awaitable[QueryResults]] | Retrieve documents from the collection of the vector database based on the queries.
get_docs_by_ids: Callable[[List[ItemID], str], Awaitable[List[Document]]] | Retrieve documents from the collection of the vector database based on the ids.
"""

active_collection: Any = None
type: str = ""
embedding_function: Optional[Callable[..., Any]] = None # embeddings = embedding_function(sentences)

async def create_collection(
self,
collection_name: str,
overwrite: bool = False,
get_or_create: bool = True,
**kwargs: Any,
) -> Any:
"""
Create a collection in the vector database.
Case 1. if the collection does not exist, create the collection.
Case 2. the collection exists, if overwrite is True, it will overwrite the collection.
Case 3. the collection exists and overwrite is False, if get_or_create is True, it will get the collection,
otherwise it raise a ValueError.

Args:
collection_name: str | The name of the collection.
overwrite: bool | Whether to overwrite the collection if it exists. Default is False.
get_or_create: bool | Whether to get the collection if it exists. Default is True.
kwargs: Dict[str, Any] | Additional keyword arguments for collection creation (e.g. schema).

Returns:
Any | The collection object.
"""
...

async def get_collection(self, collection_name: Optional[str] = None) -> Any:
"""
Get the collection from the vector database.

Args:
collection_name: Optional[str] | The name of the collection. Default is None.
If None, return the current active collection.

Returns:
Any | The collection object.
"""
...

async def delete_collection(self, collection_name: str) -> Any:
"""
Delete the collection from the vector database.

Args:
collection_name: str | The name of the collection.

Returns:
Any
"""
...

async def insert_docs(
self,
docs: List[Document],
collection_name: Optional[str] = None,
upsert: bool = False,
**kwargs: Any,
) -> None:
"""
Insert documents into the collection of the vector database.

Args:
docs: List[Document] | A list of documents. Each document is a Pydantic Document model.
collection_name: Optional[str] | The name of the collection. Default is None.
upsert: bool | Whether to update the document if it exists. Default is False.
kwargs: Dict[str, Any] | Additional keyword arguments.

Returns:
None
"""
...

async def update_docs(self, docs: List[Document], collection_name: Optional[str] = None, **kwargs: Any) -> None:
"""
Update documents in the collection of the vector database.

Args:
docs: List[Document] | A list of documents.
collection_name: Optional[str] | The name of the collection. Default is None.
kwargs: Dict[str, Any] | Additional keyword arguments.

Returns:
None
"""
...

async def delete_docs(self, ids: List[ItemID], collection_name: Optional[str] = None, **kwargs: Any) -> None:
"""
Delete documents from the collection of the vector database.

Args:
ids: List[ItemID] | A list of document ids. Each id is a typed `ItemID`.
collection_name: Optional[str] | The name of the collection. Default is None.
kwargs: Dict[str, Any] | Additional keyword arguments.

Returns:
None
"""
...

async def retrieve_docs(
self,
queries: List[str],
collection_name: Optional[str] = None,
n_results: int = 10,
distance_threshold: float = -1,
**kwargs: Any,
) -> QueryResults:
"""
Retrieve documents from the collection of the vector database based on the queries.

Args:
queries: List[str] | A list of queries. Each query is a string.
collection_name: Optional[str] | The name of the collection. Default is None.
n_results: int | The number of relevant documents to return. Default is 10.
distance_threshold: float | The threshold for the distance score, only distance smaller than it will be
returned. Don't filter with it if < 0. Default is -1.
kwargs: Dict[str, Any] | Additional keyword arguments.

Returns:
QueryResults | The query results. Each query result is a list of list of tuples containing the document and
the distance.
"""
...

async def get_docs_by_ids(
self,
ids: Optional[List[ItemID]] = None,
collection_name: Optional[str] = None,
include: Optional[List[str]] = None,
**kwargs: Any,
) -> List[Document]:
"""
Retrieve documents from the collection of the vector database based on the ids.

Args:
ids: Optional[List[ItemID]] | A list of document ids. If None, will return all the documents. Default is None.
collection_name: Optional[str] | The name of the collection. Default is None.
include: Optional[List[str]] | The fields to include. Default is None.
If None, will include ["metadatas", "documents"], ids will always be included. This may differ
depending on the implementation.
kwargs: Dict[str, Any] | Additional keyword arguments.

Returns:
List[Document] | The results.
"""
...


@runtime_checkable
class VectorDB(Protocol):
"""
Abstract class for synchronous vector database. A vector database is responsible for storing and retrieving documents.
For async support, use AsyncVectorDB instead.

Attributes:
active_collection: Any | The active collection in the vector database. Make get_collection faster. Default is None.
type: str | The type of the vector database, chroma, pgvector, etc. Default is "".

Methods:
create_collection: Callable[[str, bool, bool], Any] | Create a collection in the vector database.
get_collection: Callable[[str], Any] | Get the collection from the vector database.
delete_collection: Callable[[str], Any] | Delete the collection from the vector database.
insert_docs: Callable[[List[Document], str, bool], None] | Insert documents into the collection of the vector database.
update_docs: Callable[[List[Document], str], None] | Update documents in the collection of the vector database.
delete_docs: Callable[[List[ItemID], str], None] | Delete documents from the collection of the vector database.
retrieve_docs: Callable[[List[str], str, int, float], QueryResults] | Retrieve documents from the collection of the vector database based on the queries.
get_docs_by_ids: Callable[[List[ItemID], str], List[Document]] | Retrieve documents from the collection of the vector database based on the ids.
"""

active_collection: Any = None
type: str = ""
embedding_function: Optional[Callable[[List[str]], List[List[float]]]] = (
None # embeddings = embedding_function(sentences)
)

def create_collection(
self, collection_name: str, overwrite: bool = False, get_or_create: bool = True, **kwargs: Any
) -> Any:
"""
Create a collection in the vector database.
Case 1. if the collection does not exist, create the collection.
Case 2. the collection exists, if overwrite is True, it will overwrite the collection.
Case 3. the collection exists and overwrite is False, if get_or_create is True, it will get the collection,
otherwise it raise a ValueError.

Args:
collection_name: str | The name of the collection.
overwrite: bool | Whether to overwrite the collection if it exists. Default is False.
get_or_create: bool | Whether to get the collection if it exists. Default is True.

Returns:
Any | The collection object.
"""
...

def get_collection(self, collection_name: Optional[str] = None) -> Any:
"""
Get the collection from the vector database.

Args:
collection_name: Optional[str] | The name of the collection. Default is None.
If None, return the current active collection.

Returns:
Any | The collection object.
"""
...

def delete_collection(self, collection_name: str) -> Any:
"""
Delete the collection from the vector database.

Args:
collection_name: str | The name of the collection.

Returns:
Any
"""
...

def insert_docs(
self,
docs: List[Document],
collection_name: Optional[str] = None,
upsert: bool = False,
**kwargs: Any,
) -> None:
"""
Insert documents into the collection of the vector database.

Args:
docs: List[Document] | A list of documents. Each document is a Pydantic Document model.
collection_name: Optional[str] | The name of the collection. Default is None.
upsert: bool | Whether to update the document if it exists. Default is False.
kwargs: Dict[str, Any] | Additional keyword arguments.

Returns:
None
"""
...

def update_docs(self, docs: List[Document], collection_name: Optional[str] = None, **kwargs: Any) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the most used method is upsert

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the protocol, my understanding is that the idea is to keep it closer in design the DB theory and enforce CRUD like operations. Implementations can add upsert but they probably shouldn't be forced to

"""
Update documents in the collection of the vector database.

Args:
docs: List[Document] | A list of documents.
collection_name: Optional[str] | The name of the collection. Default is None.
kwargs: Dict[str, Any] | Additional keyword arguments.

Returns:
None
"""
...

def delete_docs(self, ids: List[ItemID], collection_name: Optional[str] = None, **kwargs: Any) -> None:
"""
Delete documents from the collection of the vector database.

Args:
ids: List[ItemID] | A list of document ids. Each id is a typed `ItemID`.
collection_name: Optional[str] | The name of the collection. Default is None.
kwargs: Dict[str, Any] | Additional keyword arguments.

Returns:
None
"""
...

def retrieve_docs(
self,
queries: List[str],
collection_name: Optional[str] = None,
n_results: int = 10,
distance_threshold: float = -1,
**kwargs: Any,
) -> QueryResults:
"""
Retrieve documents from the collection of the vector database based on the queries.

Args:
queries: List[str] | A list of queries. Each query is a string.
collection_name: Optional[str] | The name of the collection. Default is None.
n_results: int | The number of relevant documents to return. Default is 10.
distance_threshold: float | The threshold for the distance score, only distance smaller than it will be
returned. Don't filter with it if < 0. Default is -1.
kwargs: Dict[str, Any] | Additional keyword arguments.

Returns:
QueryResults | The query results. Each query result is a list of list of tuples containing the document and
the distance.
"""
...

def get_docs_by_ids(
self,
ids: Optional[List[ItemID]] = None,
collection_name: Optional[str] = None,
include: Optional[List[str]] = None,
**kwargs: Any,
) -> List[Document]:
"""
Retrieve documents from the collection of the vector database based on the ids.

Args:
ids: Optional[List[ItemID]] | A list of document ids. If None, will return all the documents. Default is None.
collection_name: Optional[str] | The name of the collection. Default is None.
include: Optional[List[str]] | The fields to include. Default is None.
If None, will include ["metadatas", "documents"], ids will always be included. This may differ
depending on the implementation.
kwargs: Dict[str, Any] | Additional keyword arguments.

Returns:
List[Document] | The results.
"""
...
Loading
Loading