Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clip embeddings parallel #251

Merged
merged 6 commits into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions libs/infinity_emb/infinity_emb/_optional_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def _raise_error(self) -> None:
CHECK_SENTENCE_TRANSFORMERS = OptionalImports("sentence_transformers", "torch")
CHECK_TRANSFORMERS = OptionalImports("transformers", "torch")
CHECK_TORCH = OptionalImports("torch.nn", "torch")
CHECK_REQUESTS = OptionalImports("requests", "server")
CHECK_PIL = OptionalImports("PIL", "vision")
CHECK_PYDANTIC = OptionalImports("pydantic", "server")
CHECK_TYPER = OptionalImports("typer", "server")
CHECK_UVICORN = OptionalImports("uvicorn", "server")
23 changes: 23 additions & 0 deletions libs/infinity_emb/infinity_emb/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,29 @@ async def classify(

return scores, usage

async def image_embed(
self, *, images: list[str]
) -> tuple[list[EmbeddingReturnType], int]:
"""embed multiple images

Args:
images (list[str]): list of image urls, to be embedded

Raises:
ValueError: raised if engine is not started yet
ModelNotDeployedError: If loaded model does not expose `image_embed`
capabilities

Returns:
list[EmbeddingReturnType]: embeddings
2D list-array of shape( len(sentences),embed_dim )
int: token usage
"""

self._assert_running()
embeddings, usage = await self._batch_handler.image_embed(images=images)
return embeddings, usage

def _assert_running(self):
if not self.running:
raise ValueError(
Expand Down
30 changes: 30 additions & 0 deletions libs/infinity_emb/infinity_emb/inference/batch_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
)
from infinity_emb.transformer.abstract import BaseTransformer
from infinity_emb.transformer.utils import get_lengths_with_tokenize
from infinity_emb.transformer.vision.utils import resolve_images


class ShutdownReadOnly:
Expand Down Expand Up @@ -200,6 +201,35 @@ async def classify(

return classifications, usage

async def image_embed(
self,
*,
images: list[str],
) -> tuple[list[EmbeddingReturnType], int]:
"""Schedule a images and sentences to be embedded. Awaits until embedded.

Args:
images (list[str]): list of pre-signed urls

Raises:
ModelNotDeployedError: If loaded model does not expose `embed`
capabilities

Returns:
list[EmbeddingReturnType]: list of embedding as 1darray
int: token usage
"""

if "image_embed" not in self.model_worker.capabilities:
raise ModelNotDeployedError(
"the loaded moded cannot fullyfill `image_embed`."
f"options are {self.model_worker.capabilities}."
)

items = await asyncio.to_thread(resolve_images, images)
embeddings, usage = await self._schedule(items)
return embeddings, usage

async def _schedule(
self, list_queueitem: Sequence[AbstractSingle]
) -> tuple[list[Any], int]:
Expand Down
3 changes: 3 additions & 0 deletions libs/infinity_emb/infinity_emb/inference/select_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from infinity_emb.log_handler import logger
from infinity_emb.transformer.abstract import BaseCrossEncoder, BaseEmbedder
from infinity_emb.transformer.utils import (
ClipLikeEngine,
EmbedderEngine,
InferenceEngine,
PredictEngine,
Expand Down Expand Up @@ -44,6 +45,8 @@ def get_engine_type_from_config(
return RerankEngine.from_inference_engine(engine_args.engine)
else:
return PredictEngine.from_inference_engine(engine_args.engine)
if config.get("vision_config") and "clip" in config.get("model_type", "").lower():
return ClipLikeEngine.from_inference_engine(engine_args.engine)
else:
return EmbedderEngine.from_inference_engine(engine_args.engine)

Expand Down
57 changes: 53 additions & 4 deletions libs/infinity_emb/infinity_emb/primitives.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,23 @@

# cached_porperty
from functools import lru_cache
from typing import Generic, Literal, Optional, Type, TypedDict, TypeVar, Union
from typing import (
TYPE_CHECKING,
Generic,
Literal,
Optional,
Type,
TypedDict,
TypeVar,
Union,
)

import numpy as np
import numpy.typing as npt

if TYPE_CHECKING:
from PIL.Image import Image as ImageClass

# if python>=3.10 use kw_only

dataclass_args = {"kw_only": True} if sys.version_info >= (3, 10) else {}
Expand Down Expand Up @@ -121,7 +133,7 @@ def str_repr(self) -> str:
pass

@abstractmethod
def to_input(self) -> Union[str, tuple[str, str]]:
def to_input(self) -> Union[str, tuple[str, str], "ImageClass"]:
pass


Expand Down Expand Up @@ -153,6 +165,18 @@ class PredictSingle(EmbeddingSingle):
pass


@dataclass(**dataclass_args)
class ImageSingle(AbstractSingle):
image: "ImageClass"

def str_repr(self) -> str:
"""creates a dummy representation of the image to count tokens relative to shape"""
return f"an image is worth a repeated {'token' * self.image.height}"

def to_input(self) -> "ImageClass":
return self.image


AbstractInnerType = TypeVar("AbstractInnerType")


Expand Down Expand Up @@ -242,12 +266,37 @@ async def get_result(self) -> ClassifyReturnType:
return self.class_encoding


QueueItemInner = Union[EmbeddingInner, ReRankInner, PredictInner]
@dataclass(order=True, **dataclass_args)
class ImageInner(AbstractInner):
content: ImageSingle
embedding: Optional[EmbeddingReturnType] = None

async def complete(self, result: EmbeddingReturnType) -> None:
"""marks the future for completion.
only call from the same thread as created future."""
self.embedding = result

if self.embedding is None:
raise ValueError("embedding is None")
try:
self.future.set_result(self.embedding)
except asyncio.exceptions.InvalidStateError:
pass

async def get_result(self) -> EmbeddingReturnType:
"""waits for future to complete and returns result"""
await self.future
assert self.embedding is not None
return self.embedding


QueueItemInner = Union[EmbeddingInner, ReRankInner, PredictInner, ImageInner]

_type_to_inner_item_map = {
EmbeddingSingle: EmbeddingInner,
ReRankSingle: ReRankInner,
PredictSingle: PredictInner,
ImageSingle: ImageInner,
}


Expand Down Expand Up @@ -275,4 +324,4 @@ class ModelNotDeployedError(Exception):
pass


ModelCapabilites = Literal["embed", "rerank", "classify"]
ModelCapabilites = Literal["embed", "rerank", "classify", "image_embed"]
Empty file.
51 changes: 49 additions & 2 deletions libs/infinity_emb/infinity_emb/transformer/abstract.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import random
from abc import ABC, abstractmethod
from time import perf_counter
from typing import Any, Set
from typing import TYPE_CHECKING, Any, Set, Union

from infinity_emb.primitives import (
EmbeddingDtype,
EmbeddingInner,
EmbeddingReturnType,
EmbeddingSingle,
ImageInner,
ImageSingle,
ModelCapabilites,
PredictInner,
PredictSingle,
Expand All @@ -18,6 +21,9 @@
INPUT_FEATURE = Any
OUT_FEATURES = Any

if TYPE_CHECKING:
from PIL.Image import Image as ImageClass


class BaseTransformer(ABC): # Inherit from ABC(Abstract base class)
capabilities: Set[ModelCapabilites] = set()
Expand Down Expand Up @@ -54,7 +60,7 @@ def embedding_dtype(self) -> EmbeddingDtype:
return self.engine_args.embedding_dtype # type: ignore

@abstractmethod # Decorator to define an abstract method
def encode_pre(self, sentences: list[str]) -> INPUT_FEATURE:
def encode_pre(self, sentences: list[Union[str, Any]]) -> INPUT_FEATURE:
"""takes care of the tokenization and feature preparation"""

@abstractmethod
Expand All @@ -72,6 +78,47 @@ def warmup(self, *, batch_size: int = 64, n_tokens=1) -> tuple[float, float, str
return run_warmup(self, inp)


class BaseClipVisionModel(BaseEmbedder): # Inherit from ABC(Abstract base class)
capabilities = {"embed", "image_embed"}

@property
def embedding_dtype(self) -> EmbeddingDtype:
"""returns the dtype of the embeddings"""
return self.engine_args.embedding_dtype # type: ignore

@abstractmethod # Decorator to define an abstract method
def encode_pre(
self, sentences_or_images: list[Union[str, "ImageClass"]]
) -> INPUT_FEATURE:
"""
takes a list of sentences, or a list of images.
Images could be url or numpy arrays/pil
"""

@abstractmethod
def encode_post(
self, embedding: OUT_FEATURES, skip_quanitzation=True
) -> EmbeddingReturnType:
"""runs post encoding such as normalization"""

def warmup(self, *, batch_size: int = 64, n_tokens=1) -> tuple[float, float, str]:
sample_text = ["warm " * n_tokens] * max(1, batch_size // 2)
sample_image = [] * max(1, batch_size // 2) # type: ignore
inp = [
# TODO: warmup for images
ImageInner(content=ImageSingle(image=img), future=None) # type: ignore
for img in sample_image
] + [
EmbeddingInner(
content=EmbeddingSingle(sentence=s), future=None # type: ignore
)
for s in sample_text
]
random.shuffle(inp)

return run_warmup(self, inp)


class BaseClassifer(BaseTransformer): # Inherit from ABC(Abstract base class)
capabilities = {"classify"}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, *, engine_args: EngineArgs):
super().__init__(
engine_args.model_name_or_path,
revision=engine_args.revision,
device=engine_args.device.resolve(),
device=engine_args.device.resolve(), # type: ignore
trust_remote_code=engine_args.trust_remote_code,
)
self.model.to(self._target_device) # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def encode_core(self, features: Mapping[str, "Tensor"]) -> "Tensor":
"""

with torch.no_grad():
features = util.batch_to_device(features, self.device)
features = util.batch_to_device(features, self.device) # type: ignore
out_features: "Tensor" = self.forward(features)["sentence_embedding"]

return out_features.detach().cpu()
Expand Down
12 changes: 12 additions & 0 deletions libs/infinity_emb/infinity_emb/transformer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from infinity_emb.transformer.embedder.sentence_transformer import (
SentenceTransformerPatched,
)
from infinity_emb.transformer.vision.torch_vision import ClipLikeModel

__all__ = [
"length_tokenizer",
Expand Down Expand Up @@ -58,6 +59,17 @@ def from_inference_engine(engine: InferenceEngine):
raise NotImplementedError(f"RerankEngine for {engine} not implemented")


class ClipLikeEngine(Enum):
torch = ClipLikeModel

@staticmethod
def from_inference_engine(engine: InferenceEngine):
if engine == InferenceEngine.torch:
return ClipLikeEngine.torch
else:
raise NotImplementedError(f"ClipLikeEngine for {engine} not implemented")


class PredictEngine(Enum):
torch = SentenceClassifier

Expand Down
Empty file.
Loading