Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support major compaction in ManualCompaction #2015

Merged
merged 1 commit into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1525,13 +1525,19 @@ def load_balance(
check_status(status)

@retry_on_rpc_failure()
def compact(self, collection_name: str, timeout: Optional[float] = None, **kwargs) -> int:
def compact(
self,
collection_name: str,
timeout: Optional[float] = None,
is_major: Optional[bool] = False,
**kwargs,
) -> int:
request = Prepare.describe_collection_request(collection_name)
rf = self._stub.DescribeCollection.future(request, timeout=timeout)
response = rf.result()
check_status(response.status)

req = Prepare.manual_compaction(response.collectionID)
req = Prepare.manual_compaction(response.collectionID, is_major)
future = self._stub.ManualCompaction.future(req, timeout=timeout)
response = future.result()
check_status(response.status)
Expand Down
3 changes: 2 additions & 1 deletion pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -961,12 +961,13 @@ def load_balance_request(
)

@classmethod
def manual_compaction(cls, collection_id: int):
def manual_compaction(cls, collection_id: int, is_major: bool):
if collection_id is None or not isinstance(collection_id, int):
raise ParamError(message=f"collection_id value {collection_id} is illegal")

request = milvus_types.ManualCompactionRequest()
request.collectionID = collection_id
request.majorCompaction = is_major

return request

Expand Down
18 changes: 14 additions & 4 deletions pymilvus/client/stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ def load_balance(
**kwargs,
)

def compact(self, collection_name, timeout=None, **kwargs) -> int:
def compact(self, collection_name, timeout=None, is_major=False, **kwargs) -> int:
"""
Do compaction for the collection.

Expand All @@ -1054,15 +1054,20 @@ def compact(self, collection_name, timeout=None, **kwargs) -> int:
:param timeout: The timeout for this method, unit: second
:type timeout: int

:param is_major: trigger major compaction
:type is_major: bool

:return: the compaction ID
:rtype: int

:raises MilvusException: If collection name not exist.
"""
with self._connection() as handler:
return handler.compact(collection_name, timeout=timeout, **kwargs)
return handler.compact(collection_name, timeout=timeout, is_major=is_major, **kwargs)

def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> CompactionState:
def get_compaction_state(
self, compaction_id: int, timeout=None, is_major=False, **kwargs
) -> CompactionState:
"""
Get compaction states of a targeted compaction id

Expand All @@ -1072,14 +1077,19 @@ def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> Co
:param timeout: The timeout for this method, unit: second
:type timeout: int

:param is_major: get major compaction
:type is_major: bool

:return: the state of the compaction
:rtype: CompactionState

:raises MilvusException: If compaction_id doesn't exist.
"""

with self._connection() as handler:
return handler.get_compaction_state(compaction_id, timeout=timeout, **kwargs)
return handler.get_compaction_state(
compaction_id, timeout=timeout, is_major=is_major, **kwargs
)

def wait_for_compaction_completed(
self, compaction_id: int, timeout=None, **kwargs
Expand Down
28 changes: 25 additions & 3 deletions pymilvus/orm/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1489,37 +1489,53 @@ def drop_index(self, timeout: Optional[float] = None, **kwargs):
)
index.drop(timeout=timeout, **kwargs)

def compact(self, timeout: Optional[float] = None, **kwargs):
def compact(self, timeout: Optional[float] = None, is_major: Optional[bool] = False, **kwargs):
"""Compact merge the small segments in a collection

Args:
timeout (``float``, optional): An optional duration of time in seconds to allow
for the RPC. When timeout is set to None, client waits until server response
or error occur.

is_major (``bool``, optional): An optional setting to trigger major compaction.

Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
self.compaction_id = conn.compact(self._name, timeout=timeout, **kwargs)
if is_major:
self.major_compaction_id = conn.compact(
self._name, timeout=timeout, is_major=is_major, **kwargs
)
else:
self.compaction_id = conn.compact(
self._name, timeout=timeout, is_major=is_major, **kwargs
)

def get_compaction_state(self, timeout: Optional[float] = None, **kwargs) -> CompactionState:
def get_compaction_state(
self, timeout: Optional[float] = None, is_major: Optional[bool] = False, **kwargs
) -> CompactionState:
"""Get the current compaction state

Args:
timeout (``float``, optional): An optional duration of time in seconds to allow
for the RPC. When timeout is set to None, client waits until server response
or error occur.

is_major (``bool``, optional): An optional setting to get major compaction state.

Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
if is_major:
return conn.get_compaction_state(self.major_compaction_id, timeout=timeout, **kwargs)
return conn.get_compaction_state(self.compaction_id, timeout=timeout, **kwargs)

def wait_for_compaction_completed(
self,
timeout: Optional[float] = None,
is_major: Optional[bool] = False,
**kwargs,
) -> CompactionState:
"""Block until the current collection's compaction completed
Expand All @@ -1529,10 +1545,16 @@ def wait_for_compaction_completed(
for the RPC. When timeout is set to None, client waits until server response
or error occur.

is_major (``bool``, optional): An optional setting to get major compaction state.

Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
if is_major:
return conn.wait_for_compaction_completed(
self.major_compaction_id, timeout=timeout, **kwargs
)
return conn.wait_for_compaction_completed(self.compaction_id, timeout=timeout, **kwargs)

def get_compaction_plans(self, timeout: Optional[float] = None, **kwargs) -> CompactionPlans:
Expand Down