Skip to content

Commit

Permalink
feat: [cherry-pick] Support clustering compaction (#2220)
Browse files Browse the repository at this point in the history
#2015 #2219

---------

Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored Aug 8, 2024
1 parent a441516 commit afbd420
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 10 deletions.
10 changes: 8 additions & 2 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1546,13 +1546,19 @@ def load_balance(
check_status(status)

@retry_on_rpc_failure()
def compact(self, collection_name: str, timeout: Optional[float] = None, **kwargs) -> int:
def compact(
self,
collection_name: str,
is_clustering: Optional[bool] = False,
timeout: Optional[float] = None,
**kwargs,
) -> int:
request = Prepare.describe_collection_request(collection_name)
rf = self._stub.DescribeCollection.future(request, timeout=timeout)
response = rf.result()
check_status(response.status)

req = Prepare.manual_compaction(response.collectionID)
req = Prepare.manual_compaction(response.collectionID, is_clustering)
future = self._stub.ManualCompaction.future(req, timeout=timeout)
response = future.result()
check_status(response.status)
Expand Down
6 changes: 5 additions & 1 deletion pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,12 +962,16 @@ def load_balance_request(
)

@classmethod
def manual_compaction(cls, collection_id: int):
def manual_compaction(cls, collection_id: int, is_clustering: bool):
if collection_id is None or not isinstance(collection_id, int):
raise ParamError(message=f"collection_id value {collection_id} is illegal")

if is_clustering is None or not isinstance(is_clustering, bool):
raise ParamError(message=f"is_clustering value {is_clustering} is illegal")

request = milvus_types.ManualCompactionRequest()
request.collectionID = collection_id
request.majorCompaction = is_clustering

return request

Expand Down
20 changes: 16 additions & 4 deletions pymilvus/client/stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,13 +1044,16 @@ def load_balance(
**kwargs,
)

def compact(self, collection_name, timeout=None, **kwargs) -> int:
def compact(self, collection_name, is_clustering=False, timeout=None, **kwargs) -> int:
"""
Do compaction for the collection.
:param collection_name: The collection name to compact
:type collection_name: str
:param is_clustering: trigger clustering compaction
:type is_clustering: bool
:param timeout: The timeout for this method, unit: second
:type timeout: int
Expand All @@ -1060,15 +1063,22 @@ def compact(self, collection_name, timeout=None, **kwargs) -> int:
:raises MilvusException: If collection name not exist.
"""
with self._connection() as handler:
return handler.compact(collection_name, timeout=timeout, **kwargs)
return handler.compact(
collection_name, is_clustering=is_clustering, timeout=timeout, **kwargs
)

def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> CompactionState:
def get_compaction_state(
self, compaction_id: int, is_clustering=False, timeout=None, **kwargs
) -> CompactionState:
"""
Get compaction states of a targeted compaction id
:param compaction_id: the id returned by compact
:type compaction_id: int
:param is_clustering: get clustering compaction
:type is_clustering: bool
:param timeout: The timeout for this method, unit: second
:type timeout: int
Expand All @@ -1079,7 +1089,9 @@ def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> Co
"""

with self._connection() as handler:
return handler.get_compaction_state(compaction_id, timeout=timeout, **kwargs)
return handler.get_compaction_state(
compaction_id, is_clustering=is_clustering, timeout=timeout, **kwargs
)

def wait_for_compaction_completed(
self, compaction_id: int, timeout=None, **kwargs
Expand Down
32 changes: 29 additions & 3 deletions pymilvus/orm/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1491,37 +1491,57 @@ def drop_index(self, timeout: Optional[float] = None, **kwargs):
**copy_kwargs,
)

def compact(self, timeout: Optional[float] = None, **kwargs):
def compact(
self, is_clustering: Optional[bool] = False, timeout: Optional[float] = None, **kwargs
):
"""Compact merge the small segments in a collection
Args:
timeout (``float``, optional): An optional duration of time in seconds to allow
for the RPC. When timeout is set to None, client waits until server response
or error occur.
is_clustering (``bool``, optional): Option to trigger clustering compaction.
Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
self.compaction_id = conn.compact(self._name, timeout=timeout, **kwargs)
if is_clustering:
self.clustering_compaction_id = conn.compact(
self._name, is_clustering=is_clustering, timeout=timeout, **kwargs
)
else:
self.compaction_id = conn.compact(
self._name, is_clustering=is_clustering, timeout=timeout, **kwargs
)

def get_compaction_state(self, timeout: Optional[float] = None, **kwargs) -> CompactionState:
def get_compaction_state(
self, timeout: Optional[float] = None, is_clustering: Optional[bool] = False, **kwargs
) -> CompactionState:
"""Get the current compaction state
Args:
timeout (``float``, optional): An optional duration of time in seconds to allow
for the RPC. When timeout is set to None, client waits until server response
or error occur.
is_clustering (``bool``, optional): Option to get clustering compaction state.
Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
if is_clustering:
return conn.get_compaction_state(
self.clustering_compaction_id, timeout=timeout, **kwargs
)
return conn.get_compaction_state(self.compaction_id, timeout=timeout, **kwargs)

def wait_for_compaction_completed(
self,
timeout: Optional[float] = None,
is_clustering: Optional[bool] = False,
**kwargs,
) -> CompactionState:
"""Block until the current collection's compaction completed
Expand All @@ -1531,10 +1551,16 @@ def wait_for_compaction_completed(
for the RPC. When timeout is set to None, client waits until server response
or error occur.
is_clustering (``bool``, optional): Option to get clustering compaction state.
Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
if is_clustering:
return conn.wait_for_compaction_completed(
self.clustering_compaction_id, timeout=timeout, **kwargs
)
return conn.wait_for_compaction_completed(self.compaction_id, timeout=timeout, **kwargs)

def get_compaction_plans(self, timeout: Optional[float] = None, **kwargs) -> CompactionPlans:
Expand Down

0 comments on commit afbd420

Please sign in to comment.