From f300364244ccd7293b06f7874accb5c409b4db6d Mon Sep 17 00:00:00 2001 From: Jordan Carlson Date: Wed, 3 Feb 2021 13:00:16 -0800 Subject: [PATCH 1/6] Remove idle connections from pool The Memcached server can be configured to drop idle connections (via the `idle_timeout` option). When this occurs, pymemcache may still try to use the connection, resulting in `MemcacheUnexpectedCloseError`. Hence the need for client-side idle timeout logic. This change was proposed in github.com/pinterest/pymemcache#114, but ultimately dropped. The current PR uses the same approach but differs slightly in the details. --- pymemcache/client/base.py | 4 ++++ pymemcache/client/hash.py | 2 ++ pymemcache/pool.py | 26 +++++++++++++++++++------- pymemcache/test/test_client.py | 29 +++++++++++++++++++++++++++++ 4 files changed, 54 insertions(+), 7 deletions(-) diff --git a/pymemcache/client/base.py b/pymemcache/client/base.py index 4db03b81..6133c7ea 100644 --- a/pymemcache/client/base.py +++ b/pymemcache/client/base.py @@ -1039,6 +1039,8 @@ class PooledClient(object): max_pool_size: maximum pool size to use (going above this amount triggers a runtime error), by default this is 2147483648L when not provided (or none). + pool_idle_timeout: pooled connections are dropped if unused for this + amount of time (default None) lock_generator: a callback/type that takes no arguments that will be called to create a lock or semaphore that can protect the pool from concurrent access (for example a @@ -1065,6 +1067,7 @@ def __init__(self, socket_module=socket, key_prefix=b'', max_pool_size=None, + pool_idle_timeout=None, lock_generator=None, default_noreply=True, allow_unicode_keys=False, @@ -1088,6 +1091,7 @@ def __init__(self, self._create_client, after_remove=lambda client: client.close(), max_size=max_pool_size, + idle_timeout=pool_idle_timeout, lock_generator=lock_generator) self.encoding = encoding self.tls_context = tls_context diff --git a/pymemcache/client/hash.py b/pymemcache/client/hash.py index c2adb893..4174d631 100644 --- a/pymemcache/client/hash.py +++ b/pymemcache/client/hash.py @@ -36,6 +36,7 @@ def __init__( socket_module=socket, key_prefix=b'', max_pool_size=None, + pool_idle_timeout=None, lock_generator=None, retry_attempts=2, retry_timeout=1, @@ -104,6 +105,7 @@ def __init__( if use_pooling is True: self.default_kwargs.update({ 'max_pool_size': max_pool_size, + 'pool_idle_timeout': pool_idle_timeout, 'lock_generator': lock_generator }) diff --git a/pymemcache/pool.py b/pymemcache/pool.py index f800f90c..8b3002e5 100644 --- a/pymemcache/pool.py +++ b/pymemcache/pool.py @@ -16,6 +16,7 @@ import contextlib import sys import threading +import time import six @@ -25,6 +26,7 @@ class ObjectPool(object): def __init__(self, obj_creator, after_remove=None, max_size=None, + idle_timeout=None, lock_generator=None): self._used_objs = collections.deque() self._free_objs = collections.deque() @@ -38,6 +40,8 @@ def __init__(self, obj_creator, if not isinstance(max_size, six.integer_types) or max_size < 0: raise ValueError('"max_size" must be a positive integer') self.max_size = max_size + self.idle_timeout = idle_timeout or 0 + self._idle_clock = time.time if idle_timeout else int @property def used(self): @@ -63,19 +67,26 @@ def get_and_release(self, destroy_on_fail=False): def get(self): with self._lock: - if not self._free_objs: + # Find a free object, removing any that have idled for too long. + now = self._idle_clock() + while self._free_objs: + obj = self._free_objs.popleft() + if now - obj._last_used > self.idle_timeout: + self._after_remove(obj) + else: + break + else: + # No free objects, create a new one. curr_count = len(self._used_objs) if curr_count >= self.max_size: raise RuntimeError("Too many objects," " %s >= %s" % (curr_count, self.max_size)) obj = self._obj_creator() - self._used_objs.append(obj) - return obj - else: - obj = self._free_objs.pop() - self._used_objs.append(obj) - return obj + + self._used_objs.append(obj) + obj._last_used = now + return obj def destroy(self, obj, silent=True): was_dropped = False @@ -94,6 +105,7 @@ def release(self, obj, silent=True): try: self._used_objs.remove(obj) self._free_objs.append(obj) + obj._last_used = self._idle_clock() except ValueError: if not silent: raise diff --git a/pymemcache/test/test_client.py b/pymemcache/test/test_client.py index bdaf233a..a90ecf43 100644 --- a/pymemcache/test/test_client.py +++ b/pymemcache/test/test_client.py @@ -1255,6 +1255,35 @@ class MyClient(Client): assert isinstance(client.client_pool.get(), MyClient) +class TestPooledClientIdleTimeout(ClientTestMixin, unittest.TestCase): + def make_client(self, mock_socket_values, **kwargs): + mock_client = Client(None, **kwargs) + mock_client.sock = MockSocket(list(mock_socket_values)) + client = PooledClient(None, pool_idle_timeout=60, **kwargs) + client.client_pool = pool.ObjectPool(lambda: mock_client) + return client + + def test_free_idle(self): + num_removed = 0 + def increment_removed(obj): + nonlocal num_removed + num_removed += 1 + + client = self.make_client([b'VALUE key 0 5\r\nvalue\r\nEND\r\n']*2) + client.client_pool._after_remove = increment_removed + client.client_pool._idle_clock = lambda: 0 + + client.set(b'key', b'value') + assert num_removed == 0 + client.get(b'key') + assert num_removed == 0 + + # Advance clock to beyond the idle timeout. + client.client_pool._idle_clock = lambda: 61 + client.get(b'key') + assert num_removed == 1 + + class TestMockClient(ClientTestMixin, unittest.TestCase): def make_client(self, mock_socket_values, **kwargs): client = MockMemcacheClient(None, **kwargs) From e02840b8ecea5b754c900b99ec588fca250334a5 Mon Sep 17 00:00:00 2001 From: Jordan Carlson Date: Thu, 27 May 2021 18:22:54 -0700 Subject: [PATCH 2/6] fixup! Remove idle connections from pool --- pymemcache/client/base.py | 5 +++-- pymemcache/pool.py | 7 ++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pymemcache/client/base.py b/pymemcache/client/base.py index 6133c7ea..a68bcff7 100644 --- a/pymemcache/client/base.py +++ b/pymemcache/client/base.py @@ -1039,8 +1039,9 @@ class PooledClient(object): max_pool_size: maximum pool size to use (going above this amount triggers a runtime error), by default this is 2147483648L when not provided (or none). - pool_idle_timeout: pooled connections are dropped if unused for this - amount of time (default None) + pool_idle_timeout: pooled connections are discarded if they have been + unused for this many seconds. A value of 0 or None + indicates that pooled connections are never discarded. lock_generator: a callback/type that takes no arguments that will be called to create a lock or semaphore that can protect the pool from concurrent access (for example a diff --git a/pymemcache/pool.py b/pymemcache/pool.py index 8b3002e5..7f6b2ff3 100644 --- a/pymemcache/pool.py +++ b/pymemcache/pool.py @@ -71,10 +71,11 @@ def get(self): now = self._idle_clock() while self._free_objs: obj = self._free_objs.popleft() - if now - obj._last_used > self.idle_timeout: - self._after_remove(obj) - else: + if now - obj._last_used <= self.idle_timeout: break + + if self._after_remove is not None: + self._after_remove(obj) else: # No free objects, create a new one. curr_count = len(self._used_objs) From 0ff84cfa995535a5c3fb070982e2cdf58acf964e Mon Sep 17 00:00:00 2001 From: Jordan Carlson Date: Fri, 28 May 2021 12:16:53 -0700 Subject: [PATCH 3/6] fixup! Remove idle connections from pool --- pymemcache/client/base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pymemcache/client/base.py b/pymemcache/client/base.py index a68bcff7..a0816cf0 100644 --- a/pymemcache/client/base.py +++ b/pymemcache/client/base.py @@ -1040,8 +1040,8 @@ class PooledClient(object): triggers a runtime error), by default this is 2147483648L when not provided (or none). pool_idle_timeout: pooled connections are discarded if they have been - unused for this many seconds. A value of 0 or None - indicates that pooled connections are never discarded. + unused for this many seconds. A value of 0 indicates + that pooled connections are never discarded. lock_generator: a callback/type that takes no arguments that will be called to create a lock or semaphore that can protect the pool from concurrent access (for example a @@ -1068,7 +1068,7 @@ def __init__(self, socket_module=socket, key_prefix=b'', max_pool_size=None, - pool_idle_timeout=None, + pool_idle_timeout=0, lock_generator=None, default_noreply=True, allow_unicode_keys=False, From aa22bc20977952da919787e195b4837e34a239e3 Mon Sep 17 00:00:00 2001 From: Jordan Carlson Date: Fri, 28 May 2021 12:52:10 -0700 Subject: [PATCH 4/6] fixup! Remove idle connections from pool --- pymemcache/client/hash.py | 2 +- pymemcache/pool.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pymemcache/client/hash.py b/pymemcache/client/hash.py index 4174d631..09b1e575 100644 --- a/pymemcache/client/hash.py +++ b/pymemcache/client/hash.py @@ -36,7 +36,7 @@ def __init__( socket_module=socket, key_prefix=b'', max_pool_size=None, - pool_idle_timeout=None, + pool_idle_timeout=0, lock_generator=None, retry_attempts=2, retry_timeout=1, diff --git a/pymemcache/pool.py b/pymemcache/pool.py index 7f6b2ff3..ddb8825e 100644 --- a/pymemcache/pool.py +++ b/pymemcache/pool.py @@ -26,7 +26,7 @@ class ObjectPool(object): def __init__(self, obj_creator, after_remove=None, max_size=None, - idle_timeout=None, + idle_timeout=0, lock_generator=None): self._used_objs = collections.deque() self._free_objs = collections.deque() @@ -40,7 +40,7 @@ def __init__(self, obj_creator, if not isinstance(max_size, six.integer_types) or max_size < 0: raise ValueError('"max_size" must be a positive integer') self.max_size = max_size - self.idle_timeout = idle_timeout or 0 + self.idle_timeout = idle_timeout self._idle_clock = time.time if idle_timeout else int @property From d37b7eac553176f6eb235ab9e7cea26fa1edd7f1 Mon Sep 17 00:00:00 2001 From: Jordan Carlson Date: Fri, 28 May 2021 13:53:25 -0700 Subject: [PATCH 5/6] fixup! Remove idle connections from pool --- pymemcache/test/test_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pymemcache/test/test_client.py b/pymemcache/test/test_client.py index a90ecf43..821b9aad 100644 --- a/pymemcache/test/test_client.py +++ b/pymemcache/test/test_client.py @@ -1265,6 +1265,7 @@ def make_client(self, mock_socket_values, **kwargs): def test_free_idle(self): num_removed = 0 + def increment_removed(obj): nonlocal num_removed num_removed += 1 From e661362304d291735a1323ac981cc1cab07e8729 Mon Sep 17 00:00:00 2001 From: Jordan Carlson Date: Fri, 28 May 2021 16:32:47 -0700 Subject: [PATCH 6/6] fixup! Remove idle connections from pool --- pymemcache/test/test_client.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pymemcache/test/test_client.py b/pymemcache/test/test_client.py index 821b9aad..24ecab21 100644 --- a/pymemcache/test/test_client.py +++ b/pymemcache/test/test_client.py @@ -1264,25 +1264,27 @@ def make_client(self, mock_socket_values, **kwargs): return client def test_free_idle(self): - num_removed = 0 + class Counter(object): + count = 0 - def increment_removed(obj): - nonlocal num_removed - num_removed += 1 + def increment(self, obj): + self.count += 1 + + removed = Counter() client = self.make_client([b'VALUE key 0 5\r\nvalue\r\nEND\r\n']*2) - client.client_pool._after_remove = increment_removed + client.client_pool._after_remove = removed.increment client.client_pool._idle_clock = lambda: 0 client.set(b'key', b'value') - assert num_removed == 0 + assert removed.count == 0 client.get(b'key') - assert num_removed == 0 + assert removed.count == 0 # Advance clock to beyond the idle timeout. client.client_pool._idle_clock = lambda: 61 client.get(b'key') - assert num_removed == 1 + assert removed.count == 1 class TestMockClient(ClientTestMixin, unittest.TestCase):