diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index f7dc2c4c..f0c6f515 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -107,13 +107,27 @@ class Cluster extends EventEmitter { this.offlineQueue = new Deque() } + clearNodesRefreshInterval() { + if (this.slotsTimer) { + clearTimeout(this.slotsTimer) + this.slotsTimer = null + } + } + resetNodesRefreshInterval() { if (this.slotsTimer) { return } - this.slotsTimer = setInterval(function () { - this.refreshSlotsCache() - }.bind(this), this.options.slotsRefreshInterval) + const nextRound = () => { + this.slotsTimer = setTimeout(() => { + debug('refreshing slot caches... (triggered by "slotsRefreshInterval" option)') + this.refreshSlotsCache(() => { + nextRound() + }) + }, this.options.slotsRefreshInterval) + } + + nextRound() } /** @@ -245,10 +259,7 @@ class Cluster extends EventEmitter { this.reconnectTimeout = null debug('Canceled reconnecting attempts') } - if (this.slotsTimer) { - clearInterval(this.slotsTimer) - this.slotsTimer = null - } + this.clearNodesRefreshInterval() this.subscriber.stop() if (status === 'wait') { @@ -276,10 +287,7 @@ class Cluster extends EventEmitter { clearTimeout(this.reconnectTimeout) this.reconnectTimeout = null } - if (this.slotsTimer) { - clearInterval(this.slotsTimer) - this.slotsTimer = null - } + this.clearNodesRefreshInterval() this.subscriber.stop() @@ -471,6 +479,7 @@ class Cluster extends EventEmitter { } _this.connectionPool.findOrCreate(_this.natMapper(key)) tryConnection() + debug('refreshing slot caches... (triggered by MOVED error)') _this.refreshSlotsCache() }, ask: function (slot, key) { @@ -608,9 +617,19 @@ class Cluster extends EventEmitter { if (!redis) { return callback(new Error('Node is disconnected')) } - redis.cluster('slots', timeout((err, result) => { + + // Use a duplication of the connection to avoid + // timeouts when the connection is in the blocking + // mode (e.g. waiting for BLPOP). + const duplicatedConnection = redis.duplicate({ + enableOfflineQueue: true, + enableReadyCheck: false, + retryStrategy: null, + connectionName: 'ioredisClusterRefresher' + }) + duplicatedConnection.cluster('slots', timeout((err, result) => { + duplicatedConnection.disconnect() if (err) { - redis.disconnect() return callback(err) } if (this.status === 'disconnecting' || this.status === 'close' || this.status === 'end') { diff --git a/test/functional/cluster/index.js b/test/functional/cluster/index.js index c66a3618..8c9084d1 100644 --- a/test/functional/cluster/index.js +++ b/test/functional/cluster/index.js @@ -104,7 +104,12 @@ describe('cluster', function () { return 0; } }); + let hasDone = false new MockServer(30002, function () { + if (hasDone) { + return + } + hasDone = true client.disconnect(); done(); }); @@ -250,8 +255,8 @@ describe('cluster', function () { describe('#nodes()', function () { it('should return the corrent nodes', function (done) { var slotTable = [ - [0, 5460, ['127.0.0.1', 30001], ['127.0.0.1', 30003]], - [5461, 10922, ['127.0.0.1', 30002]] + [0, 16381, ['127.0.0.1', 30001], ['127.0.0.1', 30003]], + [16382, 16383, ['127.0.0.1', 30002]] ]; var node = new MockServer(30001, function (argv) { if (argv[0] === 'cluster' && argv[1] === 'slots') { @@ -271,7 +276,8 @@ describe('cluster', function () { }); var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]); - cluster.on('ready', function () { + // Make sure 30001 has been connected + cluster.get('foo', function () { expect(cluster.nodes()).to.have.lengthOf(3); expect(cluster.nodes('all')).to.have.lengthOf(3); expect(cluster.nodes('master')).to.have.lengthOf(2); diff --git a/test/functional/cluster/moved.js b/test/functional/cluster/moved.js index 3046e419..1d5e1bb9 100644 --- a/test/functional/cluster/moved.js +++ b/test/functional/cluster/moved.js @@ -30,6 +30,8 @@ describe('cluster:MOVED', function () { if (argv[0] === 'get' && argv[1] === 'foo') { expect(moved).to.eql(false); moved = true; + slotTable[0][1] = 16381 + slotTable[1][0] = 16382 return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001'); } }); @@ -103,6 +105,8 @@ describe('cluster:MOVED', function () { if (argv[0] === 'get' && argv[1] === 'foo') { expect(moved).to.eql(false); moved = true; + slotTable[0][1] = 16381 + slotTable[1][0] = 16382 return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001'); } }); diff --git a/test/functional/cluster/quit.js b/test/functional/cluster/quit.js index fdc6d98e..983ca284 100644 --- a/test/functional/cluster/quit.js +++ b/test/functional/cluster/quit.js @@ -32,8 +32,8 @@ describe('cluster:quit', () => { it('failed when quit returns error', function (done) { const ERROR_MESSAGE = 'quit random error' const slotTable = [ - [0, 1000, ['127.0.0.1', 30001]], - [1001, 16383, ['127.0.0.1', 30002]] + [0, 16381, ['127.0.0.1', 30001]], + [16382, 16383, ['127.0.0.1', 30002]] ] new MockServer(30001, function (argv, c) { if (argv[0] === 'quit') { @@ -49,7 +49,7 @@ describe('cluster:quit', () => { const cluster = new Redis.Cluster([ { host: '127.0.0.1', port: '30001' } ]) - cluster.on('ready', () => { + cluster.get('foo', () => { cluster.quit((err) => { expect(err.message).to.eql(ERROR_MESSAGE) cluster.disconnect() diff --git a/test/helpers/mock_server.js b/test/helpers/mock_server.js index 7653496a..a237e87f 100644 --- a/test/helpers/mock_server.js +++ b/test/helpers/mock_server.js @@ -130,7 +130,7 @@ MockServer.prototype.write = function (c, data) { MockServer.prototype.findClientByName = function (name) { for (const client of this.clients) { - if (client._connectionName === name) { + if (client && client._connectionName === name) { return client } }