Skip to content

Commit

Permalink
Add concurrency argument to async.auto
Browse files Browse the repository at this point in the history
  • Loading branch information
vergenzt committed Oct 25, 2015
1 parent 56bdb50 commit f801c68
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 3 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,7 @@ cargo.push({name: 'baz'}, function (err) {
---------------------------------------
<a name="auto" />
### auto(tasks, [callback])
### auto(tasks, [callback], [concurrency])
Determines the best order for running the functions in `tasks`, based on their requirements. Each function can optionally depend on other functions being completed first, and each function is run as soon as its requirements are satisfied.
Expand Down Expand Up @@ -1307,6 +1307,8 @@ __Arguments__
pass an error to their callback. Results are always returned; however, if
an error occurs, no further `tasks` will be performed, and the results
object will only contain partial results.
* `concurrency` - An `integer` for determining the maximum number of tasks that
can be run in parallel. By default, as many as possible.
__Example__
Expand Down
11 changes: 9 additions & 2 deletions lib/async.js
Original file line number Diff line number Diff line change
Expand Up @@ -509,15 +509,19 @@
}
};

async.auto = function (tasks, callback) {
async.auto = function (tasks, callback, concurrency) {
callback = _once(callback || noop);
var keys = _keys(tasks);
var remainingTasks = keys.length;
if (!remainingTasks) {
return callback(null);
}
if (!concurrency) {
concurrency = remainingTasks;
}

var results = {};
var runningTasks = 0;

var listeners = [];
function addListener(fn) {
Expand All @@ -543,6 +547,7 @@
_arrayEach(keys, function (k) {
var task = _isArray(tasks[k]) ? tasks[k]: [tasks[k]];
var taskCallback = _restParam(function(err, args) {
runningTasks--;
if (args.length <= 1) {
args = args[0];
}
Expand Down Expand Up @@ -572,18 +577,20 @@
}
}
function ready() {
return _reduce(requires, function (a, x) {
return runningTasks < concurrency && _reduce(requires, function (a, x) {
return (a && results.hasOwnProperty(x));
}, true) && !results.hasOwnProperty(k);
}
if (ready()) {
runningTasks++;
task[task.length - 1](taskCallback, results);
}
else {
addListener(listener);
}
function listener() {
if (ready()) {
runningTasks++;
removeListener(listener);
task[task.length - 1](taskCallback, results);
}
Expand Down
29 changes: 29 additions & 0 deletions test/test-async.js
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,35 @@ exports['auto'] = function(test){
});
};

exports['auto concurrency'] = function (test) {
var concurrency = 2;
var runningTasks = [];
var makeCallback = function(taskName) {
return function(callback) {
runningTasks.push(taskName);
setTimeout(function(){
// Each task returns the array of running tasks as results.
var result = runningTasks.slice(0);
runningTasks.splice(runningTasks.indexOf(taskName), 1);
callback(null, result);
});
};
};
async.auto({
task1: ['task2', makeCallback('task1')],
task2: makeCallback('task2'),
task3: ['task2', makeCallback('task3')],
task4: ['task1', 'task2', makeCallback('task4')],
task5: ['task2', makeCallback('task5')],
task6: ['task2', makeCallback('task6')]
}, function(err, results){
Object.keys(results).forEach(function(taskName) {
test.ok(results[taskName].length <= concurrency);
});
test.done();
}, concurrency);
};

exports['auto petrify'] = function (test) {
var callOrder = [];
async.auto({
Expand Down

0 comments on commit f801c68

Please sign in to comment.