Skip to content

Commit

Permalink
Remove idle connections from pool
Browse files Browse the repository at this point in the history
The Memcached server can be configured to drop idle connections (via the
`idle_timeout` option). When this occurs, pymemcache may still try to
use the connection, resulting in `MemcacheUnexpectedCloseError`. Hence
the need for client-side idle timeout logic.

This change was proposed in github.com/#114, but
ultimately dropped. The current PR uses the same approach but differs
slightly in the details.
  • Loading branch information
jwgcarlson committed Mar 4, 2021
1 parent b9e61f6 commit 12119f3
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 7 deletions.
4 changes: 4 additions & 0 deletions pymemcache/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,8 @@ class PooledClient(object):
max_pool_size: maximum pool size to use (going above this amount
triggers a runtime error), by default this is 2147483648L
when not provided (or none).
pool_idle_timeout: pooled connections are dropped if unused for this
amount of time (default None)
lock_generator: a callback/type that takes no arguments that will
be called to create a lock or semaphore that can
protect the pool from concurrent access (for example a
Expand All @@ -1062,6 +1064,7 @@ def __init__(self,
socket_module=socket,
key_prefix=b'',
max_pool_size=None,
pool_idle_timeout=None,
lock_generator=None,
default_noreply=True,
allow_unicode_keys=False,
Expand All @@ -1085,6 +1088,7 @@ def __init__(self,
self._create_client,
after_remove=lambda client: client.close(),
max_size=max_pool_size,
idle_timeout=pool_idle_timeout,
lock_generator=lock_generator)
self.encoding = encoding
self.tls_context = tls_context
Expand Down
2 changes: 2 additions & 0 deletions pymemcache/client/hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(
socket_module=socket,
key_prefix=b'',
max_pool_size=None,
pool_idle_timeout=None,
lock_generator=None,
retry_attempts=2,
retry_timeout=1,
Expand Down Expand Up @@ -104,6 +105,7 @@ def __init__(
if use_pooling is True:
self.default_kwargs.update({
'max_pool_size': max_pool_size,
'pool_idle_timeout': pool_idle_timeout,
'lock_generator': lock_generator
})

Expand Down
26 changes: 19 additions & 7 deletions pymemcache/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import contextlib
import sys
import threading
import time

import six

Expand All @@ -25,6 +26,7 @@ class ObjectPool(object):

def __init__(self, obj_creator,
after_remove=None, max_size=None,
idle_timeout=None,
lock_generator=None):
self._used_objs = collections.deque()
self._free_objs = collections.deque()
Expand All @@ -38,6 +40,8 @@ def __init__(self, obj_creator,
if not isinstance(max_size, six.integer_types) or max_size < 0:
raise ValueError('"max_size" must be a positive integer')
self.max_size = max_size
self.idle_timeout = idle_timeout or 0
self._idle_clock = time.time if idle_timeout else int

@property
def used(self):
Expand All @@ -63,19 +67,26 @@ def get_and_release(self, destroy_on_fail=False):

def get(self):
with self._lock:
if not self._free_objs:
# Find a free object, removing any that have idled for too long.
now = self._idle_clock()
while self._free_objs:
obj = self._free_objs.popleft()
if now - obj._last_used > self.idle_timeout:
self._after_remove(obj)
else:
break
else:
# No free objects, create a new one.
curr_count = len(self._used_objs)
if curr_count >= self.max_size:
raise RuntimeError("Too many objects,"
" %s >= %s" % (curr_count,
self.max_size))
obj = self._obj_creator()
self._used_objs.append(obj)
return obj
else:
obj = self._free_objs.pop()
self._used_objs.append(obj)
return obj

self._used_objs.append(obj)
obj._last_used = now
return obj

def destroy(self, obj, silent=True):
was_dropped = False
Expand All @@ -94,6 +105,7 @@ def release(self, obj, silent=True):
try:
self._used_objs.remove(obj)
self._free_objs.append(obj)
obj._last_used = self._idle_clock()
except ValueError:
if not silent:
raise
Expand Down
29 changes: 29 additions & 0 deletions pymemcache/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,35 @@ class MyClient(Client):
assert isinstance(client.client_pool.get(), MyClient)


class TestPooledClientIdleTimeout(ClientTestMixin, unittest.TestCase):
def make_client(self, mock_socket_values, **kwargs):
mock_client = Client(None, **kwargs)
mock_client.sock = MockSocket(list(mock_socket_values))
client = PooledClient(None, pool_idle_timeout=60, **kwargs)
client.client_pool = pool.ObjectPool(lambda: mock_client)
return client

def test_free_idle(self):
num_removed = 0
def increment_removed(obj):
nonlocal num_removed
num_removed += 1

client = self.make_client([b'VALUE key 0 5\r\nvalue\r\nEND\r\n']*2)
client.client_pool._after_remove = increment_removed
client.client_pool._idle_clock = lambda: 0

client.set(b'key', b'value')
assert num_removed == 0
client.get(b'key')
assert num_removed == 0

# Advance clock to beyond the idle timeout.
client.client_pool._idle_clock = lambda: 61
client.get(b'key')
assert num_removed == 1


class TestMockClient(ClientTestMixin, unittest.TestCase):
def make_client(self, mock_socket_values, **kwargs):
client = MockMemcacheClient(None, **kwargs)
Expand Down

0 comments on commit 12119f3

Please sign in to comment.