From fe8820b964dac7c25b9785c32ee3242e14c1cd17 Mon Sep 17 00:00:00 2001 From: ohad-israeli Date: Mon, 7 Dec 2020 20:31:23 +0200 Subject: [PATCH] feat(cluster): support retrying MOVED with a delay Add delay to MOVED response. In case a MOVED response is recieved from the cluster use a delayed queue to retry commands --- README.md | 7 ++++-- lib/cluster/ClusterOptions.ts | 12 ++++++++++ lib/cluster/index.ts | 15 ++++++++++-- test/functional/cluster/moved.ts | 40 ++++++++++++++++++++++++++++++++ 4 files changed, 70 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index b8947131..dea4dee0 100644 --- a/README.md +++ b/README.md @@ -889,9 +889,12 @@ cluster.get("foo", (err, res) => { will resend the commands after the specified time (in ms). - `retryDelayOnTryAgain`: If this option is a number (by default, it is `100`), the client will resend the commands rejected with `TRYAGAIN` error after the specified time (in ms). + - `retryDelayOnMoved`: By default, this value is `0` (in ms), which means when a `MOVED` error is received, the client will resend + the command instantly to the node returned together with the `MOVED` error. However, sometimes it takes time for a cluster to become + state stabilized after a failover, so adding a delay before resending can prevent a ping pong effect. - `redisOptions`: Default options passed to the constructor of `Redis` when connecting to a node. - - `slotsRefreshTimeout`: Milliseconds before a timeout occurs while refreshing slots from the cluster (default `1000`) - - `slotsRefreshInterval`: Milliseconds between every automatic slots refresh (default `5000`) + - `slotsRefreshTimeout`: Milliseconds before a timeout occurs while refreshing slots from the cluster (default `1000`). + - `slotsRefreshInterval`: Milliseconds between every automatic slots refresh (default `5000`). ### Read-write splitting diff --git a/lib/cluster/ClusterOptions.ts b/lib/cluster/ClusterOptions.ts index 6b76b2bc..bcc14b40 100644 --- a/lib/cluster/ClusterOptions.ts +++ b/lib/cluster/ClusterOptions.ts @@ -94,6 +94,17 @@ export interface IClusterOptions { */ retryDelayOnTryAgain?: number; + /** + * By default, this value is 0, which means when a `MOVED` error is received, + * the client will resend the command instantly to the node returned together with + * the `MOVED` error. However, sometimes it takes time for a cluster to become + * state stabilized after a failover, so adding a delay before resending can + * prevent a ping pong effect. + * + * @default 0 + */ + retryDelayOnMoved?: number; + /** * The milliseconds before a timeout occurs while refreshing * slots from the cluster. @@ -184,6 +195,7 @@ export const DEFAULT_CLUSTER_OPTIONS: IClusterOptions = { enableReadyCheck: true, scaleReads: "master", maxRedirections: 16, + retryDelayOnMoved: 0, retryDelayOnFailover: 100, retryDelayOnClusterDown: 100, retryDelayOnTryAgain: 100, diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index f6f17d5c..ea16cb2c 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -745,8 +745,19 @@ class Cluster extends EventEmitter { return; } const errv = error.message.split(" "); - if (errv[0] === "MOVED" || errv[0] === "ASK") { - handlers[errv[0] === "MOVED" ? "moved" : "ask"](errv[1], errv[2]); + if (errv[0] === "MOVED") { + const timeout = this.options.retryDelayOnMoved; + if (timeout && typeof timeout === "number") { + this.delayQueue.push( + "moved", + handlers.moved.bind(null, errv[1], errv[2]), + { timeout } + ); + } else { + handlers.moved(errv[1], errv[2]); + } + } else if (errv[0] === "ASK") { + handlers.ask(errv[1], errv[2]); } else if (errv[0] === "TRYAGAIN") { this.delayQueue.push("tryagain", handlers.tryagain, { timeout: this.options.retryDelayOnTryAgain, diff --git a/test/functional/cluster/moved.ts b/test/functional/cluster/moved.ts index 276bd8b2..1b2527da 100644 --- a/test/functional/cluster/moved.ts +++ b/test/functional/cluster/moved.ts @@ -2,6 +2,7 @@ import * as calculateSlot from "cluster-key-slot"; import MockServer from "../../helpers/mock_server"; import { expect } from "chai"; import { Cluster } from "../../../lib"; +import * as sinon from "sinon"; describe("cluster:MOVED", function () { it("should auto redirect the command to the correct nodes", function (done) { @@ -117,4 +118,43 @@ describe("cluster:MOVED", function () { cluster.get("foo"); }); }); + + it("should supports retryDelayOnMoved", (done) => { + let cluster = undefined; + const slotTable = [[0, 16383, ["127.0.0.1", 30001]]]; + new MockServer(30001, function (argv) { + if (argv[0] === "cluster" && argv[1] === "slots") { + return slotTable; + } + if (argv[0] === "get" && argv[1] === "foo") { + return new Error("MOVED " + calculateSlot("foo") + " 127.0.0.1:30002"); + } + }); + + new MockServer(30002, function (argv) { + if (argv[0] === "cluster" && argv[1] === "slots") { + return slotTable; + } + if (argv[0] === "get" && argv[1] === "foo") { + cluster.disconnect(); + done(); + } + }); + + const retryDelayOnMoved = 789; + cluster = new Cluster([{ host: "127.0.0.1", port: "30001" }], { + retryDelayOnMoved, + }); + cluster.on("ready", function () { + sinon.stub(global, "setTimeout").callsFake((body, ms) => { + if (ms === retryDelayOnMoved) { + process.nextTick(() => { + body(); + }); + } + }); + + cluster.get("foo"); + }); + }); });