Skip to content

Commit

Permalink
Support get/delete file in Dataprep Microservice (#254)
Browse files Browse the repository at this point in the history
* support get_file for dataprep

Signed-off-by: letonghan <[email protected]>

* add e2e test for get_file api

Signed-off-by: letonghan <[email protected]>

* support get_file in ray & llama_index

Signed-off-by: letonghan <[email protected]>

* udpate README

Signed-off-by: letonghan <[email protected]>

* update e2e tests for ray & llama_index

Signed-off-by: letonghan <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* modify port for langchain e2e test

Signed-off-by: letonghan <[email protected]>

* modify ports of ray test

Signed-off-by: letonghan <[email protected]>

* mofidy container name and ports

Signed-off-by: letonghan <[email protected]>

* update

Signed-off-by: letonghan <[email protected]>

* fix depencendy error of ray & unify container names

Signed-off-by: letonghan <[email protected]>

* modify port of 8001

Signed-off-by: letonghan <[email protected]>

* modify conflict port

Signed-off-by: letonghan <[email protected]>

* Add container log print

Signed-off-by: Chendi Xue <[email protected]>

* Add LOG_PATH define

Signed-off-by: Chendi Xue <[email protected]>

* support get link & delete file/link

Signed-off-by: letonghan <[email protected]>

* update readme and test scripts

Signed-off-by: letonghan <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix test issue

Signed-off-by: letonghan <[email protected]>

* update

Signed-off-by: letonghan <[email protected]>

* update

Signed-off-by: letonghan <[email protected]>

---------

Signed-off-by: letonghan <[email protected]>
Signed-off-by: Chendi Xue <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: chen, suyue <[email protected]>
Co-authored-by: Chendi Xue <[email protected]>
  • Loading branch information
4 people authored Jul 4, 2024
1 parent d7cdab9 commit 5d08426
Show file tree
Hide file tree
Showing 12 changed files with 504 additions and 85 deletions.
61 changes: 59 additions & 2 deletions comps/dataprep/redis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ docker build -t opea/dataprep-on-ray-redis:latest --build-arg https_proxy=$https
- option 1: Start single-process version (for 1-10 files processing)

```bash
docker run -d --name="dataprep-redis-server" -p 6007:6007 --runtime=runc --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e REDIS_URL=$REDIS_URL -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT opea/dataprep-redis:latest
docker run -d --name="dataprep-redis-server" -p 6007:6007 -p 6008:6008 -p 6009:6009 --runtime=runc --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e REDIS_URL=$REDIS_URL -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT opea/dataprep-redis:latest
```

- option 2: Start multi-process version (for >10 files processing)

```bash
docker run -d --name="dataprep-redis-server" -p 6007:6007 --runtime=runc --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e REDIS_URL=$REDIS_URL -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT -e TIMEOUT_SECONDS=600 opea/dataprep-on-ray-redis:latest
docker run -d --name="dataprep-redis-server" -p 6007:6007 -p 6008:6008 -p 6009:6009 --runtime=runc --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e REDIS_URL=$REDIS_URL -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT -e TIMEOUT_SECONDS=600 opea/dataprep-on-ray-redis:latest
```

## 2.5 Run with Docker Compose (Option B - deprecated, will move to genAIExample in future)
Expand All @@ -133,6 +133,8 @@ docker container logs -f dataprep-redis-server

# 🚀4. Consume Microservice

## 4.1 Consume Upload API

Once document preparation microservice for Redis is started, user can use below command to invoke the microservice to convert the document to embedding and save to the database.

Make sure the file path after `files=@` is correct.
Expand Down Expand Up @@ -210,3 +212,58 @@ try:
except requests.exceptions.RequestException as e:
print("An error occurred:", e)
```

## 4.2 Consume get_file API

To get uploaded file structures, use the following command:

```bash
curl -X POST \
-H "Content-Type: application/json" \
http://localhost:6008/v1/dataprep/get_file
```

Then you will get the response JSON like this:

```json
[
{
"name": "uploaded_file_1.txt",
"id": "uploaded_file_1.txt",
"type": "File",
"parent": ""
},
{
"name": "uploaded_file_2.txt",
"id": "uploaded_file_2.txt",
"type": "File",
"parent": ""
}
]
```

## 4.3 Consume delete_file API

To delete uploaded file/link, use the following command.

The `file_path` here should be the `id` get from `/v1/dataprep/get_file` API.

```bash
# delete link
curl -X POST \
-H "Content-Type: application/json" \
-d '{"file_path": "https://www.ces.tech/.txt"}' \
http://10.165.57.68:6009/v1/dataprep/delete_file

# delete file
curl -X POST \
-H "Content-Type: application/json" \
-d '{"file_path": "uploaded_file_1.txt"}' \
http://10.165.57.68:6009/v1/dataprep/delete_file

# delete all files and links
curl -X POST \
-H "Content-Type: application/json" \
-d '{"file_path": "all"}' \
http://10.165.57.68:6009/v1/dataprep/delete_file
```
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ services:
container_name: dataprep-redis-server
ports:
- "6007:6007"
- "6008:6008"
- "6009:6009"
ipc: host
environment:
no_proxy: ${no_proxy}
Expand Down
109 changes: 87 additions & 22 deletions comps/dataprep/redis/langchain/prepare_doc_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

import json
import os
import shutil
import uuid
from pathlib import Path
from typing import List, Optional, Union

from config import EMBED_MODEL, INDEX_NAME, INDEX_SCHEMA, REDIS_URL
from fastapi import File, Form, HTTPException, UploadFile
from fastapi import Body, File, Form, HTTPException, UploadFile
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings
from langchain_community.vectorstores import Redis
Expand All @@ -17,20 +18,19 @@
from pyspark import SparkConf, SparkContext

from comps import DocPath, opea_microservices, register_microservice
from comps.dataprep.utils import document_loader, get_tables_result, parse_html
from comps.dataprep.utils import (
create_upload_folder,
document_loader,
encode_filename,
get_file_structure,
get_tables_result,
parse_html,
remove_folder_with_ignore,
save_content_to_local_disk,
)

tei_embedding_endpoint = os.getenv("TEI_ENDPOINT")


async def save_file_to_local_disk(save_path: str, file):
save_path = Path(save_path)
with save_path.open("wb") as fout:
try:
content = await file.read()
fout.write(content)
except Exception as e:
print(f"Write file failed. Exception: {e}")
raise HTTPException(status_code=500, detail=f"Write file {save_path} failed. Exception: {e}")
upload_folder = "./uploaded_files/"


def ingest_data_to_redis(doc_path: DocPath):
Expand Down Expand Up @@ -84,9 +84,16 @@ def ingest_data_to_redis(doc_path: DocPath):
return True


def ingest_link_to_redis(link_list: List[str]):
async def ingest_link_to_redis(link_list: List[str]):
data_collection = parse_html(link_list)

for content, link in data_collection:
print(f"[ ingest link ] link: {link} content: {content}")
encoded_link = encode_filename(link)
save_path = upload_folder + encoded_link + ".txt"
print(f"[ ingest link ] save_path: {save_path}")
await save_content_to_local_disk(save_path, content)

texts = []
metadatas = []
for data, meta in data_collection:
Expand Down Expand Up @@ -125,19 +132,15 @@ async def ingest_documents(
):
print(f"files:{files}")
print(f"link_list:{link_list}")
if files and link_list:
raise HTTPException(status_code=400, detail="Provide either a file or a string list, not both.")

if files:
if not isinstance(files, list):
files = [files]
upload_folder = "./uploaded_files/"
if not os.path.exists(upload_folder):
Path(upload_folder).mkdir(parents=True, exist_ok=True)
uploaded_files = []
for file in files:
save_path = upload_folder + file.filename
await save_file_to_local_disk(save_path, file)
encode_file = encode_filename(file.filename)
save_path = upload_folder + encode_file
await save_content_to_local_disk(save_path, file)
ingest_data_to_redis(
DocPath(
path=save_path,
Expand Down Expand Up @@ -178,7 +181,7 @@ def process_files_wrapper(files):
link_list = json.loads(link_list) # Parse JSON string to list
if not isinstance(link_list, list):
raise HTTPException(status_code=400, detail="link_list should be a list.")
ingest_link_to_redis(link_list)
await ingest_link_to_redis(link_list)
print(f"Successfully saved link list {link_list}")
return {"status": 200, "message": "Data preparation succeeded"}
except json.JSONDecodeError:
Expand All @@ -187,5 +190,67 @@ def process_files_wrapper(files):
raise HTTPException(status_code=400, detail="Must provide either a file or a string list.")


@register_microservice(
name="opea_service@prepare_doc_redis_file", endpoint="/v1/dataprep/get_file", host="0.0.0.0", port=6008
)
@traceable(run_type="tool")
async def rag_get_file_structure():
print("[ get_file_structure] ")

if not Path(upload_folder).exists():
print("No file uploaded, return empty list.")
return []

file_content = get_file_structure(upload_folder)
return file_content


@register_microservice(
name="opea_service@prepare_doc_redis_del", endpoint="/v1/dataprep/delete_file", host="0.0.0.0", port=6009
)
@traceable(run_type="tool")
async def delete_single_file(file_path: str = Body(..., embed=True)):
"""Delete file according to `file_path`.
`file_path`:
- specific file path (e.g. /path/to/file.txt)
- folder path (e.g. /path/to/folder)
- "all": delete all files uploaded
"""
# delete all uploaded files
if file_path == "all":
print("[dataprep - del] delete all files")
remove_folder_with_ignore(upload_folder)
print("[dataprep - del] successfully delete all files.")
create_upload_folder(upload_folder)
return {"status": True}

delete_path = Path(upload_folder + "/" + encode_filename(file_path))
print(f"[dataprep - del] delete_path: {delete_path}")

# partially delete files/folders
if delete_path.exists():
# delete file
if delete_path.is_file():
try:
delete_path.unlink()
except Exception as e:
print(f"[dataprep - del] fail to delete file {delete_path}: {e}")
return {"status": False}
# delete folder
else:
try:
shutil.rmtree(delete_path)
except Exception as e:
print(f"[dataprep - del] fail to delete folder {delete_path}: {e}")
return {"status": False}
return {"status": True}
else:
raise HTTPException(status_code=404, detail="File/folder not found. Please check del_path.")


if __name__ == "__main__":
create_upload_folder(upload_folder)
opea_microservices["opea_service@prepare_doc_redis"].start()
opea_microservices["opea_service@prepare_doc_redis_file"].start()
opea_microservices["opea_service@prepare_doc_redis_del"].start()
3 changes: 2 additions & 1 deletion comps/dataprep/redis/langchain_ray/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missin
build-essential \
libgl1-mesa-glx \
libjemalloc-dev \
vim
vim \
libcairo2

RUN useradd -m -s /bin/bash user && \
mkdir -p /home/user && \
Expand Down
Loading

0 comments on commit 5d08426

Please sign in to comment.