Skip to content

Commit

Permalink
Fix memory leak. Subscribe on key changes only when we need it (sonic…
Browse files Browse the repository at this point in the history
…-net#6)

* Subsritbe to keyspace events when required. Otherwise memory leak
  • Loading branch information
pavel-shirshov authored Apr 28, 2017
1 parent ebb5ec6 commit 4cf7a59
Showing 1 changed file with 29 additions and 8 deletions.
37 changes: 29 additions & 8 deletions src/swsssdk/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,20 @@ def wrapped(inst, db_name, *args, **kwargs):
attempts = 0
while True:
try:
return f(inst, db_name, *args, **kwargs)
ret_data = f(inst, db_name, *args, **kwargs)
inst._unsubscribe_keyspace_notification(db_name)
return ret_data
except UnavailableDataError as e:
if blocking:
inst._unavailable_data_handler(db_name, e.data)
if db_name in inst.keyspace_notification_channels:
result = inst._unavailable_data_handler(db_name, e.data)
if result:
continue # received updates, try to read data again
else:
inst._unsubscribe_keyspace_notification(db_name)
raise # No updates was received. Raise exception
else: # Subscribe to updates and try it again (avoiding race condition)
inst._subscribe_keyspace_notification(db_name)
else:
return None
except redis.exceptions.ResponseError:
Expand Down Expand Up @@ -193,7 +203,6 @@ def _onetime_connect(self, db_name):
# Enable the notification mechanism for keyspace events in Redis
client.config_set('notify-keyspace-events', self.KEYSPACE_EVENTS)
self.redis_clients[db_name] = client
self._subscribe_keyspace_notification(db_name, client)

def _persistent_connect(self, db_name):
"""
Expand All @@ -219,14 +228,25 @@ def close(self, db_name):
if db_name in self.keyspace_notification_channels:
self.keyspace_notification_channels[db_name].close()

def _subscribe_keyspace_notification(self, db_name, client):
def _subscribe_keyspace_notification(self, db_name):
"""
Subscribe the chosent client to keyspace event notifications
"""
logger.debug("Subscribe to keyspace notification")
client = self.redis_clients[db_name]
pubsub = client.pubsub()
pubsub.psubscribe(self.KEYSPACE_PATTERN)
self.keyspace_notification_channels[db_name] = pubsub

def _unsubscribe_keyspace_notification(self, db_name):
"""
Unsubscribe the chosent client from keyspace event notifications
"""
if db_name in self.keyspace_notification_channels:
logger.debug("Unsubscribe from keyspace notification")
self.keyspace_notification_channels[db_name].close()
del self.keyspace_notification_channels[db_name]

def get_redis_client(self, db_name):
"""
:param db_name: Name of the DB to query
Expand Down Expand Up @@ -260,9 +280,9 @@ def get(self, db_name, _hash, key):
client = self.redis_clients[db_name]
val = client.hget(_hash, key)
if not val:
message = "Key '{}' unavailable in database '{}' - table '{}'".format(key, _hash, db_name)
message = "Key '{}' field '{}' unavailable in database '{}'".format(_hash, key, db_name)
logger.warning(message)
raise UnavailableDataError(message, key)
raise UnavailableDataError(message, _hash)
else:
# redis only supports strings. if any item is set to string 'None', cast it back to the appropriate type.
return None if val == b'None' else val
Expand All @@ -278,7 +298,7 @@ def get_all(self, db_name, _hash):
client = self.redis_clients[db_name]
table = client.hgetall(_hash)
if not table:
message = "Table '{}' does not exist in database '{}'".format(_hash, db_name)
message = "Key '{}' unavailable in database '{}'".format(_hash, db_name)
logger.warning(message)
raise UnavailableDataError(message, _hash)
else:
Expand Down Expand Up @@ -309,9 +329,10 @@ def _unavailable_data_handler(self, db_name, data):
logger.info("'{}' acquired via pub-sub. Unblocking...".format(data, db_name))
# Wait for a "settling" period before releasing the wait.
time.sleep(self.DATA_RETRIEVAL_WAIT_TIME)
return
return True

logger.warning("No notification for '{}' from '{}' received before timeout.".format(data, db_name))
return False

def _connection_error_handler(self, db_name):
"""
Expand Down

0 comments on commit 4cf7a59

Please sign in to comment.