From 8ae1a4faed5d6ac6bd0ff7d9f643e150196ea2e0 Mon Sep 17 00:00:00 2001 From: Ismail Pelaseyed Date: Sat, 13 Jan 2024 16:41:13 -0800 Subject: [PATCH 1/7] Add dependencies --- requirements.txt | 39 ++++++++++++++++ service/vector_database.py | 94 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 service/vector_database.py diff --git a/requirements.txt b/requirements.txt index c12ad9cb..de18bf7b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,27 +1,66 @@ +aiohttp==3.9.1 +aiosignal==1.3.1 annotated-types==0.6.0 anyio==4.2.0 +attrs==23.2.0 +Authlib==1.3.0 +backoff==2.2.1 black==23.12.1 +certifi==2023.11.17 +cffi==1.16.0 +charset-normalizer==3.3.2 click==8.1.7 +cohere==4.42 +cryptography==41.0.7 +dnspython==2.4.2 fastapi==0.109.0 +fastavro==1.9.3 +frozenlist==1.4.1 +grpcio==1.60.0 +grpcio-tools==1.60.0 h11==0.14.0 +h2==4.1.0 +hpack==4.0.0 +httpcore==1.0.2 httptools==0.6.1 +httpx==0.26.0 +hyperframe==6.0.1 idna==3.6 +importlib-metadata==6.11.0 +loguru==0.7.2 +multidict==6.0.4 mypy-extensions==1.0.0 +numpy==1.26.3 packaging==23.2 pathspec==0.12.1 +pinecone-client==2.2.4 platformdirs==4.1.0 +portalocker==2.8.2 +protobuf==4.25.2 +pycparser==2.21 pydantic==2.5.3 pydantic_core==2.14.6 +python-dateutil==2.8.2 python-decouple==3.8 python-dotenv==1.0.0 PyYAML==6.0.1 +qdrant-client==1.7.0 +requests==2.31.0 ruff==0.1.13 +setuptools==69.0.3 +six==1.16.0 sniffio==1.3.0 starlette==0.35.1 toml==0.10.2 +tqdm==4.66.1 typing_extensions==4.9.0 +urllib3==1.26.18 uvicorn==0.25.0 uvloop==0.19.0 +validators==0.22.0 vulture==2.10 watchfiles==0.21.0 +weaviate-client==3.26.0 websockets==12.0 +yarl==1.9.4 +zipp==3.17.0 diff --git a/service/vector_database.py b/service/vector_database.py new file mode 100644 index 00000000..e134d43a --- /dev/null +++ b/service/vector_database.py @@ -0,0 +1,94 @@ +from abc import ABC, abstractmethod +from typing import Any, List + +import pinecone +from decouple import config +from numpy import ndarray + + +class VectorDBService(ABC): + def __init__(self, index_name: str, dimension: int, filter_id: str = None): + self.index_name = index_name + self.filter_id = filter_id + self.dimension = dimension + + @abstractmethod + def upsert(): + pass + + @abstractmethod + def query(): + pass + + @abstractmethod + def rerank(self, query: str, documents: list, top_n: int = 3): + pass + + +class PineconeVectorService(VectorDBService): + def __init__(self, index_name: str, dimension: int, filter_id: str = None): + super().__init__( + index_name=index_name, dimension=dimension, filter_id=filter_id + ) + pinecone.init( + api_key=config("PINECONE_API_KEY"), + environment=config("PINECONE_ENVIRONMENT"), + ) + # Create a new vector index if it doesn't + # exist dimensions should be passed in the arguments + if index_name not in pinecone.list_indexes(): + pinecone.create_index( + name=index_name, metric="cosine", shards=1, dimension=dimension + ) + self.index = pinecone.Index(index_name=self.index_name) + + def upsert(self, vectors: ndarray): + self.index.upsert(vectors=vectors, namespace=self.filter_id) + + def query(self, queries: List[ndarray], top_k: int, include_metadata: bool = True): + results = self.index.query( + queries=queries, + top_k=top_k, + include_metadata=include_metadata, + namespace=self.filter_id, + ) + return results["results"][0]["matches"] + + def rerank(self, query: str, documents: Any, top_n: int = 3): + from cohere import Client + + api_key = config("COHERE_API_KEY") + if not api_key: + raise ValueError("API key for Cohere is not present.") + cohere_client = Client(api_key=api_key) + docs = [ + ( + f"{doc['metadata']['content']}\n\n" + f"page number: {doc['metadata']['page_label']}" + ) + for doc in documents + ] + re_ranked = cohere_client.rerank( + model="rerank-multilingual-v2.0", + query=query, + documents=docs, + top_n=top_n, + ).results + results = [] + for obj in re_ranked: + results.append(obj.document["text"]) + return results + + +def get_vector_service( + provider: str, index_name: str, filter_id: str = None, dimension: int = 384 +): + services = { + "PINECONE": PineconeVectorService, + # Add other providers here + # e.g "weaviate": WeaviateVectorService, + } + service = services.get(provider) + if service is None: + raise ValueError(f"Unsupported provider: {provider}") + return service(index_name=index_name, filter_id=filter_id, dimension=dimension) From 98104a1dedad9f01fd67c49b29f4fc0709542779 Mon Sep 17 00:00:00 2001 From: Ismail Pelaseyed Date: Sun, 14 Jan 2024 10:02:11 -0800 Subject: [PATCH 2/7] Add ingest router and vectorstore class --- .env.example | 2 ++ api/__init__.py | 0 api/ingest.py | 29 +++++++++++++++++++++++++++++ main.py | 27 +++------------------------ requirements.txt | 23 +++++++++++++++++++++++ router.py | 8 ++++++++ service/__init__.py | 0 service/embedding.py | 24 ++++++++++++++++++++++++ service/vector_database.py | 4 ++-- 9 files changed, 91 insertions(+), 26 deletions(-) create mode 100644 .env.example create mode 100644 api/__init__.py create mode 100644 api/ingest.py create mode 100644 router.py create mode 100644 service/__init__.py create mode 100644 service/embedding.py diff --git a/.env.example b/.env.example new file mode 100644 index 00000000..b58da339 --- /dev/null +++ b/.env.example @@ -0,0 +1,2 @@ +API_BASE_URL=https://rag.superagent.sh +COHERE_API_KEY= \ No newline at end of file diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/ingest.py b/api/ingest.py new file mode 100644 index 00000000..0a55f278 --- /dev/null +++ b/api/ingest.py @@ -0,0 +1,29 @@ +from typing import Dict, List +from enum import Enum +from pydantic import BaseModel +from fastapi import APIRouter + + +router = APIRouter() + + +class DatabaseType(Enum): + qdrant = "qdrant" + pinecone = "pinecone" + weaviate = "weaviate" + astra = "astra" + + +class VectorDatabase(BaseModel): + type: DatabaseType + config: Dict + + +class RequestPayload(BaseModel): + files: List[str] + vector_database: VectorDatabase + + +@router.post("/ingest") +async def ingest(payload: RequestPayload) -> Dict: + return payload.model_dump() diff --git a/main.py b/main.py index a070672d..115a2991 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,8 @@ -from typing import Dict, List -from enum import Enum -from pydantic import BaseModel from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from decouple import config +from router import router + app = FastAPI( title="SuperRag", @@ -21,24 +20,4 @@ allow_headers=["*"], ) - -class DatabaseType(Enum): - qdrant = "qdrant" - pinecone = "pinecone" - weaviate = "weaviate" - astra = "astra" - - -class VectorDatabase(BaseModel): - type: DatabaseType - config: Dict - - -class RequestPayload(BaseModel): - files: List - vector_database: VectorDatabase - - -@app.post("/ingest") -async def ingest(payload: RequestPayload) -> Dict: - return payload.model_dump() +app.include_router(router) diff --git a/requirements.txt b/requirements.txt index de18bf7b..a859e1de 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ anyio==4.2.0 attrs==23.2.0 Authlib==1.3.0 backoff==2.2.1 +beautifulsoup4==4.12.2 black==23.12.1 certifi==2023.11.17 cffi==1.16.0 @@ -12,10 +13,15 @@ charset-normalizer==3.3.2 click==8.1.7 cohere==4.42 cryptography==41.0.7 +dataclasses-json==0.6.3 +Deprecated==1.2.14 +distro==1.9.0 dnspython==2.4.2 fastapi==0.109.0 fastavro==1.9.3 frozenlist==1.4.1 +fsspec==2023.12.2 +greenlet==3.0.3 grpcio==1.60.0 grpcio-tools==1.60.0 h11==0.14.0 @@ -27,11 +33,19 @@ httpx==0.26.0 hyperframe==6.0.1 idna==3.6 importlib-metadata==6.11.0 +joblib==1.3.2 +llama-index==0.9.30 loguru==0.7.2 +marshmallow==3.20.2 multidict==6.0.4 mypy-extensions==1.0.0 +nest-asyncio==1.5.8 +networkx==3.2.1 +nltk==3.8.1 numpy==1.26.3 +openai==1.7.2 packaging==23.2 +pandas==2.1.4 pathspec==0.12.1 pinecone-client==2.2.4 platformdirs==4.1.0 @@ -43,17 +57,25 @@ pydantic_core==2.14.6 python-dateutil==2.8.2 python-decouple==3.8 python-dotenv==1.0.0 +pytz==2023.3.post1 PyYAML==6.0.1 qdrant-client==1.7.0 +regex==2023.12.25 requests==2.31.0 ruff==0.1.13 setuptools==69.0.3 six==1.16.0 sniffio==1.3.0 +soupsieve==2.5 +SQLAlchemy==2.0.25 starlette==0.35.1 +tenacity==8.2.3 +tiktoken==0.5.2 toml==0.10.2 tqdm==4.66.1 +typing-inspect==0.9.0 typing_extensions==4.9.0 +tzdata==2023.4 urllib3==1.26.18 uvicorn==0.25.0 uvloop==0.19.0 @@ -62,5 +84,6 @@ vulture==2.10 watchfiles==0.21.0 weaviate-client==3.26.0 websockets==12.0 +wrapt==1.16.0 yarl==1.9.4 zipp==3.17.0 diff --git a/router.py b/router.py new file mode 100644 index 00000000..436fa0f5 --- /dev/null +++ b/router.py @@ -0,0 +1,8 @@ +from fastapi import APIRouter + +from api import ingest + +router = APIRouter() +api_prefix = "/api/v1" + +router.include_router(ingest.router, tags=["Ingest"], prefix=api_prefix) diff --git a/service/__init__.py b/service/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/service/embedding.py b/service/embedding.py new file mode 100644 index 00000000..193d74a8 --- /dev/null +++ b/service/embedding.py @@ -0,0 +1,24 @@ +from typing import List +from fastapi import UploadFile +from tempfile import NamedTemporaryFile +from llama_index import Document, SimpleDirectoryReader +from llama_index.node_parser import SimpleNodeParser + + +class EmbeddingService: + def __init__(self, files: List[UploadFile]): + self.files = files + + async def generate_chunks(self): + documents = [] + for file in self.files: + with NamedTemporaryFile( + suffix=file.filename.split(".")[-1], delete=True + ) as temp_file: + content = await file.read() + temp_file.write(content) + temp_file.flush() + reader = SimpleDirectoryReader(input_files=[temp_file.name]) + docs = reader.load_data() + documents.append(docs) + return documents diff --git a/service/vector_database.py b/service/vector_database.py index e134d43a..bded8a1d 100644 --- a/service/vector_database.py +++ b/service/vector_database.py @@ -6,7 +6,7 @@ from numpy import ndarray -class VectorDBService(ABC): +class VectorService(ABC): def __init__(self, index_name: str, dimension: int, filter_id: str = None): self.index_name = index_name self.filter_id = filter_id @@ -25,7 +25,7 @@ def rerank(self, query: str, documents: list, top_n: int = 3): pass -class PineconeVectorService(VectorDBService): +class PineconeVectorService(VectorService): def __init__(self, index_name: str, dimension: int, filter_id: str = None): super().__init__( index_name=index_name, dimension=dimension, filter_id=filter_id From a3999edf1c8996a90524c39b211929ba2ce37fd6 Mon Sep 17 00:00:00 2001 From: Ismail Pelaseyed Date: Mon, 15 Jan 2024 00:17:21 -0800 Subject: [PATCH 3/7] Convert text into document --- api/ingest.py | 28 ++++++---------------------- models/file.py | 18 ++++++++++++++++++ models/ingest.py | 10 ++++++++++ models/vector_database.py | 15 +++++++++++++++ requirements.txt | 1 + service/embedding.py | 25 ++++++++++++++++++------- 6 files changed, 68 insertions(+), 29 deletions(-) create mode 100644 models/file.py create mode 100644 models/ingest.py create mode 100644 models/vector_database.py diff --git a/api/ingest.py b/api/ingest.py index 0a55f278..cabfbfda 100644 --- a/api/ingest.py +++ b/api/ingest.py @@ -1,29 +1,13 @@ -from typing import Dict, List -from enum import Enum -from pydantic import BaseModel +from typing import Dict from fastapi import APIRouter - +from models.ingest import RequestPayload +from service.embedding import EmbeddingService router = APIRouter() -class DatabaseType(Enum): - qdrant = "qdrant" - pinecone = "pinecone" - weaviate = "weaviate" - astra = "astra" - - -class VectorDatabase(BaseModel): - type: DatabaseType - config: Dict - - -class RequestPayload(BaseModel): - files: List[str] - vector_database: VectorDatabase - - @router.post("/ingest") async def ingest(payload: RequestPayload) -> Dict: - return payload.model_dump() + embeddings = EmbeddingService(files=payload.files, index_name=payload.index_name) + documents = await embeddings.generate_documents() + return {"success": True, "data": documents} diff --git a/models/file.py b/models/file.py new file mode 100644 index 00000000..83cf2d8f --- /dev/null +++ b/models/file.py @@ -0,0 +1,18 @@ +from typing import Dict, List +from enum import Enum +from pydantic import BaseModel + + +class FileType(Enum): + pdf = "PDF" + docx = "DOCX" + txt = "TXT" + pptx = "PPTX" + csv = "CSV" + xlsx = "XLSX" + md = "MARKDOWN" + + +class File(BaseModel): + type: FileType + url: str diff --git a/models/ingest.py b/models/ingest.py new file mode 100644 index 00000000..a02c8037 --- /dev/null +++ b/models/ingest.py @@ -0,0 +1,10 @@ +from typing import List +from pydantic import BaseModel +from models.file import File +from models.vector_database import VectorDatabase + + +class RequestPayload(BaseModel): + files: List[File] + vector_database: VectorDatabase + index_name: str diff --git a/models/vector_database.py b/models/vector_database.py new file mode 100644 index 00000000..ff4c0cc2 --- /dev/null +++ b/models/vector_database.py @@ -0,0 +1,15 @@ +from typing import Dict +from enum import Enum +from pydantic import BaseModel + + +class DatabaseType(Enum): + qdrant = "qdrant" + pinecone = "pinecone" + weaviate = "weaviate" + astra = "astra" + + +class VectorDatabase(BaseModel): + type: DatabaseType + config: Dict diff --git a/requirements.txt b/requirements.txt index a859e1de..9354c1a1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -54,6 +54,7 @@ protobuf==4.25.2 pycparser==2.21 pydantic==2.5.3 pydantic_core==2.14.6 +pypdf==3.17.4 python-dateutil==2.8.2 python-decouple==3.8 python-dotenv==1.0.0 diff --git a/service/embedding.py b/service/embedding.py index 193d74a8..ddd81b93 100644 --- a/service/embedding.py +++ b/service/embedding.py @@ -1,22 +1,33 @@ +import requests + from typing import List from fastapi import UploadFile from tempfile import NamedTemporaryFile from llama_index import Document, SimpleDirectoryReader from llama_index.node_parser import SimpleNodeParser +from models.file import File class EmbeddingService: - def __init__(self, files: List[UploadFile]): + def __init__(self, files: List[File], index_name: str): self.files = files + self.index_name = index_name + + def _get_datasource_suffix(self, type: str) -> str: + suffixes = {"TXT": ".txt", "PDF": ".pdf", "MARKDOWN": ".md"} + try: + return suffixes[type] + except KeyError: + raise ValueError("Unsupported datasource type") - async def generate_chunks(self): + async def generate_documents(self): documents = [] for file in self.files: - with NamedTemporaryFile( - suffix=file.filename.split(".")[-1], delete=True - ) as temp_file: - content = await file.read() - temp_file.write(content) + print(file.type.value) + suffix = self._get_datasource_suffix(file.type.value) + with NamedTemporaryFile(suffix=suffix, delete=True) as temp_file: + response = requests.get(url=file.url) + temp_file.write(response.content) temp_file.flush() reader = SimpleDirectoryReader(input_files=[temp_file.name]) docs = reader.load_data() From 51a911f08c9eb007e548e89f5f6ff8731ffe9452 Mon Sep 17 00:00:00 2001 From: Ismail Pelaseyed Date: Tue, 16 Jan 2024 09:42:36 -0800 Subject: [PATCH 4/7] Finalize qdrant retrieval --- .env.example | 3 +- api/ingest.py | 12 ++- api/query.py | 17 ++++ models/query.py | 14 +++ router.py | 3 +- service/embedding.py | 58 +++++++++++-- service/vector_database.py | 172 +++++++++++++++++++++++++++---------- 7 files changed, 225 insertions(+), 54 deletions(-) create mode 100644 api/query.py create mode 100644 models/query.py diff --git a/.env.example b/.env.example index b58da339..c5e00580 100644 --- a/.env.example +++ b/.env.example @@ -1,2 +1,3 @@ API_BASE_URL=https://rag.superagent.sh -COHERE_API_KEY= \ No newline at end of file +COHERE_API_KEY= +HUGGINGFACE_API_KEY= \ No newline at end of file diff --git a/api/ingest.py b/api/ingest.py index cabfbfda..43ade6b3 100644 --- a/api/ingest.py +++ b/api/ingest.py @@ -8,6 +8,12 @@ @router.post("/ingest") async def ingest(payload: RequestPayload) -> Dict: - embeddings = EmbeddingService(files=payload.files, index_name=payload.index_name) - documents = await embeddings.generate_documents() - return {"success": True, "data": documents} + embedding_service = EmbeddingService( + files=payload.files, + index_name=payload.index_name, + vector_credentials=payload.vector_database, + ) + documents = await embedding_service.generate_documents() + chunks = await embedding_service.generate_chunks(documents=documents) + await embedding_service.generate_embeddings(nodes=chunks) + return {"success": True} diff --git a/api/query.py b/api/query.py new file mode 100644 index 00000000..69d4ebbd --- /dev/null +++ b/api/query.py @@ -0,0 +1,17 @@ +from typing import Dict +from fastapi import APIRouter +from models.query import RequestPayload, ResponsePayload +from service.vector_database import get_vector_service, VectorService + +router = APIRouter() + + +@router.post("/query", response_model=ResponsePayload) +async def query(payload: RequestPayload): + vector_service: VectorService = get_vector_service( + index_name=payload.index_name, credentials=payload.vector_database + ) + chunks = await vector_service.query(input=payload.input, top_k=4) + documents = await vector_service.convert_to_dict(points=chunks) + results = await vector_service.rerank(query=payload.input, documents=documents) + return {"success": True, "data": results} diff --git a/models/query.py b/models/query.py new file mode 100644 index 00000000..ed88c932 --- /dev/null +++ b/models/query.py @@ -0,0 +1,14 @@ +from pydantic import BaseModel +from typing import List +from models.vector_database import VectorDatabase + + +class RequestPayload(BaseModel): + input: str + vector_database: VectorDatabase + index_name: str + + +class ResponsePayload(BaseModel): + success: bool + data: List diff --git a/router.py b/router.py index 436fa0f5..f26259ea 100644 --- a/router.py +++ b/router.py @@ -1,8 +1,9 @@ from fastapi import APIRouter -from api import ingest +from api import ingest, query router = APIRouter() api_prefix = "/api/v1" router.include_router(ingest.router, tags=["Ingest"], prefix=api_prefix) +router.include_router(query.router, tags=["Query"], prefix=api_prefix) diff --git a/service/embedding.py b/service/embedding.py index ddd81b93..781b3d5b 100644 --- a/service/embedding.py +++ b/service/embedding.py @@ -1,17 +1,21 @@ import requests +import asyncio -from typing import List -from fastapi import UploadFile +from typing import Any, List, Union from tempfile import NamedTemporaryFile from llama_index import Document, SimpleDirectoryReader from llama_index.node_parser import SimpleNodeParser +from litellm import aembedding from models.file import File +from decouple import config +from service.vector_database import get_vector_service class EmbeddingService: - def __init__(self, files: List[File], index_name: str): + def __init__(self, files: List[File], index_name: str, vector_credentials: dict): self.files = files self.index_name = index_name + self.vector_credentials = vector_credentials def _get_datasource_suffix(self, type: str) -> str: suffixes = {"TXT": ".txt", "PDF": ".pdf", "MARKDOWN": ".md"} @@ -20,10 +24,9 @@ def _get_datasource_suffix(self, type: str) -> str: except KeyError: raise ValueError("Unsupported datasource type") - async def generate_documents(self): + async def generate_documents(self) -> List[Document]: documents = [] for file in self.files: - print(file.type.value) suffix = self._get_datasource_suffix(file.type.value) with NamedTemporaryFile(suffix=suffix, delete=True) as temp_file: response = requests.get(url=file.url) @@ -31,5 +34,48 @@ async def generate_documents(self): temp_file.flush() reader = SimpleDirectoryReader(input_files=[temp_file.name]) docs = reader.load_data() - documents.append(docs) + for doc in docs: + doc.metadata["file_url"] = file.url + documents.extend(docs) return documents + + async def generate_chunks( + self, documents: List[Document] + ) -> List[Union[Document, None]]: + parser = SimpleNodeParser.from_defaults(chunk_size=350, chunk_overlap=20) + nodes = parser.get_nodes_from_documents(documents, show_progress=False) + return nodes + + async def generate_embeddings( + self, + nodes: List[Union[Document, None]], + ) -> List[tuple[str, list, dict[str, Any]]]: + async def generate_embedding(node): + if node is not None: + vectors = [] + embedding_object = await aembedding( + model="huggingface/intfloat/multilingual-e5-large", + input=node.text, + api_key=config("HUGGINGFACE_API_KEY"), + ) + for vector in embedding_object.data: + if vector["object"] == "embedding": + vectors.append(vector["embedding"]) + embedding = ( + node.id_, + vectors, + { + **node.metadata, + "content": node.text, + }, + ) + return embedding + + tasks = [generate_embedding(node) for node in nodes] + embeddings = await asyncio.gather(*tasks) + vector_service = get_vector_service( + index_name=self.index_name, credentials=self.vector_credentials + ) + await vector_service.upsert(embeddings=[e for e in embeddings if e is not None]) + + return [e for e in embeddings if e is not None] diff --git a/service/vector_database.py b/service/vector_database.py index bded8a1d..a0d040a1 100644 --- a/service/vector_database.py +++ b/service/vector_database.py @@ -1,38 +1,62 @@ -from abc import ABC, abstractmethod -from typing import Any, List - import pinecone + +from abc import ABC, abstractmethod +from typing import Any, List, Type from decouple import config from numpy import ndarray +from litellm import embedding +from qdrant_client import QdrantClient +from qdrant_client.http import models as rest +from models.vector_database import VectorDatabase class VectorService(ABC): - def __init__(self, index_name: str, dimension: int, filter_id: str = None): + def __init__(self, index_name: str, dimension: int, credentials: dict): self.index_name = index_name - self.filter_id = filter_id self.dimension = dimension + self.credentials = credentials @abstractmethod - def upsert(): + async def upsert(): pass @abstractmethod - def query(): + async def query(): pass @abstractmethod - def rerank(self, query: str, documents: list, top_n: int = 3): + async def convert_to_dict(): pass + async def rerank(self, query: str, documents: list, top_n: int = 4): + from cohere import Client + + api_key = config("COHERE_API_KEY") + if not api_key: + raise ValueError("API key for Cohere is not present.") + cohere_client = Client(api_key=api_key) + docs = [doc["content"] for doc in documents] + re_ranked = cohere_client.rerank( + model="rerank-multilingual-v2.0", + query=query, + documents=docs, + top_n=top_n, + ).results + results = [] + for r in re_ranked: + doc = documents[r.index] + results.append(doc) + return results + class PineconeVectorService(VectorService): - def __init__(self, index_name: str, dimension: int, filter_id: str = None): + def __init__(self, index_name: str, dimension: int, credentials: dict): super().__init__( - index_name=index_name, dimension=dimension, filter_id=filter_id + index_name=index_name, dimension=dimension, credentials=credentials ) pinecone.init( - api_key=config("PINECONE_API_KEY"), - environment=config("PINECONE_ENVIRONMENT"), + api_key=credentials["PINECONE_API_KEY"], + environment=credentials["PINECONE_ENVIRONMENT"], ) # Create a new vector index if it doesn't # exist dimensions should be passed in the arguments @@ -42,53 +66,115 @@ def __init__(self, index_name: str, dimension: int, filter_id: str = None): ) self.index = pinecone.Index(index_name=self.index_name) - def upsert(self, vectors: ndarray): - self.index.upsert(vectors=vectors, namespace=self.filter_id) + async def convert_to_dict(self, documents: list): + pass + + async def upsert(self, embeddings: List[tuple[str, list, dict[str, Any]]]): + self.index.upsert(vectors=embeddings) - def query(self, queries: List[ndarray], top_k: int, include_metadata: bool = True): + async def query( + self, queries: List[ndarray], top_k: int, include_metadata: bool = True + ): results = self.index.query( queries=queries, top_k=top_k, include_metadata=include_metadata, - namespace=self.filter_id, ) return results["results"][0]["matches"] - def rerank(self, query: str, documents: Any, top_n: int = 3): - from cohere import Client - api_key = config("COHERE_API_KEY") - if not api_key: - raise ValueError("API key for Cohere is not present.") - cohere_client = Client(api_key=api_key) - docs = [ - ( - f"{doc['metadata']['content']}\n\n" - f"page number: {doc['metadata']['page_label']}" +class QdrantService(VectorService): + def __init__(self, index_name: str, dimension: int, credentials: dict): + super().__init__( + index_name=index_name, dimension=dimension, credentials=credentials + ) + self.client = QdrantClient( + url=credentials["host"], api_key=credentials["api_key"], https=True + ) + collections = self.client.get_collections() + if index_name not in [c.name for c in collections.collections]: + self.client.create_collection( + collection_name=self.index_name, + vectors_config={ + "content": rest.VectorParams( + size=1024, distance=rest.Distance.COSINE + ) + }, + optimizers_config=rest.OptimizersConfigDiff( + indexing_threshold=0, + ), ) - for doc in documents + + async def convert_to_dict(self, points: List[rest.PointStruct]): + docs = [ + { + "content": point.payload.get("content"), + "page_label": point.payload.get("page_label"), + "file_url": point.payload.get("file_url"), + } + for point in points ] - re_ranked = cohere_client.rerank( - model="rerank-multilingual-v2.0", - query=query, - documents=docs, - top_n=top_n, - ).results - results = [] - for obj in re_ranked: - results.append(obj.document["text"]) - return results + return docs + + async def upsert(self, embeddings: List[tuple[str, list, dict[str, Any]]]): + points = [] + + for embedding in embeddings: + points.append( + rest.PointStruct( + id=embedding[0], + vector={"content": embedding[1]}, + payload={**embedding[2]}, + ) + ) + + self.client.upsert(collection_name=self.index_name, wait=True, points=points) + collection_vector_count = self.client.get_collection( + collection_name=self.index_name + ).vectors_count + print(f"Vector count in collection: {collection_vector_count}") + + async def query(self, input: str, top_k: int): + vectors = [] + embedding_object = embedding( + model="huggingface/intfloat/multilingual-e5-large", + input=input, + api_key=config("HUGGINGFACE_API_KEY"), + ) + for vector in embedding_object.data: + if vector["object"] == "embedding": + vectors.append(vector["embedding"]) + search_result = self.client.search( + collection_name=self.index_name, + query_vector=("content", vectors), + limit=top_k, + # query_filter=rest.Filter( + # must=[ + # rest.FieldCondition( + # key="datasource_id", + # match=rest.MatchValue(value=datasource_id), + # ), + # ] + # ), + with_payload=True, + ) + return search_result def get_vector_service( - provider: str, index_name: str, filter_id: str = None, dimension: int = 384 -): + index_name: str, credentials: VectorDatabase, dimension: int = 1024 +) -> Type[VectorService]: services = { - "PINECONE": PineconeVectorService, + "pinecone": PineconeVectorService, + "qdrant": QdrantService, # Add other providers here # e.g "weaviate": WeaviateVectorService, } - service = services.get(provider) + service = services.get(credentials.type.value) if service is None: - raise ValueError(f"Unsupported provider: {provider}") - return service(index_name=index_name, filter_id=filter_id, dimension=dimension) + raise ValueError(f"Unsupported provider: {credentials.type.value}") + return service( + index_name=index_name, + dimension=dimension, + credentials=dict(credentials.config), + ) From 67adf00a98b5402869fd4a110951ff7ef48367ae Mon Sep 17 00:00:00 2001 From: Ismail Pelaseyed Date: Tue, 16 Jan 2024 09:46:48 -0800 Subject: [PATCH 5/7] Fix linting --- api/query.py | 1 - models/file.py | 1 - service/vector_database.py | 8 ++++---- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/api/query.py b/api/query.py index 69d4ebbd..249c82ca 100644 --- a/api/query.py +++ b/api/query.py @@ -1,4 +1,3 @@ -from typing import Dict from fastapi import APIRouter from models.query import RequestPayload, ResponsePayload from service.vector_database import get_vector_service, VectorService diff --git a/models/file.py b/models/file.py index 83cf2d8f..74f20db1 100644 --- a/models/file.py +++ b/models/file.py @@ -1,4 +1,3 @@ -from typing import Dict, List from enum import Enum from pydantic import BaseModel diff --git a/service/vector_database.py b/service/vector_database.py index a0d040a1..133cbfb1 100644 --- a/service/vector_database.py +++ b/service/vector_database.py @@ -119,12 +119,12 @@ async def convert_to_dict(self, points: List[rest.PointStruct]): async def upsert(self, embeddings: List[tuple[str, list, dict[str, Any]]]): points = [] - for embedding in embeddings: + for _embedding in embeddings: points.append( rest.PointStruct( - id=embedding[0], - vector={"content": embedding[1]}, - payload={**embedding[2]}, + id=_embedding[0], + vector={"content": _embedding[1]}, + payload={**_embedding[2]}, ) ) From 9ce12edd266f7f7bea58cfcfaee81cf426b6a953 Mon Sep 17 00:00:00 2001 From: Ismail Pelaseyed Date: Tue, 16 Jan 2024 09:48:38 -0800 Subject: [PATCH 6/7] Fix query response model --- models/query.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/models/query.py b/models/query.py index ed88c932..aaab3776 100644 --- a/models/query.py +++ b/models/query.py @@ -9,6 +9,12 @@ class RequestPayload(BaseModel): index_name: str +class ResponseData(BaseModel): + content: str + file_url: str + page_label: str + + class ResponsePayload(BaseModel): success: bool - data: List + data: List[ResponseData] From 8893c74730c310ee94b69da4c90c0cf4b9ed4641 Mon Sep 17 00:00:00 2001 From: Ismail Pelaseyed Date: Tue, 16 Jan 2024 09:50:58 -0800 Subject: [PATCH 7/7] Remove linting for python 3.8 --- .github/workflows/lint.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index c7fe3405..44bab06a 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -11,7 +11,6 @@ jobs: strategy: matrix: python-version: - - "3.8" - "3.9" - "3.10" - "3.11"