Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Enable bulkwriter to support import v2 (#2295) #2296

Merged
merged 3 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 43 additions & 24 deletions examples/example_bulkwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import tensorflow as tf

import logging

from typing import List

logging.basicConfig(level=logging.INFO)

from pymilvus import (
Expand Down Expand Up @@ -273,7 +276,7 @@ def _append_row(writer: LocalBulkWriter, begin: int, end: int):
print("Data is correct")


def all_types_writer(schema: CollectionSchema, file_type: BulkFileType)->list:
def all_types_writer(schema: CollectionSchema, file_type: BulkFileType)-> List[List[str]]:
print(f"\n===================== all field types ({file_type.name}) ====================")
with RemoteBulkWriter(
schema=schema,
Expand Down Expand Up @@ -347,31 +350,47 @@ def all_types_writer(schema: CollectionSchema, file_type: BulkFileType)->list:
return remote_writer.batch_files


def call_bulkinsert(schema: CollectionSchema, batch_files: list):
print(f"\n===================== call bulkinsert ====================")
def call_bulkinsert(schema: CollectionSchema, batch_files: List[List[str]]):
if utility.has_collection(ALL_TYPES_COLLECTION_NAME):
utility.drop_collection(ALL_TYPES_COLLECTION_NAME)

collection = Collection(name=ALL_TYPES_COLLECTION_NAME, schema=schema)
print(f"Collection '{collection.name}' created")

task_ids = []
for files in batch_files:
task_id = utility.do_bulk_insert(collection_name=ALL_TYPES_COLLECTION_NAME, files=files)
task_ids.append(task_id)
print(f"Create a bulkinert task, task id: {task_id}")
url = f"http://{HOST}:{PORT}"

while len(task_ids) > 0:
print("Wait 1 second to check bulkinsert tasks state...")
print(f"\n===================== import files to milvus ====================")
resp = bulk_import(
url=url,
collection_name=ALL_TYPES_COLLECTION_NAME,
files=batch_files,
)
print(resp.json())
job_id = resp.json()['data']['jobId']
print(f"Create a bulkinsert job, job id: {job_id}")

while True:
print("Wait 1 second to check bulkinsert job state...")
time.sleep(1)
for id in task_ids:
state = utility.get_bulk_insert_state(task_id=id)
if state.state == BulkInsertState.ImportFailed or state.state == BulkInsertState.ImportFailedAndCleaned:
print(f"The task {state.task_id} failed, reason: {state.failed_reason}")
task_ids.remove(id)
elif state.state == BulkInsertState.ImportCompleted:
print(f"The task {state.task_id} completed")
task_ids.remove(id)

print(f"\n===================== get import job progress ====================")
resp = get_import_progress(
url=url,
job_id=job_id,
)

state = resp.json()['data']['state']
progress = resp.json()['data']['progress']
if state == "Importing":
print(f"The job {job_id} is importing... {progress}%")
continue
if state == "Failed":
reason = resp.json()['data']['reason']
print(f"The job {job_id} failed, reason: {reason}")
break
if state == "Completed" and progress == 100:
print(f"The job {job_id} completed")
break

print(f"Collection row number: {collection.num_entities}")

Expand Down Expand Up @@ -427,31 +446,31 @@ def cloud_bulkinsert():
object_url_secret_key = "_your_object_storage_service_secret_key_"
resp = bulk_import(
url=url,
api_key=api_key,
collection_name=collection_name,
partition_name=partition_name,
object_url=object_url,
cluster_id=cluster_id,
api_key=api_key,
access_key=object_url_access_key,
secret_key=object_url_secret_key,
cluster_id=cluster_id,
collection_name=collection_name,
partition_name=partition_name,
)
print(resp.json())

print(f"\n===================== get import job progress ====================")
job_id = resp.json()['data']['jobId']
resp = get_import_progress(
url=url,
api_key=api_key,
job_id=job_id,
cluster_id=cluster_id,
api_key=api_key,
)
print(resp.json())

print(f"\n===================== list import jobs ====================")
resp = list_import_jobs(
url=url,
api_key=api_key,
cluster_id=cluster_id,
api_key=api_key,
page_size=10,
current_page=1,
)
Expand Down
48 changes: 33 additions & 15 deletions pymilvus/bulk_writer/bulk_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import json
import logging
from typing import List, Optional

import requests

Expand Down Expand Up @@ -77,36 +78,43 @@ def _get_request(
## bulkinsert RESTful api wrapper
def bulk_import(
url: str,
api_key: str,
object_url: str,
access_key: str,
secret_key: str,
cluster_id: str,
collection_name: str,
files: Optional[List[List[str]]] = None,
object_url: str = "",
cluster_id: str = "",
api_key: str = "",
access_key: str = "",
secret_key: str = "",
**kwargs,
) -> requests.Response:
"""call bulkinsert restful interface to import files

Args:
url (str): url of the server
object_url (str): data files url
access_key (str): access key to access the object storage
secret_key (str): secret key to access the object storage
cluster_id (str): id of a milvus instance(for cloud)
collection_name (str): name of the target collection
partition_name (str): name of the target partition
files (list of list of str): The files that contain the data to import.
A sub-list contains a single JSON or Parquet file, or a set of Numpy files.
object_url (str): The URL of the object to import.
This URL should be accessible to the S3-compatible
object storage service, such as AWS S3, GCS, Azure blob storage.
cluster_id (str): id of a milvus instance(for cloud)
api_key (str): API key to authenticate your requests.
access_key (str): access key to access the object storage
secret_key (str): secret key to access the object storage

Returns:
json: response of the restful interface
response of the restful interface
"""
request_url = url + "/v2/vectordb/jobs/import/create"

partition_name = kwargs.pop("partition_name", "")
params = {
"clusterId": cluster_id,
"collectionName": collection_name,
"partitionName": partition_name,
"files": files,
"objectUrl": object_url,
"clusterId": cluster_id,
"accessKey": access_key,
"secretKey": secret_key,
}
Expand All @@ -117,17 +125,18 @@ def bulk_import(


def get_import_progress(
url: str, api_key: str, job_id: str, cluster_id: str, **kwargs
url: str, job_id: str, cluster_id: str = "", api_key: str = "", **kwargs
) -> requests.Response:
"""get job progress

Args:
url (str): url of the server
job_id (str): a job id
cluster_id (str): id of a milvus instance(for cloud)
api_key (str): API key to authenticate your requests.

Returns:
json: response of the restful interface
response of the restful interface
"""
request_url = url + "/v2/vectordb/jobs/import/describe"

Expand All @@ -142,22 +151,31 @@ def get_import_progress(


def list_import_jobs(
url: str, api_key: str, cluster_id: str, page_size: int, current_page: int, **kwargs
url: str,
collection_name: str = "",
cluster_id: str = "",
api_key: str = "",
page_size: int = 10,
current_page: int = 1,
**kwargs,
) -> requests.Response:
"""list jobs in a cluster

Args:
url (str): url of the server
collection_name (str): name of the target collection
cluster_id (str): id of a milvus instance(for cloud)
api_key (str): API key to authenticate your requests.
page_size (int): pagination size
current_page (int): pagination

Returns:
json: response of the restful interface
response of the restful interface
"""
request_url = url + "/v2/vectordb/jobs/import/list"

params = {
"collectionName": collection_name,
"clusterId": cluster_id,
"pageSize": page_size,
"currentPage": current_page,
Expand Down