diff --git a/django_elastipymemcache/backend.py b/django_elastipymemcache/backend.py index a27619e..5a0bdef 100644 --- a/django_elastipymemcache/backend.py +++ b/django_elastipymemcache/backend.py @@ -7,10 +7,9 @@ from django.core.cache import InvalidCacheBackendError from django.core.cache.backends.memcached import BaseMemcachedCache - from djpymemcache import client as djpymemcache_client -from .cluster_utils import get_cluster_info +from .client import ConfigurationEndpointClient logger = logging.getLogger(__name__) @@ -59,32 +58,37 @@ def __init__(self, server, params): 'ElastiCache should be configured with only one server ' '(Configuration Endpoint)', ) - - if len(self._servers[0].split(':')) != 2: + try: + host, port = self._servers[0].split(':') + except ValueError: raise InvalidCacheBackendError( 'Server configuration should be in format IP:Port', ) + self.configuration_endpoint_client = ConfigurationEndpointClient( + (host, port), + ignore_cluster_errors=self._ignore_cluster_errors, + **self._options, + ) + def clear_cluster_nodes_cache(self): """Clear internal cache with list of nodes in cluster""" if hasattr(self, '_client'): del self._client def get_cluster_nodes(self): - """Return list with all nodes in cluster""" - server, port = self._servers[0].split(':') try: - return get_cluster_info( - server, - port, - self._ignore_cluster_errors, - self._cluster_timeout - )['nodes'] - except (OSError, socket.gaierror, socket.timeout) as err: - logger.debug( + return self.configuration_endpoint_client \ + .get_cluster_info()['nodes'] + except ( + OSError, + socket.gaierror, + socket.timeout, + ) as e: + logger.warn( 'Cannot connect to cluster %s, err: %s', - self._servers[0], - err, + self.configuration_endpoint_client.server, + e, ) return [] @@ -92,7 +96,9 @@ def get_cluster_nodes(self): def _cache(self): if getattr(self, '_client', None) is None: self._client = self._lib.Client( - self.get_cluster_nodes(), **self._options) + self.get_cluster_nodes(), + **self._options, + ) return self._client @invalidate_cache_after_error diff --git a/django_elastipymemcache/client.py b/django_elastipymemcache/client.py new file mode 100644 index 0000000..8000837 --- /dev/null +++ b/django_elastipymemcache/client.py @@ -0,0 +1,78 @@ +import logging +from distutils.version import StrictVersion + +from django.utils.encoding import smart_text +from pymemcache.client.base import Client, _readline +from pymemcache.exceptions import MemcacheUnknownError + +logger = logging.getLogger(__name__) + + +class ConfigurationEndpointClient(Client): + # https://docs.aws.amazon.com/AmazonElastiCache/latest/mem-ug/AutoDiscovery.AddingToYourClientLibrary.html + + def __init__(self, *args, ignore_cluster_errors=False, **kwargs): + self.ignore_cluster_errors = ignore_cluster_errors + return super().__init__(*args, **kwargs) + + def _get_cluster_info_cmd(self): + if StrictVersion(smart_text(self.version())) < StrictVersion('1.4.14'): + return b'get AmazonElastiCache:cluster\r\n' + return b'config get cluster\r\n' + + def _extract_cluster_info(self, line): + raw_version, raw_nodes, _ = line.split(b'\n') + nodes = [] + for raw_node in raw_nodes.split(b' '): + host, ip, port = raw_node.split(b'|') + nodes.append('{host}:{port}'.format( + host=smart_text(ip or host), + port=int(port) + )) + return { + 'version': int(raw_version), + 'nodes': nodes, + } + + def _fetch_cluster_info_cmd(self, cmd, name): + if self.sock is None: + self._connect() + self.sock.sendall(cmd) + + buf = b'' + result = {} + number_of_line = 0 + + while True: + buf, line = _readline(self.sock, buf) + self._raise_errors(line, name) + if line == b'END': + if number_of_line != 2: + raise MemcacheUnknownError('Wrong response') + return result + if number_of_line == 1: + try: + result = self._extract_cluster_info(line) + except ValueError: + raise MemcacheUnknownError('Wrong format: {line}'.format( + line=line, + )) + number_of_line += 1 + + def get_cluster_info(self): + cmd = self._get_cluster_info_cmd() + try: + return self._fetch_cluster_info_cmd(cmd, 'config cluster') + except Exception as e: + if self.ignore_cluster_errors: + logger.warn('Failed to get cluster: %s', e) + return { + 'version': None, + 'nodes': [ + '{host}:{port:d}'.format( + host=self.server[0], + port=int(self.server[1]), + ), + ] + } + raise diff --git a/django_elastipymemcache/cluster_utils.py b/django_elastipymemcache/cluster_utils.py deleted file mode 100644 index 7d10d02..0000000 --- a/django_elastipymemcache/cluster_utils.py +++ /dev/null @@ -1,90 +0,0 @@ -""" -Utils for discovery cluster -""" -import re -from distutils.version import StrictVersion -import socket -from telnetlib import Telnet - -from django.utils.encoding import smart_text - - -class WrongProtocolData(ValueError): - """ - Exception for raising when we get something unexpected - in telnet protocol - """ - def __init__(self, cmd, response): - super().__init__( - 'Unexpected response {} for command {}'.format(response, cmd), - ) - - -def get_cluster_info( - host, - port, - ignore_cluster_errors=False, - timeout=socket._GLOBAL_DEFAULT_TIMEOUT): - """ - Return dict with info about nodes in cluster and current version - { - 'nodes': [ - 'IP:Port', - 'IP:Port', - ], - 'version': '1.4.4' - } - """ - client = Telnet(host, int(port), timeout=timeout) - client.write(b'version\n') - res = client.read_until(b'\r\n').strip() - version_list = res.split(b' ') - if len(version_list) not in [2, 3] or version_list[0] != b'VERSION': - raise WrongProtocolData('version', res) - version = version_list[1] - if StrictVersion(smart_text(version)) >= StrictVersion('1.4.14'): - cmd = b'config get cluster\n' - else: - cmd = b'get AmazonElastiCache:cluster\n' - client.write(cmd) - regex_index, match_object, res = client.expect([ - re.compile(b'\n\r\nEND\r\n'), - re.compile(b'ERROR\r\n') - ]) - client.close() - - if res == b'ERROR\r\n' and ignore_cluster_errors: - return { - 'version': version, - 'nodes': [ - '{host}:{port:d}'.format( - host=smart_text(host), - port=int(port), - ), - ] - } - - ls = list(filter(None, re.compile(br'\r?\n').split(res))) - if len(ls) != 4: - raise WrongProtocolData(cmd, res) - - try: - version = int(ls[1]) - except ValueError: - raise WrongProtocolData(cmd, res) - nodes = [] - try: - for node in ls[2].split(b' '): - host, ip, port = node.split(b'|') - nodes.append( - '{host}:{port:d}'.format( - host=smart_text(ip or host), - port=int(port), - ), - ) - except ValueError: - raise WrongProtocolData(cmd, res) - return { - 'version': version, - 'nodes': nodes - } diff --git a/tests/test_backend.py b/tests/test_backend.py index 1097a1f..267d31d 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -1,14 +1,9 @@ -import socket -from unittest.mock import ( - patch, - Mock, -) +from unittest.mock import Mock, patch from django.core.cache import InvalidCacheBackendError -from nose.tools import ( - eq_, - raises, -) +from nose.tools import eq_, raises + +from django_elastipymemcache.client import ConfigurationEndpointClient @raises(InvalidCacheBackendError) @@ -23,7 +18,7 @@ def test_wrong_server_format(): ElastiPymemcache('h', {}) -@patch('django_elastipymemcache.backend.get_cluster_info') +@patch.object(ConfigurationEndpointClient, 'get_cluster_info') def test_split_servers(get_cluster_info): from django_elastipymemcache.backend import ElastiPymemcache backend = ElastiPymemcache('h:0', {}) @@ -33,15 +28,14 @@ def test_split_servers(get_cluster_info): } backend._lib.Client = Mock() assert backend._cache - get_cluster_info.assert_called_once_with( - 'h', '0', False, socket._GLOBAL_DEFAULT_TIMEOUT) + get_cluster_info.assert_called() backend._lib.Client.assert_called_once_with( servers, ignore_exc=True, ) -@patch('django_elastipymemcache.backend.get_cluster_info') +@patch.object(ConfigurationEndpointClient, 'get_cluster_info') def test_node_info_cache(get_cluster_info): from django_elastipymemcache.backend import ElastiPymemcache servers = ['h1:0', 'h2:0'] @@ -62,11 +56,10 @@ def test_node_info_cache(get_cluster_info): eq_(backend._cache.get.call_count, 2) eq_(backend._cache.set.call_count, 2) - get_cluster_info.assert_called_once_with( - 'h', '0', False, socket._GLOBAL_DEFAULT_TIMEOUT) + get_cluster_info.assert_called_once() -@patch('django_elastipymemcache.backend.get_cluster_info') +@patch.object(ConfigurationEndpointClient, 'get_cluster_info') def test_failed_to_connect_servers(get_cluster_info): from django_elastipymemcache.backend import ElastiPymemcache backend = ElastiPymemcache('h:0', {}) @@ -74,7 +67,7 @@ def test_failed_to_connect_servers(get_cluster_info): eq_(backend.get_cluster_nodes(), []) -@patch('django_elastipymemcache.backend.get_cluster_info') +@patch.object(ConfigurationEndpointClient, 'get_cluster_info') def test_invalidate_cache(get_cluster_info): from django_elastipymemcache.backend import ElastiPymemcache servers = ['h1:0', 'h2:0'] @@ -102,7 +95,7 @@ def test_invalidate_cache(get_cluster_info): eq_(get_cluster_info.call_count, 3) -@patch('django_elastipymemcache.backend.get_cluster_info') +@patch.object(ConfigurationEndpointClient, 'get_cluster_info') def test_client_add(get_cluster_info): from django_elastipymemcache.backend import ElastiPymemcache @@ -116,7 +109,7 @@ def test_client_add(get_cluster_info): eq_(ret, False) -@patch('django_elastipymemcache.backend.get_cluster_info') +@patch.object(ConfigurationEndpointClient, 'get_cluster_info') def test_client_delete(get_cluster_info): from django_elastipymemcache.backend import ElastiPymemcache @@ -130,7 +123,7 @@ def test_client_delete(get_cluster_info): eq_(ret, None) -@patch('django_elastipymemcache.backend.get_cluster_info') +@patch.object(ConfigurationEndpointClient, 'get_cluster_info') def test_client_get_many(get_cluster_info): from django_elastipymemcache.backend import ElastiPymemcache @@ -188,7 +181,7 @@ def test_client_get_many(get_cluster_info): ) -@patch('django_elastipymemcache.backend.get_cluster_info') +@patch.object(ConfigurationEndpointClient, 'get_cluster_info') def test_client_set_many(get_cluster_info): from django_elastipymemcache.backend import ElastiPymemcache @@ -202,7 +195,7 @@ def test_client_set_many(get_cluster_info): eq_(ret, ['key1', 'key2']) -@patch('django_elastipymemcache.backend.get_cluster_info') +@patch.object(ConfigurationEndpointClient, 'get_cluster_info') def test_client_delete_many(get_cluster_info): from django_elastipymemcache.backend import ElastiPymemcache @@ -216,7 +209,7 @@ def test_client_delete_many(get_cluster_info): eq_(ret, None) -@patch('django_elastipymemcache.backend.get_cluster_info') +@patch.object(ConfigurationEndpointClient, 'get_cluster_info') def test_client_incr(get_cluster_info): from django_elastipymemcache.backend import ElastiPymemcache @@ -230,7 +223,7 @@ def test_client_incr(get_cluster_info): eq_(ret, False) -@patch('django_elastipymemcache.backend.get_cluster_info') +@patch.object(ConfigurationEndpointClient, 'get_cluster_info') def test_client_decr(get_cluster_info): from django_elastipymemcache.backend import ElastiPymemcache diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 0000000..724e0ad --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,110 @@ +import collections +from unittest.mock import call, patch + +from nose.tools import eq_, raises +from pymemcache.exceptions import ( + MemcacheUnknownCommandError, + MemcacheUnknownError, +) + +from django_elastipymemcache.client import ConfigurationEndpointClient + +EXAMPLE_RESPONSE = [ + b'CONFIG cluster 0 147\r\n', + b'12\n' + b'myCluster.pc4ldq.0001.use1.cache.amazonaws.com|10.82.235.120|11211 ' + b'myCluster.pc4ldq.0002.use1.cache.amazonaws.com|10.80.249.27|11211\n\r\n', + b'END\r\n', +] + + +@patch('socket.socket') +def test_get_cluster_info(socket): + recv_bufs = collections.deque([ + b'VERSION 1.4.14\r\n', + ] + EXAMPLE_RESPONSE) + + client = socket.return_value + client.recv.side_effect = lambda *args, **kwargs: recv_bufs.popleft() + cluster_info = ConfigurationEndpointClient(('h', 0)).get_cluster_info() + eq_(cluster_info['nodes'], ['10.82.235.120:11211', '10.80.249.27:11211']) + client.sendall.assert_has_calls([ + call(b'version\r\n'), + call(b'config get cluster\r\n'), + ]) + + +@patch('socket.socket') +def test_get_cluster_info_before_1_4_13(socket): + recv_bufs = collections.deque([ + b'VERSION 1.4.13\r\n', + ] + EXAMPLE_RESPONSE) + + client = socket.return_value + client.recv.side_effect = lambda *args, **kwargs: recv_bufs.popleft() + cluster_info = ConfigurationEndpointClient(('h', 0)).get_cluster_info() + eq_(cluster_info['nodes'], ['10.82.235.120:11211', '10.80.249.27:11211']) + client.sendall.assert_has_calls([ + call(b'version\r\n'), + call(b'get AmazonElastiCache:cluster\r\n'), + ]) + + +@raises(MemcacheUnknownCommandError) +@patch('socket.socket') +def test_no_configuration_protocol_support_with_errors(socket): + recv_bufs = collections.deque([ + b'VERSION 1.4.13\r\n', + b'ERROR\r\n', + ]) + + client = socket.return_value + client.recv.side_effect = lambda *args, **kwargs: recv_bufs.popleft() + ConfigurationEndpointClient(('h', 0)).get_cluster_info() + + +@raises(MemcacheUnknownError) +@patch('socket.socket') +def test_cannot_parse_version(socket): + recv_bufs = collections.deque([ + b'VERSION 1.4.34\r\n', + b'CONFIG cluster 0 147\r\n', + b'fail\nhost|ip|11211 host|ip|11211\n\r\n', + b'END\r\n', + ]) + + client = socket.return_value + client.recv.side_effect = lambda *args, **kwargs: recv_bufs.popleft() + ConfigurationEndpointClient(('h', 0)).get_cluster_info() + + +@raises(MemcacheUnknownError) +@patch('socket.socket') +def test_cannot_parse_nodes(socket): + recv_bufs = collections.deque([ + b'VERSION 1.4.34\r\n', + b'CONFIG cluster 0 147\r\n', + b'1\nfail\n\r\n', + b'END\r\n', + ]) + + client = socket.return_value + client.recv.side_effect = lambda *args, **kwargs: recv_bufs.popleft() + ConfigurationEndpointClient(('h', 0)).get_cluster_info() + + +@patch('socket.socket') +def test_ignore_erros(socket): + recv_bufs = collections.deque([ + b'VERSION 1.4.34\r\n', + b'fail\nfail\n\r\n', + b'END\r\n', + ]) + + client = socket.return_value + client.recv.side_effect = lambda *args, **kwargs: recv_bufs.popleft() + cluster_info = ConfigurationEndpointClient( + ('h', 0), + ignore_cluster_errors=True, + ).get_cluster_info() + eq_(cluster_info['nodes'], ['h:0']) diff --git a/tests/test_protocol.py b/tests/test_protocol.py deleted file mode 100644 index 7f99a91..0000000 --- a/tests/test_protocol.py +++ /dev/null @@ -1,122 +0,0 @@ -import sys - -from django_elastipymemcache.cluster_utils import ( - WrongProtocolData, - get_cluster_info, -) -from nose.tools import ( - eq_, - raises, -) - -if sys.version < '3': - from mock import patch, call, MagicMock -else: - from unittest.mock import patch, call, MagicMock - - -# https://docs.aws.amazon.com/AmazonElastiCache/latest/mem-ug/AutoDiscovery.AddingToYourClientLibrary.html -EXAMPLE_RESPONSE = ( - b'CONFIG cluster 0 147\r\n' - b'12\n' - b'myCluster.pc4ldq.0001.use1.cache.amazonaws.com|10.82.235.120|11211 ' - b'myCluster.pc4ldq.0002.use1.cache.amazonaws.com|10.80.249.27|11211\n\r\n' - b'END\r\n' -) - - -@patch('django_elastipymemcache.cluster_utils.Telnet') -def test_get_cluster_info(Telnet): - client = Telnet.return_value - client.read_until.side_effect = [ - b'VERSION 1.4.14', - ] - client.expect.side_effect = [ - (0, None, EXAMPLE_RESPONSE), # NOQA - ] - info = get_cluster_info('', 0) - eq_(info['version'], 12) - eq_(info['nodes'], ['10.82.235.120:11211', '10.80.249.27:11211']) - client.write.assert_has_calls([ - call(b'version\n'), - call(b'config get cluster\n'), - ]) - - -@patch('django_elastipymemcache.cluster_utils.Telnet') -def test_get_cluster_info_before_1_4_13(Telnet): - client = Telnet.return_value - client.read_until.side_effect = [ - b'VERSION 1.4.13', - ] - client.expect.side_effect = [ - (0, None, EXAMPLE_RESPONSE), # NOQA - ] - info = get_cluster_info('', 0) - eq_(info['version'], 12) - eq_(info['nodes'], ['10.82.235.120:11211', '10.80.249.27:11211']) - client.write.assert_has_calls([ - call(b'version\n'), - call(b'get AmazonElastiCache:cluster\n'), - ]) - - -@raises(WrongProtocolData) -@patch('django_elastipymemcache.cluster_utils.Telnet', MagicMock()) -def test_bad_protocol(): - get_cluster_info('', 0) - - -@patch('django_elastipymemcache.cluster_utils.Telnet') -def test_ubuntu_protocol(Telnet): - client = Telnet.return_value - client.read_until.side_effect = [ - b'VERSION 1.4.14 (Ubuntu)', - ] - client.expect.side_effect = [ - (0, None, EXAMPLE_RESPONSE), # NOQA - ] - get_cluster_info('', 0) - client.write.assert_has_calls([ - call(b'version\n'), - call(b'config get cluster\n'), - ]) - - -@raises(WrongProtocolData) -@patch('django_elastipymemcache.cluster_utils.Telnet') -def test_no_configuration_protocol_support_with_errors(Telnet): - client = Telnet.return_value - client.read_until.side_effect = [ - b'VERSION 1.4.34', - ] - client.expect.side_effect = [ - (0, None, b'ERROR\r\n'), - ] - get_cluster_info('test', 0) - - -@raises(WrongProtocolData) -@patch('django_elastipymemcache.cluster_utils.Telnet') -def test_cannot_parse_version(Telnet): - client = Telnet.return_value - client.read_until.side_effect = [ - b'VERSION 1.4.34', - ] - client.expect.side_effect = [ - (0, None, b'CONFIG cluster 0 138\r\nfail\nhost|ip|11211 host||11211\n\r\nEND\r\n'), # NOQA - ] - get_cluster_info('test', 0) - - -@raises(WrongProtocolData) -@patch('django_elastipymemcache.cluster_utils.Telnet') -def test_cannot_parse_nodes(Telnet): - client = Telnet.return_value - client.read_until.side_effect = [ - b'VERSION 1.4.34', - ] - client.expect.side_effect = [ - (0, None, b'CONFIG cluster 0 138\r\n1\nfail\n\r\nEND\r\n'), # NOQA - ] - get_cluster_info('test', 0)