Skip to content

Commit

Permalink
#35 pipeline does not pass rv-object to final callback
Browse files Browse the repository at this point in the history
#36 should bump version and update changes.md
Reviewed by: Julien Gilli <[email protected]>
Approved by: David Pacheco <[email protected]>
  • Loading branch information
nickziv committed Sep 14, 2017
1 parent a88ef4c commit 548e798
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 23 deletions.
11 changes: 11 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,19 @@

None yet.

## v2.1.0

* #33 want filter, filterLimit, and filterSeries
* #35 pipeline does not pass rv-object to final callback


## v2.0.0

** WARNING

Do not use this version (v2.0.0), as it has broken pipeline and forEachPipeline
functions.

**Breaking Changes:**

* The `waterfall` function's terminating callback no longer receives a
Expand Down
88 changes: 66 additions & 22 deletions lib/vasync.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ function pipeline(args, callback)
'callback': callback,
'args': { impl: 'pipeline', uarg: args['arg'] },
'stop_when': 'error',
'res_type': 'values'
'res_type': 'rv'
};
return (waterfall_impl(opts));
}
Expand Down Expand Up @@ -629,6 +629,40 @@ function waterfall(funcs, callback)
return (waterfall_impl(opts));
}

/*
* This function is used to implement vasync-functions that need to execute a
* list of functions in a sequence, but differ in how they make use of the
* intermediate callbacks and finall callback, as well as under what conditions
* they stop executing the functions in the list. Examples of such functions
* are `pipeline`, `waterfall`, and `tryEach`. See the documentation for those
* functions to see how they operate.
*
* This function's behavior is influenced via the `opts` object that we pass
* in. This object has the following layout:
*
* {
* 'funcs': array of functions
* 'callback': the final callback
* 'args': {
* 'impl': 'pipeline' or 'tryEach' or 'waterfall'
* 'uarg': the arg passed to each func for 'pipeline'
* }
* 'stop_when': 'error' or 'success'
* 'res_type': 'values' or 'arrays' or 'rv'
* }
*
* In the object, 'res_type' is used to indicate what the type of the result
* values(s) is that we pass to the final callback. We secondarily use
* 'args.impl' to adjust this behavior in an implementation-specific way. For
* example, 'tryEach' only returns an array if it has more than 1 result passed
* to the final callback. Otherwise, it passes a solitary value to the final
* callback.
*
* In case it's not clear, 'rv' in the `res_type` member, is just the
* result-value that we also return. This is the convention in functions that
* originated in `vasync` (pipeline), but not in functions that originated in
* `async` (waterfall, tryEach).
*/
function waterfall_impl(opts)
{
mod_assert.ok(typeof (opts) === 'object');
Expand All @@ -641,8 +675,8 @@ function waterfall_impl(opts)
mod_assert.ok(arguments.length == 1,
'Function "waterfall_impl" must take only 1 arg');
mod_assert.ok(opts.res_type === 'values' ||
opts.res_type === 'array',
'"opts.res_type" must either be "value" or "array"');
opts.res_type === 'array' || opts.res_type == 'rv',
'"opts.res_type" must either be "values", "array", or "rv"');
mod_assert.ok(opts.stop_when === 'error' ||
opts.stop_when === 'success',
'"opts.stop_when" must either be "error" or "success"');
Expand Down Expand Up @@ -678,7 +712,13 @@ function waterfall_impl(opts)
}

