Skip to content

Commit

Permalink
Merge pull request #103 from harikitech/topic/depends_django_pymemcache
Browse files Browse the repository at this point in the history
Depends django-pymemcache
  • Loading branch information
heavenshell authored Jun 29, 2020
2 parents 29e28ec + 9a8e6e6 commit 7d5360c
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 123 deletions.
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Requirements

* pymemcache
* Django>=2.2
* django-pymemcache>=1.0

Installation
------------
Expand Down
11 changes: 0 additions & 11 deletions django_elastipymemcache/client.py

This file was deleted.

20 changes: 14 additions & 6 deletions django_elastipymemcache/cluster_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
utils for discovery cluster
Utils for discovery cluster
"""
import re
from distutils.version import StrictVersion
Expand All @@ -26,11 +26,11 @@ def get_cluster_info(
ignore_cluster_errors=False,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
"""
return dict with info about nodes in cluster and current version
Return dict with info about nodes in cluster and current version
{
'nodes': [
'IP:port',
'IP:port',
'IP:Port',
'IP:Port',
],
'version': '1.4.4'
}
Expand All @@ -57,7 +57,10 @@ def get_cluster_info(
return {
'version': version,
'nodes': [
(smart_text(host), int(port))
'{host}:{port:d}'.format(
host=smart_text(host),
port=int(port),
),
]
}

Expand All @@ -73,7 +76,12 @@ def get_cluster_info(
try:
for node in ls[2].split(b' '):
host, ip, port = node.split(b'|')
nodes.append((smart_text(ip or host), int(port)))
nodes.append(
'{host}:{port:d}'.format(
host=smart_text(ip or host),
port=int(port),
),
)
except ValueError:
raise WrongProtocolData(cmd, res)
return {
Expand Down
104 changes: 49 additions & 55 deletions django_elastipymemcache/memcached.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,19 @@
import socket
from functools import wraps

try:
import cPickle as pickle
except ImportError:
import pickle

from django.core.cache import InvalidCacheBackendError
from django.core.cache.backends.memcached import BaseMemcachedCache

from . import client as pymemcache_client
from djpymemcache import client as djpymemcache_client
from .cluster_utils import get_cluster_info


logger = logging.getLogger(__name__)


def serialize_pickle(key, value):
if isinstance(value, str):
return value, 1
return pickle.dumps(value), 2


def deserialize_pickle(key, value, flags):
if flags == 1:
return value
if flags == 2:
return pickle.loads(value)


def invalidate_cache_after_error(f):
"""
catch any exception and invalidate internal cache with list of nodes
Catch any exception and invalidate internal cache with list of nodes
"""
@wraps(f)
def wrapper(self, *args, **kwds):
Expand All @@ -49,37 +31,47 @@ def wrapper(self, *args, **kwds):

class ElastiPyMemCache(BaseMemcachedCache):
"""
backend for Amazon ElastiCache (memcached) with auto discovery mode
it used pyMemcache
Backend for Amazon ElastiCache (memcached) with auto discovery mode
it used pymemcache
"""
def __init__(self, server, params):
params['OPTIONS'] = params.get('OPTIONS', {})
params['OPTIONS'].setdefault('ignore_exc', True)

self._cluster_timeout = params['OPTIONS'].pop(
'cluster_timeout',
socket._GLOBAL_DEFAULT_TIMEOUT,
)
self._ignore_cluster_errors = params['OPTIONS'].pop(
'ignore_cluster_errors',
False,
)

super().__init__(
server,
params,
library=pymemcache_client,
value_not_found_exception=ValueError)
library=djpymemcache_client,
value_not_found_exception=ValueError,
)

if len(self._servers) > 1:
raise InvalidCacheBackendError(
'ElastiCache should be configured with only one server '
'(Configuration Endpoint)')
'(Configuration Endpoint)',
)

if len(self._servers[0].split(':')) != 2:
raise InvalidCacheBackendError(
'Server configuration should be in format IP:port')

self._cluster_timeout = self._options.get(
'cluster_timeout', socket._GLOBAL_DEFAULT_TIMEOUT)
self._ignore_cluster_errors = self._options.get(
'ignore_cluster_errors', False)
'Server configuration should be in format IP:Port',
)

def clear_cluster_nodes_cache(self):
"""clear internal cache with list of nodes in cluster"""
"""Clear internal cache with list of nodes in cluster"""
if hasattr(self, '_client'):
del self._client

def get_cluster_nodes(self):
"""
return list with all nodes in cluster
"""
"""Return list with all nodes in cluster"""
server, port = self._servers[0].split(':')
try:
return get_cluster_info(
Expand All @@ -92,47 +84,49 @@ def get_cluster_nodes(self):
logger.debug(
'Cannot connect to cluster %s, err: %s',
self._servers[0],
err
err,
)
return []

@property
def _cache(self):

if getattr(self, '_client', None) is None:

options = self._options
options['serializer'] = serialize_pickle
options['deserializer'] = deserialize_pickle
options.setdefault('ignore_exc', True)
options.pop('cluster_timeout', None)
options.pop('ignore_cluster_errors', None)

self._client = self._lib.Client(
self.get_cluster_nodes(), **options)

self.get_cluster_nodes(), **self._options)
return self._client

@invalidate_cache_after_error
def get(self, *args, **kwargs):
return super().get(*args, **kwargs)
def add(self, *args, **kwargs):
return super().add(*args, **kwargs)

@invalidate_cache_after_error
def get_many(self, *args, **kwargs):
return super().get_many(*args, **kwargs)
def get(self, *args, **kwargs):
return super().get(*args, **kwargs)

@invalidate_cache_after_error
def set(self, *args, **kwargs):
return super().set(*args, **kwargs)

@invalidate_cache_after_error
def set_many(self, *args, **kwargs):
return super().set_many(*args, **kwargs)

@invalidate_cache_after_error
def delete(self, *args, **kwargs):
return super().delete(*args, **kwargs)

@invalidate_cache_after_error
def get_many(self, *args, **kwargs):
return super().get_many(*args, **kwargs)

@invalidate_cache_after_error
def set_many(self, *args, **kwargs):
return super().set_many(*args, **kwargs)

@invalidate_cache_after_error
def delete_many(self, *args, **kwargs):
return super().delete_many(*args, **kwargs)

@invalidate_cache_after_error
def incr(self, *args, **kwargs):
return super().incr(*args, **kwargs)

@invalidate_cache_after_error
def decr(self, *args, **kwargs):
return super().decr(*args, **kwargs)
Loading

0 comments on commit 7d5360c

Please sign in to comment.