diff --git a/examples/basic_operations.js b/examples/basic_operations.js index ec3876fbf..726812936 100644 --- a/examples/basic_operations.js +++ b/examples/basic_operations.js @@ -33,7 +33,7 @@ redis.sadd("set", [1, 3, 5, 7]); redis.spop("set"); // Promise resolves to "5" or another item in the set // Most responses are strings, or arrays of strings -redis.zadd("sortedSet", 1, "one", 2, "dos", 4, "quatro", 3, "three") +redis.zadd("sortedSet", 1, "one", 2, "dos", 4, "quatro", 3, "three"); redis.zrange("sortedSet", 0, 2, "WITHSCORES").then(res => console.log(res)); // Promise resolves to ["one", "1", "dos", "2", "three", "3"] as if the command was ` redis> ZRANGE sortedSet 0 2 WITHSCORES ` // Some responses have transformers to JS values diff --git a/lib/command.ts b/lib/command.ts index 90e36252e..66a7d96f4 100644 --- a/lib/command.ts +++ b/lib/command.ts @@ -154,6 +154,8 @@ export default class Command implements ICommand { private callback: CallbackFunction; private transformed: boolean = false; public isCustomCommand: boolean = false; + public inTransaction: boolean = false; + public pipelineIndex?: number; private slot?: number | null; private keys?: Array; diff --git a/lib/pipeline.ts b/lib/pipeline.ts index ca436959b..b92780495 100644 --- a/lib/pipeline.ts +++ b/lib/pipeline.ts @@ -65,15 +65,9 @@ Pipeline.prototype.fillResult = function(value, position) { if (this.isCluster) { let retriable = true; let commonError: { name: string; message: string }; - let inTransaction: boolean; for (let i = 0; i < this._result.length; ++i) { var error = this._result[i][0]; var command = this._queue[i]; - if (command.name === "multi") { - inTransaction = true; - } else if (command.name === "exec") { - inTransaction = false; - } if (error) { if ( command.name === "exec" && @@ -94,7 +88,7 @@ Pipeline.prototype.fillResult = function(value, position) { retriable = false; break; } - } else if (!inTransaction) { + } else if (!command.inTransaction) { var isReadOnly = exists(command.name) && hasFlag(command.name, "readonly"); if (!isReadOnly) { @@ -107,7 +101,7 @@ Pipeline.prototype.fillResult = function(value, position) { var _this = this; var errv = commonError.message.split(" "); var queue = this._queue; - inTransaction = false; + let inTransaction = false; this._queue = []; for (let i = 0; i < queue.length; ++i) { if ( @@ -122,11 +116,7 @@ Pipeline.prototype.fillResult = function(value, position) { } queue[i].initPromise(); this.sendCommand(queue[i]); - if (queue[i].name === "multi") { - inTransaction = true; - } else if (queue[i].name === "exec") { - inTransaction = false; - } + inTransaction = queue[i].inTransaction; } let matched = true; @@ -174,7 +164,12 @@ Pipeline.prototype.fillResult = function(value, position) { }; Pipeline.prototype.sendCommand = function(command) { + if (this._transactions > 0) { + command.inTransaction = true; + } + const position = this._queue.length; + command.pipelineIndex = position; command.promise .then(result => { diff --git a/lib/redis/event_handler.ts b/lib/redis/event_handler.ts index 15e158e7e..709c9f30d 100644 --- a/lib/redis/event_handler.ts +++ b/lib/redis/event_handler.ts @@ -1,7 +1,10 @@ "use strict"; +import Deque = require("denque"); +import { AbortError } from "redis-errors"; import Command from "../command"; import { MaxRetriesPerRequestError } from "../errors"; +import { ICommandItem, ICommand } from "../types"; import { Debug, noop, CONNECTION_CLOSED_ERROR_MSG } from "../utils"; import DataHandler from "../DataHandler"; @@ -77,6 +80,61 @@ export function connectHandler(self) { }; } +function abortError(command: ICommand) { + const err = new AbortError("Command aborted due to connection close"); + (err as any).command = { + name: command.name, + args: command.args + }; + return err; +} + +// If a contiguous set of pipeline commands starts from index zero then they +// can be safely reattempted. If however we have a chain of pipelined commands +// starting at index 1 or more it means we received a partial response before +// the connection close and those pipelined commands must be aborted. For +// example, if the queue looks like this: [2, 3, 4, 0, 1, 2] then after +// aborting and purging we'll have a queue that looks like this: [0, 1, 2] +function abortIncompletePipelines(commandQueue: Deque) { + let expectedIndex = 0; + for (let i = 0; i < commandQueue.length; ) { + const command = commandQueue.peekAt(i).command as Command; + const pipelineIndex = command.pipelineIndex; + if (pipelineIndex === undefined || pipelineIndex === 0) { + expectedIndex = 0; + } + if (pipelineIndex !== undefined && pipelineIndex !== expectedIndex++) { + commandQueue.remove(i, 1); + command.reject(abortError(command)); + continue; + } + i++; + } +} + +// If only a partial transaction result was received before connection close, +// we have to abort any transaction fragments that may have ended up in the +// offline queue +function abortTransactionFragments(commandQueue: Deque) { + for (let i = 0; i < commandQueue.length; ) { + const command = commandQueue.peekAt(i).command as Command; + if (command.name === "multi") { + break; + } + if (command.name === "exec") { + commandQueue.remove(i, 1); + command.reject(abortError(command)); + break; + } + if ((command as Command).inTransaction) { + commandQueue.remove(i, 1); + command.reject(abortError(command)); + } else { + i++; + } + } +} + export function closeHandler(self) { return function() { self.setStatus("close"); @@ -85,8 +143,12 @@ export function closeHandler(self) { self.prevCondition = self.condition; } if (self.commandQueue.length) { + abortIncompletePipelines(self.commandQueue); self.prevCommandQueue = self.commandQueue; } + if (self.offlineQueue.length) { + abortTransactionFragments(self.offlineQueue); + } if (self.manuallyClosing) { self.manuallyClosing = false; diff --git a/test/functional/elasticache.ts b/test/functional/elasticache.ts new file mode 100644 index 000000000..69e7863d5 --- /dev/null +++ b/test/functional/elasticache.ts @@ -0,0 +1,154 @@ +import * as sinon from "sinon"; +import Redis from "../../lib/redis"; +import { expect } from "chai"; +import MockServer from "../helpers/mock_server"; + +// AWS Elasticache closes the connection immediately when it encounters a READONLY error +function simulateElasticache(options: { + reconnectOnErrorValue: boolean | number; +}) { + let inTransaction = false; + const mockServer = new MockServer(30000, (argv, socket, flags) => { + switch (argv[0]) { + case "multi": + inTransaction = true; + return MockServer.raw("+OK\r\n"); + case "del": + flags.disconnect = true; + return new Error( + "READONLY You can't write against a read only replica." + ); + case "get": + return inTransaction ? MockServer.raw("+QUEUED\r\n") : argv[1]; + case "exec": + inTransaction = false; + return []; + } + }); + + return new Redis({ + port: 30000, + reconnectOnError(err: Error): boolean | number { + // bring the mock server back up + mockServer.connect(); + return options.reconnectOnErrorValue; + } + }); +} + +function expectReplyError(err) { + expect(err).to.exist; + expect(err.name).to.eql("ReplyError"); +} + +function expectAbortError(err) { + expect(err).to.exist; + expect(err.name).to.eql("AbortError"); + expect(err.message).to.eql("Command aborted due to connection close"); +} + +describe("elasticache", function() { + it("should abort a failed transaction when connection is lost", function(done) { + const redis = simulateElasticache({ reconnectOnErrorValue: true }); + + redis + .multi() + .del("foo") + .del("bar") + .exec(err => { + expectAbortError(err); + expect(err.command).to.eql({ + name: "exec", + args: [] + }); + expect(err.previousErrors).to.have.lengthOf(2); + expectReplyError(err.previousErrors[0]); + expect(err.previousErrors[0].command).to.eql({ + name: "del", + args: ["foo"] + }); + expectAbortError(err.previousErrors[1]); + expect(err.previousErrors[1].command).to.eql({ + name: "del", + args: ["bar"] + }); + + // ensure we've recovered into a healthy state + redis.get("foo", (err, res) => { + expect(res).to.eql("foo"); + done(); + }); + }); + }); + + it("should not resend failed transaction commands", function(done) { + const redis = simulateElasticache({ reconnectOnErrorValue: 2 }); + redis + .multi() + .del("foo") + .get("bar") + .exec(err => { + expectAbortError(err); + expect(err.command).to.eql({ + name: "exec", + args: [] + }); + expect(err.previousErrors).to.have.lengthOf(2); + expectAbortError(err.previousErrors[0]); + expect(err.previousErrors[0].command).to.eql({ + name: "del", + args: ["foo"] + }); + expectAbortError(err.previousErrors[1]); + expect(err.previousErrors[1].command).to.eql({ + name: "get", + args: ["bar"] + }); + + // ensure we've recovered into a healthy state + redis.get("foo", (err, res) => { + expect(res).to.eql("foo"); + done(); + }); + }); + }); + + it("should resend intact pipelines", function(done) { + const redis = simulateElasticache({ reconnectOnErrorValue: true }); + + let p1Result; + const p1 = redis + .pipeline() + .del("foo") + .get("bar") + .exec((err, result) => (p1Result = result)); + + const p2 = redis + .pipeline() + .get("baz") + .get("qux") + .exec((err, p2Result) => { + // First pipeline should have been aborted + expect(p1Result).to.have.lengthOf(2); + expect(p1Result[0]).to.have.lengthOf(1); + expect(p1Result[1]).to.have.lengthOf(1); + expectReplyError(p1Result[0][0]); + expect(p1Result[0][0].command).to.eql({ + name: "del", + args: ["foo"] + }); + expectAbortError(p1Result[1][0]); + expect(p1Result[1][0].command).to.eql({ + name: "get", + args: ["bar"] + }); + + // Second pipeline was intact and should have been retried successfully + expect(p2Result).to.have.lengthOf(2); + expect(p2Result[0]).to.eql([null, "baz"]); + expect(p2Result[1]).to.eql([null, "qux"]); + + done(); + }); + }); +}); diff --git a/test/functional/reconnect_on_error.ts b/test/functional/reconnect_on_error.ts index 803730f57..b8f1bd2cc 100644 --- a/test/functional/reconnect_on_error.ts +++ b/test/functional/reconnect_on_error.ts @@ -1,5 +1,6 @@ import Redis from "../../lib/redis"; import { expect } from "chai"; +import * as sinon from "sinon"; describe("reconnectOnError", function() { it("should pass the error as the first param", function(done) { @@ -109,4 +110,38 @@ describe("reconnectOnError", function() { done(); }); }); + + it("should work with pipelined multi", function(done) { + var redis = new Redis({ + reconnectOnError: function() { + // deleting foo allows sadd below to succeed on the second try + redis.del("foo"); + return 2; + } + }); + var delSpy = sinon.spy(redis, "del"); + + redis.set("foo", "bar"); + redis.set("i", 1); + redis + .pipeline() + .sadd("foo", "a") // trigger a WRONGTYPE error + .multi() + .get("foo") + .incr("i") + .exec() + .exec(function(err, res) { + expect(delSpy.calledOnce).to.eql(true); + expect(delSpy.firstCall.args[0]).to.eql("foo"); + expect(err).to.be.null; + expect(res).to.eql([ + [null, 1], + [null, "OK"], + [null, "QUEUED"], + [null, "QUEUED"], + [null, ["bar", 2]] + ]); + done(); + }); + }); }); diff --git a/test/helpers/mock_server.ts b/test/helpers/mock_server.ts index a9a6408a0..92e1e731e 100644 --- a/test/helpers/mock_server.ts +++ b/test/helpers/mock_server.ts @@ -32,7 +32,14 @@ export function getConnectionName(socket: Socket): string | undefined { return connectionNameMap.get(socket); } -export type MockServerHandler = (reply: any, socket: Socket) => any; +interface Flags { + disconnect?: boolean; +} +export type MockServerHandler = ( + reply: any, + socket: Socket, + flags: Flags +) => any; export default class MockServer extends EventEmitter { static REDIS_OK = "+OK"; @@ -84,7 +91,12 @@ export default class MockServer extends EventEmitter { this.write(c, this.slotTable); return; } - this.write(c, this.handler && this.handler(reply, c)); + let flags: Flags = {}; + let handlerResult = this.handler && this.handler(reply, c, flags); + this.write(c, handlerResult); + if (flags.disconnect) { + this.disconnect(); + } }, returnError: function() {} }); diff --git a/test/unit/pipeline.ts b/test/unit/pipeline.ts new file mode 100644 index 000000000..b3e2a63bf --- /dev/null +++ b/test/unit/pipeline.ts @@ -0,0 +1,69 @@ +import * as sinon from "sinon"; +import { expect } from "chai"; +import Pipeline from "../../lib/pipeline"; +import Commander from "../../lib/commander"; +import Redis from "../../lib/redis"; + +describe("Pipeline", function() { + beforeEach(() => { + sinon.stub(Redis.prototype, "connect").resolves(); + sinon.stub(Commander.prototype, "sendCommand").callsFake(command => { + return command; + }); + }); + + afterEach(() => { + sinon.restore(); + }); + + it("should properly mark commands as transactions", function() { + const redis = new Redis(); + const p = new Pipeline(redis); + let i = 0; + + function validate(name, inTransaction) { + const command = p._queue[i++]; + expect(command.name).to.eql(name); + expect(command.inTransaction).to.eql(inTransaction); + } + + p.get(); + p.multi(); + p.get(); + p.multi(); + p.exec(); + p.exec(); + p.get(); + + validate("get", false); + validate("multi", true); + validate("get", true); + validate("multi", true); + validate("exec", true); + validate("exec", false); + validate("get", false); + }); + + it("should properly set pipelineIndex on commands", function() { + const redis = new Redis(); + const p = new Pipeline(redis); + let i = 0; + + function validate(name) { + const command = p._queue[i]; + expect(command.name).to.eql(name); + expect(command.pipelineIndex).to.eql(i); + i++; + } + + p.get(); + p.set(); + p.del(); + p.ping(); + + validate("get"); + validate("set"); + validate("del"); + validate("ping"); + }); +});