From 0334b1debe6d138386306b3fddc598926d37341a Mon Sep 17 00:00:00 2001 From: Enwei Jiao Date: Mon, 25 Sep 2023 15:45:20 +0800 Subject: [PATCH] Change error_code to code (#1696) Signed-off-by: Enwei Jiao --- examples/iterator.py | 92 ++++++++++++++++++++++++++-------- pymilvus/client/constants.py | 2 +- pymilvus/client/prepare.py | 6 ++- pymilvus/grpc_gen/milvus-proto | 2 +- pymilvus/orm/constants.py | 2 +- pymilvus/orm/iterator.py | 2 - 6 files changed, 79 insertions(+), 27 deletions(-) diff --git a/examples/iterator.py b/examples/iterator.py index 9edd82a97..ed7d2bf54 100644 --- a/examples/iterator.py +++ b/examples/iterator.py @@ -21,31 +21,47 @@ DIM = 8 CLEAR_EXIST = False -def re_create_collection(): - if utility.has_collection(COLLECTION_NAME) and CLEAR_EXIST: - utility.drop_collection(COLLECTION_NAME) - print(f"dropped existed collection{COLLECTION_NAME}") - - fields = [ - FieldSchema(name=USER_ID, dtype=DataType.VARCHAR, is_primary=True, - auto_id=False, max_length=MAX_LENGTH), - FieldSchema(name=AGE, dtype=DataType.INT64), - FieldSchema(name=DEPOSIT, dtype=DataType.DOUBLE), - FieldSchema(name=PICTURE, dtype=DataType.FLOAT_VECTOR, dim=DIM) - ] - - schema = CollectionSchema(fields) - print(f"Create collection {COLLECTION_NAME}") - collection = Collection(COLLECTION_NAME, schema, consistency_level=CONSISTENCY_LEVEL) + +def re_create_collection(skip_data_period: bool): + if not skip_data_period: + if utility.has_collection(COLLECTION_NAME) and CLEAR_EXIST: + utility.drop_collection(COLLECTION_NAME) + print(f"dropped existed collection{COLLECTION_NAME}") + + fields = [ + FieldSchema(name=USER_ID, dtype=DataType.VARCHAR, is_primary=True, + auto_id=False, max_length=MAX_LENGTH), + FieldSchema(name=AGE, dtype=DataType.INT64), + FieldSchema(name=DEPOSIT, dtype=DataType.DOUBLE), + FieldSchema(name=PICTURE, dtype=DataType.FLOAT_VECTOR, dim=DIM) + ] + + schema = CollectionSchema(fields) + print(f"Create collection {COLLECTION_NAME}") + collection = Collection(COLLECTION_NAME, schema, consistency_level=CONSISTENCY_LEVEL, num_shards=2) + else: + collection = Collection(COLLECTION_NAME) return collection +def random_pk(filter_set: set, lower_bound: int, upper_bound: int) -> str: + ret: str = "" + while True: + candidate = str(random.randint(lower_bound, upper_bound)) + if candidate in filter_set: + continue + ret = candidate + break + return ret + + def insert_data(collection): rng = np.random.default_rng(seed=19530) batch_count = 5 + filter_set: set = {} for i in range(batch_count): entities = [ - [str(random.randint(NUM_ENTITIES * i, NUM_ENTITIES * (i + 1))) for ni in range(NUM_ENTITIES)], + [random_pk(filter_set, 0, batch_count * NUM_ENTITIES) for _ in range(NUM_ENTITIES)], [int(ni % 100) for ni in range(NUM_ENTITIES)], [float(ni) for ni in range(NUM_ENTITIES)], rng.random((NUM_ENTITIES, DIM)), @@ -54,6 +70,7 @@ def insert_data(collection): collection.flush() print(f"Finish insert batch{i}, number of entities in Milvus: {collection.num_entities}") + def prepare_index(collection): index = { "index_type": "IVF_FLAT", @@ -74,9 +91,30 @@ def prepare_data(collection): def query_iterate_collection_no_offset(collection): - expr = f"10 <= {AGE} <= 14" + expr = f"10 <= {AGE} <= 25" + + query_iterator = collection.query_iterator(expr=expr, output_fields=[USER_ID, AGE], + offset=0, batch_size=5, consistency_level=CONSISTENCY_LEVEL, + reduce_stop_for_best="false") + no_best_ids: set = set({}) + page_idx = 0 + while True: + res = query_iterator.next() + if len(res) == 0: + print("query iteration finished, close") + query_iterator.close() + break + for i in range(len(res)): + print(res[i]) + no_best_ids.add(res[i]['id']) + page_idx += 1 + print(f"page{page_idx}-------------------------") + + print("best---------------------------") query_iterator = collection.query_iterator(expr=expr, output_fields=[USER_ID, AGE], - offset=0, batch_size=5, consistency_level=CONSISTENCY_LEVEL) + offset=0, batch_size=5, consistency_level=CONSISTENCY_LEVEL, + reduce_stop_for_best="true") + best_ids: set = set({}) page_idx = 0 while True: res = query_iterator.next() @@ -86,9 +124,15 @@ def query_iterate_collection_no_offset(collection): break for i in range(len(res)): print(res[i]) + best_ids.add(res[i]['id']) page_idx += 1 print(f"page{page_idx}-------------------------") + diff = best_ids.difference(no_best_ids) + for id in diff: + print(f"diff id:{id}") + + def query_iterate_collection_with_offset(collection): expr = f"10 <= {AGE} <= 14" query_iterator = collection.query_iterator(expr=expr, output_fields=[USER_ID, AGE], @@ -105,6 +149,7 @@ def query_iterate_collection_with_offset(collection): page_idx += 1 print(f"page{page_idx}-------------------------") + def query_iterate_collection_with_limit(collection): expr = f"10 <= {AGE} <= 44" query_iterator = collection.query_iterator(expr=expr, output_fields=[USER_ID, AGE], @@ -121,6 +166,7 @@ def query_iterate_collection_with_limit(collection): page_idx += 1 print(f"page{page_idx}-------------------------") + def search_iterator_collection(collection): SEARCH_NQ = 1 DIM = 8 @@ -144,6 +190,7 @@ def search_iterator_collection(collection): page_idx += 1 print(f"page{page_idx}-------------------------") + def search_iterator_collection_with_limit(collection): SEARCH_NQ = 1 DIM = 8 @@ -167,10 +214,13 @@ def search_iterator_collection_with_limit(collection): page_idx += 1 print(f"page{page_idx}-------------------------") + def main(): + skip_data_period = False connections.connect("default", host=HOST, port=PORT) - collection = re_create_collection() - collection = prepare_data(collection) + collection = re_create_collection(skip_data_period) + if not skip_data_period: + collection = prepare_data(collection) query_iterate_collection_no_offset(collection) query_iterate_collection_with_offset(collection) query_iterate_collection_with_limit(collection) diff --git a/pymilvus/client/constants.py b/pymilvus/client/constants.py index 1ac430170..b9c659dcc 100644 --- a/pymilvus/client/constants.py +++ b/pymilvus/client/constants.py @@ -8,4 +8,4 @@ BOUNDED_TS = 2 DEFAULT_CONSISTENCY_LEVEL = ConsistencyLevel.Bounded DEFAULT_RESOURCE_GROUP = "__default_resource_group" -ITERATION_EXTENSION_REDUCE_RATE = "iteration_extension_reduce_rate" +REDUCE_STOP_FOR_BEST = "reduce_stop_for_best" diff --git a/pymilvus/client/prepare.py b/pymilvus/client/prepare.py index 3a946f9ea..31d319b5b 100644 --- a/pymilvus/client/prepare.py +++ b/pymilvus/client/prepare.py @@ -13,7 +13,7 @@ from . import blob, entity_helper, ts_utils from .check import check_pass_param, is_legal_collection_properties -from .constants import DEFAULT_CONSISTENCY_LEVEL +from .constants import DEFAULT_CONSISTENCY_LEVEL, REDUCE_STOP_FOR_BEST from .types import DataType, PlaceholderType, get_consistency_level from .utils import traverse_info, traverse_rows_info @@ -844,9 +844,13 @@ def query_request( req.query_params.append(common_types.KeyValuePair(key="offset", value=str(offset))) ignore_growing = kwargs.get("ignore_growing", False) + stop_reduce_for_best = kwargs.get(REDUCE_STOP_FOR_BEST, False) req.query_params.append( common_types.KeyValuePair(key="ignore_growing", value=str(ignore_growing)) ) + req.query_params.append( + common_types.KeyValuePair(key=REDUCE_STOP_FOR_BEST, value=str(stop_reduce_for_best)) + ) return req @classmethod diff --git a/pymilvus/grpc_gen/milvus-proto b/pymilvus/grpc_gen/milvus-proto index 23756009c..aa8a66130 160000 --- a/pymilvus/grpc_gen/milvus-proto +++ b/pymilvus/grpc_gen/milvus-proto @@ -1 +1 @@ -Subproject commit 23756009c643a2017b5bfb73e190d15dc850f4b6 +Subproject commit aa8a661302177b80869d7c84b9d3a3679b452043 diff --git a/pymilvus/orm/constants.py b/pymilvus/orm/constants.py index 57a5d1648..ddfe8aecb 100644 --- a/pymilvus/orm/constants.py +++ b/pymilvus/orm/constants.py @@ -35,8 +35,8 @@ RADIUS = "radius" RANGE_FILTER = "range_filter" FIELDS = "fields" -ITERATION_EXTENSION_REDUCE_RATE = "iteration_extension_reduce_rate" EF = "ef" +STOP_REDUCE_FOR_BEST = "stop_reduce_for_best" DEFAULT_MAX_L2_DISTANCE = 99999999.0 DEFAULT_MIN_IP_DISTANCE = -99999999.0 DEFAULT_MAX_HAMMING_DISTANCE = 99999999.0 diff --git a/pymilvus/orm/iterator.py b/pymilvus/orm/iterator.py index 4c83424e7..417fa7121 100644 --- a/pymilvus/orm/iterator.py +++ b/pymilvus/orm/iterator.py @@ -21,7 +21,6 @@ EF, FIELDS, INT64_MAX, - ITERATION_EXTENSION_REDUCE_RATE, MAX_BATCH_SIZE, MAX_FILTERED_IDS_COUNT_ITERATION, MAX_TRY_TIME, @@ -104,7 +103,6 @@ def __seek(self): first_cursor_kwargs[OFFSET] = 0 # offset may be too large, needed to seek in multiple times first_cursor_kwargs[MILVUS_LIMIT] = self._kwargs[OFFSET] - first_cursor_kwargs[ITERATION_EXTENSION_REDUCE_RATE] = 0 res = self._conn.query( collection_name=self._collection_name,