Skip to content

Commit

Permalink
Merge pull request #105 from harikitech/topic/migrate_to_pymemcache_c…
Browse files Browse the repository at this point in the history
…lient

Migrate to pymemcache client
  • Loading branch information
heavenshell authored Jun 29, 2020
2 parents 1546a86 + 11c764c commit eaacc5e
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 253 deletions.
40 changes: 23 additions & 17 deletions django_elastipymemcache/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@

from django.core.cache import InvalidCacheBackendError
from django.core.cache.backends.memcached import BaseMemcachedCache

from djpymemcache import client as djpymemcache_client
from .cluster_utils import get_cluster_info

from .client import ConfigurationEndpointClient

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -59,40 +58,47 @@ def __init__(self, server, params):
'ElastiCache should be configured with only one server '
'(Configuration Endpoint)',
)

if len(self._servers[0].split(':')) != 2:
try:
host, port = self._servers[0].split(':')
except ValueError:
raise InvalidCacheBackendError(
'Server configuration should be in format IP:Port',
)

self.configuration_endpoint_client = ConfigurationEndpointClient(
(host, port),
ignore_cluster_errors=self._ignore_cluster_errors,
**self._options,
)

def clear_cluster_nodes_cache(self):
"""Clear internal cache with list of nodes in cluster"""
if hasattr(self, '_client'):
del self._client

def get_cluster_nodes(self):
"""Return list with all nodes in cluster"""
server, port = self._servers[0].split(':')
try:
return get_cluster_info(
server,
port,
self._ignore_cluster_errors,
self._cluster_timeout
)['nodes']
except (OSError, socket.gaierror, socket.timeout) as err:
logger.debug(
return self.configuration_endpoint_client \
.get_cluster_info()['nodes']
except (
OSError,
socket.gaierror,
socket.timeout,
) as e:
logger.warn(
'Cannot connect to cluster %s, err: %s',
self._servers[0],
err,
self.configuration_endpoint_client.server,
e,
)
return []

@property
def _cache(self):
if getattr(self, '_client', None) is None:
self._client = self._lib.Client(
self.get_cluster_nodes(), **self._options)
self.get_cluster_nodes(),
**self._options,
)
return self._client

@invalidate_cache_after_error
Expand Down
78 changes: 78 additions & 0 deletions django_elastipymemcache/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import logging
from distutils.version import StrictVersion

from django.utils.encoding import smart_text
from pymemcache.client.base import Client, _readline
from pymemcache.exceptions import MemcacheUnknownError

logger = logging.getLogger(__name__)


class ConfigurationEndpointClient(Client):
# https://docs.aws.amazon.com/AmazonElastiCache/latest/mem-ug/AutoDiscovery.AddingToYourClientLibrary.html

def __init__(self, *args, ignore_cluster_errors=False, **kwargs):
self.ignore_cluster_errors = ignore_cluster_errors
return super().__init__(*args, **kwargs)

def _get_cluster_info_cmd(self):
if StrictVersion(smart_text(self.version())) < StrictVersion('1.4.14'):
return b'get AmazonElastiCache:cluster\r\n'
return b'config get cluster\r\n'

def _extract_cluster_info(self, line):
raw_version, raw_nodes, _ = line.split(b'\n')
nodes = []
for raw_node in raw_nodes.split(b' '):
host, ip, port = raw_node.split(b'|')
nodes.append('{host}:{port}'.format(
host=smart_text(ip or host),
port=int(port)
))
return {
'version': int(raw_version),
'nodes': nodes,
}

def _fetch_cluster_info_cmd(self, cmd, name):
if self.sock is None:
self._connect()
self.sock.sendall(cmd)

buf = b''
result = {}
number_of_line = 0

while True:
buf, line = _readline(self.sock, buf)
self._raise_errors(line, name)
if line == b'END':
if number_of_line != 2:
raise MemcacheUnknownError('Wrong response')
return result
if number_of_line == 1:
try:
result = self._extract_cluster_info(line)
except ValueError:
raise MemcacheUnknownError('Wrong format: {line}'.format(
line=line,
))
number_of_line += 1

def get_cluster_info(self):
cmd = self._get_cluster_info_cmd()
try:
return self._fetch_cluster_info_cmd(cmd, 'config cluster')
except Exception as e:
if self.ignore_cluster_errors:
logger.warn('Failed to get cluster: %s', e)
return {
'version': None,
'nodes': [
'{host}:{port:d}'.format(
host=self.server[0],
port=int(self.server[1]),
),
]
}
raise
90 changes: 0 additions & 90 deletions django_elastipymemcache/cluster_utils.py

This file was deleted.

41 changes: 17 additions & 24 deletions tests/test_backend.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
import socket
from unittest.mock import (
patch,
Mock,
)
from unittest.mock import Mock, patch

from django.core.cache import InvalidCacheBackendError
from nose.tools import (
eq_,
raises,
)
from nose.tools import eq_, raises

