Skip to content

Commit

Permalink
fix: Fix autopipeline and downgrade p-map to support Node 6. [#1216]
Browse files Browse the repository at this point in the history
  • Loading branch information
ShogunPanda authored and AVVS committed Oct 31, 2020
1 parent 6feee28 commit 1bc8ca0
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 20 deletions.
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
language: node_js

node_js:
- "6"
- "8"
- "9"
- "10"
- "11"
- "12"
- "14"
- "15"

services:
- redis-server
Expand Down
2 changes: 1 addition & 1 deletion lib/autoPipelining.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export const notAllowedAutoPipelineCommands = [
"unpsubscribe",
];

function findAutoPipeline(client, ...args: Array<string>): string {
function findAutoPipeline(client, _commandName, ...args: Array<string>): string {
if (!client.isCluster) {
return "main";
}
Expand Down
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"denque": "^1.1.0",
"lodash.defaults": "^4.2.0",
"lodash.flatten": "^4.4.0",
"p-map": "^4.0.0",
"p-map": "^2.1.0",
"redis-commands": "1.6.0",
"redis-errors": "^1.2.0",
"redis-parser": "^3.0.0",
Expand Down
76 changes: 68 additions & 8 deletions test/functional/cluster/autopipelining.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { expect, use } from "chai";
import * as calculateKeySlot from 'cluster-key-slot';

import { default as Cluster } from "../../../lib/cluster";
import MockServer from "../../helpers/mock_server";

Expand Down Expand Up @@ -395,14 +397,29 @@ describe("autoPipelining for cluster", function () {
await new Promise((resolve) => cluster.once("connect", resolve));

const promise1 = cluster.set("foo1", "bar");
const promise2 = cluster.set("foo2", "bar");
const promise2 = cluster.set("foo5", "bar");
const promise3 = cluster.set("foo2", "bar");
const promise4 = cluster.set("foo6", "bar");

// Override slots to induce a failure
const key1Slot = calculateKeySlot('foo1');
const key2Slot = calculateKeySlot('foo2');
const key5Slot = calculateKeySlot('foo5');
cluster.slots[key1Slot] = cluster.slots[key2Slot];
cluster.slots[key2Slot] = cluster.slots[key5Slot];

await expect(promise1).to.eventually.be.rejectedWith(
"All keys in the pipeline should belong to the same slots allocation group"
);
await expect(promise2).to.eventually.be.rejectedWith(
"All keys in the pipeline should belong to the same slots allocation group"
);
await expect(promise3).to.eventually.be.rejectedWith(
"All keys in the pipeline should belong to the same slots allocation group"
);
await expect(promise4).to.eventually.be.rejectedWith(
"All keys in the pipeline should belong to the same slots allocation group"
);

cluster.disconnect();
});
Expand All @@ -411,7 +428,7 @@ describe("autoPipelining for cluster", function () {
const cluster = new Cluster(hosts, { enableAutoPipelining: true });

cluster.once("connect", () => {
let err1, err2;
let err1, err2, err3, err4;

function cb() {
expect(err1.message).to.eql(
Expand All @@ -420,6 +437,12 @@ describe("autoPipelining for cluster", function () {
expect(err2.message).to.eql(
"All keys in the pipeline should belong to the same slots allocation group"
);
expect(err3.message).to.eql(
"All keys in the pipeline should belong to the same slots allocation group"
);
expect(err4.message).to.eql(
"All keys in the pipeline should belong to the same slots allocation group"
);
expect(cluster.autoPipelineQueueSize).to.eql(0);

cluster.disconnect();
Expand All @@ -431,22 +454,49 @@ describe("autoPipelining for cluster", function () {
cluster.set("foo1", "bar1", (err) => {
err1 = err;

if (err1 && err2) {
if (err1 && err2 && err3 && err4) {
cb();
}
});

expect(cluster.autoPipelineQueueSize).to.eql(1);

cluster.set("foo2", (err) => {
cluster.set("foo2", "bar2", (err) => {
err2 = err;

if (err1 && err2) {
if (err1 && err2 && err3 && err4) {
cb();
}
});

expect(cluster.autoPipelineQueueSize).to.eql(2);

cluster.set("foo5", "bar5", (err) => {
err3 = err;

if (err1 && err2 && err3 && err4) {
cb();
}
});

expect(cluster.autoPipelineQueueSize).to.eql(3);

cluster.set("foo6", "bar6", (err) => {
err4 = err;

if (err1 && err2 && err3 && err4) {
cb();
}
});

expect(cluster.autoPipelineQueueSize).to.eql(4);

// Override slots to induce a failure
const key1Slot = calculateKeySlot('foo1');
const key2Slot = calculateKeySlot('foo2');
const key5Slot = calculateKeySlot('foo5');
cluster.slots[key1Slot] = cluster.slots[key2Slot];
cluster.slots[key2Slot] = cluster.slots[key5Slot];
});
});

Expand All @@ -457,13 +507,16 @@ describe("autoPipelining for cluster", function () {
process.removeAllListeners("uncaughtException");

cluster.once("connect", () => {
let err1;
let err1, err5;

process.once("uncaughtException", (err) => {
expect(err.message).to.eql("ERROR");
expect(err1.message).to.eql(
"All keys in the pipeline should belong to the same slots allocation group"
);
expect(err5.message).to.eql(
"All keys in the pipeline should belong to the same slots allocation group"
);

for (const listener of listeners) {
process.on("uncaughtException", listener);
Expand All @@ -476,14 +529,21 @@ describe("autoPipelining for cluster", function () {
cluster.set("foo1", "bar1", (err) => {
err1 = err;
});
cluster.set("foo5", "bar5", (err) => {
err5 = err;
});

expect(cluster.autoPipelineQueueSize).to.eql(1);
expect(cluster.autoPipelineQueueSize).to.eql(2);

cluster.set("foo2", (err) => {
throw new Error("ERROR");
});

expect(cluster.autoPipelineQueueSize).to.eql(2);
expect(cluster.autoPipelineQueueSize).to.eql(3);

const key1Slot = calculateKeySlot('foo1');
const key2Slot = calculateKeySlot('foo2');
cluster.slots[key1Slot] = cluster.slots[key2Slot];
});
});
});

0 comments on commit 1bc8ca0

Please sign in to comment.