Skip to content

Commit

Permalink
Support database API (milvus-io#1401)
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 authored May 5, 2023
1 parent 6cada4d commit 6bb23e3
Show file tree
Hide file tree
Showing 14 changed files with 595 additions and 353 deletions.
136 changes: 136 additions & 0 deletions examples/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import random

from pymilvus import (
connections,
FieldSchema, CollectionSchema, DataType,
Collection,
db,
)
from pymilvus.orm import utility

_HOST = '127.0.0.1'
_PORT = '19530'
_ROOT = "root"
_ROOT_PASSWORD = "Milvus"
_METRIC_TYPE = 'IP'
_INDEX_TYPE = 'IVF_FLAT'
_NLIST = 1024
_NPROBE = 16
_TOPK = 3

# Vector parameters
_DIM = 128
_INDEX_FILE_SIZE = 32 # max file size of stored index


def connect_to_milvus(db_name="default"):
print(f"connect to milvus\n")
connections.connect(host=_HOST,
port=_PORT,
user=_ROOT,
password=_ROOT_PASSWORD,
db_name=db_name,
)


def create_collection(collection_name, db_name):
default_fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="double", dtype=DataType.DOUBLE),
FieldSchema(name="fv", dtype=DataType.FLOAT_VECTOR, dim=128)
]
default_schema = CollectionSchema(fields=default_fields)
print(f"Create collection:{collection_name} within db:{db_name}")
return Collection(name=collection_name, schema=default_schema)


def insert(collection, num, dim):
data = [
[i for i in range(num)],
[float(i) for i in range(num)],
[[random.random() for _ in range(dim)] for _ in range(num)],
]
collection.insert(data)
return data[2]


def drop_index(collection):
collection.drop_index()
print("\nDrop index sucessfully")


def search(collection, vector_field, id_field, search_vectors):
search_param = {
"data": search_vectors,
"anns_field": vector_field,
"param": {"metric_type": _METRIC_TYPE, "params": {"nprobe": _NPROBE}},
"limit": _TOPK,
"expr": "id >= 0"}
results = collection.search(**search_param)
for i, result in enumerate(results):
print("\nSearch result for {}th vector: ".format(i))
for j, res in enumerate(result):
print("Top {}: {}".format(j, res))


def collection_read_write(collection, db_name):
col_name = "{}:{}".format(db_name, collection.name)
vectors = insert(collection, 10000, _DIM)
collection.flush()
print("\nInsert {} rows data into collection:{}".format(collection.num_entities, col_name))

# create index
index_param = {
"index_type": _INDEX_TYPE,
"params": {"nlist": _NLIST},
"metric_type": _METRIC_TYPE}
collection.create_index("fv", index_param)
print("\nCreated index:{} for collection:{}".format(collection.index().params, col_name))

# load data to memory
print("\nLoad collection:{}".format(col_name))
collection.load()
# search
print("\nSearch collection:{}".format(col_name))
search(collection, "fv", "id", vectors[:3])

# release memory
collection.release()
# drop collection index
collection.drop_index()
print("\nDrop collection:{}".format(col_name))


if __name__ == '__main__':
# connect to milvus and using database db1
# there will not check db1 already exists during connect
connect_to_milvus(db_name="default")

# create collection within default
col1_db1 = create_collection("col1_db1", "default")

# create db1
if "db1" not in db.list_database():
print("\ncreate database: db1")
db.create_database(db_name="db1")

# use database db1
db.using_database(db_name="db1")
# create collection within default
col2_db1 = create_collection("col1_db1", "db1")

# verify read and write
collection_read_write(col2_db1, "db1")

# list collections within db1
print("\nlist collections of database db1:")
print(utility.list_collections())

print("\ndrop collection: col1_db2 from db1")
col2_db1.drop()
print("\ndrop database: db1")
db.drop_database(db_name="db1")

# list database
print("\nlist databases:")
print(db.list_database())
4 changes: 2 additions & 2 deletions pymilvus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
list_resource_groups, transfer_node, transfer_replica
)

from .orm import utility
from .orm import utility, db