from django_elastipymemcache.client import ConfigurationEndpointClient


@raises(InvalidCacheBackendError)
Expand All @@ -23,7 +18,7 @@ def test_wrong_server_format():
ElastiPymemcache('h', {})


@patch('django_elastipymemcache.backend.get_cluster_info')
@patch.object(ConfigurationEndpointClient, 'get_cluster_info')
def test_split_servers(get_cluster_info):
from django_elastipymemcache.backend import ElastiPymemcache
backend = ElastiPymemcache('h:0', {})
Expand All @@ -33,15 +28,14 @@ def test_split_servers(get_cluster_info):
}
backend._lib.Client = Mock()
assert backend._cache
get_cluster_info.assert_called_once_with(
'h', '0', False, socket._GLOBAL_DEFAULT_TIMEOUT)
get_cluster_info.assert_called()
backend._lib.Client.assert_called_once_with(
servers,
ignore_exc=True,
)


@patch('django_elastipymemcache.backend.get_cluster_info')
@patch.object(ConfigurationEndpointClient, 'get_cluster_info')
def test_node_info_cache(get_cluster_info):
from django_elastipymemcache.backend import ElastiPymemcache
servers = ['h1:0', 'h2:0']
Expand All @@ -62,19 +56,18 @@ def test_node_info_cache(get_cluster_info):
eq_(backend._cache.get.call_count, 2)
eq_(backend._cache.set.call_count, 2)

get_cluster_info.assert_called_once_with(
'h', '0', False, socket._GLOBAL_DEFAULT_TIMEOUT)
get_cluster_info.assert_called_once()


@patch('django_elastipymemcache.backend.get_cluster_info')
@patch.object(ConfigurationEndpointClient, 'get_cluster_info')
def test_failed_to_connect_servers(get_cluster_info):
from django_elastipymemcache.backend import ElastiPymemcache
backend = ElastiPymemcache('h:0', {})
get_cluster_info.side_effect = OSError()
eq_(backend.get_cluster_nodes(), [])


@patch('django_elastipymemcache.backend.get_cluster_info')
@patch.object(ConfigurationEndpointClient, 'get_cluster_info')
def test_invalidate_cache(get_cluster_info):
from django_elastipymemcache.backend import ElastiPymemcache
servers = ['h1:0', 'h2:0']
Expand Down Expand Up @@ -102,7 +95,7 @@ def test_invalidate_cache(get_cluster_info):
eq_(get_cluster_info.call_count, 3)


@patch('django_elastipymemcache.backend.get_cluster_info')
@patch.object(ConfigurationEndpointClient, 'get_cluster_info')
def test_client_add(get_cluster_info):
from django_elastipymemcache.backend import ElastiPymemcache

Expand All @@ -116,7 +109,7 @@ def test_client_add(get_cluster_info):
eq_(ret, False)


@patch('django_elastipymemcache.backend.get_cluster_info')
@patch.object(ConfigurationEndpointClient, 'get_cluster_info')
def test_client_delete(get_cluster_info):
from django_elastipymemcache.backend import ElastiPymemcache

Expand All @@ -130,7 +123,7 @@ def test_client_delete(get_cluster_info):
eq_(ret, None)


@patch('django_elastipymemcache.backend.get_cluster_info')
@patch.object(ConfigurationEndpointClient, 'get_cluster_info')
def test_client_get_many(get_cluster_info):
from django_elastipymemcache.backend import ElastiPymemcache

Expand Down Expand Up @@ -188,7 +181,7 @@ def test_client_get_many(get_cluster_info):
)


@patch('django_elastipymemcache.backend.get_cluster_info')
@patch.object(ConfigurationEndpointClient, 'get_cluster_info')
def test_client_set_many(get_cluster_info):
from django_elastipymemcache.backend import ElastiPymemcache

Expand All @@ -202,7 +195,7 @@ def test_client_set_many(get_cluster_info):
eq_(ret, ['key1', 'key2'])


@patch('django_elastipymemcache.backend.get_cluster_info')
@patch.object(ConfigurationEndpointClient, 'get_cluster_info')
def test_client_delete_many(get_cluster_info):
from django_elastipymemcache.backend import ElastiPymemcache

Expand All @@ -216,7 +209,7 @@ def test_client_delete_many(get_cluster_info):
eq_(ret, None)


@patch('django_elastipymemcache.backend.get_cluster_info')
@patch.object(ConfigurationEndpointClient, 'get_cluster_info')
def test_client_incr(get_cluster_info):
from django_elastipymemcache.backend import ElastiPymemcache

Expand All @@ -230,7 +223,7 @@ def test_client_incr(get_cluster_info):
eq_(ret, False)


@patch('django_elastipymemcache.backend.get_cluster_info')
@patch.object(ConfigurationEndpointClient, 'get_cluster_info')
def test_client_decr(get_cluster_info):
from django_elastipymemcache.backend import ElastiPymemcache

Expand Down
Loading

0 comments on commit eaacc5e

Please sign in to comment.