diff --git a/pymilvus/client/prepare.py b/pymilvus/client/prepare.py index b24fe76dc..f64459935 100644 --- a/pymilvus/client/prepare.py +++ b/pymilvus/client/prepare.py @@ -254,7 +254,7 @@ def partition_name(cls, collection_name, partition_name): def batch_insert_or_upsert_param(cls, collection_name, entities, partition_name, fields_info=None, isInsert=True, **kwargs): # insert_request.hash_keys and upsert_request.hash_keys won't be filled in client. It will be filled in proxy. - tag = partition_name or "_default" # should here? + tag = partition_name if isinstance(partition_name, str) else "_default" # should here? request = milvus_types.InsertRequest(collection_name=collection_name, partition_name=tag) if not isInsert: diff --git a/pymilvus/orm/collection.py b/pymilvus/orm/collection.py index 51ef84a9e..68f2b4b96 100644 --- a/pymilvus/orm/collection.py +++ b/pymilvus/orm/collection.py @@ -128,7 +128,6 @@ def __init__(self, name: str, schema: CollectionSchema=None, using: str="default def __repr__(self): _dict = { 'name': self.name, - 'partitions': self.partitions, 'description': self.description, 'schema': self._schema, } diff --git a/pymilvus/orm/partition.py b/pymilvus/orm/partition.py index 586346b95..efeecee8b 100644 --- a/pymilvus/orm/partition.py +++ b/pymilvus/orm/partition.py @@ -10,8 +10,10 @@ # or implied. See the License for the specific language governing permissions and limitations under # the License. -import copy import json +from typing import Union, List + +import pandas from ..exceptions import ( CollectionNotExistException, @@ -19,38 +21,27 @@ ExceptionsMessage, ) -from .prepare import Prepare from .search import SearchResult from .mutation import MutationResult -from .future import SearchFuture, MutationFuture from ..client.types import Replica class Partition: def __init__(self, collection, name, description="", **kwargs): - # TODO: Need a place to store the description from .collection import Collection if not isinstance(collection, Collection): raise CollectionNotExistException(message=ExceptionsMessage.CollectionType) self._collection = collection self._name = name self._description = description - self._schema = collection._schema - self._consistency_level = collection._consistency_level - self._kwargs = kwargs - conn = self._get_connection() if kwargs.get("construct_only", False): return - copy_kwargs = copy.deepcopy(kwargs) - if copy_kwargs.get("partition_name"): - copy_kwargs.pop("partition_name") - has = conn.has_partition(self._collection.name, self._name, **copy_kwargs) - if not has: - conn.create_partition(self._collection.name, self._name, **copy_kwargs) - self._schema_dict = self._schema.to_dict() - self._schema_dict["consistency_level"] = self._consistency_level + if not self._collection.has_partition(self.name, **kwargs): + conn = self._get_connection() + conn.create_partition(self._collection.name, self.name, **kwargs) + def __repr__(self): return json.dumps({ @@ -64,19 +55,12 @@ def _get_connection(self): @property def description(self) -> str: - """ Return the description text. - - :return: Partition description - :rtype: str + """str: discription of the partition. - :example: - >>> from pymilvus import connections, Collection, Partition, FieldSchema, CollectionSchema, DataType + Examples: + >>> from pymilvus import connections, Collection, Partition >>> connections.connect() - >>> schema = CollectionSchema([ - ... FieldSchema("film_id", DataType.INT64, is_primary=True), - ... FieldSchema("films", dtype=DataType.FLOAT_VECTOR, dim=2) - ... ]) - >>> collection = Collection("test_partition_description", schema) + >>> collection = Collection("test_partition_description") >>> partition = Partition(collection, "comedy", "comedy films") >>> partition.description 'comedy films' @@ -85,18 +69,12 @@ def description(self) -> str: @property def name(self) -> str: - """ - Return the partition name. + """str: name of the partition - :return str: Partition name, return when operation is successful - :example: - >>> from pymilvus import connections, Collection, Partition, FieldSchema, CollectionSchema, DataType + Examples: + >>> from pymilvus import connections, Collection, Partition >>> connections.connect() - >>> schema = CollectionSchema([ - ... FieldSchema("film_id", DataType.INT64, is_primary=True), - ... FieldSchema("films", dtype=DataType.FLOAT_VECTOR, dim=2) - ... ]) - >>> collection = Collection("test_partition_name", schema) + >>> collection = Collection("test_partition_name") >>> partition = Partition(collection, "comedy", "comedy films") >>> partition.name 'comedy' @@ -105,22 +83,12 @@ def name(self) -> str: @property def is_empty(self) -> bool: - """ - Returns whether the partition is empty + """bool: whether the partition is empty - :return bool: Whether the partition is empty - * True: The partition is empty. - * False: The partition is not empty. - - :example: - - >>> from pymilvus import connections, Collection, Partition, FieldSchema, CollectionSchema, DataType + Examples: + >>> from pymilvus import connections, Collection, Partition >>> connections.connect() - >>> schema = CollectionSchema([ - ... FieldSchema("film_id", DataType.INT64, is_primary=True), - ... FieldSchema("films", dtype=DataType.FLOAT_VECTOR, dim=2) - ... ]) - >>> collection = Collection("test_partition_is_empty", schema) + >>> collection = Collection("test_partition_is_empty") >>> partition = Partition(collection, "comedy", "comedy films") >>> partition.is_empty True @@ -129,12 +97,9 @@ def is_empty(self) -> bool: @property def num_entities(self, **kwargs) -> int: - """ - Return the number of entities. + """int: number of entities in the partition - :return int: Number of entities in this partition. - - :example: + Examples: >>> from pymilvus import connections, Collection, Partition, FieldSchema, CollectionSchema, DataType >>> connections.connect() >>> schema = CollectionSchema([ @@ -152,58 +117,56 @@ def num_entities(self, **kwargs) -> int: 10 """ conn = self._get_connection() - stats = conn.get_partition_stats(collection_name=self._collection.name, partition_name=self._name, **kwargs) + stats = conn.get_partition_stats(collection_name=self._collection.name, partition_name=self.name, **kwargs) result = {stat.key: stat.value for stat in stats} result["row_count"] = int(result["row_count"]) return result["row_count"] def flush(self, timeout=None, **kwargs): - """ Flush """ + """ Seal all segment in the collection of this partition. Inserts after flushing will be written into + new segments. Only sealed segments can be indexed. + + Args: + timeout (float): an optional duration of time in seconds to allow for the RPCs. + If timeout is not set, the client keeps waiting until the server responds or an error occurs. + """ conn = self._get_connection() conn.flush([self._collection.name], timeout=timeout, **kwargs) def drop(self, timeout=None, **kwargs): - """ - Drop the partition, as well as its corresponding index files. + """ Drop the partition, the same as Collection.drop_partition - :param timeout: An optional duration of time in seconds to allow for the RPC. When timeout - is set to None, client waits until server response or error occur - :type timeout: float + Args: + timeout (``float``, optional): an optional duration of time in seconds to allow for the RPCs. + If timeout is not set, the client keeps waiting until the server responds or an error occurs. - :raises PartitionNotExistException: - When partitoin does not exist + Raises: + PartitionNotExistException: If the partitoin doesn't exist - :example: - >>> from pymilvus import connections, Collection, Partition, FieldSchema, CollectionSchema, DataType + Examples: + >>> from pymilvus import connections, Collection, Partition >>> connections.connect() - >>> schema = CollectionSchema([ - ... FieldSchema("film_id", DataType.INT64, is_primary=True), - ... FieldSchema("films", dtype=DataType.FLOAT_VECTOR, dim=2) - ... ]) - >>> collection = Collection("test_partition_drop", schema) + >>> collection = Collection("test_partition_drop") >>> partition = Partition(collection, "comedy", "comedy films") >>> partition.drop() """ conn = self._get_connection() - if conn.has_partition(self._collection.name, self._name, timeout=timeout, **kwargs) is False: + if conn.has_partition(self._collection.name, self.name, timeout=timeout, **kwargs) is False: raise PartitionNotExistException(message=ExceptionsMessage.PartitionNotExist) - return conn.drop_partition(self._collection.name, self._name, timeout=timeout, **kwargs) - - def load(self, replica_number=1, timeout=None, **kwargs): - """ - Load the partition from disk to memory. + return conn.drop_partition(self._collection.name, self.name, timeout=timeout, **kwargs) - :param replica_number: The replication numbers to load. - :type replica_number: int + def load(self, replica_number: int=1, timeout=None, **kwargs): + """ Load the partition data into memory. - :param timeout: An optional duration of time in seconds to allow for the RPC. When timeout - is set to None, client waits until server response or error occur - :type timeout: float + Args: + replica_number (``int``, optional): The replica number to load, defaults to 1. + timeout (``float``, optional): an optional duration of time in seconds to allow for the RPCs. + If timeout is not set, the client keeps waiting until the server responds or an error occurs. - :raises ParamError: - If params are invalid + Raises: + MilvusException: If anything goes wrong - :example: + Examples: >>> from pymilvus import connections, Collection, Partition, FieldSchema, CollectionSchema, DataType >>> connections.connect() >>> schema = CollectionSchema([ @@ -214,26 +177,22 @@ def load(self, replica_number=1, timeout=None, **kwargs): >>> partition = Partition(collection, "comedy", "comedy films") >>> partition.load() """ - # TODO(yukun): If field_names is not None and not equal schema.field_names, - # raise Exception Not Supported, - # if index_names is not None, raise Exception Not Supported conn = self._get_connection() - if conn.has_partition(self._collection.name, self._name, **kwargs): - return conn.load_partitions(self._collection.name, [self._name], replica_number, timeout=timeout, **kwargs) + if conn.has_partition(self._collection.name, self.name, **kwargs): + return conn.load_partitions(self._collection.name, [self.name], replica_number, timeout=timeout, **kwargs) raise PartitionNotExistException(message=ExceptionsMessage.PartitionNotExist) def release(self, timeout=None, **kwargs): - """ - Release the partition from memory. + """ Release the partition data from memory. - :param timeout: An optional duration of time in seconds to allow for the RPC. When timeout - is set to None, client waits until server response or error occur - :type timeout: float + Args: + timeout (``float``, optional): an optional duration of time in seconds to allow for the RPCs. + If timeout is not set, the client keeps waiting until the server responds or an error occurs. - :raises PartitionNotExistException: - When partitoin does not exist + Raises: + MilvusException: If anything goes wrong - :example: + Examples: >>> from pymilvus import connections, Collection, Partition, FieldSchema, CollectionSchema, DataType >>> connections.connect() >>> schema = CollectionSchema([ @@ -247,35 +206,26 @@ def release(self, timeout=None, **kwargs): """ conn = self._get_connection() if conn.has_partition(self._collection.name, self._name, **kwargs): - return conn.release_partitions(self._collection.name, [self._name], timeout=timeout, **kwargs) + return conn.release_partitions(self._collection.name, [self.name], timeout=timeout, **kwargs) raise PartitionNotExistException(message=ExceptionsMessage.PartitionNotExist) - def insert(self, data, timeout=None, **kwargs): - """ - Insert data into partition. - - :param data: The specified data to insert, the dimension of data needs to align with column - number - :type data: list-like(list, tuple) object or pandas.DataFrame - - :param timeout: An optional duration of time in seconds to allow for the RPC. When timeout - is set to None, client waits until server response or error occur - :type timeout: float - - :param kwargs: - * *timeout* (``float``) -- - An optional duration of time in seconds to allow for the RPC. When timeout - is set to None, client waits until server response or error occur. + def insert(self, data: Union[List, pandas.DataFrame], timeout=None, **kwargs) -> MutationResult: + """ Insert data into the partition, the same as Collection.insert(data, [partition]) - :return: A MutationResult object contains a property named `insert_count` represents how many - entities have been inserted into milvus and a property named `primary_keys` is a list of primary - keys of the inserted entities. - :rtype: MutationResult - - :raises PartitionNotExistException: - When partitoin does not exist + Args: + data (``list/tuple/pandas.DataFrame``): The specified data to insert + partition_name (``str``): The partition name which the data will be inserted to, + if partition name is not passed, then the data will be inserted to "_default" partition + timeout (``float``, optional): A duration of time in seconds to allow for the RPC. Defaults to None. + If timeout is set to None, the client keeps waiting until the server responds or an error occurs. + Returns: + MutationResult: contains 2 properties `insert_count`, and, `primary_keys` + `insert_count`: how may entites have been inserted into Milvus, + `primary_keys`: list of primary keys of the inserted entities + Raises: + MilvusException: If anything goes wrong. - :example: + Examples: >>> from pymilvus import connections, Collection, Partition, FieldSchema, CollectionSchema, DataType >>> connections.connect() >>> schema = CollectionSchema([ @@ -288,40 +238,32 @@ def insert(self, data, timeout=None, **kwargs): ... [i for i in range(10)], ... [[float(i) for i in range(2)] for _ in range(10)], ... ] - >>> partition.insert(data) - >>> partition.num_entities + >>> res = partition.insert(data) + >>> res.insert_count 10 """ conn = self._get_connection() - if conn.has_partition(self._collection.name, self._name, **kwargs) is False: + if conn.has_partition(self._collection.name, self.name, **kwargs) is False: raise PartitionNotExistException(message=ExceptionsMessage.PartitionNotExist) - # TODO: check insert data schema here? - entities = Prepare.prepare_insert_or_upsert_data(data, self._collection.schema) - res = conn.batch_insert(self._collection.name, entities=entities, partition_name=self._name, - timeout=timeout, orm=True, schema=self._schema_dict, **kwargs) - if kwargs.get("_async", False): - return MutationFuture(res) - return MutationResult(res) + + return self._collection.insert(data, self.name, timeout=timeout, **kwargs) def delete(self, expr, timeout=None, **kwargs): """ Delete entities with an expression condition. - :param expr: The expression to specify entities to be deleted - :type expr: str - - :param timeout: An optional duration of time in seconds to allow for the RPC. When timeout - is set to None, client waits until server response or error occur - :type timeout: float + Args: + expr (``str``): The specified data to insert. + partition_names (``List[str]``): Name of partitions to delete entities. + timeout (``float``, optional): A duration of time in seconds to allow for the RPC. Defaults to None. + If timeout is set to None, the client keeps waiting until the server responds or an error occurs. - :return: A MutationResult object contains a property named `delete_count` represents how many - entities will be deleted. - :rtype: MutationResult + Returns: + MutationResult: contains `delete_count` properties represents how many entities might be deleted. - :raises RpcError: If gRPC encounter an error - :raises ParamError: If parameters are invalid - :raises BaseException: If the return result from server is not ok + Raises: + MilvusException: If anything goes wrong. - :example: + Examples: >>> from pymilvus import connections, Collection, Partition, FieldSchema, CollectionSchema, DataType >>> connections.connect() >>> schema = CollectionSchema([ @@ -329,51 +271,35 @@ def delete(self, expr, timeout=None, **kwargs): ... FieldSchema("films", DataType.FLOAT_VECTOR, dim=2) ... ]) >>> test_collection = Collection("test_partition_delete", schema) - >>> test_partition = test_collection.create_partition("comedy", "comedy films") + >>> test_partition = Partition(test_collection, "comedy films") >>> data = [ ... [i for i in range(10)], ... [[float(i) for i in range(2)] for _ in range(10)], ... ] >>> test_partition.insert(data) (insert count: 10, delete count: 0, upsert count: 0, timestamp: 431044482906718212) - >>> test_partition.num_entities - 10 >>> test_partition.delete("film_id in [0, 1]") (insert count: 0, delete count: 2, upsert count: 0, timestamp: 431044582560759811) """ + return self._collection.delete(expr, self.name, timeout=timeout, **kwargs) - conn = self._get_connection() - res = conn.delete(self._collection.name, expr, self.name, timeout=timeout, **kwargs) - if kwargs.get("_async", False): - return MutationFuture(res) - return MutationResult(res) - - def upsert(self, data, timeout=None, **kwargs): - """ - Upsert data into partition. - - :param data: The specified data to upsert, the dimension of data needs to align with column - number - :type data: list-like(list, tuple) object or pandas.DataFrame - - :param timeout: An optional duration of time in seconds to allow for the RPC. When timeout - is set to None, client waits until server response or error occur - :type timeout: float - - :param kwargs: - * *timeout* (``float``) -- - An optional duration of time in seconds to allow for the RPC. When timeout - is set to None, client waits until server response or error occur. - - :return: A MutationResult object contains a property named `upsert_count` represents how many - entities have been upserted at milvus and a property named `primary_keys` is a list of primary - keys of the upserted entities. - :rtype: MutationResult + def upsert(self, data: Union[List, pandas.DataFrame], timeout=None, **kwargs) -> MutationResult: + """ Upsert data into the collection. - :raises PartitionNotExistException: - When partitoin does not exist + Args: + data (``list/tuple/pandas.DataFrame``): The specified data to upsert + partition_name (``str``): The partition name which the data will be upserted at, + if partition name is not passed, then the data will be upserted in "_default" partition + timeout (``float``, optional): A duration of time in seconds to allow for the RPC. Defaults to None. + If timeout is set to None, the client keeps waiting until the server responds or an error occurs. + Returns: + MutationResult: contains 2 properties `upsert_count`, and, `primary_keys` + `upsert_count`: how may entites have been upserted at Milvus, + `primary_keys`: list of primary keys of the upserted entities + Raises: + MilvusException: If anything goes wrong. - :example: + Examples: >>> from pymilvus import connections, Collection, Partition, FieldSchema, CollectionSchema, DataType >>> connections.connect() >>> schema = CollectionSchema([ @@ -386,23 +312,18 @@ def upsert(self, data, timeout=None, **kwargs): ... [i for i in range(10)], ... [[float(i) for i in range(2)] for _ in range(10)], ... ] - >>> partition.upsert(data) - >>> partition.num_entities + >>> res = partition.upsert(data) + >>> res.upsert_count 10 """ conn = self._get_connection() - if conn.has_partition(self._collection.name, self._name, **kwargs) is False: + if conn.has_partition(self._collection.name, self.name, **kwargs) is False: raise PartitionNotExistException(message=ExceptionsMessage.PartitionNotExist) - # TODO: check upsert data schema here? - entities = Prepare.prepare_insert_or_upsert_data(data, self._collection.schema,False) - res = conn.upsert(self._collection.name, entities=entities, partition_name=self._name, - timeout=timeout, orm=True, schema=self._schema_dict, **kwargs) - if kwargs.get("_async", False): - return MutationFuture(res) - return MutationResult(res) + + return self._collection.upsert(data, self.name, timeout=timeout, **kwargs) def search(self, data, anns_field, param, limit, - expr=None, output_fields=None, timeout=None, round_decimal=-1, **kwargs): + expr=None, output_fields=None, timeout=None, round_decimal=-1, **kwargs) -> SearchResult: """ Conducts a vector similarity search with an optional boolean expression as filter. Args: @@ -494,7 +415,7 @@ def search(self, data, anns_field, param, limit, Raises: MilvusException: If anything goes wrong - :example: + Examples: >>> from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType >>> import random >>> connections.connect() @@ -529,62 +450,77 @@ def search(self, data, anns_field, param, limit, >>> print(f"- Top1 hit id: {hits[0].id}, distance: {hits[0].distance}, score: {hits[0].score} ") - Top1 hit id: 8, distance: 0.10143111646175385, score: 0.10143111646175385 """ - conn = self._get_connection() - res = conn.search(self._collection.name, data, anns_field, param, limit, expr, [self._name], output_fields, - round_decimal=round_decimal, timeout=timeout, schema=self._schema_dict, **kwargs) - if kwargs.get("_async", False): - return SearchFuture(res) - return SearchResult(res) + + return self._collection.search( + data=data, + anns_field=anns_field, + param=param, + limit=limit, + expr=expr, + partition_names=[self.name], + output_fields=output_fields, + round_decimal=round_decimal, + timeout=timeout, + **kwargs, + ) def query(self, expr, output_fields=None, timeout=None, **kwargs): - """ - Query with a set of criteria, and results in a list of records that match the query exactly. - - :param expr: The query expression - :type expr: str - - :param output_fields: A list of fields to return - :type output_fields: list[str] - - :param timeout: An optional duration of time in seconds to allow for the RPC. When timeout - is set to None, client waits until server response or error occur - :type timeout: float - - :param kwargs: - * *consistency_level* (``str/int``) -- - Which consistency level to use during a query on the collection. For details, see - https://github.com/milvus-io/milvus/blob/master/docs/developer_guides/how-guarantee-ts-works.md. - Note: this parameter will overwrite the same parameter specified when user created the collection, - if no consistency level was specified, query will use the collection consistency level. - * *guarantee_timestamp* (``int``) -- - This function instructs Milvus to see all operations performed before a provided timestamp. If no - such timestamp is specified, Milvus will query all operations performed to date. - Note: only used in Customized consistency level. - * *graceful_time* (``int``) -- - Only used in bounded consistency level. If graceful_time is set, PyMilvus will use current timestamp minus - the graceful_time as the `guarantee_timestamp`. This option is 5s by default if not set. - * *travel_timestamp* (``int``) -- - Users can specify a timestamp in a search to get results based on a data view - at a specified point in time. - - :return: A list that contains all results - :rtype: list - - :raises RpcError: If gRPC encounter an error - :raises ParamError: If parameters are invalid - :raises BaseException: If the return result from server is not ok - - :example: + """ Query with expressions + + Args: + expr (``str``): The query expression. + output_fields(``List[str]``): A list of field names to return. Defaults to None. + timeout (``float``, optional): A duration of time in seconds to allow for the RPC. Defaults to None. + If timeout is set to None, the client keeps waiting until the server responds or an error occurs. + **kwargs (``dict``, optional): + + * *consistency_level* (``str/int``, optional) + Which consistency level to use when searching in the collection. + + Options of consistency level: Strong, Bounded, Eventually, Session, Customized. + + Note: this parameter will overwrite the same parameter specified when user created the collection, + if no consistency level was specified, search will use the consistency level when you create the + collection. + + * *guarantee_timestamp* (``int``, optional) + Instructs Milvus to see all operations performed before this timestamp. + By default Milvus will search all operations performed to date. + + Note: only valid in Customized consistency level. + + * *graceful_time* (``int``, optional) + Search will use the (current_timestamp - the graceful_time) as the + `guarantee_timestamp`. By default with 5s. + + Note: only valid in Bounded consistency level + + * *travel_timestamp* (``int``, optional) + A specific timestamp to get results based on a data view at. + + * *offset* (``int``) + Combined with limit to enable pagination + + * *limit* (``int``) + Combined with limit to enable pagination + + Returns: + List, contains all results + + Raises: + MilvusException: If anything goes wrong + + Examples: >>> from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType >>> import random >>> connections.connect() - >>> schema = CollectionSchema([ ... FieldSchema("film_id", DataType.INT64, is_primary=True), ... FieldSchema("film_date", DataType.INT64), ... FieldSchema("films", dtype=DataType.FLOAT_VECTOR, dim=2) ... ]) >>> collection = Collection("test_collection_query", schema) + >>> collection.create_index("films", {"index_type": "FLAT", "metric_type": "L2", "params": {}}) >>> partition = Partition(collection, "comedy", "comedy films") >>> # insert >>> data = [ @@ -593,8 +529,6 @@ def query(self, expr, output_fields=None, timeout=None, **kwargs): ... [[random.random() for _ in range(2)] for _ in range(10)], ... ] >>> partition.insert(data) - >>> partition.num_entities - 10 >>> partition.load() >>> # query >>> expr = "film_id in [ 0, 1 ]" @@ -603,21 +537,16 @@ def query(self, expr, output_fields=None, timeout=None, **kwargs): >>> print(f"- Query results: {res}") - Query results: [{'film_id': 0, 'film_date': 2000}, {'film_id': 1, 'film_date': 2001}] """ - conn = self._get_connection() - res = conn.query(self._collection.name, expr, output_fields, [self._name], - timeout=timeout, schema=self._schema_dict, **kwargs) - return res - - def get_replicas(self, timeout=None, **kwargs) -> Replica: - """get_replicas returns the current collection's replica information - :param timeout: An optional duration of time in seconds to allow for the RPC. When timeout - is set to None, client waits until server response or error occur - :type timeout: float + return self._collection.query(expr, output_fields, partition_names=[self.name], timeout=timeout, **kwargs) - :raises BaseException: If the collection does not exist. + def get_replicas(self, timeout=None, **kwargs) -> Replica: + """Get the current loaded replica information - :example: + Args: + timeout (``float``, optional): An optional duration of time in seconds to allow for the RPC. When timeout + is set to None, client waits until server response or error occur. + Returns: + Replica: All the replica information. """ - conn = self._get_connection() - return conn.get_replicas(self._collection.name, timeout=timeout, **kwargs) + return self._collection.get_replicas(timeout=timeout, **kwargs)