Skip to content

Commit

Permalink
fix: make sure timers is cleared on exit (#1502)
Browse files Browse the repository at this point in the history
  • Loading branch information
luin authored Feb 2, 2022
1 parent bf3bec7 commit cfb04a0
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 23 deletions.
41 changes: 25 additions & 16 deletions lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Cluster extends EventEmitter {
private isRefreshing = false;
public isCluster = true;
private _autoPipelines: Map<string, typeof Pipeline> = new Map();
private _groupsIds: {[key: string]: number} = {};
private _groupsIds: { [key: string]: number } = {};
private _groupsBySlot: number[] = Array(16384);
private _runningAutoPipelines: Set<string> = new Set();
private _readyDelayedCallbacks: CallbackFunction[] = [];
Expand Down Expand Up @@ -154,6 +154,13 @@ class Cluster extends EventEmitter {
}
}

clearAddedScriptHashesCleanInterval() {
if (this._addedScriptHashesCleanInterval) {
clearInterval(this._addedScriptHashesCleanInterval);
this._addedScriptHashesCleanInterval = null;
}
}

resetNodesRefreshInterval() {
if (this.slotsTimer) {
return;
Expand Down Expand Up @@ -191,7 +198,7 @@ class Cluster extends EventEmitter {
}

// Make sure only one timer is active at a time
clearInterval(this._addedScriptHashesCleanInterval);
this.clearAddedScriptHashesCleanInterval();

// Start the script cache cleaning
this._addedScriptHashesCleanInterval = setInterval(() => {
Expand Down Expand Up @@ -272,12 +279,12 @@ class Cluster extends EventEmitter {
this.once("close", this.handleCloseEvent.bind(this));

this.refreshSlotsCache(
function (err) {
if (err && err.message === "Failed to refresh slots cache.") {
Redis.prototype.silentEmit.call(this, "error", err);
this.connectionPool.reset([]);
}
}.bind(this)
function (err) {
if (err && err.message === "Failed to refresh slots cache.") {
Redis.prototype.silentEmit.call(this, "error", err);
this.connectionPool.reset([]);
}
}.bind(this)
);
this.subscriber.start();
})
Expand All @@ -300,6 +307,9 @@ class Cluster extends EventEmitter {
if (reason) {
debug("closed because %s", reason);
}

this.clearAddedScriptHashesCleanInterval();

let retryDelay;
if (
!this.manuallyClosing &&
Expand Down Expand Up @@ -339,8 +349,7 @@ class Cluster extends EventEmitter {
const status = this.status;
this.setStatus("disconnecting");

clearInterval(this._addedScriptHashesCleanInterval);
this._addedScriptHashesCleanInterval = null;
this.clearAddedScriptHashesCleanInterval();

if (!reconnect) {
this.manuallyClosing = true;
Expand Down Expand Up @@ -372,8 +381,7 @@ class Cluster extends EventEmitter {
const status = this.status;
this.setStatus("disconnecting");

clearInterval(this._addedScriptHashesCleanInterval);
this._addedScriptHashesCleanInterval = null;
this.clearAddedScriptHashesCleanInterval();

this.manuallyClosing = true;

Expand Down Expand Up @@ -632,7 +640,8 @@ class Cluster extends EventEmitter {
} else {
_this.slots[slot] = [key];
}
_this._groupsBySlot[slot] = _this._groupsIds[_this.slots[slot].join(';')];
_this._groupsBySlot[slot] =
_this._groupsIds[_this.slots[slot].join(";")];
_this.connectionPool.findOrCreate(_this.natMapper(key));
tryConnection();
debug("refreshing slot caches... (triggered by MOVED error)");
Expand Down Expand Up @@ -867,14 +876,14 @@ class Cluster extends EventEmitter {
}

// Assign to each node keys a numeric value to make autopipeline comparison faster.
this._groupsIds = Object.create(null);
this._groupsIds = Object.create(null);
let j = 0;
for (let i = 0; i < 16384; i++) {
const target = (this.slots[i] || []).join(';');
const target = (this.slots[i] || []).join(";");

if (!target.length) {
this._groupsBySlot[i] = undefined;
continue;
continue;
}

if (!this._groupsIds[target]) {
Expand Down
2 changes: 2 additions & 0 deletions lib/redis/event_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ export function closeHandler(self) {
abortTransactionFragments(self.offlineQueue);
}

self.clearAddedScriptHashesCleanInterval();

if (self.manuallyClosing) {
self.manuallyClosing = false;
debug("skip reconnecting since the connection is manually closed.");
Expand Down
15 changes: 10 additions & 5 deletions lib/redis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,13 @@ Redis.prototype.setStatus = function (status, arg) {
process.nextTick(this.emit.bind(this, status, arg));
};

Redis.prototype.clearAddedScriptHashesCleanInterval = function () {
if (this._addedScriptHashesCleanInterval) {
clearInterval(this._addedScriptHashesCleanInterval);
this._addedScriptHashesCleanInterval = null;
}
};

/**
* Create a connection to Redis.
* This method will be invoked automatically when creating a new Redis instance
Expand All @@ -314,7 +321,7 @@ Redis.prototype.connect = function (callback) {
}

// Make sure only one timer is active at a time
clearInterval(this._addedScriptHashesCleanInterval);
this.clearAddedScriptHashesCleanInterval();

// Start the script cache cleaning
this._addedScriptHashesCleanInterval = setInterval(() => {
Expand Down Expand Up @@ -436,8 +443,7 @@ Redis.prototype.connect = function (callback) {
* @public
*/
Redis.prototype.disconnect = function (reconnect) {
clearInterval(this._addedScriptHashesCleanInterval);
this._addedScriptHashesCleanInterval = null;
this.clearAddedScriptHashesCleanInterval();

if (!reconnect) {
this.manuallyClosing = true;
Expand Down Expand Up @@ -742,8 +748,7 @@ Redis.prototype.sendCommand = function (command: Command, stream: NetStream) {
}

if (command.name === "quit") {
clearInterval(this._addedScriptHashesCleanInterval);
this._addedScriptHashesCleanInterval = null;
this.clearAddedScriptHashesCleanInterval();
}

let writable =
Expand Down
2 changes: 1 addition & 1 deletion test/functional/autopipelining.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ describe("autoPipelining for single node", function () {
expect(redis.autoPipelineQueueSize).to.eql(1);

redis.set("foo2", (err) => {
expect(err.message).to.eql(
expect(err.message).to.include(
"ERR wrong number of arguments for 'set' command"
);
done();
Expand Down
2 changes: 1 addition & 1 deletion test/functional/cluster/autopipelining.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ describe("autoPipelining for cluster", function () {
expect(cluster.autoPipelineQueueSize).to.eql(1);

cluster.set("foo5", (err) => {
expect(err.message).to.eql(
expect(err.message).to.include(
"ERR wrong number of arguments for 'set' command"
);

Expand Down
51 changes: 51 additions & 0 deletions test/functional/cluster/disconnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import Redis from "../../../lib/redis";
import * as sinon from "sinon";
import { expect } from "chai";
import { Cluster } from "../../../lib";
import MockServer from "../../helpers/mock_server";

describe("disconnection", function () {
afterEach(() => {
sinon.restore();
});

it("should clear all timers on disconnect", function (done) {
const server = new MockServer(30000);

const setIntervalCalls = sinon.spy(global, "setInterval");
const clearIntervalCalls = sinon.spy(global, "clearInterval");

const cluster = new Cluster([{ host: "127.0.0.1", port: "30000" }]);
cluster.on("connect", function () {
cluster.disconnect();
});

cluster.on("end", function () {
setTimeout(() => {
// wait for disconnect with refresher.
expect(setIntervalCalls.callCount).to.equal(
clearIntervalCalls.callCount
);
server.disconnect();
done();
}, 500);
});
});

it("should clear all timers on server exits", function (done) {
const server = new MockServer(30000);

const setIntervalCalls = sinon.spy(global, "setInterval");
const clearIntervalCalls = sinon.spy(global, "clearInterval");

const cluster = new Cluster([{ host: "127.0.0.1", port: "30000" }], {
clusterRetryStrategy: null,
});
cluster.on("end", function () {
expect(setIntervalCalls.callCount).to.equal(clearIntervalCalls.callCount);
done();
});

server.disconnect();
});
});
46 changes: 46 additions & 0 deletions test/functional/disconnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import Redis from "../../lib/redis";
import * as sinon from "sinon";
import { expect } from "chai";
import MockServer from "../helpers/mock_server";

describe("disconnection", function () {
afterEach(() => {
sinon.restore();
});

it("should clear all timers on disconnect", function (done) {
const server = new MockServer(30000);

const setIntervalCalls = sinon.spy(global, "setInterval");
const clearIntervalCalls = sinon.spy(global, "clearInterval");

const redis = new Redis({});
redis.on("connect", function () {
redis.disconnect();
});

redis.on("end", function () {
expect(setIntervalCalls.callCount).to.equal(clearIntervalCalls.callCount);
server.disconnect();
done();
});
});

it("should clear all timers on server exits", function (done) {
const server = new MockServer(30000);

const setIntervalCalls = sinon.spy(global, "setInterval");
const clearIntervalCalls = sinon.spy(global, "clearInterval");

const redis = new Redis({
port: 30000,
retryStrategy: null,
});
redis.on("end", function () {
expect(setIntervalCalls.callCount).to.equal(clearIntervalCalls.callCount);
done();
});

server.disconnect();
});
});

0 comments on commit cfb04a0

Please sign in to comment.