Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Depends django-pymemcache #103

Merged
merged 3 commits into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Requirements

* pymemcache
* Django>=2.2
* django-pymemcache>=1.0

Installation
------------
Expand Down
11 changes: 0 additions & 11 deletions django_elastipymemcache/client.py

This file was deleted.

20 changes: 14 additions & 6 deletions django_elastipymemcache/cluster_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
utils for discovery cluster
Utils for discovery cluster
"""
import re
from distutils.version import StrictVersion
Expand All @@ -26,11 +26,11 @@ def get_cluster_info(
ignore_cluster_errors=False,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
"""
return dict with info about nodes in cluster and current version
Return dict with info about nodes in cluster and current version
{
'nodes': [
'IP:port',
'IP:port',
'IP:Port',
'IP:Port',
],
'version': '1.4.4'
}
Expand All @@ -57,7 +57,10 @@ def get_cluster_info(
return {
'version': version,
'nodes': [
(smart_text(host), int(port))
'{host}:{port:d}'.format(
host=smart_text(host),
port=int(port),
),
]
}

Expand All @@ -73,7 +76,12 @@ def get_cluster_info(
try:
for node in ls[2].split(b' '):
host, ip, port = node.split(b'|')
nodes.append((smart_text(ip or host), int(port)))
nodes.append(
'{host}:{port:d}'.format(
host=smart_text(ip or host),
port=int(port),
),
)
except ValueError:
raise WrongProtocolData(cmd, res)
return {
Expand Down
104 changes: 49 additions & 55 deletions django_elastipymemcache/memcached.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,19 @@
import socket
from functools import wraps

try:
import cPickle as pickle
except ImportError:
import pickle

from django.core.cache import InvalidCacheBackendError
from django.core.cache.backends.memcached import BaseMemcachedCache

from . import client as pymemcache_client
from djpymemcache import client as djpymemcache_client
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from .cluster_utils import get_cluster_info


logger = logging.getLogger(__name__)


def serialize_pickle(key, value):
if isinstance(value, str):
return value, 1
return pickle.dumps(value), 2


def deserialize_pickle(key, value, flags):
if flags == 1:
return value
if flags == 2:
return pickle.loads(value)


def invalidate_cache_after_error(f):
"""
catch any exception and invalidate internal cache with list of nodes
Catch any exception and invalidate internal cache with list of nodes
"""
@wraps(f)
def wrapper(self, *args, **kwds):
Expand All @@ -49,37 +31,47 @@ def wrapper(self, *args, **kwds):

class ElastiPyMemCache(BaseMemcachedCache):
"""
backend for Amazon ElastiCache (memcached) with auto discovery mode
it used pyMemcache
Backend for Amazon ElastiCache (memcached) with auto discovery mode
it used pymemcache
"""
def __init__(self, server, params):
params['OPTIONS'] = params.get('OPTIONS', {})
params['OPTIONS'].setdefault('ignore_exc', True)

self._cluster_timeout = params['OPTIONS'].pop(
'cluster_timeout',
socket._GLOBAL_DEFAULT_TIMEOUT,
)
self._ignore_cluster_errors = params['OPTIONS'].pop(
'ignore_cluster_errors',
False,
)

super().__init__(
server,
params,
library=pymemcache_client,
value_not_found_exception=ValueError)
library=djpymemcache_client,
value_not_found_exception=ValueError,
)

if len(self._servers) > 1:
raise InvalidCacheBackendError(
'ElastiCache should be configured with only one server '
'(Configuration Endpoint)')
'(Configuration Endpoint)',
)

if len(self._servers[0].split(':')) != 2:
raise InvalidCacheBackendError(
'Server configuration should be in format IP:port')

self._cluster_timeout = self._options.get(
'cluster_timeout', socket._GLOBAL_DEFAULT_TIMEOUT)
self._ignore_cluster_errors = self._options.get(
'ignore_cluster_errors', False)
'Server configuration should be in format IP:Port',
)

def clear_cluster_nodes_cache(self):
"""clear internal cache with list of nodes in cluster"""
"""Clear internal cache with list of nodes in cluster"""
if hasattr(self, '_client'):
del self._client

def get_cluster_nodes(self):
"""
return list with all nodes in cluster
"""
"""Return list with all nodes in cluster"""
server, port = self._servers[0].split(':')
try:
return get_cluster_info(
Expand All @@ -92,47 +84,49 @@ def get_cluster_nodes(self):
logger.debug(
'Cannot connect to cluster %s, err: %s',
self._servers[0],
err
err,
)
return []

@property
def _cache(self):

if getattr(self, '_client', None) is None:

options = self._options
options['serializer'] = serialize_pickle
options['deserializer'] = deserialize_pickle
options.setdefault('ignore_exc', True)
options.pop('cluster_timeout', None)
options.pop('ignore_cluster_errors', None)

self._client = self._lib.Client(
self.get_cluster_nodes(), **options)

self.get_cluster_nodes(), **self._options)
return self._client

@invalidate_cache_after_error
def get(self, *args, **kwargs):
return super().get(*args, **kwargs)
def add(self, *args, **kwargs):
return super().add(*args, **kwargs)

@invalidate_cache_after_error
def get_many(self, *args, **kwargs):
return super().get_many(*args, **kwargs)
def get(self, *args, **kwargs):
return super().get(*args, **kwargs)

@invalidate_cache_after_error
def set(self, *args, **kwargs):
return super().set(*args, **kwargs)

@invalidate_cache_after_error
def set_many(self, *args, **kwargs):
return super().set_many(*args, **kwargs)

@invalidate_cache_after_error
def delete(self, *args, **kwargs):
return super().delete(*args, **kwargs)

@invalidate_cache_after_error
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Support all methods

def get_many(self, *args, **kwargs):
return super().get_many(*args, **kwargs)

@invalidate_cache_after_error
def set_many(self, *args, **kwargs):
return super().set_many(*args, **kwargs)

@invalidate_cache_after_error
def delete_many(self, *args, **kwargs):
return super().delete_many(*args, **kwargs)

@invalidate_cache_after_error
def incr(self, *args, **kwargs):
return super().incr(*args, **kwargs)

@invalidate_cache_after_error
def decr(self, *args, **kwargs):
return super().decr(*args, **kwargs)
Loading