From 54cbd63a275e781c9db98b57d6daa36a15eaa0ff Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 10 Mar 2024 13:05:12 -0400 Subject: [PATCH] client_async: Allow throwing an exception upon socket error during (#134) wakeup When wakeup() is called, we sometime notice that we get an endless prints: "Unable to send to wakeup socket!". Those prints are spamming the logs. This commit aims to address it by allowing restating the application via an intentional exception raise. This behavior is configurable and its default is backward compatible. Signed-off-by: shimon-armis Co-authored-by: shimon-armis --- kafka/client_async.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 530a1f441..3076c4ba0 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -154,6 +154,8 @@ class KafkaClient(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + raise_upon_socket_err_during_wakeup (bool): If set to True, raise an exception + upon socket error during wakeup(). Default: False """ DEFAULT_CONFIG = { @@ -192,7 +194,8 @@ class KafkaClient(object): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None + 'sasl_oauth_token_provider': None, + 'raise_upon_socket_err_during_wakeup': False } def __init__(self, **configs): @@ -243,6 +246,8 @@ def __init__(self, **configs): check_timeout = self.config['api_version_auto_timeout_ms'] / 1000 self.config['api_version'] = self.check_version(timeout=check_timeout) + self._raise_upon_socket_err_during_wakeup = self.config['raise_upon_socket_err_during_wakeup'] + def _can_bootstrap(self): effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts backoff_factor = 2 ** effective_failures @@ -936,8 +941,10 @@ def wakeup(self): except socket.timeout: log.warning('Timeout to send to wakeup socket!') raise Errors.KafkaTimeoutError() - except socket.error: + except socket.error as e: log.warning('Unable to send to wakeup socket!') + if self._raise_upon_socket_err_during_wakeup: + raise e def _clear_wake_fd(self): # reading from wake socket should only happen in a single thread