Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect attempts when reclaiming queue #13

Merged
merged 2 commits into from
Jul 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,20 +310,20 @@ Queue.prototype._reclaim = function(id) {
queue: other.get(this.keys.QUEUE) || []
};

// add their queue to ours, resetting run-time to immediate and attempt# to 0
// add their queue to ours, resetting run-time to immediate and copying the attempt#
each(function(el) {
our.queue.push({
item: el.item,
attemptNumber: 0,
attemptNumber: el.attemptNumber,
time: self._schedule.now()
});
}, their.queue);

// if the queue is abandoned, all the in-progress are failed. retry them immediately and reset the attempt#
// if the queue is abandoned, all the in-progress are failed. retry them immediately and increment the attempt#
each(function(el) {
our.queue.push({
item: el.item,
attemptNumber: 0,
attemptNumber: el.attemptNumber + 1,
time: self._schedule.now()
});
}, their.inProgress);
Expand Down
142 changes: 142 additions & 0 deletions test/lifecycle.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
'use strict';

var assert = require('proclaim');
var lolex = require('lolex');

var Queue = require('..');
var Schedule = require('../lib/schedule');

describe('Queue Lifecycle', function() {
it('should not reattempt to publish an expired item', function() {
var attempts = 8;
var options = { maxAttempts: attempts };

openTab(options, function(tab) {
tab.queue.addItem('test item');
tab.queue.start();

assert(tab.queue.wait(attempts) !== -1);
assert(tab.queue.wait(1) === -1);

assert(tab.forkAndConsume(1) === -1);
});
});

it('should respect attempts when reclaiming a queue', function() {
var options = { maxAttempts: 10 };

openTab(options, function(tab) {
tab.queue.addItem('test');
tab.queue.start();

assert(tab.forkAndConsume(3) !== -1);
assert(tab.forkAndConsume(4) !== -1);
assert(tab.forkAndConsume(3) !== -1);

// Now check that no attempts are made in a new tab
assert(tab.forkAndConsume(1) === -1);
});
});

it('should increment in-progress items attempts when reclaiming a queue', function() {
var options = { maxAttempts: 6 };

openTab(options, function(tab) {
tab.queue.addItem('test');
tab.queue.start();

assert(tab.forkAndConsume(3) !== -1);

// Make this attempt timeout by not calling the done callback
// This will put the item in the inProgress queue
tab.queue.ignore = true;
assert(tab.queue.wait(1) !== -1);
tab.queue.ignore = false;

// Check that we only consume the two remaining attempts
assert(tab.forkAndConsume(2) !== -1);
assert(tab.forkAndConsume(1) === -1);
});
});
});

// Utilities

function createTestQueue(tab, options) {
var waiting = 0;
var queue = new Queue('segment::multi_queue_test', options, function(item, done) {
waiting--;
queue.calls++;

if (!queue.ignore) {
done(new Error(item));
}
});

queue.calls = 0;
queue.wait = function(condition) {
waiting = condition;

return tab.wait(function() {
return waiting === 0;
});
};

queue.waitFirstAttempt = function() {
return queue.wait(1);
};

return queue;
}

var ONE_SECOND = 1000;
var ONE_MINUTE = 60 * ONE_SECOND;

// Mocks a browser tab with its own clock
function openTab(options, fn) {
var tab = {
clock: lolex.createClock(),

// Waits for a condition to be true by immediatly executing all timers
wait: function(condition) {
var clock = tab.clock;
var start = clock.now;
var timeout = 15 * ONE_MINUTE;

while (!condition()) {
if (clock.now - start > timeout || !clock.timers) {
return -1;
}

clock.next();
}

return clock.now - start;
},

// Simulates a event-loop freeze, like a browser would do with a background tab
freeze: function() {
tab.clock.reset();
tab.clock.setTimeout = tab.clock.clearTimeout = function noop() {};
},

// Opens a new tab and waits for $attempts to be made, returns -1 otherwise
forkAndConsume: function(attempts) {
// Freeze the current tab so the next tab can reclaim the queue
tab.freeze();

return openTab(options, function(newTab) {
tab.queue = newTab.queue;
tab = newTab;
tab.queue.start();

return tab.queue.wait(attempts);
});
}
};

tab.queue = createTestQueue(tab, options);
Schedule.setClock(tab.clock);

return fn(tab);
}