From 69c0b8e10a1f5786ca08ec584f86f2965b865166 Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Tue, 15 Oct 2024 15:28:20 -0400 Subject: [PATCH] WIP --- app.py | 31 +++++++++ app2.py | 16 +++++ app3.py | 62 +++++++++++++++++ pinecone/grpc/base.py | 18 +++-- pinecone/grpc/index_grpc.py | 21 ++++++ pinecone/grpc/index_grpc_asyncio.py | 100 ++++++++++++++++++++++++++++ pinecone/grpc/pinecone.py | 9 ++- pinecone/grpc/utils.py | 6 ++ poetry.lock | 55 ++++++++++++++- pyproject.toml | 1 + 10 files changed, 308 insertions(+), 11 deletions(-) create mode 100644 app.py create mode 100644 app2.py create mode 100644 app3.py create mode 100644 pinecone/grpc/index_grpc_asyncio.py diff --git a/app.py b/app.py new file mode 100644 index 00000000..562241c8 --- /dev/null +++ b/app.py @@ -0,0 +1,31 @@ +from pinecone.grpc import PineconeGRPC, GRPCClientConfig + +# Initialize a client. An API key must be passed, but the +# value does not matter. +pc = PineconeGRPC(api_key="test_api_key") + +# Target the indexes. Use the host and port number along with disabling tls. +index1 = pc.Index(host="localhost:5081", grpc_config=GRPCClientConfig(secure=False)) +index2 = pc.Index(host="localhost:5082", grpc_config=GRPCClientConfig(secure=False)) + +# You can now perform data plane operations with index1 and index2 + +dimension = 3 + + +def upserts(): + vectors = [] + for i in range(0, 100): + vectors.append((f"vec{i}", [i] * dimension)) + + print(len(vectors)) + + index1.upsert(vectors=vectors, namespace="ns2") + index2.upsert(vectors=vectors, namespace="ns2") + + +upserts() +print(index1.describe_index_stats()) + +print(index1.query(id="vec1", top_k=2, namespace="ns2", include_values=True)) +print(index1.query(id="vec1", top_k=10, namespace="", include_values=True)) diff --git a/app2.py b/app2.py new file mode 100644 index 00000000..c8349e70 --- /dev/null +++ b/app2.py @@ -0,0 +1,16 @@ +from pinecone.grpc import PineconeGRPC +from pinecone import Pinecone + +pc = Pinecone(api_key="b1cb8ba4-b3d1-458f-9c32-8dd10813459a") +pcg = PineconeGRPC(api_key="b1cb8ba4-b3d1-458f-9c32-8dd10813459a") + +index = pc.Index("jen2") +indexg = pcg.Index(name="jen2", use_asyncio=True) + +# Rest call fails +# print(index.upsert(vectors=[("vec1", [1, 2])])) + +# GRPC succeeds +print(indexg.upsert(vectors=[("vec1", [1, 2])])) + +# print(index.fetch(ids=['vec1'])) diff --git a/app3.py b/app3.py new file mode 100644 index 00000000..5e49daff --- /dev/null +++ b/app3.py @@ -0,0 +1,62 @@ +import asyncio +from pinecone.grpc import PineconeGRPC as Pinecone, Vector + +import time +import random +import pandas as pd + + +# Enable gRPC tracing and verbosity for more detailed logs +# os.environ["GRPC_VERBOSITY"] = "DEBUG" +# os.environ["GRPC_TRACE"] = "all" + + +# Generate a large set of vectors (as an example) +def generate_vectors(num_vectors, dimension): + return [ + Vector(id=f"vector_{i}", values=[random.random()] * dimension) for i in range(num_vectors) + ] + + +def load_vectors(): + df = pd.read_parquet("test_records_100k_dim1024.parquet") + df["values"] = df["values"].apply(lambda x: [float(v) for v in x]) + + vectors = [Vector(id=row.id, values=list(row.values)) for row in df.itertuples()] + return vectors + + +async def main(): + # Create a semaphore to limit concurrency (e.g., max 5 concurrent requests) + s = time.time() + # all_vectors = load_vectors() + all_vectors = generate_vectors(1000, 1024) + f = time.time() + print(f"Loaded {len(all_vectors)} vectors in {f-s:.2f} seconds") + + start_time = time.time() + + # Same setup as before... + pc = Pinecone(api_key="b1cb8ba4-b3d1-458f-9c32-8dd10813459a") + index = pc.Index( + # index_host="jen2-dojoi3u.svc.aped-4627-b74a.pinecone.io" + host="jen1024-dojoi3u.svc.apw5-4e34-81fa.pinecone.io", + use_asyncio=True, + ) + + batch_size = 150 + namespace = "asyncio-py7" + res = await index.upsert( + vectors=all_vectors, batch_size=batch_size, namespace=namespace, show_progress=True + ) + + print(res) + + end_time = time.time() + + total_time = end_time - start_time + print(f"All tasks completed in {total_time:.2f} seconds") + print(f"Namespace: {namespace}") + + +asyncio.run(main()) diff --git a/pinecone/grpc/base.py b/pinecone/grpc/base.py index 17580d7e..a7dd816b 100644 --- a/pinecone/grpc/base.py +++ b/pinecone/grpc/base.py @@ -9,6 +9,7 @@ from pinecone import Config from .config import GRPCClientConfig from .grpc_runner import GrpcRunner +from .utils import normalize_endpoint class GRPCIndexBase(ABC): @@ -16,8 +17,6 @@ class GRPCIndexBase(ABC): Base class for grpc-based interaction with Pinecone indexes """ - _pool = None - def __init__( self, index_name: str, @@ -25,17 +24,17 @@ def __init__( channel: Optional[Channel] = None, grpc_config: Optional[GRPCClientConfig] = None, _endpoint_override: Optional[str] = None, + use_asyncio: Optional[bool] = False, ): self.config = config self.grpc_client_config = grpc_config or GRPCClientConfig() - self._endpoint_override = _endpoint_override self.runner = GrpcRunner( index_name=index_name, config=config, grpc_config=self.grpc_client_config ) self.channel_factory = GrpcChannelFactory( - config=self.config, grpc_client_config=self.grpc_client_config, use_asyncio=False + config=self.config, grpc_client_config=self.grpc_client_config, use_asyncio=use_asyncio ) self._channel = channel or self._gen_channel() self.stub = self.stub_class(self._channel) @@ -46,9 +45,7 @@ def stub_class(self): pass def _endpoint(self): - grpc_host = self.config.host.replace("https://", "") - if ":" not in grpc_host: - grpc_host = f"{grpc_host}:443" + grpc_host = normalize_endpoint(self.config.host) return self._endpoint_override if self._endpoint_override else grpc_host def _gen_channel(self): @@ -83,3 +80,10 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, traceback): self.close() + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + self.close() + return True diff --git a/pinecone/grpc/index_grpc.py b/pinecone/grpc/index_grpc.py index 6269c23d..b7081987 100644 --- a/pinecone/grpc/index_grpc.py +++ b/pinecone/grpc/index_grpc.py @@ -39,6 +39,10 @@ from .base import GRPCIndexBase from .future import PineconeGrpcFuture +from .config import GRPCClientConfig +from pinecone.config import Config +from grpc._channel import Channel + __all__ = ["GRPCIndex", "GRPCVector", "GRPCQueryVector", "GRPCSparseValues"] @@ -53,6 +57,23 @@ class SparseVectorTypedDict(TypedDict): class GRPCIndex(GRPCIndexBase): """A client for interacting with a Pinecone index via GRPC API.""" + def __init__( + self, + index_name: str, + config: Config, + channel: Optional[Channel] = None, + grpc_config: Optional[GRPCClientConfig] = None, + _endpoint_override: Optional[str] = None, + ): + super().__init__( + index_name=index_name, + config=config, + channel=channel, + grpc_config=grpc_config, + _endpoint_override=_endpoint_override, + use_asyncio=False, + ) + @property def stub_class(self): return VectorServiceStub diff --git a/pinecone/grpc/index_grpc_asyncio.py b/pinecone/grpc/index_grpc_asyncio.py new file mode 100644 index 00000000..2d141803 --- /dev/null +++ b/pinecone/grpc/index_grpc_asyncio.py @@ -0,0 +1,100 @@ +from typing import Optional, Union, List, Awaitable + +from tqdm.asyncio import tqdm +from asyncio import Semaphore + +from .vector_factory_grpc import VectorFactoryGRPC + +from pinecone.core.grpc.protos.vector_service_pb2 import ( + Vector as GRPCVector, + QueryVector as GRPCQueryVector, + UpsertRequest, + UpsertResponse, + SparseValues as GRPCSparseValues, +) +from .base import GRPCIndexBase +from pinecone import Vector as NonGRPCVector +from pinecone.core.grpc.protos.vector_service_pb2_grpc import VectorServiceStub +from pinecone.utils import parse_non_empty_args + +from .config import GRPCClientConfig +from pinecone.config import Config +from grpc._channel import Channel + +__all__ = ["GRPCIndexAsyncio", "GRPCVector", "GRPCQueryVector", "GRPCSparseValues"] + + +class GRPCIndexAsyncio(GRPCIndexBase): + """A client for interacting with a Pinecone index over GRPC with asyncio.""" + + def __init__( + self, + index_name: str, + config: Config, + channel: Optional[Channel] = None, + grpc_config: Optional[GRPCClientConfig] = None, + _endpoint_override: Optional[str] = None, + ): + super().__init__( + index_name=index_name, + config=config, + channel=channel, + grpc_config=grpc_config, + _endpoint_override=_endpoint_override, + use_asyncio=True, + ) + + @property + def stub_class(self): + return VectorServiceStub + + async def upsert( + self, + vectors: Union[List[GRPCVector], List[NonGRPCVector], List[tuple], List[dict]], + namespace: Optional[str] = None, + batch_size: Optional[int] = None, + show_progress: bool = True, + **kwargs, + ) -> Awaitable[UpsertResponse]: + timeout = kwargs.pop("timeout", None) + vectors = list(map(VectorFactoryGRPC.build, vectors)) + + if batch_size is None: + return await self._upsert_batch(vectors, namespace, timeout=timeout, **kwargs) + + else: + if not isinstance(batch_size, int) or batch_size <= 0: + raise ValueError("batch_size must be a positive integer") + + semaphore = Semaphore(25) + vector_batches = [ + vectors[i : i + batch_size] for i in range(0, len(vectors), batch_size) + ] + tasks = [ + self._upsert_batch( + vectors=batch, namespace=namespace, timeout=100, semaphore=semaphore + ) + for batch in vector_batches + ] + + return await tqdm.gather(*tasks, disable=not show_progress, desc="Upserted batches") + + async def _upsert_batch( + self, + vectors: List[GRPCVector], + namespace: Optional[str], + timeout: Optional[int] = None, + semaphore: Optional[Semaphore] = None, + **kwargs, + ) -> Awaitable[UpsertResponse]: + args_dict = parse_non_empty_args([("namespace", namespace)]) + request = UpsertRequest(vectors=vectors, **args_dict) + if semaphore is not None: + async with semaphore: + return await self.runner.run_asyncio( + self.stub.Upsert, request, timeout=timeout, **kwargs + ) + else: + return await self.runner.run_asyncio( + self.stub.Upsert, request, timeout=timeout, **kwargs + ) diff --git a/pinecone/grpc/pinecone.py b/pinecone/grpc/pinecone.py index af6a8baa..1878a940 100644 --- a/pinecone/grpc/pinecone.py +++ b/pinecone/grpc/pinecone.py @@ -1,6 +1,7 @@ from ..control.pinecone import Pinecone from ..config.config import ConfigBuilder from .index_grpc import GRPCIndex +from .index_grpc_asyncio import GRPCIndexAsyncio class PineconeGRPC(Pinecone): @@ -47,7 +48,7 @@ class PineconeGRPC(Pinecone): """ - def Index(self, name: str = "", host: str = "", **kwargs): + def Index(self, name: str = "", host: str = "", use_asyncio=False, **kwargs): """ Target an index for data operations. @@ -131,4 +132,8 @@ def Index(self, name: str = "", host: str = "", **kwargs): proxy_url=self.config.proxy_url, ssl_ca_certs=self.config.ssl_ca_certs, ) - return GRPCIndex(index_name=name, config=config, **kwargs) + + if use_asyncio: + return GRPCIndexAsyncio(index_name=name, config=config, **kwargs) + else: + return GRPCIndex(index_name=name, config=config, **kwargs) diff --git a/pinecone/grpc/utils.py b/pinecone/grpc/utils.py index 99f45460..f1843428 100644 --- a/pinecone/grpc/utils.py +++ b/pinecone/grpc/utils.py @@ -19,6 +19,12 @@ def _generate_request_id() -> str: return str(uuid.uuid4()) +def normalize_endpoint(endpoint: str) -> str: + grpc_host = endpoint.replace("https://", "") + if ":" not in grpc_host: + grpc_host = f"{grpc_host}:443" + + def dict_to_proto_struct(d: Optional[dict]) -> "Struct": if not d: d = {} diff --git a/poetry.lock b/poetry.lock index 758f00d4..e5bcdd9d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "astunparse" @@ -906,6 +906,57 @@ files = [ googleapis-common-protos = "*" protobuf = ">=4.21.0" +[[package]] +name = "pyarrow" +version = "17.0.0" +description = "Python library for Apache Arrow" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pyarrow-17.0.0-cp310-cp310-macosx_10_15_x86_64.whl", hash = "sha256:a5c8b238d47e48812ee577ee20c9a2779e6a5904f1708ae240f53ecbee7c9f07"}, + {file = "pyarrow-17.0.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:db023dc4c6cae1015de9e198d41250688383c3f9af8f565370ab2b4cb5f62655"}, + {file = "pyarrow-17.0.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:da1e060b3876faa11cee287839f9cc7cdc00649f475714b8680a05fd9071d545"}, + {file = "pyarrow-17.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:75c06d4624c0ad6674364bb46ef38c3132768139ddec1c56582dbac54f2663e2"}, + {file = "pyarrow-17.0.0-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:fa3c246cc58cb5a4a5cb407a18f193354ea47dd0648194e6265bd24177982fe8"}, + {file = "pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:f7ae2de664e0b158d1607699a16a488de3d008ba99b3a7aa5de1cbc13574d047"}, + {file = "pyarrow-17.0.0-cp310-cp310-win_amd64.whl", hash = "sha256:5984f416552eea15fd9cee03da53542bf4cddaef5afecefb9aa8d1010c335087"}, + {file = "pyarrow-17.0.0-cp311-cp311-macosx_10_15_x86_64.whl", hash = "sha256:1c8856e2ef09eb87ecf937104aacfa0708f22dfeb039c363ec99735190ffb977"}, + {file = "pyarrow-17.0.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2e19f569567efcbbd42084e87f948778eb371d308e137a0f97afe19bb860ccb3"}, + {file = "pyarrow-17.0.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6b244dc8e08a23b3e352899a006a26ae7b4d0da7bb636872fa8f5884e70acf15"}, + {file = "pyarrow-17.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0b72e87fe3e1db343995562f7fff8aee354b55ee83d13afba65400c178ab2597"}, + {file = "pyarrow-17.0.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:dc5c31c37409dfbc5d014047817cb4ccd8c1ea25d19576acf1a001fe07f5b420"}, + {file = "pyarrow-17.0.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:e3343cb1e88bc2ea605986d4b94948716edc7a8d14afd4e2c097232f729758b4"}, + {file = "pyarrow-17.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:a27532c38f3de9eb3e90ecab63dfda948a8ca859a66e3a47f5f42d1e403c4d03"}, + {file = "pyarrow-17.0.0-cp312-cp312-macosx_10_15_x86_64.whl", hash = "sha256:9b8a823cea605221e61f34859dcc03207e52e409ccf6354634143e23af7c8d22"}, + {file = "pyarrow-17.0.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:f1e70de6cb5790a50b01d2b686d54aaf73da01266850b05e3af2a1bc89e16053"}, + {file = "pyarrow-17.0.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0071ce35788c6f9077ff9ecba4858108eebe2ea5a3f7cf2cf55ebc1dbc6ee24a"}, + {file = "pyarrow-17.0.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:757074882f844411fcca735e39aae74248a1531367a7c80799b4266390ae51cc"}, + {file = "pyarrow-17.0.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:9ba11c4f16976e89146781a83833df7f82077cdab7dc6232c897789343f7891a"}, + {file = "pyarrow-17.0.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:b0c6ac301093b42d34410b187bba560b17c0330f64907bfa4f7f7f2444b0cf9b"}, + {file = "pyarrow-17.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:392bc9feabc647338e6c89267635e111d71edad5fcffba204425a7c8d13610d7"}, + {file = "pyarrow-17.0.0-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:af5ff82a04b2171415f1410cff7ebb79861afc5dae50be73ce06d6e870615204"}, + {file = "pyarrow-17.0.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:edca18eaca89cd6382dfbcff3dd2d87633433043650c07375d095cd3517561d8"}, + {file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7c7916bff914ac5d4a8fe25b7a25e432ff921e72f6f2b7547d1e325c1ad9d155"}, + {file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f553ca691b9e94b202ff741bdd40f6ccb70cdd5fbf65c187af132f1317de6145"}, + {file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_28_aarch64.whl", hash = "sha256:0cdb0e627c86c373205a2f94a510ac4376fdc523f8bb36beab2e7f204416163c"}, + {file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_28_x86_64.whl", hash = "sha256:d7d192305d9d8bc9082d10f361fc70a73590a4c65cf31c3e6926cd72b76bc35c"}, + {file = "pyarrow-17.0.0-cp38-cp38-win_amd64.whl", hash = "sha256:02dae06ce212d8b3244dd3e7d12d9c4d3046945a5933d28026598e9dbbda1fca"}, + {file = "pyarrow-17.0.0-cp39-cp39-macosx_10_15_x86_64.whl", hash = "sha256:13d7a460b412f31e4c0efa1148e1d29bdf18ad1411eb6757d38f8fbdcc8645fb"}, + {file = "pyarrow-17.0.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:9b564a51fbccfab5a04a80453e5ac6c9954a9c5ef2890d1bcf63741909c3f8df"}, + {file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:32503827abbc5aadedfa235f5ece8c4f8f8b0a3cf01066bc8d29de7539532687"}, + {file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a155acc7f154b9ffcc85497509bcd0d43efb80d6f733b0dc3bb14e281f131c8b"}, + {file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:dec8d129254d0188a49f8a1fc99e0560dc1b85f60af729f47de4046015f9b0a5"}, + {file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:a48ddf5c3c6a6c505904545c25a4ae13646ae1f8ba703c4df4a1bfe4f4006bda"}, + {file = "pyarrow-17.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:42bf93249a083aca230ba7e2786c5f673507fa97bbd9725a1e2754715151a204"}, + {file = "pyarrow-17.0.0.tar.gz", hash = "sha256:4beca9521ed2c0921c1023e68d097d0299b62c362639ea315572a58f3f50fd28"}, +] + +[package.dependencies] +numpy = ">=1.16.6" + +[package.extras] +test = ["cffi", "hypothesis", "pandas", "pytest", "pytz"] + [[package]] name = "pygments" version = "2.16.1" @@ -1312,4 +1363,4 @@ grpc = ["googleapis-common-protos", "grpcio", "grpcio", "lz4", "protobuf", "prot [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "a82474ddea48c5918581d11b25eb9398e8b6241770245a79101d1a8be9803cd5" +content-hash = "533d8e24e1b5c89f0e5c15a4c4178917de664421a0b0fc09cba8bc7d18e8719c" diff --git a/pyproject.toml b/pyproject.toml index 7b571682..f132343d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -84,6 +84,7 @@ pytest-mock = "3.6.1" pytest-timeout = "2.2.0" urllib3_mock = "0.3.3" responses = ">=0.8.1" +pyarrow = "^17.0.0" [tool.poetry.extras] grpc = ["grpcio", "googleapis-common-protos", "lz4", "protobuf", "protoc-gen-openapiv2"]