Skip to content

Commit

Permalink
client_async: Allow throwing an exception upon socket error during (#134
Browse files Browse the repository at this point in the history
)

wakeup

When wakeup() is called, we sometime notice that we get
an endless prints:
"Unable to send to wakeup socket!".

Those prints are spamming the logs.
This commit aims to address it by allowing restating the
application via an intentional exception raise.
This behavior is configurable and its default is backward compatible.

Signed-off-by: shimon-armis <[email protected]>
Co-authored-by: shimon-armis <[email protected]>
  • Loading branch information
wbarnha and shimonturjeman committed Mar 10, 2024
1 parent 18eaa2d commit 54cbd63
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ class KafkaClient(object):
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
raise_upon_socket_err_during_wakeup (bool): If set to True, raise an exception
upon socket error during wakeup(). Default: False
"""

DEFAULT_CONFIG = {
Expand Down Expand Up @@ -192,7 +194,8 @@ class KafkaClient(object):
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None
'sasl_oauth_token_provider': None,
'raise_upon_socket_err_during_wakeup': False
}

def __init__(self, **configs):
Expand Down Expand Up @@ -243,6 +246,8 @@ def __init__(self, **configs):
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
self.config['api_version'] = self.check_version(timeout=check_timeout)

self._raise_upon_socket_err_during_wakeup = self.config['raise_upon_socket_err_during_wakeup']

def _can_bootstrap(self):
effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts
backoff_factor = 2 ** effective_failures
Expand Down Expand Up @@ -936,8 +941,10 @@ def wakeup(self):
except socket.timeout:
log.warning('Timeout to send to wakeup socket!')
raise Errors.KafkaTimeoutError()
except socket.error:
except socket.error as e:
log.warning('Unable to send to wakeup socket!')
if self._raise_upon_socket_err_during_wakeup:
raise e

def _clear_wake_fd(self):
# reading from wake socket should only happen in a single thread
Expand Down

0 comments on commit 54cbd63

Please sign in to comment.