next = function (idx, err) {
var res_key, args, entry, nextentry;
/*
* Note that nfunc_args contains the args we will pass to the
* next func in the func-list the user gave us. Except for
* 'tryEach', which passes cb's. However, it will pass
* 'nfunc_args' to its final callback -- see below.
*/
var res_key, nfunc_args, entry, nextentry;

if (err === undefined)
err = null;
Expand All @@ -695,23 +735,25 @@ function waterfall_impl(opts)
entry = rv['operations'][rv['ndone']++];
if (opts.args.impl === 'tryEach' ||
opts.args.impl === 'waterfall') {
args = Array.prototype.slice.call(arguments, 2);
nfunc_args = Array.prototype.slice.call(arguments, 2);
res_key = 'results';
entry['results'] = nfunc_args;
} else if (opts.args.impl === 'pipeline') {
args = [ opts.args.uarg ];
nfunc_args = [ opts.args.uarg ];
res_key = 'result';
entry['result'] = arguments[2];
}

mod_assert.equal(entry['status'], 'pending',
'status should be pending');
entry['status'] = err ? 'fail' : 'ok';
entry['err'] = err;
entry[res_key] = args;

if (err)
if (err) {
rv['nerrors']++;
else
rv['successes'].push(args);
} else {
rv['successes'].push(entry[res_key]);
}

if ((opts.stop_when === 'error' && err) ||
(opts.stop_when === 'success' &&
Expand All @@ -720,18 +762,20 @@ function waterfall_impl(opts)
if (callback) {
if (opts.res_type === 'values' ||
(opts.res_type === 'array' &&
args.length <= 1)) {
args.unshift(err);
callback.apply(null, args);
nfunc_args.length <= 1)) {
nfunc_args.unshift(err);
callback.apply(null, nfunc_args);
} else if (opts.res_type === 'array') {
callback(err, args);
callback(err, nfunc_args);
} else if (opts.res_type === 'rv') {
callback(err, rv);
}
}
} else {
nextentry = rv['operations'][rv['ndone']];
nextentry['status'] = 'pending';
current++;
args.push(next.bind(null, current));
nfunc_args.push(next.bind(null, current));
setImmediate(function () {
var nfunc = nextentry['func'];
/*
Expand All @@ -741,22 +785,22 @@ function waterfall_impl(opts)
* seem like calling `nfunc.apply` is
* sufficient for both cases (after all we
* pushed `next.bind(null, current)` to the
* `args` array), before we call
* `nfunc_args` array), before we call
* `setImmediate()`. However, this is not the
* case, because the interface exposed by
* tryEach is different from the others. The
* others pass argument(s) from task to task.
* tryEach passes nothing but a callback
* (`next.bind` below). However, the callback
* itself _can_ be called with one or more
* results, which we collect into `args` using
* the aformentioned `opts.args.impl` branch
* above, and which we pass to the callback via
* the `opts.res_type` branch above (where
* res_type is set to 'array').
* results, which we collect into `nfunc_args`
* using the aformentioned `opts.args.impl`
* branch above, and which we pass to the
* callback via the `opts.res_type` branch
* above (where res_type is set to 'array').
*/
if (opts.args.impl !== 'tryEach') {
nfunc.apply(null, args);
nfunc.apply(null, nfunc_args);
} else {
nfunc(next.bind(null, current));
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "vasync",
"version": "2.0.0",
"version": "2.1.0",
"description": "utilities for observable asynchronous control flow",
"main": "./lib/vasync.js",
"repository": {
Expand Down
132 changes: 132 additions & 0 deletions tests/pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Tests the "pipeline" primitive.
*/

var mod_tap = require('tap');
var mod_vasync = require('..');
var st;


mod_tap.test('empty pipeline', function (test) {
var count = 0;
st = mod_vasync.pipeline({'funcs': [], 'arg': null},
function (err, result) {

test.ok(err === null);
test.ok(result.ndone === 0);
test.ok(result.nerrors === 0);
test.ok(result.operations.length === 0);
test.ok(result.successes.length === 0);
test.equal(count, 1);
test.end();
});
count++;
test.ok(st.ndone === 0);
test.ok(st.nerrors === 0);
test.ok(st.operations.length === 0);
test.ok(st.successes.length === 0);
});

mod_tap.test('normal 4-stage pipeline', function (test) {
var count = 0;
st = mod_vasync.pipeline({'funcs': [
function func1(_, cb) {
test.equal(st.successes[0], undefined,
'func1: successes');
test.ok(count === 0, 'func1: count === 0');
test.ok(st.ndone === 0);
count++;
setImmediate(cb, null, count);
},
function func2(_, cb) {
test.equal(st.successes[0], 1, 'func2: successes');
test.ok(count == 1, 'func2: count == 1');
test.ok(st.ndone === 1);
test.ok(st.operations[0].status == 'ok');
test.ok(st.operations[1].status == 'pending');
test.ok(st.operations[2].status == 'waiting');
count++;
setImmediate(cb, null, count);
},
function (_, cb) {
test.equal(st.successes[0], 1, 'func3: successes');
test.equal(st.successes[1], 2, 'func3: successes');
test.ok(count == 2, 'func3: count == 2');
test.ok(st.ndone === 2);
count++;
setImmediate(cb, null, count);
},
function func4(_, cb) {
test.equal(st.successes[0], 1, 'func4: successes');
test.equal(st.successes[1], 2, 'func4: successes');
test.equal(st.successes[2], 3, 'func4: successes');
test.ok(count == 3, 'func4: count == 3');
test.ok(st.ndone === 3);
count++;
setImmediate(cb, null, count);
}
]}, function (err, result) {
test.ok(count == 4, 'final: count == 4');
test.ok(err === null, 'no error');
test.ok(result === st);
test.equal(result, st, 'final-cb: st == result');
test.equal(st.successes[0], 1, 'final-cb: successes');
test.equal(st.successes[1], 2, 'final-cb: successes');
test.equal(st.successes[2], 3, 'final-cb: successes');
test.equal(st.successes[3], 4, 'final-cb: successes');
test.ok(st.ndone === 4);
test.ok(st.nerrors === 0);
test.ok(st.operations.length === 4);
test.ok(st.successes.length === 4);
test.ok(st.operations[0].status == 'ok');
test.ok(st.operations[1].status == 'ok');
test.ok(st.operations[2].status == 'ok');
test.ok(st.operations[3].status == 'ok');
test.end();
});
test.ok(st.ndone === 0);
test.ok(st.nerrors === 0);
test.ok(st.operations.length === 4);
test.ok(st.operations[0].funcname == 'func1', 'func1 name');
test.ok(st.operations[0].status == 'pending');
test.ok(st.operations[1].funcname == 'func2', 'func2 name');
test.ok(st.operations[1].status == 'waiting');
test.ok(st.operations[2].funcname == '(anon)', 'anon name');
test.ok(st.operations[2].status == 'waiting');
test.ok(st.operations[3].funcname == 'func4', 'func4 name');
test.ok(st.operations[3].status == 'waiting');
test.ok(st.successes.length === 0);
});

mod_tap.test('bailing out early', function (test) {
var count = 0;
st = mod_vasync.pipeline({'funcs': [
function func1(_, cb) {
test.ok(count === 0, 'func1: count === 0');
count++;
setImmediate(cb, null, count);
},
function func2(_, cb) {
test.ok(count == 1, 'func2: count == 1');
count++;
setImmediate(cb, new Error('boom!'));
},
function func3(_, cb) {
test.ok(count == 2, 'func3: count == 2');
count++;
setImmediate(cb, null, count);
}
]}, function (err, result) {
test.ok(count == 2, 'final: count == 3');
test.equal(err.message, 'boom!');
test.ok(result === st);
test.equal(result, st, 'final-cb: st == result');
test.ok(st.ndone == 2);
test.ok(st.nerrors == 1);
test.ok(st.operations[0].status == 'ok');
test.ok(st.operations[1].status == 'fail');
test.ok(st.operations[2].status == 'waiting');
test.ok(st.successes.length == 1);
test.end();
});
});

0 comments on commit 548e798

Please sign in to comment.