diff --git a/comps/reranks/langchain-mosec/README.md b/comps/reranks/langchain-mosec/README.md new file mode 100644 index 000000000..d67cf78b0 --- /dev/null +++ b/comps/reranks/langchain-mosec/README.md @@ -0,0 +1,33 @@ +# build reranking Mosec endpoint docker image + +``` +docker build --build-arg http_proxy=$http_proxy --build-arg https_proxy=$https_proxy -t reranking-langchain-mosec:latest -f comps/reranks/langchain-mosec/mosec-docker/Dockerfile . +``` + +# build reranking microservice docker image + +``` +docker build --build-arg http_proxy=$http_proxy --build-arg https_proxy=$https_proxy -t opea/reranking-langchain-mosec:latest -f comps/reranks/langchain-mosec/docker/Dockerfile . +``` + +# launch Mosec endpoint docker container + +``` +docker run -d --name="reranking-langchain-mosec-endpoint" -p 6001:8000 reranking-langchain-mosec:latest +``` + +# launch embedding microservice docker container + +``` +export MOSEC_RERANKING_ENDPOINT=http://127.0.0.1:6001 +docker run -d --name="reranking-langchain-mosec-server" -e http_proxy=$http_proxy -e https_proxy=$https_proxy -p 6000:8000 --ipc=host -e MOSEC_RERANKING_ENDPOINT=$MOSEC_RERANKING_ENDPOINT opea/reranking-langchain-mosec:latest +``` + +# run client test + +``` +curl http://localhost:6000/v1/reranking \ + -X POST \ + -d '{"initial_query":"What is Deep Learning?", "retrieved_docs": [{"text":"Deep Learning is not..."}, {"text":"Deep learning is..."}]}' \ + -H 'Content-Type: application/json' +``` diff --git a/comps/reranks/langchain-mosec/__init__.py b/comps/reranks/langchain-mosec/__init__.py new file mode 100644 index 000000000..916f3a44b --- /dev/null +++ b/comps/reranks/langchain-mosec/__init__.py @@ -0,0 +1,2 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/reranks/langchain-mosec/docker/Dockerfile b/comps/reranks/langchain-mosec/docker/Dockerfile new file mode 100644 index 000000000..9a678dc4a --- /dev/null +++ b/comps/reranks/langchain-mosec/docker/Dockerfile @@ -0,0 +1,28 @@ + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +FROM langchain/langchain:latest + +RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \ + libgl1-mesa-glx \ + libjemalloc-dev \ + vim + +RUN useradd -m -s /bin/bash user && \ + mkdir -p /home/user && \ + chown -R user /home/user/ + +USER user + +COPY comps /home/user/comps + +RUN pip install --no-cache-dir --upgrade pip && \ + pip install --no-cache-dir -r /home/user/comps/reranks/langchain-mosec/requirements.txt + +ENV PYTHONPATH=$PYTHONPATH:/home/user + +WORKDIR /home/user/comps/reranks/langchain-mosec + +ENTRYPOINT ["python", "reranking_mosec_xeon.py"] + diff --git a/comps/reranks/langchain-mosec/docker/docker_compose_embedding.yaml b/comps/reranks/langchain-mosec/docker/docker_compose_embedding.yaml new file mode 100644 index 000000000..581946185 --- /dev/null +++ b/comps/reranks/langchain-mosec/docker/docker_compose_embedding.yaml @@ -0,0 +1,22 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +version: "3.8" + +services: + reranking: + image: opea/reranking-langchain-mosec:latest + container_name: reranking-langchain-mosec-server + ports: + - "6000:8000" + ipc: host + environment: + http_proxy: ${http_proxy} + https_proxy: ${https_proxy} + MOSEC_RERANKING_ENDPOINT: ${MOSEC_RERANKING_ENDPOINT} + LANGCHAIN_API_KEY: ${LANGCHAIN_API_KEY} + restart: unless-stopped + +networks: + default: + driver: bridge diff --git a/comps/reranks/langchain-mosec/mosec-docker/Dockerfile b/comps/reranks/langchain-mosec/mosec-docker/Dockerfile new file mode 100644 index 000000000..0c634fb90 --- /dev/null +++ b/comps/reranks/langchain-mosec/mosec-docker/Dockerfile @@ -0,0 +1,23 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +From ubuntu:22.04 +ARG DEBIAN_FRONTEND=noninteractive + +ENV GLIBC_TUNABLES glibc.cpu.x86_shstk=permissive + +COPY comps /root/comps + +RUN apt update && apt install -y python3 python3-pip +RUN pip3 install torch==2.2.2 torchvision --trusted-host download.pytorch.org --index-url https://download.pytorch.org/whl/cpu +RUN pip3 install intel-extension-for-pytorch==2.2.0 +RUN pip3 install transformers sentence-transformers +RUN pip3 install llmspec mosec + +RUN cd /root/ && export HF_ENDPOINT=https://hf-mirror.com && huggingface-cli download --resume-download BAAI/bge-reranker-large --local-dir /root/bge-reranker-large + +ENV EMB_MODEL="/root/bge-reranker-large/" + +WORKDIR /root/comps/reranks/langchain-mosec/mosec-docker + +CMD ["python3", "server-ipex.py"] diff --git a/comps/reranks/langchain-mosec/mosec-docker/server-ipex.py b/comps/reranks/langchain-mosec/mosec-docker/server-ipex.py new file mode 100644 index 000000000..cd81fbf33 --- /dev/null +++ b/comps/reranks/langchain-mosec/mosec-docker/server-ipex.py @@ -0,0 +1,172 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import json +import os +from os import environ +from typing import Any, Dict, List, Optional, Union + +import intel_extension_for_pytorch as ipex +import numpy as np +import torch +from mosec import Server, Worker +from mosec.mixin import TypedMsgPackMixin +from msgspec import Struct +from sentence_transformers import CrossEncoder +from torch.utils.data import DataLoader +from tqdm.autonotebook import tqdm, trange + +DEFAULT_MODEL = "/root/bge-reranker-large" + + +class MyCrossEncoder(CrossEncoder): + def __init__( + self, + model_name: str, + num_labels: int = None, + max_length: int = None, + device: str = None, + tokenizer_args: Dict = None, + automodel_args: Dict = None, + trust_remote_code: bool = False, + revision: Optional[str] = None, + local_files_only: bool = False, + default_activation_function=None, + classifier_dropout: float = None, + ) -> None: + super().__init__( + model_name, + num_labels, + max_length, + device, + tokenizer_args, + automodel_args, + trust_remote_code, + revision, + local_files_only, + default_activation_function, + classifier_dropout, + ) + # jit trace model + self.model = ipex.optimize(self.model, dtype=torch.float32) + vocab_size = self.model.config.vocab_size + batch_size = 16 + seq_length = 512 + d = torch.randint(vocab_size, size=[batch_size, seq_length]) + # t = torch.randint(0, 1, size=[batch_size, seq_length]) + m = torch.randint(1, 2, size=[batch_size, seq_length]) + self.model = torch.jit.trace(self.model, [d, m], check_trace=False, strict=False) + self.model = torch.jit.freeze(self.model) + + def predict( + self, + sentences: List[List[str]], + batch_size: int = 32, + show_progress_bar: bool = None, + num_workers: int = 0, + activation_fct=None, + apply_softmax=False, + convert_to_numpy: bool = True, + convert_to_tensor: bool = False, + ) -> Union[List[float], np.ndarray, torch.Tensor]: + input_was_string = False + if isinstance(sentences[0], str): # Cast an individual sentence to a list with length 1 + sentences = [sentences] + input_was_string = True + + inp_dataloader = DataLoader( + sentences, + batch_size=batch_size, + collate_fn=self.smart_batching_collate_text_only, + num_workers=num_workers, + shuffle=False, + ) + + iterator = inp_dataloader + if show_progress_bar: + iterator = tqdm(inp_dataloader, desc="Batches") + + if activation_fct is None: + activation_fct = self.default_activation_function + + pred_scores = [] + self.model.eval() + self.model.to(self._target_device) + with torch.no_grad(): + for features in iterator: + model_predictions = self.model(**features) + logits = activation_fct(model_predictions["logits"]) + + if apply_softmax and len(logits[0]) > 1: + logits = torch.nn.functional.softmax(logits, dim=1) + pred_scores.extend(logits) + + if self.config.num_labels == 1: + pred_scores = [score[0] for score in pred_scores] + + if convert_to_tensor: + pred_scores = torch.stack(pred_scores) + elif convert_to_numpy: + pred_scores = np.asarray([score.cpu().detach().numpy() for score in pred_scores]) + + if input_was_string: + pred_scores = pred_scores[0] + + return pred_scores + + +class Request(Struct, kw_only=True): + query: str + docs: List[str] + + +class Response(Struct, kw_only=True): + scores: List[float] + + +def float_handler(o): + if isinstance(o, float): + return format(o, ".10f") + raise TypeError("Not serializable") + + +class MosecReranker(Worker): + def __init__(self): + self.model_name = environ.get("MODEL_NAME", DEFAULT_MODEL) + self.model = MyCrossEncoder(self.model_name) + + def serialize(self, data: Response) -> bytes: + sorted_list = sorted(data.scores, reverse=True) + index_sorted = [data.scores.index(i) for i in sorted_list] + res = [] + for i, s in zip(index_sorted, sorted_list): + tmp = {"index": i, "score": "{:.10f}".format(s)} + res.append(tmp) + return json.dumps(res, default=float_handler).encode("utf-8") + + def forward(self, data: List[Request]) -> List[Response]: + sentence_pairs = [] + inputs_lens = [] + for d in data: + inputs_lens.append(len(d["texts"])) + tmp = [[d["query"], doc] for doc in d["texts"]] + sentence_pairs.extend(tmp) + + scores = self.model.predict(sentence_pairs) + scores = scores.tolist() + + resp = [] + cur_idx = 0 + for lens in inputs_lens: + resp.append(Response(scores=scores[cur_idx : cur_idx + lens])) + cur_idx += lens + + return resp + + +if __name__ == "__main__": + MAX_BATCH_SIZE = int(os.environ.get("MAX_BATCH_SIZE", 128)) + MAX_WAIT_TIME = int(os.environ.get("MAX_WAIT_TIME", 10)) + server = Server() + server.append_worker(MosecReranker, max_batch_size=MAX_BATCH_SIZE, max_wait_time=MAX_WAIT_TIME) + server.run() diff --git a/comps/reranks/langchain-mosec/requirements.txt b/comps/reranks/langchain-mosec/requirements.txt new file mode 100644 index 000000000..65c79959e --- /dev/null +++ b/comps/reranks/langchain-mosec/requirements.txt @@ -0,0 +1,9 @@ +docarray[full] +fastapi +langchain +langchain_community +openai +opentelemetry-api +opentelemetry-exporter-otlp +opentelemetry-sdk +shortuuid diff --git a/comps/reranks/langchain-mosec/reranking_mosec_xeon.py b/comps/reranks/langchain-mosec/reranking_mosec_xeon.py new file mode 100644 index 000000000..33a3f1207 --- /dev/null +++ b/comps/reranks/langchain-mosec/reranking_mosec_xeon.py @@ -0,0 +1,76 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +# Copyright 2024 MOSEC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import re +import time + +import requests +from langchain_core.prompts import ChatPromptTemplate +from langsmith import traceable + +from comps import ( + LLMParamsDoc, + SearchedDoc, + ServiceType, + opea_microservices, + register_microservice, + register_statistics, + statistics_dict, +) + + +@register_microservice( + name="opea_service@reranking_mosec_xeon", + service_type=ServiceType.RERANK, + endpoint="/v1/reranking", + host="0.0.0.0", + port=8000, + input_datatype=SearchedDoc, + output_datatype=LLMParamsDoc, +) +@traceable(run_type="llm") +@register_statistics(names=["opea_service@reranking_mosec_xeon"]) +def reranking(input: SearchedDoc) -> LLMParamsDoc: + print("reranking input: ", input) + start = time.time() + docs = [doc.text for doc in input.retrieved_docs] + url = mosec_reranking_endpoint + "/inference" + data = {"query": input.initial_query, "texts": docs} + headers = {"Content-Type": "application/json"} + response = requests.post(url, data=json.dumps(data), headers=headers) + response_data = response.json() + best_response = max(response_data, key=lambda response: response["score"]) + doc = input.retrieved_docs[best_response["index"]] + if doc.text and len(re.findall("[\u4E00-\u9FFF]", doc.text)) / len(doc.text) >= 0.3: + # chinese context + template = "仅基于以下背景回答问题:\n{context}\n问题: {question}" + else: + template = """Answer the question based only on the following context: +{context} +Question: {question} + """ + prompt = ChatPromptTemplate.from_template(template) + final_prompt = prompt.format(context=doc.text, question=input.initial_query) + statistics_dict["opea_service@reranking_mosec_xeon"].append_latency(time.time() - start, None) + return LLMParamsDoc(query=final_prompt.strip()) + + +if __name__ == "__main__": + mosec_reranking_endpoint = os.getenv("MOSEC_RERANKING_ENDPOINT", "http://localhost:8080") + opea_microservices["opea_service@reranking_mosec_xeon"].start() diff --git a/tests/test_reranks_langchain-mosec.sh b/tests/test_reranks_langchain-mosec.sh new file mode 100644 index 000000000..899db5122 --- /dev/null +++ b/tests/test_reranks_langchain-mosec.sh @@ -0,0 +1,65 @@ +#!/bin/bash +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +set -xe + +WORKPATH=$(dirname "$PWD") +ip_address=$(hostname -I | awk '{print $1}') + +function build_mosec_docker_images() { + cd $WORKPATH + echo $(pwd) + docker build --build-arg http_proxy=$http_proxy --build-arg https_proxy=$https_proxy --no-cache -t reranking-langchain-mosec:comps -f comps/reranks/langchain-mosec/mosec-docker/Dockerfile . +} + +function build_docker_images() { + cd $WORKPATH + echo $(pwd) + docker build --build-arg http_proxy=$http_proxy --build-arg https_proxy=$https_proxy --no-cache -t opea/reranking-langchain-mosec:comps -f comps/reranks/langchain-mosec/docker/Dockerfile . +} + +function start_service() { + mosec_endpoint=5006 + model="BAAI/bge-reranker-large" + unset http_proxy + docker run -d --name="test-comps-reranking-langchain-mosec-endpoint" -p $mosec_endpoint:8000 reranking-langchain-mosec:comps + export MOSEC_RERANKING_ENDPOINT="http://${ip_address}:${mosec_endpoint}" + mosec_service_port=5007 + docker run -d --name="test-comps-reranking-langchain-mosec-server" -e http_proxy=$http_proxy -e https_proxy=$https_proxy -p ${mosec_service_port}:8000 --ipc=host -e MOSEC_RERANKING_ENDPOINT=$MOSEC_RERANKING_ENDPOINT opea/reranking-langchain-mosec:comps + sleep 3m +} + +function validate_microservice() { + mosec_service_port=5007 + http_proxy="" curl http://${ip_address}:${mosec_service_port}/v1/reranking\ + -X POST \ + -d '{"initial_query":"What is Deep Learning?", "retrieved_docs": [{"text":"Deep Learning is not..."}, {"text":"Deep learning is..."}]}' \ + -H 'Content-Type: application/json' + docker logs test-comps-reranking-langchain-mosec-server + docker logs test-comps-reranking-langchain-mosec-endpoint +} + +function stop_docker() { + cid=$(docker ps -aq --filter "name=test-comps-reranking-langchain-mosec-*") + if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi +} + +function main() { + + stop_docker + + build_mosec_docker_images + + build_docker_images + + start_service + + validate_microservice + + stop_docker + echo y | docker system prune + +} + +main