Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster support in progress #604

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 179 additions & 1 deletion redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,87 @@ def parse_slowlog_get(response, **options):
} for item in response]


def parse_cluster_slots(resp, **options):
current_host = options.get('current_host', '')
fix_server = lambda (host, port): (host or current_host, port)

slots = {}
for slot in resp:
start, end, master = slot[:3]
slaves = slot[3:]
slots[start, end] = {
'master': fix_server(master),
'slaves': [fix_server(slave) for slave in slaves],
}

return slots


def parse_cluster_nodes(resp, **options):
"""
@see: http://redis.io/commands/cluster-nodes # string
@see: http://redis.io/commands/cluster-slaves # list of string
"""
current_host = options.get('current_host', '')

def parse_slots(s):
slots, migrations = [], []
for r in s.split(' '):
if '->-' in r:
slot_id, dst_node_id = r[1:-1].split('->-', 1)
migrations.append({
'slot': int(slot_id),
'node_id': dst_node_id,
'state': 'migrating'
})
elif '-<-' in r:
slot_id, src_node_id = r[1:-1].split('-<-', 1)
migrations.append({
'slot': int(slot_id),
'node_id': src_node_id,
'state': 'importing'
})
elif '-' in r:
start, end = r.split('-')
slots.extend(range(int(start), int(end) + 1))
else:
slots.append(int(r))

return slots, migrations

if isinstance(resp, basestring):
resp = resp.splitlines()

nodes = []
for line in resp:
parts = line.split(' ', 8)
self_id, addr, flags, master_id, ping_sent, \
pong_recv, config_epoch, link_state = parts[:8]

host, port = addr.rsplit(':', 1)

node = {
'id': self_id,
'host': host or current_host,
'port': int(port),
'flags': tuple(flags.split(',')),
'master': master_id if master_id != '-' else None,
'ping-sent': int(ping_sent),
'pong-recv': int(pong_recv),
'link-state': link_state,
'slots': [],
'migrations': [],
}

if len(parts) >= 9:
slots, migrations = parse_slots(parts[8])
node['slots'], node['migrations'] = tuple(slots), migrations

nodes.append(node)

return nodes


class StrictRedis(object):
"""
Implementation of the Redis protocol.
Expand Down Expand Up @@ -361,6 +442,30 @@ class StrictRedis(object):
'SSCAN': parse_scan,
'TIME': lambda x: (int(x[0]), int(x[1])),
'ZSCAN': parse_zscan
},
# cluster
{
'CLUSTER ADDSLOTS': bool_ok,
'CLUSTER COUNT-FAILURE-REPORTS': int,
'CLUSTER COUNTKEYSINSLOT': int,
'CLUSTER DELSLOTS': bool_ok,
'CLUSTER FAILOVER': bool_ok,
'CLUSTER FORGET': bool_ok,
'CLUSTER GETKEYSINSLOT': int,
'CLUSTER INFO': parse_info,
'CLUSTER KEYSLOT': int,
'CLUSTER MEET': bool_ok,
'CLUSTER NODES': parse_cluster_nodes,
'CLUSTER REPLICATE': bool_ok,
'CLUSTER RESET': bool_ok,
'CLUSTER SAVECONFIG': bool_ok,
'CLUSTER SET-CONFIG-EPOCH': bool_ok,
'CLUSTER SETSLOT': bool_ok,
'CLUSTER SLAVES': parse_cluster_nodes,
'CLUSTER SLOTS': parse_cluster_slots,
'ASKING': bool_ok,
'READONLY': bool_ok,
'READWRITE': bool_ok,
}
)

Expand Down Expand Up @@ -444,6 +549,7 @@ def __init__(self, host='localhost', port=6379,
'ssl_ca_certs': ssl_ca_certs,
})
connection_pool = ConnectionPool(**kwargs)
self.host = host
self.connection_pool = connection_pool
self._use_lua_lock = None

Expand Down Expand Up @@ -612,8 +718,80 @@ def client_setname(self, name):
"Sets the current connection name"
return self.execute_command('CLIENT SETNAME', name)

def cluster_addslots(self, *slots):
"""Assign new hash slots to receiving node"""
return self.execute_command('CLUSTER ADDSLOTS', *slots)

def cluster_countkeysinslot(self, slot_id):
"""Return the number of local keys in the specified hash slot"""
return self.execute_command('CLUSTER COUNTKEYSINSLOT', slot_id)

def cluster_count_failure_report(self, node_id):
"""Return the number of failure reports active for a given node"""
return self.execute_command('CLUSTER COUNT-FAILURE-REPORTS', node_id)

def cluster_delslots(self, *slots):
"""Set hash slots as unbound in receiving node"""
return self.execute_command('CLUSTER DELSLOTS', *slots)

def cluster_failover(self, option):
"""Forces a slave to perform a manual failover of its master."""
assert option.upper() in ('FORCE', 'TAKEOVER')
return self.execute_command('CLUSTER FAILOVER', Token(option))

def cluster_info(self):
"""Provides info about Redis Cluster node state"""
return self.execute_command('CLUSTER INFO')

def cluster_keyslot(self, name):
"""Returns the hash slot of the specified key"""
return self.execute_command('CLUSTER KEYSLOT', name)

def cluster_meet(self, host, port):
"""Force a node cluster to handshake with another node"""
return self.execute_command('CLUSTER MEET', host, port)

def cluster_nodes(self):
"""Force a node cluster to handshake with another node"""
return self.execute_command('CLUSTER NODES', current_host=self.host)

def cluster_replicate(self, node_id):
"""Reconfigure a node as a slave of the specified master node"""
return self.execute_command('CLUSTER REPLICATE', node_id)

def cluster_reset(self, option='SOFT'):
"""Reset a Redis Cluster node"""
assert option.upper() in ('SOFT', 'HARD')
return self.execute_command('CLUSTER RESET', Token(option))

def cluster_save_config(self):
"""Forces the node to save cluster state on disk"""
return self.execute_command('CLUSTER SAVECONFIG')

def cluster_set_config_epoch(self, epoch):
"""Set the configuration epoch in a new node"""
return self.execute_command('CLUSTER SET-CONFIG-EPOCH', epoch)

def cluster_setslot(self, slot_id, state, node_id=None):
"""Bind an hash slot to a specific node"""
if state.upper() in ('IMPORTING', 'MIGRATING', 'NODE'):
if node_id is not None:
return self.execute_command('CLUSTER SETSLOT', slot_id, Token(state), node_id)
elif state.upper() == 'STABLE':
return self.execute_command('CLUSTER SETSLOT', slot_id, Token('STABLE'))
else:
raise RedisError('Invalid slot state: %s' % state)

def cluster_slaves(self, node_id):
"""Force a node cluster to handshake with another node"""
return self.execute_command('CLUSTER SLAVES', node_id)

def cluster_slots(self):
"""Get array of Cluster slot to node mappings"""
return self.execute_command('CLUSTER SLOTS', current_host=self.host)

def config_get(self, pattern="*"):
"Return a dictionary of configuration based on the ``pattern``"
"""Return a dictionary of configuration based on the ``pattern``"""
return self.execute_command('CONFIG GET', pattern)

def config_set(self, name, value):
Expand Down
Loading