From b7a4d0ef7c26484ad35c56fed4c683ed088500db Mon Sep 17 00:00:00 2001 From: michaelfeil Date: Mon, 30 Oct 2023 00:00:00 +0000 Subject: [PATCH] latest updates --- .../infinity_emb/fastapi_schemas/convert.py | 9 ++++--- .../infinity_emb/inference/batch_handler.py | 14 +++++----- .../infinity_emb/inference/primitives.py | 19 +------------ .../infinity_emb/inference/select_model.py | 5 ++-- .../transformer/sentence_transformer.py | 22 +++++---------- libs/infinity_emb/poetry.lock | 14 +++++----- .../end_to_end/test_sentence_transformers.py | 2 +- libs/infinity_emb/tests/script_live.py | 27 ++++++++++++------- 8 files changed, 49 insertions(+), 63 deletions(-) diff --git a/libs/infinity_emb/infinity_emb/fastapi_schemas/convert.py b/libs/infinity_emb/infinity_emb/fastapi_schemas/convert.py index 878e0ba9..e57dab5b 100644 --- a/libs/infinity_emb/infinity_emb/fastapi_schemas/convert.py +++ b/libs/infinity_emb/infinity_emb/fastapi_schemas/convert.py @@ -1,10 +1,13 @@ -from infinity_emb.fastapi_schemas.pymodels import OpenAIEmbeddingResult +from typing import Any, Dict, Iterable, Union + from infinity_emb.inference.primitives import NpEmbeddingType def list_embeddings_to_response( - embeddings: NpEmbeddingType, model: str, usage: int -) -> OpenAIEmbeddingResult: + embeddings: Union[NpEmbeddingType, Iterable[NpEmbeddingType]], + model: str, + usage: int, +) -> Dict[str, Any]: return dict( model=model, data=[ diff --git a/libs/infinity_emb/infinity_emb/inference/batch_handler.py b/libs/infinity_emb/infinity_emb/inference/batch_handler.py index e135add6..26c56d64 100644 --- a/libs/infinity_emb/infinity_emb/inference/batch_handler.py +++ b/libs/infinity_emb/infinity_emb/inference/batch_handler.py @@ -39,7 +39,7 @@ async def extend(self, items: List[PrioritizedQueueItem]): self._sync_event.set() - def pop_optimal_batch( + def pop_optimal_batches( self, size: int, timeout=0.2, latest_first=False ) -> Union[List[List[EmbeddingResult]], None]: """ @@ -98,7 +98,7 @@ async def extend(self, items: List[PrioritizedQueueItem]): with self._lock_queue_event: self._queue.extend(items) - def pop_optimal_batch( + def pop_optimal_batches( self, size: int, timeout=0.2, **kwargs ) -> Union[List[List[EmbeddingResult]], None]: """ @@ -120,7 +120,7 @@ def pop_optimal_batch( return None # slice as many batches as possible - n_batches = max(1, len(self._queue) // size) + n_batches = min(32, max(1, len(self._queue) // size)) size_batches = size * n_batches with self._lock_queue_event: @@ -134,11 +134,11 @@ def pop_optimal_batch( # optimal padding per batch new_items_l.sort() - new_items = [] + new_items: List[List[EmbeddingResult]] = [] for i in range(n_batches): mini_batch = new_items_l[size * i : size * (i + 1)] - mini_batch = [mi.item for mi in mini_batch] - new_items.append(mini_batch) + mini_batch_e: List[EmbeddingResult] = [mi.item for mi in mini_batch] + new_items.append(mini_batch_e) # runtime checks # assert 1 <= len(mini_batch) <= size # if i > 0: @@ -274,7 +274,7 @@ def _preprocess_batch(self): # decision to attemp to pop a batch # -> will happen if a single datapoint is available - batches = self._queue_prio.pop_optimal_batch( + batches = self._queue_prio.pop_optimal_batches( self.max_batch_size, latest_first=False ) if not batches: diff --git a/libs/infinity_emb/infinity_emb/inference/primitives.py b/libs/infinity_emb/infinity_emb/inference/primitives.py index 8b161ca3..67311b2f 100644 --- a/libs/infinity_emb/infinity_emb/inference/primitives.py +++ b/libs/infinity_emb/infinity_emb/inference/primitives.py @@ -6,18 +6,15 @@ import numpy as np -# from infinity_emb.inference.threading_asyncio import EventTS - NpEmbeddingType = np.ndarray @dataclass(order=True) class EmbeddingResult: sentence: str = field(compare=False) + future: asyncio.Future = field(compare=False) uuid: str = field(default_factory=lambda: str(uuid4()), compare=False) embedding: Optional[NpEmbeddingType] = field(default=None, compare=False) - future: Optional[asyncio.Future] = field(default=None, compare=False) - # event: Optional[EventTS] = field(default=None, compare=False) @dataclass(order=True) @@ -32,17 +29,3 @@ class OverloadStatus: queue_fraction: float queue_absolute: int results_absolute: int - - -if __name__ == "__main__": - import bisect - from concurrent.futures import ThreadPoolExecutor - - tp = ThreadPoolExecutor() - r1 = EmbeddingResult(5, "hello") - r2 = EmbeddingResult(6, "hello_") - r3 = EmbeddingResult(6, "hello_") - r1 < r2 - l1 = [] - bisect.insort(l1, r1) - bisect.insort(l1, r2) diff --git a/libs/infinity_emb/infinity_emb/inference/select_model.py b/libs/infinity_emb/infinity_emb/inference/select_model.py index 3a192a40..cf1f3711 100644 --- a/libs/infinity_emb/infinity_emb/inference/select_model.py +++ b/libs/infinity_emb/infinity_emb/inference/select_model.py @@ -1,4 +1,5 @@ from time import perf_counter +from typing import Tuple from infinity_emb.inference.primitives import EmbeddingResult, NpEmbeddingType from infinity_emb.log_handler import logger @@ -38,8 +39,8 @@ def select_model_to_functional( def runtime_check_callable( model: BaseTransformer, sample=["warmup"], log=True -) -> float: - inp = [EmbeddingResult(sentence=s) for s in sample] # type: ignore +) -> Tuple[float, float]: + inp = [EmbeddingResult(sentence=s, future=None) for s in sample] # type: ignore start = perf_counter() sentences = [item.sentence for item in inp] feat = model.encode_pre(sentences) diff --git a/libs/infinity_emb/infinity_emb/transformer/sentence_transformer.py b/libs/infinity_emb/infinity_emb/transformer/sentence_transformer.py index 52a0f277..6d9ae883 100644 --- a/libs/infinity_emb/infinity_emb/transformer/sentence_transformer.py +++ b/libs/infinity_emb/infinity_emb/transformer/sentence_transformer.py @@ -30,7 +30,6 @@ class SentenceTransformerPatched(SentenceTransformer, BaseTransformer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) device = self._target_device - self.eval() self.to(device) # make a copy of the tokenizer, # to be able to could the tokens in another thread @@ -50,16 +49,15 @@ def __init__(self, *args, **kwargs): else: logger.info("No optimizations via Huggingface optimum.") - if self._target_device.type == "cuda" and not os.environ.get( - "INFINITY_DISABLE_FLASH", False + self.eval() + if self._target_device.type == "cuda" and os.environ.get( + "INFINITY_TORCH_ENABLE_HALF", False ): logger.info( - "Adding flash_attention." - "Disable by setting the env var `INFINITY_DISABLE_FLASH`" + "Switching to half() precision (fp16)." + "Enabled by the setting the env var `INFINITY_TORCH_ENABLE_HALF`" ) - self._use_flash_attn = True - else: - self._use_flash_attn = False + self.half() def encode_pre(self, sentences) -> Dict[str, Tensor]: features = self.tokenize(sentences) @@ -74,13 +72,7 @@ def encode_core(self, features: Dict[str, Tensor]) -> Tensor: with torch.inference_mode(): device = self._target_device features = util.batch_to_device(features, device) - if self._use_flash_attn: - with torch.backends.cuda.sdp_kernel( - enable_flash=True, enable_math=True, enable_mem_efficient=True - ): - out_features = self.forward(features)["sentence_embedding"] - else: - out_features = self.forward(features)["sentence_embedding"] + out_features = self.forward(features)["sentence_embedding"] return out_features diff --git a/libs/infinity_emb/poetry.lock b/libs/infinity_emb/poetry.lock index 01dead94..36e4609b 100644 --- a/libs/infinity_emb/poetry.lock +++ b/libs/infinity_emb/poetry.lock @@ -3324,24 +3324,24 @@ files = [ [[package]] name = "types-protobuf" -version = "4.24.0.3" +version = "4.24.0.4" description = "Typing stubs for protobuf" optional = false python-versions = ">=3.7" files = [ - {file = "types-protobuf-4.24.0.3.tar.gz", hash = "sha256:048ca006a08ac0563ff04a86ddcdda6f8877b024f5ff89ed6180510b017c3a91"}, - {file = "types_protobuf-4.24.0.3-py3-none-any.whl", hash = "sha256:6652be32c647a855cd9c01e6c556ecdc94988188c4de89942ad13e906537aaee"}, + {file = "types-protobuf-4.24.0.4.tar.gz", hash = "sha256:57ab42cb171dfdba2c74bb5b50c250478538cc3c5ed95b8b368929ad0c9f90a5"}, + {file = "types_protobuf-4.24.0.4-py3-none-any.whl", hash = "sha256:131ab7d0cbc9e444bc89c994141327dcce7bcaeded72b1acb72a94827eb9c7af"}, ] [[package]] name = "types-pyopenssl" -version = "23.2.0.2" +version = "23.3.0.0" description = "Typing stubs for pyOpenSSL" optional = false -python-versions = "*" +python-versions = ">=3.7" files = [ - {file = "types-pyOpenSSL-23.2.0.2.tar.gz", hash = "sha256:6a010dac9ecd42b582d7dd2cc3e9e40486b79b3b64bb2fffba1474ff96af906d"}, - {file = "types_pyOpenSSL-23.2.0.2-py3-none-any.whl", hash = "sha256:19536aa3debfbe25a918cf0d898e9f5fbbe6f3594a429da7914bf331deb1b342"}, + {file = "types-pyOpenSSL-23.3.0.0.tar.gz", hash = "sha256:5ffb077fe70b699c88d5caab999ae80e192fe28bf6cda7989b7e79b1e4e2dcd3"}, + {file = "types_pyOpenSSL-23.3.0.0-py3-none-any.whl", hash = "sha256:00171433653265843b7469ddb9f3c86d698668064cc33ef10537822156130ebf"}, ] [package.dependencies] diff --git a/libs/infinity_emb/tests/end_to_end/test_sentence_transformers.py b/libs/infinity_emb/tests/end_to_end/test_sentence_transformers.py index e22fb0b9..07c7735c 100644 --- a/libs/infinity_emb/tests/end_to_end/test_sentence_transformers.py +++ b/libs/infinity_emb/tests/end_to_end/test_sentence_transformers.py @@ -13,7 +13,7 @@ from infinity_emb.transformer.utils import InferenceEngine PREFIX = "/v1_ct2" -model = pytest.DEFAULT_BERT_MODEL +model: str = pytest.DEFAULT_BERT_MODEL batch_size = 64 if torch.cuda.is_available() else 8 app = create_server( diff --git a/libs/infinity_emb/tests/script_live.py b/libs/infinity_emb/tests/script_live.py index 39cbfebc..33475f80 100644 --- a/libs/infinity_emb/tests/script_live.py +++ b/libs/infinity_emb/tests/script_live.py @@ -1,5 +1,7 @@ +import concurrent.futures import json import timeit +from functools import partial import numpy as np import requests @@ -9,6 +11,7 @@ def embedding_live_performance(): + tp = concurrent.futures.ThreadPoolExecutor() sample = [f"Test count {i} {(list(range(i % (384))))} " for i in range(2048)] json_d = json.dumps({"input": sample, "model": "model"}) @@ -21,15 +24,17 @@ def embedding_live_performance(): print(f"batch_size is {batch_size}, model={model_name}") model = SentenceTransformer(model_name_or_path=model_name) - def local(data: str): - enc = model.encode(data, batch_size=batch_size) - assert len(enc) == len(data) - return enc + def local(data: list[str], iters=1): + data_in = data * iters + enc = model.encode(data_in, batch_size=batch_size) + assert len(enc) == len(data_in) + return enc[: len(data)] - def remote(json_data: bytes): - req = session.post(f"{LIVE_URL}/embeddings", data=json_data) - assert req.status_code == 200 - return req + def remote(json_data: bytes, iters=1): + fn = partial(session.post, data=json_data) + req = list(tp.map(fn, [f"{LIVE_URL}/embeddings"] * iters)) + assert req[0].status_code == 200 + return req[0] local_resp = local(sample) remote_resp = [d["embedding"] for d in remote(json_d).json()["data"]] @@ -37,12 +42,14 @@ def remote(json_data: bytes): print("Both methods provide the identical output.") print("Measuring latency via SentenceTransformers") - latency_st = timeit.timeit("local(sample)", number=2, globals=locals()) + latency_st = timeit.timeit("local(sample, iters=5)", number=2, globals=locals()) print("SentenceTransformers latency: ", latency_st) model = None print("Measuring latency via requests") - latency_request = timeit.timeit("remote(json_d)", number=2, globals=locals()) + latency_request = timeit.timeit( + "remote(json_d, iters=5)", number=2, globals=locals() + ) print(f"Request latency: {latency_request}") assert latency_st * 1.1 > latency_request