Skip to content

Commit

Permalink
Use FlushTs and ColelctionName in GetFlushState
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper committed Sep 7, 2023
1 parent 2964d41 commit 619d110
Show file tree
Hide file tree
Showing 11 changed files with 2,355 additions and 2,611 deletions.
14 changes: 7 additions & 7 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1216,8 +1216,8 @@ def get_collection_stats(self, collection_name: str, timeout: Optional[float] =
raise MilvusException(status.error_code, status.reason)

@retry_on_rpc_failure()
def get_flush_state(self, segment_ids: List[int], timeout: Optional[float] = None, **kwargs):
req = Prepare.get_flush_state_request(segment_ids)
def get_flush_state(self, collection_name: str, flush_ts: int, timeout: Optional[float] = None, **kwargs):
req = Prepare.get_flush_state_request(collection_name, flush_ts)
future = self._stub.GetFlushState.future(req, timeout=timeout)
response = future.result()
status = response.status
Expand All @@ -1238,14 +1238,14 @@ def get_persistent_segment_infos(
return response.infos # todo: A wrapper class of PersistentSegmentInfo
raise MilvusException(status.error_code, status.reason)

def _wait_for_flushed(self, segment_ids: List[int], timeout: Optional[float] = None, **kwargs):
def _wait_for_flushed(self, collection_name: str, flush_ts: int, timeout: Optional[float] = None, **kwargs):
flush_ret = False
start = time.time()
while not flush_ret:
flush_ret = self.get_flush_state(segment_ids, timeout, **kwargs)
flush_ret = self.get_flush_state(collection_name, flush_ts, timeout, **kwargs)
end = time.time()
if timeout is not None and end - start > timeout:
raise MilvusException(message=f"wait for flush timeout, segment ids: {segment_ids}")
raise MilvusException(message=f"wait for flush timeout, collection: {collection_name}")

if not flush_ret:
time.sleep(0.5)
Expand All @@ -1266,8 +1266,8 @@ def flush(self, collection_names: list, timeout: Optional[float] = None, **kwarg

def _check():
for collection_name in collection_names:
segment_ids = future.result().coll_segIDs[collection_name].data
self._wait_for_flushed(segment_ids, timeout=timeout)
flush_ts = future.result().coll_flush_ts[collection_name]
self._wait_for_flushed(collection_name, flush_ts, timeout=timeout)

if kwargs.get("_async", False):
flush_future = FlushFuture(future)
Expand Down
4 changes: 2 additions & 2 deletions pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,8 +764,8 @@ def get_persistent_segment_info_request(cls, collection_name: str):
return milvus_types.GetPersistentSegmentInfoRequest(collectionName=collection_name)

@classmethod
def get_flush_state_request(cls, segment_ids: List[int]):
return milvus_types.GetFlushStateRequest(segmentIDs=segment_ids)
def get_flush_state_request(cls, collection_name: str, flush_ts: int):
return milvus_types.GetFlushStateRequest(collection_name=collection_name, flush_ts=flush_ts)

@classmethod
def get_query_segment_info_request(cls, collection_name: str):
Expand Down
123 changes: 61 additions & 62 deletions pymilvus/grpc_gen/common_pb2.py

Large diffs are not rendered by default.

Loading

0 comments on commit 619d110

Please sign in to comment.