Skip to content

Commit

Permalink
feat(cluster): add the option for a custom node selector in scaleReads
Browse files Browse the repository at this point in the history
  • Loading branch information
Ramon Snir committed Feb 10, 2016
1 parent 95c6857 commit 6795b1e
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 14 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -671,10 +671,11 @@ but a few so that if one is unreachable the client will try the next one, and th

A typical redis cluster contains three or more masters and several slaves for each master. It's possible to scale out redis cluster by sending read queries to slaves and write queries to masters by setting the `scaleReads` option.

`scaleReads` is "master" by default, which means ioredis will never send any queries to slaves. There are other two available options:
`scaleReads` is "master" by default, which means ioredis will never send any queries to slaves. There are other three available options:

1. "all": Send write queries to masters and read queries to masters or slaves randomly.
2. "slave": Send write queries to masters and read queries to slaves.
3. a custom `function(nodes, command): node`: Will choose the custom function to select to which node to send read queries (write queries keep being sent to master). The first node in `nodes` is always the master serving the relevant slots. If the function returns an array of nodes, a random node of that list will be selected.

For example:

Expand Down
34 changes: 25 additions & 9 deletions lib/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ function Cluster(startupNodes, options) {
this.options = _.defaults(this.options, options, Cluster.defaultOptions);

// validate options
if (['all', 'master', 'slave'].indexOf(this.options.scaleReads) === -1) {
if (typeof this.options.scaleReads !== 'function' &&
['all', 'master', 'slave'].indexOf(this.options.scaleReads) === -1) {
throw new Error('Invalid option scaleReads "' + this.options.scaleReads +
'". Expected "all", "master" or "slave"');
'". Expected "all", "master", "slave" or a custom function');
}

if (!Array.isArray(startupNodes) || startupNodes.length === 0) {
Expand Down Expand Up @@ -453,15 +454,30 @@ Cluster.prototype.sendCommand = function (command, stream, node) {
if (!random) {
if (typeof targetSlot === 'number' && _this.slots[targetSlot]) {
var nodeKeys = _this.slots[targetSlot];
var key;
if (to === 'all') {
key = utils.sample(nodeKeys);
} else if (to === 'slave' && nodeKeys.length > 1) {
key = utils.sample(nodeKeys, 1);
if (typeof to === 'function') {
var nodes =
nodeKeys
.map(function(key) {
return _this.connectionPool.nodes.all[key];
});
redis = to(nodes, command);
if (Array.isArray(redis)) {
redis = utils.sample(redis);
}
if (!redis) {
redis = nodes[0];
}
} else {
key = nodeKeys[0];
var key;
if (to === 'all') {
key = utils.sample(nodeKeys);
} else if (to === 'slave' && nodeKeys.length > 1) {
key = utils.sample(nodeKeys, 1);
} else {
key = nodeKeys[0];
}
redis = _this.connectionPool.nodes.all[key];
}
redis = _this.connectionPool.nodes.all[key];
}
if (asking) {
redis = _this.connectionPool.nodes.all[asking];
Expand Down
57 changes: 53 additions & 4 deletions test/functional/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ describe('cluster', function () {
function handler(port, argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
return [
[0, 16381, ['127.0.0.1', 30001], ['127.0.0.1', 30003]],
[0, 16381, ['127.0.0.1', 30001], ['127.0.0.1', 30003], ['127.0.0.1', 30004]],
[16382, 16383, ['127.0.0.1', 30002]]
];
}
Expand All @@ -949,10 +949,11 @@ describe('cluster', function () {
this.node1 = new MockServer(30001, handler.bind(null, 30001));
this.node2 = new MockServer(30002, handler.bind(null, 30002));
this.node3 = new MockServer(30003, handler.bind(null, 30003));
this.node4 = new MockServer(30004, handler.bind(null, 30004));
});

afterEach(function (done) {
disconnect([this.node1, this.node2, this.node3], done);
disconnect([this.node1, this.node2, this.node3, this.node4], done);
});

context('master', function () {
Expand All @@ -977,7 +978,7 @@ describe('cluster', function () {
});
cluster.on('ready', function () {
stub(utils, 'sample', function (array, from) {
expect(array).to.eql(['127.0.0.1:30001', '127.0.0.1:30003']);
expect(array).to.eql(['127.0.0.1:30001', '127.0.0.1:30003', '127.0.0.1:30004']);
expect(from).to.eql(1);
return '127.0.0.1:30003';
});
Expand Down Expand Up @@ -1006,14 +1007,62 @@ describe('cluster', function () {
});
});

context('custom', function () {
it('should send to selected slave', function (done) {
var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], {
scaleReads: function(node, command) {
if (command.name === 'get') {
return node[1];
} else {
return node[2];
}
}
});
cluster.on('ready', function () {
stub(utils, 'sample', function (array, from) {
expect(array).to.eql(['127.0.0.1:30001', '127.0.0.1:30003', '127.0.0.1:30004']);
expect(from).to.eql(1);
return '127.0.0.1:30003';
});
cluster.hgetall('foo', function (err, res) {
utils.sample.restore();
expect(res).to.eql(30004);
cluster.disconnect();
done();
});
});
});

it('should send writes to masters', function (done) {
var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], {
scaleReads: function(node, command) {
if (command.name === 'get') {
return node[1];
} else {
return node[2];
}
}
});
cluster.on('ready', function () {
stub(utils, 'sample').throws('sample is called');
cluster.set('foo', 'bar', function (err, res) {
utils.sample.restore();
expect(res).to.eql(30001);
cluster.disconnect();
done();
});
});
});
});

context('all', function () {
it('should send reads to all nodes randomly', function (done) {
var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], {
scaleReads: 'all'
});
cluster.on('ready', function () {
stub(utils, 'sample', function (array, from) {
expect(array).to.eql(['127.0.0.1:30001', '127.0.0.1:30003']);
expect(array).to.eql(['127.0.0.1:30001', '127.0.0.1:30003', '127.0.0.1:30004']);
expect(from).to.eql(undefined);
return '127.0.0.1:30003';
});
Expand Down

0 comments on commit 6795b1e

Please sign in to comment.