From 1bc8ca0d05ab830a04502acd1cfc2796aca256ec Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Fri, 30 Oct 2020 10:56:56 +0100 Subject: [PATCH] fix: Fix autopipeline and downgrade p-map to support Node 6. [#1216] --- .travis.yml | 5 +- lib/autoPipelining.ts | 2 +- package-lock.json | 16 ++--- package.json | 2 +- test/functional/cluster/autopipelining.ts | 76 ++++++++++++++++++++--- 5 files changed, 81 insertions(+), 20 deletions(-) diff --git a/.travis.yml b/.travis.yml index 8f77df1c..916636c0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,11 +1,12 @@ language: node_js node_js: + - "6" - "8" - - "9" - "10" - - "11" - "12" + - "14" + - "15" services: - redis-server diff --git a/lib/autoPipelining.ts b/lib/autoPipelining.ts index 4d721f1d..7855de7c 100644 --- a/lib/autoPipelining.ts +++ b/lib/autoPipelining.ts @@ -17,7 +17,7 @@ export const notAllowedAutoPipelineCommands = [ "unpsubscribe", ]; -function findAutoPipeline(client, ...args: Array): string { +function findAutoPipeline(client, _commandName, ...args: Array): string { if (!client.isCluster) { return "main"; } diff --git a/package-lock.json b/package-lock.json index bf371acf..cc397fcf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -477,6 +477,7 @@ "version": "3.0.1", "resolved": "https://registry.npmjs.org/aggregate-error/-/aggregate-error-3.0.1.tgz", "integrity": "sha512-quoaXsZ9/BLNae5yiNoUz+Nhkwz83GhWwtYFglcjEQB2NDHCIpApbqXxIFnm4Pq/Nvhrsq5sYJFyohrrxnTGAA==", + "dev": true, "requires": { "clean-stack": "^2.0.0", "indent-string": "^4.0.0" @@ -868,7 +869,8 @@ "clean-stack": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/clean-stack/-/clean-stack-2.2.0.tgz", - "integrity": "sha512-4diC9HaTE+KRAMWhDhrGOECgWZxoevMc5TlkObMqNSsVU62PYzXZ/SMTjzyGAFF1YusgxGcSWTEXBhp0CPwQ1A==" + "integrity": "sha512-4diC9HaTE+KRAMWhDhrGOECgWZxoevMc5TlkObMqNSsVU62PYzXZ/SMTjzyGAFF1YusgxGcSWTEXBhp0CPwQ1A==", + "dev": true }, "cli-cursor": { "version": "3.1.0", @@ -2471,7 +2473,8 @@ "indent-string": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/indent-string/-/indent-string-4.0.0.tgz", - "integrity": "sha512-EdDDZu4A2OyIK7Lr/2zG+w5jmbuk1DVBnEwREQvBzspBJkCEbRa8GxU1lghYcaGJCnRWibjDXlq779X1/y5xwg==" + "integrity": "sha512-EdDDZu4A2OyIK7Lr/2zG+w5jmbuk1DVBnEwREQvBzspBJkCEbRa8GxU1lghYcaGJCnRWibjDXlq779X1/y5xwg==", + "dev": true }, "inflight": { "version": "1.0.6", @@ -3385,12 +3388,9 @@ } }, "p-map": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/p-map/-/p-map-4.0.0.tgz", - "integrity": "sha512-/bjOqmgETBYB5BoEeGVea8dmvHb2m9GLy1E9W43yeyfP6QQCZGFNa+XRceJEuDB6zqr+gKpIAmlLebMpykw/MQ==", - "requires": { - "aggregate-error": "^3.0.0" - } + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/p-map/-/p-map-2.1.0.tgz", + "integrity": "sha512-y3b8Kpd8OAN444hxfBbFfj1FY/RjtTd8tzYwhUqNYXx0fXx2iX4maP4Qr6qhIKbQXI02wTLAda4fYUbDagTUFw==" }, "p-reduce": { "version": "2.1.0", diff --git a/package.json b/package.json index 92b47985..881e9c52 100644 --- a/package.json +++ b/package.json @@ -38,7 +38,7 @@ "denque": "^1.1.0", "lodash.defaults": "^4.2.0", "lodash.flatten": "^4.4.0", - "p-map": "^4.0.0", + "p-map": "^2.1.0", "redis-commands": "1.6.0", "redis-errors": "^1.2.0", "redis-parser": "^3.0.0", diff --git a/test/functional/cluster/autopipelining.ts b/test/functional/cluster/autopipelining.ts index cfb65add..91be1a69 100644 --- a/test/functional/cluster/autopipelining.ts +++ b/test/functional/cluster/autopipelining.ts @@ -1,4 +1,6 @@ import { expect, use } from "chai"; +import * as calculateKeySlot from 'cluster-key-slot'; + import { default as Cluster } from "../../../lib/cluster"; import MockServer from "../../helpers/mock_server"; @@ -395,7 +397,16 @@ describe("autoPipelining for cluster", function () { await new Promise((resolve) => cluster.once("connect", resolve)); const promise1 = cluster.set("foo1", "bar"); - const promise2 = cluster.set("foo2", "bar"); + const promise2 = cluster.set("foo5", "bar"); + const promise3 = cluster.set("foo2", "bar"); + const promise4 = cluster.set("foo6", "bar"); + + // Override slots to induce a failure + const key1Slot = calculateKeySlot('foo1'); + const key2Slot = calculateKeySlot('foo2'); + const key5Slot = calculateKeySlot('foo5'); + cluster.slots[key1Slot] = cluster.slots[key2Slot]; + cluster.slots[key2Slot] = cluster.slots[key5Slot]; await expect(promise1).to.eventually.be.rejectedWith( "All keys in the pipeline should belong to the same slots allocation group" @@ -403,6 +414,12 @@ describe("autoPipelining for cluster", function () { await expect(promise2).to.eventually.be.rejectedWith( "All keys in the pipeline should belong to the same slots allocation group" ); + await expect(promise3).to.eventually.be.rejectedWith( + "All keys in the pipeline should belong to the same slots allocation group" + ); + await expect(promise4).to.eventually.be.rejectedWith( + "All keys in the pipeline should belong to the same slots allocation group" + ); cluster.disconnect(); }); @@ -411,7 +428,7 @@ describe("autoPipelining for cluster", function () { const cluster = new Cluster(hosts, { enableAutoPipelining: true }); cluster.once("connect", () => { - let err1, err2; + let err1, err2, err3, err4; function cb() { expect(err1.message).to.eql( @@ -420,6 +437,12 @@ describe("autoPipelining for cluster", function () { expect(err2.message).to.eql( "All keys in the pipeline should belong to the same slots allocation group" ); + expect(err3.message).to.eql( + "All keys in the pipeline should belong to the same slots allocation group" + ); + expect(err4.message).to.eql( + "All keys in the pipeline should belong to the same slots allocation group" + ); expect(cluster.autoPipelineQueueSize).to.eql(0); cluster.disconnect(); @@ -431,22 +454,49 @@ describe("autoPipelining for cluster", function () { cluster.set("foo1", "bar1", (err) => { err1 = err; - if (err1 && err2) { + if (err1 && err2 && err3 && err4) { cb(); } }); expect(cluster.autoPipelineQueueSize).to.eql(1); - cluster.set("foo2", (err) => { + cluster.set("foo2", "bar2", (err) => { err2 = err; - if (err1 && err2) { + if (err1 && err2 && err3 && err4) { cb(); } }); expect(cluster.autoPipelineQueueSize).to.eql(2); + + cluster.set("foo5", "bar5", (err) => { + err3 = err; + + if (err1 && err2 && err3 && err4) { + cb(); + } + }); + + expect(cluster.autoPipelineQueueSize).to.eql(3); + + cluster.set("foo6", "bar6", (err) => { + err4 = err; + + if (err1 && err2 && err3 && err4) { + cb(); + } + }); + + expect(cluster.autoPipelineQueueSize).to.eql(4); + + // Override slots to induce a failure + const key1Slot = calculateKeySlot('foo1'); + const key2Slot = calculateKeySlot('foo2'); + const key5Slot = calculateKeySlot('foo5'); + cluster.slots[key1Slot] = cluster.slots[key2Slot]; + cluster.slots[key2Slot] = cluster.slots[key5Slot]; }); }); @@ -457,13 +507,16 @@ describe("autoPipelining for cluster", function () { process.removeAllListeners("uncaughtException"); cluster.once("connect", () => { - let err1; + let err1, err5; process.once("uncaughtException", (err) => { expect(err.message).to.eql("ERROR"); expect(err1.message).to.eql( "All keys in the pipeline should belong to the same slots allocation group" ); + expect(err5.message).to.eql( + "All keys in the pipeline should belong to the same slots allocation group" + ); for (const listener of listeners) { process.on("uncaughtException", listener); @@ -476,14 +529,21 @@ describe("autoPipelining for cluster", function () { cluster.set("foo1", "bar1", (err) => { err1 = err; }); + cluster.set("foo5", "bar5", (err) => { + err5 = err; + }); - expect(cluster.autoPipelineQueueSize).to.eql(1); + expect(cluster.autoPipelineQueueSize).to.eql(2); cluster.set("foo2", (err) => { throw new Error("ERROR"); }); - expect(cluster.autoPipelineQueueSize).to.eql(2); + expect(cluster.autoPipelineQueueSize).to.eql(3); + + const key1Slot = calculateKeySlot('foo1'); + const key2Slot = calculateKeySlot('foo2'); + cluster.slots[key1Slot] = cluster.slots[key2Slot]; }); }); });