diff --git a/pymemcache/client/base.py b/pymemcache/client/base.py index 4db03b81..a0816cf0 100644 --- a/pymemcache/client/base.py +++ b/pymemcache/client/base.py @@ -1039,6 +1039,9 @@ class PooledClient(object): max_pool_size: maximum pool size to use (going above this amount triggers a runtime error), by default this is 2147483648L when not provided (or none). + pool_idle_timeout: pooled connections are discarded if they have been + unused for this many seconds. A value of 0 indicates + that pooled connections are never discarded. lock_generator: a callback/type that takes no arguments that will be called to create a lock or semaphore that can protect the pool from concurrent access (for example a @@ -1065,6 +1068,7 @@ def __init__(self, socket_module=socket, key_prefix=b'', max_pool_size=None, + pool_idle_timeout=0, lock_generator=None, default_noreply=True, allow_unicode_keys=False, @@ -1088,6 +1092,7 @@ def __init__(self, self._create_client, after_remove=lambda client: client.close(), max_size=max_pool_size, + idle_timeout=pool_idle_timeout, lock_generator=lock_generator) self.encoding = encoding self.tls_context = tls_context diff --git a/pymemcache/client/hash.py b/pymemcache/client/hash.py index c2adb893..09b1e575 100644 --- a/pymemcache/client/hash.py +++ b/pymemcache/client/hash.py @@ -36,6 +36,7 @@ def __init__( socket_module=socket, key_prefix=b'', max_pool_size=None, + pool_idle_timeout=0, lock_generator=None, retry_attempts=2, retry_timeout=1, @@ -104,6 +105,7 @@ def __init__( if use_pooling is True: self.default_kwargs.update({ 'max_pool_size': max_pool_size, + 'pool_idle_timeout': pool_idle_timeout, 'lock_generator': lock_generator }) diff --git a/pymemcache/pool.py b/pymemcache/pool.py index f800f90c..ddb8825e 100644 --- a/pymemcache/pool.py +++ b/pymemcache/pool.py @@ -16,6 +16,7 @@ import contextlib import sys import threading +import time import six @@ -25,6 +26,7 @@ class ObjectPool(object): def __init__(self, obj_creator, after_remove=None, max_size=None, + idle_timeout=0, lock_generator=None): self._used_objs = collections.deque() self._free_objs = collections.deque() @@ -38,6 +40,8 @@ def __init__(self, obj_creator, if not isinstance(max_size, six.integer_types) or max_size < 0: raise ValueError('"max_size" must be a positive integer') self.max_size = max_size + self.idle_timeout = idle_timeout + self._idle_clock = time.time if idle_timeout else int @property def used(self): @@ -63,19 +67,27 @@ def get_and_release(self, destroy_on_fail=False): def get(self): with self._lock: - if not self._free_objs: + # Find a free object, removing any that have idled for too long. + now = self._idle_clock() + while self._free_objs: + obj = self._free_objs.popleft() + if now - obj._last_used <= self.idle_timeout: + break + + if self._after_remove is not None: + self._after_remove(obj) + else: + # No free objects, create a new one. curr_count = len(self._used_objs) if curr_count >= self.max_size: raise RuntimeError("Too many objects," " %s >= %s" % (curr_count, self.max_size)) obj = self._obj_creator() - self._used_objs.append(obj) - return obj - else: - obj = self._free_objs.pop() - self._used_objs.append(obj) - return obj + + self._used_objs.append(obj) + obj._last_used = now + return obj def destroy(self, obj, silent=True): was_dropped = False @@ -94,6 +106,7 @@ def release(self, obj, silent=True): try: self._used_objs.remove(obj) self._free_objs.append(obj) + obj._last_used = self._idle_clock() except ValueError: if not silent: raise diff --git a/pymemcache/test/test_client.py b/pymemcache/test/test_client.py index bdaf233a..24ecab21 100644 --- a/pymemcache/test/test_client.py +++ b/pymemcache/test/test_client.py @@ -1255,6 +1255,38 @@ class MyClient(Client): assert isinstance(client.client_pool.get(), MyClient) +class TestPooledClientIdleTimeout(ClientTestMixin, unittest.TestCase): + def make_client(self, mock_socket_values, **kwargs): + mock_client = Client(None, **kwargs) + mock_client.sock = MockSocket(list(mock_socket_values)) + client = PooledClient(None, pool_idle_timeout=60, **kwargs) + client.client_pool = pool.ObjectPool(lambda: mock_client) + return client + + def test_free_idle(self): + class Counter(object): + count = 0 + + def increment(self, obj): + self.count += 1 + + removed = Counter() + + client = self.make_client([b'VALUE key 0 5\r\nvalue\r\nEND\r\n']*2) + client.client_pool._after_remove = removed.increment + client.client_pool._idle_clock = lambda: 0 + + client.set(b'key', b'value') + assert removed.count == 0 + client.get(b'key') + assert removed.count == 0 + + # Advance clock to beyond the idle timeout. + client.client_pool._idle_clock = lambda: 61 + client.get(b'key') + assert removed.count == 1 + + class TestMockClient(ClientTestMixin, unittest.TestCase): def make_client(self, mock_socket_values, **kwargs): client = MockMemcacheClient(None, **kwargs)