diff --git a/lib/autoPipelining.ts b/lib/autoPipelining.ts index 238fe1149..e2fbfdecd 100644 --- a/lib/autoPipelining.ts +++ b/lib/autoPipelining.ts @@ -1,4 +1,5 @@ import * as PromiseContainer from "./promiseContainer"; +import { flatten, isArguments } from "./utils/lodash"; import * as calculateSlot from "cluster-key-slot"; import asCallback from "standard-as-callback"; @@ -74,11 +75,35 @@ export function shouldUseAutoPipelining( ); } +/** + * @private + */ +export function getFirstValueInFlattenedArray( + args: (string | string[])[] +): string | undefined { + for (let i = 0; i < args.length; i++) { + const arg = args[i]; + if (typeof arg === "string") { + return arg; + } else if (Array.isArray(arg) || isArguments(arg)) { + if (arg.length === 0) { + continue; + } + return arg[0]; + } + const flattened = flatten([arg]); + if (flattened.length > 0) { + return flattened[0]; + } + } + return undefined; +} + export function executeWithAutoPipelining( client, functionName: string, commandName: string, - args: string[], + args: (string | string[])[], callback ) { const CustomPromise = PromiseContainer.get(); @@ -104,7 +129,16 @@ export function executeWithAutoPipelining( } // If we have slot information, we can improve routing by grouping slots served by the same subset of nodes - const slotKey = client.isCluster ? client.slots[calculateSlot(args[0])].join(",") : 'main'; + // Note that the first value in args may be a (possibly empty) array. + // ioredis will only flatten one level of the array, in the Command constructor. + const prefix = client.options.keyPrefix || ""; + const slotKey = client.isCluster + ? client.slots[ + calculateSlot( + `${client.options.keyPrefix}${getFirstValueInFlattenedArray(args)}` + ) + ].join(",") + : "main"; if (!client._autoPipelines.has(slotKey)) { const pipeline = client.pipeline(); diff --git a/lib/utils/lodash.ts b/lib/utils/lodash.ts index c903fdc59..ec1a26b91 100644 --- a/lib/utils/lodash.ts +++ b/lib/utils/lodash.ts @@ -1,6 +1,7 @@ import defaults = require("lodash.defaults"); import flatten = require("lodash.flatten"); +import isArguments = require("lodash.isarguments"); export function noop() {} -export { defaults, flatten }; +export { defaults, flatten, isArguments }; diff --git a/package-lock.json b/package-lock.json index 7b855eef0..86c806371 100644 --- a/package-lock.json +++ b/package-lock.json @@ -288,6 +288,15 @@ "@types/lodash": "*" } }, + "@types/lodash.isarguments": { + "version": "3.1.6", + "resolved": "http://artifactory.amz.mtmemgmt.com/artifactory/api/npm/npm-tmg-local/@types/lodash.isarguments/-/lodash.isarguments-3.1.6.tgz", + "integrity": "sha1-WgEtFkZs3Q6hg/U9/CZErcodi4g=", + "dev": true, + "requires": { + "@types/lodash": "*" + } + }, "@types/minimatch": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/@types/minimatch/-/minimatch-3.0.3.tgz", @@ -2671,6 +2680,11 @@ "integrity": "sha1-LRd/ZS+jHpObRDjVNBSZ36OCXpk=", "dev": true }, + "lodash.isarguments": { + "version": "3.1.0", + "resolved": "http://artifactory.amz.mtmemgmt.com/artifactory/api/npm/npm-tmg-local/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha1-L1c9hcaiQon/AGY7SRwdM4/zRYo=" + }, "lodash.map": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/lodash.map/-/lodash.map-4.6.0.tgz", diff --git a/package.json b/package.json index 283391896..fa844719f 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,7 @@ "denque": "^1.1.0", "lodash.defaults": "^4.2.0", "lodash.flatten": "^4.4.0", + "lodash.isarguments": "^3.1.0", "p-map": "^2.1.0", "redis-commands": "1.7.0", "redis-errors": "^1.2.0", @@ -53,6 +54,7 @@ "@types/debug": "^4.1.5", "@types/lodash.defaults": "^4.2.6", "@types/lodash.flatten": "^4.4.6", + "@types/lodash.isarguments": "^3.1.6", "@types/mocha": "^7.0.2", "@types/node": "^13.11.0", "@types/redis-errors": "1.2.0", diff --git a/test/functional/cluster/autopipelining.ts b/test/functional/cluster/autopipelining.ts index 02f933d64..82ac77add 100644 --- a/test/functional/cluster/autopipelining.ts +++ b/test/functional/cluster/autopipelining.ts @@ -35,6 +35,10 @@ describe("autoPipelining for cluster", function () { if (argv[0] === "get" && argv[1] === "foo6") { return "bar6"; } + + if (argv[0] === "get" && argv[1] === "baz:foo10") { + return "bar10"; + } }); new MockServer(30002, function (argv) { @@ -68,6 +72,10 @@ describe("autoPipelining for cluster", function () { return "bar5"; } + if (argv[0] === "get" && argv[1] === "baz:foo1") { + return "bar1"; + } + if (argv[0] === "evalsha") { return argv.slice(argv.length - 4); } @@ -177,6 +185,40 @@ describe("autoPipelining for cluster", function () { cluster.disconnect(); }); + it("should support building pipelines when a prefix is used", async () => { + const cluster = new Cluster(hosts, { + enableAutoPipelining: true, + keyPrefix: "baz:", + }); + await new Promise((resolve) => cluster.once("connect", resolve)); + + await cluster.set("foo1", "bar1"); + await cluster.set("foo10", "bar10"); + + expect( + await Promise.all([cluster.get("foo1"), cluster.get("foo10")]) + ).to.eql(["bar1", "bar10"]); + + cluster.disconnect(); + }); + + it("should support building pipelines when a prefix is used with arrays to flatten", async () => { + const cluster = new Cluster(hosts, { + enableAutoPipelining: true, + keyPrefix: "baz:", + }); + await new Promise((resolve) => cluster.once("connect", resolve)); + + await cluster.set(["foo1"], "bar1"); + await cluster.set(["foo10"], "bar10"); + + expect( + await Promise.all([cluster.get(["foo1"]), cluster.get(["foo10"])]) + ).to.eql(["bar1", "bar10"]); + + cluster.disconnect(); + }); + it("should support commands queued after a pipeline is already queued for execution", (done) => { const cluster = new Cluster(hosts, { enableAutoPipelining: true }); @@ -407,9 +449,9 @@ describe("autoPipelining for cluster", function () { const promise4 = cluster.set("foo6", "bar"); // Override slots to induce a failure - const key1Slot = calculateKeySlot('foo1'); - const key2Slot = calculateKeySlot('foo2'); - const key5Slot = calculateKeySlot('foo5'); + const key1Slot = calculateKeySlot("foo1"); + const key2Slot = calculateKeySlot("foo2"); + const key5Slot = calculateKeySlot("foo5"); changeSlot(cluster, key1Slot, key2Slot); changeSlot(cluster, key2Slot, key5Slot); @@ -498,9 +540,9 @@ describe("autoPipelining for cluster", function () { expect(cluster.autoPipelineQueueSize).to.eql(4); // Override slots to induce a failure - const key1Slot = calculateKeySlot('foo1'); - const key2Slot = calculateKeySlot('foo2'); - const key5Slot = calculateKeySlot('foo5'); + const key1Slot = calculateKeySlot("foo1"); + const key2Slot = calculateKeySlot("foo2"); + const key5Slot = calculateKeySlot("foo5"); changeSlot(cluster, key1Slot, key2Slot); changeSlot(cluster, key2Slot, key5Slot); }); @@ -547,8 +589,8 @@ describe("autoPipelining for cluster", function () { expect(cluster.autoPipelineQueueSize).to.eql(3); - const key1Slot = calculateKeySlot('foo1'); - const key2Slot = calculateKeySlot('foo2'); + const key1Slot = calculateKeySlot("foo1"); + const key2Slot = calculateKeySlot("foo2"); changeSlot(cluster, key1Slot, key2Slot); }); }); diff --git a/test/unit/autoPipelining.ts b/test/unit/autoPipelining.ts new file mode 100644 index 000000000..f8cc053b0 --- /dev/null +++ b/test/unit/autoPipelining.ts @@ -0,0 +1,38 @@ +import { expect } from "chai"; +import { getFirstValueInFlattenedArray } from "../../lib/autoPipelining"; +import { flatten } from "../../lib/utils/lodash"; + +describe("autoPipelining", function () { + const expectGetFirstValueIs = (values, expected) => { + expect(getFirstValueInFlattenedArray(values)).to.eql(expected); + // getFirstValueInFlattenedArray should behave the same way as flatten(args)[0] + // but be much more efficient. + expect(flatten(values)[0]).to.eql(expected); + }; + + it("should be able to efficiently get array args", function () { + expectGetFirstValueIs([], undefined); + expectGetFirstValueIs([null, "key"], null); + expectGetFirstValueIs(["key", "value"], "key"); + expectGetFirstValueIs([[], "key"], "key"); + expectGetFirstValueIs([["key"]], "key"); + // @ts-ignore + expectGetFirstValueIs([[["key"]]], ["key"]); + // @ts-ignore + expectGetFirstValueIs([0, 1, 2, 3, 4], 0); + // @ts-ignore + expectGetFirstValueIs([[true]], true); + // @ts-ignore + expectGetFirstValueIs([Buffer.from("test")], Buffer.from("test")); + // @ts-ignore + expectGetFirstValueIs([{}], {}); + // lodash.isArguments is true for this legacy js way to get argument lists + const createArguments = function () { + return arguments; + }; + // @ts-ignore + expectGetFirstValueIs([createArguments(), createArguments("key")], "key"); + // @ts-ignore + expectGetFirstValueIs([createArguments("")], ""); + }); +});