forked from mike-marcacci/node-redlock
-
Notifications
You must be signed in to change notification settings - Fork 0
/
redlock.js
364 lines (280 loc) · 10.1 KB
/
redlock.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
'use strict';
var util = require('util');
var crypto = require('crypto');
var Promise = require('bluebird');
var EventEmitter = require('events');
// support the event library provided by node < 0.11.0
if(typeof EventEmitter.EventEmitter === 'function')
EventEmitter = EventEmitter.EventEmitter;
// constants
var lockScript = 'return redis.call("set", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])';
var unlockScript = 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end';
var extendScript = 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end';
// defaults
var defaults = {
driftFactor: 0.01,
retryCount: 10,
retryDelay: 200,
retryJitter: 100
};
// LockError
// ---------
// This error is returned when there is an error locking a resource.
function LockError(message, attempts) {
Error.call(this);
Error.captureStackTrace(this, LockError);
this.name = 'LockError';
this.message = message || 'Failed to lock the resource.';
this.attempts = attempts;
}
util.inherits(LockError, Error);
// Lock
// ----
// An object of this type is returned when a resource is successfully locked. It contains
// convenience methods `unlock` and `extend` which perform the associated Redlock method on
// itself.
function Lock(redlock, resource, value, expiration, attempts) {
this.redlock = redlock;
this.resource = resource;
this.value = value;
this.expiration = expiration;
this.attempts = attempts;
}
Lock.prototype.unlock = function unlock(callback) {
return this.redlock.unlock(this, callback);
};
Lock.prototype.extend = function extend(ttl, callback) {
return this.redlock.extend(this, ttl, callback);
};
// Attach a reference to Lock, which allows the application to use instanceof
// to ensure type.
Redlock.Lock = Lock;
// Redlock
// -------
// A redlock object is instantiated with an array of at least one redis client and an optional
// `options` object. Properties of the Redlock object should NOT be changed after it is first
// used, as doing so could have unintended consequences for live locks.
function Redlock(clients, options) {
// set default options
options = options || {};
this.driftFactor = typeof options.driftFactor === 'number' ? options.driftFactor : defaults.driftFactor;
this.retryCount = typeof options.retryCount === 'number' ? options.retryCount : defaults.retryCount;
this.retryDelay = typeof options.retryDelay === 'number' ? options.retryDelay : defaults.retryDelay;
this.retryJitter = typeof options.retryJitter === 'number' ? options.retryJitter : defaults.retryJitter;
this.lockScript = typeof options.lockScript === 'function' ? options.lockScript(lockScript) : lockScript;
this.unlockScript = typeof options.unlockScript === 'function' ? options.unlockScript(unlockScript) : unlockScript;
this.extendScript = typeof options.extendScript === 'function' ? options.extendScript(extendScript) : extendScript;
// set the redis servers from additional arguments
this.servers = clients;
if(this.servers.length === 0)
throw new Error('Redlock must be instantiated with at least one redis server.');
}
// Inherit all the EventEmitter methods, like `on`, and `off`
util.inherits(Redlock, EventEmitter);
// Attach a reference to LockError per issue #7, which allows the application to use instanceof
// to destinguish between error types.
Redlock.LockError = LockError;
// quit
// ----
// This method runs `.quit()` on all client connections.
Redlock.prototype.quit = function quit(callback) {
// quit all clients
return Promise.map(this.servers, function(client) {
return client.quit();
})
// optionally run callback
.nodeify(callback);
};
// lock
// ----
// This method locks a resource using the redlock algorithm.
//
// ```js
// redlock.lock(
// 'some-resource', // the resource to lock
// 2000, // ttl in ms
// function(err, lock) { // callback function (optional)
// ...
// }
// )
// ```
Redlock.prototype.acquire =
Redlock.prototype.lock = function lock(resource, ttl, callback) {
return this._lock(resource, null, ttl, callback);
};
// lock
// ----
// This method locks a resource using the redlock algorithm,
// and returns a bluebird disposer.
//
// ```js
// using(
// redlock.disposer(
// 'some-resource', // the resource to lock
// 2000 // ttl in ms
// ),
// function(lock) {
// ...
// }
// );
// ```
Redlock.prototype.disposer = function disposer(resource, ttl, errorHandler) {
errorHandler = errorHandler || function(err) {};
return this._lock(resource, null, ttl).disposer(function(lock){
return lock.unlock().catch(errorHandler);
});
};
// unlock
// ------
// This method unlocks the provided lock from all servers still persisting it. It will fail
// with an error if it is unable to release the lock on a quorum of nodes, but will make no
// attempt to restore the lock on nodes that failed to release. It is safe to re-attempt an
// unlock or to ignore the error, as the lock will automatically expire after its timeout.
Redlock.prototype.release =
Redlock.prototype.unlock = function unlock(lock, callback) {
var self = this;
// immediately invalidate the lock
lock.expiration = 0;
return new Promise(function(resolve, reject) {
// the number of servers which have agreed to release this lock
var votes = 0;
// the number of votes needed for consensus
var quorum = Math.floor(self.servers.length / 2) + 1;
// the number of async redis calls still waiting to finish
var waiting = self.servers.length;
// release the lock on each server
self.servers.forEach(function(server){
server.eval(self.unlockScript, 1, lock.resource, lock.value, loop);
});
function loop(err, response) {
if(err) self.emit('clientError', err);
// - if the lock was released by this call, it will return 1
// - if the lock has already been released, it will return 0
// - it may have been re-acquired by another process
// - it may hava already been manually released
// - it may have expired
if(typeof response === 'string')
response = parseInt(response);
if(response === 0 || response === 1)
votes++;
if(waiting-- > 1) return;
// SUCCESS: there is concensus and the lock is released
if(votes >= quorum)
return resolve();
// FAILURE: the lock could not be released
return reject(new LockError('Unable to fully release the lock on resource "' + lock.resource + '".'));
}
})
// optionally run callback
.nodeify(callback);
};
// extend
// ------
// This method extends a valid lock by the provided `ttl`.
Redlock.prototype.extend = function extend(lock, ttl, callback) {
var self = this;
// the lock has expired
if(lock.expiration < Date.now())
return Promise.reject(new LockError('Cannot extend lock on resource "' + lock.resource + '" because the lock has already expired.', 0)).nodeify(callback);
// extend the lock
return self._lock(lock.resource, lock.value, ttl)
// modify and return the original lock object
.then(function(extension){
lock.value = extension.value;
lock.expiration = extension.expiration;
return lock;
})
// optionally run callback
.nodeify(callback);
};
// _lock
// -----
// This method locks a resource using the redlock algorithm.
//
// ###Creating New Locks:
//
// ```js
// redlock._lock(
// 'some-resource', // the resource to lock
// null, // no original lock value
// 2000, // ttl in ms
// function(err, lock) { // callback function (optional)
// ...
// }
// )
// ```
//
// ###Extending Existing Locks:
//
// ```js
// redlock._lock(
// 'some-resource', // the resource to lock
// 'dkkk18g4gy39dx6r', // the value of the original lock
// 2000, // ttl in ms
// function(err, lock) { // callback function (optional)
// ...
// }
// )
// ```
Redlock.prototype._lock = function _lock(resource, value, ttl, callback) {
var self = this;
return new Promise(function(resolve, reject) {
var request;
// the number of times we have attempted this lock
var attempts = 0;
// create a new lock
if(value === null) {
value = self._random();
request = function(server, loop){
return server.eval(self.lockScript, 1, resource, value, ttl, loop);
};
}
// extend an existing lock
else {
request = function(server, loop){
return server.eval(self.extendScript, 1, resource, value, ttl, loop);
};
}
function attempt(){
attempts++;
// the time when this attempt started
var start = Date.now();
// the number of servers which have agreed to this lock
var votes = 0;
// the number of votes needed for consensus
var quorum = Math.floor(self.servers.length / 2) + 1;
// the number of async redis calls still waiting to finish
var waiting = self.servers.length;
function loop(err, response) {
if(err) self.emit('clientError', err);
if(response) votes++;
if(waiting-- > 1) return;
// Add 2 milliseconds to the drift to account for Redis expires precision, which is 1 ms,
// plus the configured allowable drift factor
var drift = Math.round(self.driftFactor * ttl) + 2;
var lock = new Lock(self, resource, value, start + ttl - drift, attempts);
// SUCCESS: there is concensus and the lock is not expired
if(votes >= quorum && lock.expiration > Date.now())
return resolve(lock);
// remove this lock from servers that voted for it
return lock.unlock(function(){
// RETRY
if(self.retryCount === -1 || attempts <= self.retryCount)
return setTimeout(attempt, Math.max(0, self.retryDelay + Math.floor((Math.random() * 2 - 1) * self.retryJitter)));
// FAILED
return reject(new LockError('Exceeded ' + self.retryCount + ' attempts to lock the resource "' + resource + '".', attempts));
});
}
return self.servers.forEach(function(server){
return request(server, loop);
});
}
return attempt();
})
// optionally run callback
.nodeify(callback);
};
Redlock.prototype._random = function _random(){
return crypto.randomBytes(16).toString('hex');
};
module.exports = Redlock;