from .orm.search import SearchResult, Hits, Hit
from .orm.schema import FieldSchema, CollectionSchema
Expand All @@ -84,7 +84,7 @@
'SearchResult', 'Hits', 'Hit', 'Replica', 'Group', 'Shard',
'FieldSchema', 'CollectionSchema',
'SearchFuture', 'MutationFuture',
'utility', 'DefaultConfig', 'ExceptionsMessage', 'MilvusUnavailableException', 'BulkInsertState',
'utility', 'db', 'DefaultConfig', 'ExceptionsMessage', 'MilvusUnavailableException', 'BulkInsertState',
'Role',
'create_resource_group', 'drop_resource_group', 'describe_resource_group',
'list_resource_groups', 'transfer_node', 'transfer_replica',
Expand Down
1 change: 1 addition & 0 deletions pymilvus/client/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ def is_legal_operate_privilege_type(operate_privilege_type: Any) -> bool:
class ParamChecker(metaclass=Singleton):
def __init__(self) -> None:
self.check_dict = {
"db_name": is_legal_table_name,
"collection_name": is_legal_table_name,
"field_name": is_legal_field_name,
"dimension": is_legal_dimension,
Expand Down
26 changes: 26 additions & 0 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ def _wait_for_channel_ready(self, timeout=10):
def close(self):
self._channel.close()

def reset_db_name(self, db_name):
self._setup_db_interceptor(db_name)
self._setup_grpc_channel()

def _setup_authorization_interceptor(self, user, password):
if user and password:
authorization = base64.b64encode(f"{user}:{password}".encode('utf-8'))
Expand Down Expand Up @@ -772,6 +776,28 @@ def get_loading_progress(self, collection_name, partition_names=None, timeout=No
raise MilvusException(response.status.error_code, response.status.reason)
return response.progress

@retry_on_rpc_failure()
def create_database(self, db_name, timeout=None):
request = Prepare.create_database_req(db_name)
status = self._stub.CreateDatabase(request, timeout=timeout)
if status.error_code != 0:
raise MilvusException(status.error_code, status.reason)

@retry_on_rpc_failure()
def drop_database(self, db_name, timeout=None):
request = Prepare.drop_database_req(db_name)
status = self._stub.DropDatabase(request, timeout=timeout)
if status.error_code != 0:
raise MilvusException(status.error_code, status.reason)

@retry_on_rpc_failure()
def list_database(self, timeout=None):
request = Prepare.list_database_req()
response = self._stub.ListDatabases(request, timeout=timeout)
if response.status.error_code != 0:
raise MilvusException(response.status.error_code, response.status.reason)
return list(response.db_names)

@retry_on_rpc_failure()
def get_load_state(self, collection_name, partition_names=None, timeout=None):
request = Prepare.get_load_state(collection_name, partition_names)
Expand Down
19 changes: 19 additions & 0 deletions pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,3 +875,22 @@ def flush_all_request(cls):
@classmethod
def get_flush_all_state_request(cls, flush_all_ts):
return milvus_types.GetFlushAllStateRequest(flush_all_ts=flush_all_ts)


@classmethod
def create_database_req(cls, db_name):
check_pass_param(db_name=db_name)
req = milvus_types.CreateDatabaseRequest(db_name=db_name)
return req

@classmethod
def drop_database_req(cls, db_name):
check_pass_param(db_name=db_name)
req = milvus_types.DropDatabaseRequest(db_name=db_name)
return req

@classmethod
def list_database_req(cls):
req = milvus_types.ListDatabasesRequest()
return req

8 changes: 0 additions & 8 deletions pymilvus/grpc_gen/__init__.py

This file was deleted.

36 changes: 18 additions & 18 deletions pymilvus/grpc_gen/common_pb2.py

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pymilvus/grpc_gen/common_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ CreateAlias: MsgType
CreateCollection: MsgType
CreateCredential: MsgType
CreateCredentialFailure: ErrorCode
CreateDatabase: MsgType
CreateIndex: MsgType
CreatePartition: MsgType
CreateResourceGroup: MsgType
Expand All @@ -49,6 +50,7 @@ DescribeSegments: MsgType
DiskQuotaExhausted: ErrorCode
DropAlias: MsgType
DropCollection: MsgType
DropDatabase: MsgType
DropIndex: MsgType
DropPartition: MsgType
DropResourceGroup: MsgType
Expand Down Expand Up @@ -111,6 +113,7 @@ Insert: MsgType
InsufficientMemoryToLoad: ErrorCode
ListCredUsernames: MsgType
ListCredUsersFailure: ErrorCode
ListDatabases: MsgType
ListPolicy: MsgType
ListPolicyFailure: ErrorCode
ListResourceGroups: MsgType
Expand Down
2 changes: 1 addition & 1 deletion pymilvus/grpc_gen/milvus-proto
576 changes: 288 additions & 288 deletions pymilvus/grpc_gen/milvus_pb2.py

Large diffs are not rendered by default.

Loading

0 comments on commit 6bb23e3

Please sign in to comment.