Skip to content

Commit

Permalink
Support Describe log dirs (dpkp#145)
Browse files Browse the repository at this point in the history
I implemented API KEY 35 from the official Apache Kafka documentation. This functionality is requested in issue # 2163 and this is an implementation proposal.

Co-authored-by: chopatate <[email protected]>
  • Loading branch information
wbarnha and Courouge committed Mar 19, 2024
1 parent a856dc4 commit 2f2ccb1
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 1 deletion.
18 changes: 17 additions & 1 deletion kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
DeleteGroupsRequest
DeleteGroupsRequest, DescribeLogDirsRequest
)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
Expand Down Expand Up @@ -1342,3 +1342,19 @@ def _wait_for_futures(self, futures):

if future.failed():
raise future.exception # pylint: disable-msg=raising-bad-type

def describe_log_dirs(self):
"""Send a DescribeLogDirsRequest request to a broker.
:return: A message future
"""
version = self._matching_api_version(DescribeLogDirsRequest)
if version <= 1:
request = DescribeLogDirsRequest[version]()
future = self._send_request_to_node(self._client.least_loaded_node(), request)
self._wait_for_futures([future])
else:
raise NotImplementedError(
"Support for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient."
.format(version))
return future.value
42 changes: 42 additions & 0 deletions kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,48 @@ class DescribeConfigsRequest_v2(Request):
]


class DescribeLogDirsResponse_v0(Response):
API_KEY = 35
API_VERSION = 0
FLEXIBLE_VERSION = True
SCHEMA = Schema(
('throttle_time_ms', Int32),
('log_dirs', Array(
('error_code', Int16),
('log_dir', String('utf-8')),
('topics', Array(
('name', String('utf-8')),
('partitions', Array(
('partition_index', Int32),
('partition_size', Int64),
('offset_lag', Int64),
('is_future_key', Boolean)
))
))
))
)


class DescribeLogDirsRequest_v0(Request):
API_KEY = 35
API_VERSION = 0
RESPONSE_TYPE = DescribeLogDirsResponse_v0
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
('partitions', Int32)
))
)


DescribeLogDirsResponse = [
DescribeLogDirsResponse_v0,
]
DescribeLogDirsRequest = [
DescribeLogDirsRequest_v0,
]


class SaslAuthenticateResponse_v0(Response):
API_KEY = 36
API_VERSION = 0
Expand Down

0 comments on commit 2f2ccb1

Please sign in to comment.