Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Pipeline Errors - Forwarding errors down stream pipelines #8306

Closed

Conversation

chrisdickinson
Copy link

Note: This is by way of proposal -- I'm opening this now in its current form to get more comments. In particular, I have the following questions:

Would it be better to distinguish the error re-throwing behavior from the unpiping behavior? This would entail
splitting up handleError into handleError (to prevent re-throwing the error) and preventDefault (to prevent unpiping) -- or alternatively, should the presence of a pipelineError handler be enough to prevent the re-throwing of errors?

cc @phated @contra @tjfontaine @hayes @dominictarr @substack

pipeline errors

Writing correct streaming programs is difficult. A single stream missing an
error handler in a pipeline has the ability to crash your entire program, often
with a cryptic error message. Even when an error listener is added, the stream
system has already unpiped by the time client code is called. This makes it
difficult to generically gather pipeline information on error events.
Finally, error events are not necessarily fatal to the pipeline, but the user
has no way to communicate this intent back to the stream subsystem -- any error
emitted will destroy whatever pipeline it is emitted in, at present.
To reiterate: error handling in pipelines requires users to remember to add
error listeners to every constituent stream, does not allow users to prevent
undesired default behavior, and in the event of an error, does not give users
enough information to debug the situation.

The goals and constraints of this proposal are as follows:

  • Make correct error handling easier to implement for end users.
  • The solution must be polyfillable for older versions of node.
  • The solution must work in browser (for browserify).
  • Work with extant stream solutions (through, through2).
  • Provide public methods into internal state for being able to traverse
    up and down pipelines.
  • Implement the solution in terms of public methods, inasfar as possible.

To that end, this proposal adds six methods (in three pairs) to the Stream class.

  • stream.next() and stream.prev(): Get an array of the set
    of immediate destination or source streams.
  • stream.nextAll() and stream.prevAll(): Get an array of the
    set of all destination or source streams, with no duplicates or cycles.
  • stream.addPipelineErrorHandler(fn) and stream.removePipelineErrorHandler(fn):
    Add a pipeline error handler to a stream.

User-facing API

var A = fs.createReadStream('input');
var B = new stream.PassThrough();
var C = fs.createWriteStream('output');

A.name = 'A';
B.name = 'B';
C.name = 'C';

C.addPipelineErrorHandler(function(ev) {
  console.log(ev.error.message, ev.stream.name);
  ev.handleError();
});

A.pipe(B).pipe(C);
A.emit('error', new Error('hello!')); // logs "hello!".

For example, in the above program, someone has maliciously injected an error
event into our stream pipeline. The error originates on A, but is handled at
C. The error handler indicates that the user wishes to manually handle this error
event and prevent the unpiping behavior with ev.handleError(). The original error
and the originating stream are available at ev.error and ev.stream, respectively.

The error event may only be marked "handled" on the same event loop turn as it was
emitted. Multiple pipeline error handlers may intercept an error -- it propagates downstream,
hitting handlers "A" then "B" then "C". Once an error event is "handled", it stops propagating
entirely. For complex pipelines -- where there may be cycles, or multiple branches, it propagates
down the "oldest" pipelines first, stopping when it hits duplicates. A few examples:

A - B - A     error at A, propagates to A, then B.
*

A - B - A     error at B, propagates to B, then A.
    *

A - B - C     error at A, propagates to A, B, then C.
*

A - B - C     error at B, propagates to B, then C.
    *

       C - D  error at A, propagates to A, B, C, D, X, then Y.
      /       NB: handlers on *either* branch have the opportunity
A - B         to "handle" the error and thus stop propagation.
*     \
       X - Y

A user may remove a handler by calling stream.removePipelineErrorHandler(fn) with the handler
function they originally provided.

Implementation

The sequence of events during an error is as follows:

  1. A.emit("error")
  2. stream .pipe "error" handler (or _defaultErrorHandler).
  3. Internal "_preError" event.
  4. pipelineErrorHandlers for A, and all downstream streams (gathered with nextAll) until error is handled or streams exhausted.
  5. if the error is handled, skip to step 8.
  6. unpipe().
  7. re-emit error if no other error handlers exist.
  8. The rest of the "error" handlers on A.

In order to implement this sequence of events, the following modifications and
additions were made:

Added _nextStreams and _prevStreams attributes to base Stream class

The stream constructor grew two new arrays -- _nextStreams and
_prevStreams. These are arrays tracking the next and previous streams, and
are managed internally by the .pipe method (both Readable#pipe and
Stream#pipe may affect these arrays. Since "correct" subclassing isn't
guaranteed (see crypto's LazyTransform), the attributes are lazily added
by *#pipe if they are not present. These attributes are there to support
the {next,prev}{All,}() methods. _nextStreams is a slight duplication
of information in the Readable case, but it is only ever represented by an
Array, while _readableState.pipes may be null, a single Stream, or an array
of Streams.

Added {next,prev}{All,}() methods

To give users better tools to debug errors within streams, the next(),
prev(), nextAll(), and prevAll() methods were added to publicly expose
what was previously private or implicit state. nextAll and prevAll are implemented
in terms of iterateStream(stream, attributeName) (here and here), which does a (non-recursive) depth
first traversal of the _nextStreams (or _prevStreams) property. One potentially
controversial bit is the use of a WeakSet to ensure no duplicates are added to the
output -- this could be replaced with an array of "seen" streams in environments that
don't support WeakSet.

Modified onerror in {Readable,Stream}#pipe

This is the primary change:

function onerror(er) {
  var handled = {handled: false};
  this.emit('_preError', er, handled);

  if (handled.handled) {
    return;
  }

  // ...

This has been added for old streams as
well as new streams. The goal
was to keep all of the pipeline error event handling out of the pipe functions. Earlier attempts, where the pipelineError event was
emitted as a "normal" EE-style event, and the handling logic was embedded in .pipe became complicated very quickly. There's bookkeeping
that needs to be done with regards to default error handlers, deciding which error handler needs to propagate the PipelineErrorEvent, etc.,
and I wanted to keep that as separate from .pipe as possible. In order to do this, I added an internal event -- "_preError" -- that emits
the error and an object that can be modified by listeners of _preError to indicate whether the error is handled or not.

One benefit of this approach is that the feature can be added piecemeal -- adding the navigational components can be one piece, adding _preError
can be another, and we can iterate on what the public API for _preError should be (if the current approach is deemed unsatisfactory).

Added _errorHandlers attribute to base Stream class

This is pretty straightforward. Like _{next,prev}Streams, it's an attribute that's
lazily added if not present; otherwise it contains a list of {handler, uninstall}
objects. {add,remove}PipelineErrorHandler manipulates this attribute.

Added addPipelineErrorEvent

This is the real meat of the change. It installs listeners onto the current
stream as well as all streams returned by prevAll(). Installation adds
listeners for unpipe, pipe, _preError, and, if there are no error
handlers present, error as well.

The error event handler is called _defaultPipelineErrorHandler (here). Because streams2+
do not add an error event handler to Readable streams on .pipe, this ensures consistent
pipeline error behavior -- namely, that errors emitted on Readables are forwarded down the
pipeline.

Because upstream may change over time (after the initial
addPipelineErrorHandler call), a listener is added for pipe and
unpipe. Once piped to, a pipeline-error-handling stream will remove the
_defaultPipelineErrorHandler, and install the 3 to 4 listeners on the new
source and all of its ancestors. If unpiped, to keep things simple, the handler
simply uninstalls and reinstalls itself, re-propagating the error handlers out
appropriately.

Uninstallation involves removing all installed listeners, and splicing out the
errorHandler from the list of error handlers if the stream is the source
stream. Since the streams installed are tracked by the installedOnStreams
array (deduplicated with the aid of a WeakSet), uninstallation is as simple as
calling uninstall on each of the streams.

@hayes
Copy link

hayes commented Sep 2, 2014

re: distinguish the error re-throwing behavior from the unpiping behavior

There are 2 things that splitting these out would enable. Keeping the pipeline in place while still emitting the error, an handling/preventing the error from being emitted, but still destroying the pipeline.

The first seems like a dangerous pattern, and it would probably to call any handling code in the pipelineHandler function rather than letting the error event propagate to an 'error' listener that may or may not expect the pipeline to be destroyed. For the second I would prefer to manually trigger the unpipe after handling the error, that would allow you to send any finial data through.

One thing that may be useful is a stopPropagation type method that prevents the error from propagating further, my interpretation was that all error handlers for the entire pipeline would be collected immediately and guaranteed to be executed.

}

Stream.prototype.next = function() {
return this._nextStreams.slice();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these handle the case where the stream was incorrectly subclassed has not been piped to/from?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

@dominictarr
Copy link

I commented on this when you posted it as a gist, it looks like what I said would still apply.
I'm not sure if my comment was replied to, because I can't find the gist. Do you have a link?

if (installedIdx === null) return;

stream._errorHandlers.splice(installedIdx, 1);
installedIdx = null;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should remove the stream from the membership set here, true polyfills for weakset are not possible in es5, but partial polyfill that store a reference to the weakset/map are possible, allowing items to still be gc'd but not the set itself, if you remove all items from the set it can then be gcd

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

I'm using WeakSet here to avoid the O(N) membership test (& because it's available/reliable here) -- in environments that don't support it (browserify, readable-stream) I'll implement using [].some / [].indexOf instead of WeakSet.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah cool, wasn't really sure how implementations for browserify and the polyfill for older versions of node would work.

@chrisdickinson
Copy link
Author

Some use cases to consider:

  1. Introspecting all errors that happen to a pipeline & sending them to some other logging mechanism, without otherwise affecting the stream machinery.
  2. Preventing undesired unpipe-ing / allowing multiple "error" events -- as desired by gulp (as I understand it.)
  3. Being able to abort the process before the streams are unpiped, so that they're preserved in-situ in core dumps.
  4. Being able to add a single handler to handle all error events in a pipeline, instead of having to add a listener for each stream in the pipeline.

@dominictarr Here's a link to the gist!. @contra and @hayes weighed in with some use cases for being able to "handle" errors without destroying the pipeline.

For my part, I think that handleError, at present, needs to come with a big red warning flag in the docs -- "you are responsible for cleaning up after this error." In @contra's linter error case, the correct "cleanup" is to prevent the error from unpiping the streams.

@hayes

One thing that may be useful is a stopPropagation type method that prevents the error from propagating further, my interpretation was that all error handlers for the entire pipeline would be collected immediately and guaranteed to be executed.

Currently the error will stop propagating to other pipelineError handlers after it is marked "handled" (so, in DOM-ish terms, it's a combination of "preventDefault" and "stopImmediatePropagation"). I went this route since at any point, only one handler should ever be able to "handle" the error -- the handler that calls handleError assumes responsibility for the error at that point. Put another way, handleError() is like the catch in a try { } catch(err) { } clause.

@phated
Copy link

phated commented Sep 2, 2014

Some issues opened on gulp (to point to specific use cases) are gulpjs/gulp#75, gulpjs/gulp#91, gulpjs/gulp#354, gulpjs/gulp#184 among others (just searched for unpipe and gulp-plumber).

These are real-world examples where people are seeing the issues that @chrisdickinson is trying to solve.

The current solution the gulp team has to tell people is "use gulp-plumber", which is a really gross module, but gets the job done. Another module called StreamSpy is also trying to solve this problem. I believe @dominictarr even merged a PR at dominictarr/map-stream#7 trying to help with this (continueOnError), but it never worked in the gulp ecosystem (as far as I know).

I am all for having a try/catch analogy for dealing with streams.

@jmcbee
Copy link

jmcbee commented Oct 11, 2014

So what happens now?

@bjouhier
Copy link

This whole issue arises from the complexity of an event-based API.

If you use a continuation callback style API instead (a simple read(callback) to read from a stream) you'll be able to do proper error handling.

Gotcha: pipe will have a continuation callback parameter in such a design. Two advantages: 1) you can easily chain an action to be executed after the pipe has completed, and 2) this is where you will trap the error. See https://github.com/Sage/ez-streams for details.

Stop adding APIs, fix the root cause!

@phated
Copy link

phated commented Oct 12, 2014

@bjouhier there is no way that node is breaking those APIs to the extent you are suggesting.

Like @fbm-static, I'd also like to know how this proceeds, as no other node core member has commented on this thread.

@bjouhier
Copy link

@phated

Why not introduce a new, different, API?

First it's easy. No need to worrry about backward compatibility: you deploy it side by side. You just need calls to convert in both directions between the two APIs (see https://github.com/Sage/ez-streams#interoperabily-with-native-nodejs-streams).

Advantages of the continuation callback API:

  • It is simple.
  • Streams are very easy to implement: look at the ez-streams devices modules.
  • Everything is done with existing concepts. For example, there is no need to introduce new events nor special mechanisms for termination or error handling. The callback(err, result) signature has all we need.
  • Object mode is supported without any additional API. Actually streams are content agnostic.
  • The continuation points where you may chain actions and handle errors are clearly identified: the reducers.
  • The design is loosely coupled. You don't need to subclass special ReadableStream/WritableStream classes. You can turn any read or write function into a stream.
  • Semantics are raised to the application level: application developers want to manipulate data (transform it, map it, filter it, ...), not deal with low-level I/O details.
  • Backpressure is a non issue. You don't need to code any mechanism for it explicitly. You reason about buffering instead.
  • Performance should even be better because there is a lot less code and callbacks are less costly than event emitters.

I'm probably being very loud with this message. I don't have any special need for this API to go into core because I have it in userland. But I (we) have been using it for quite some time and it makes a huge difference in usability, productivity and robustness. This is why I'm being so loud. I think that a fresh simple API would put a lot more power into the hands of node developers.

@jmcbee
Copy link

jmcbee commented Oct 14, 2014

userland?

@dominictarr
Copy link

Sorry @chrisdickinson I missed your previous post, and just noticed it now. Okay now I have read @hayes @contra 's responses in the gist also. My thoughts is that you are misusing the 'error' event.

If the error doesn't break the stream, it's not really an error, not from the point of view of the stream.
From the point of view of the stream, it's data. It's like, there wen't enough chicken meals for all the passengers, and some people had to have fish. It's not like the wings fall off the plane, that would most certainly be a fatal error. So what if gulp just emitted a non-fatal-error event, that did not cause streams to be unpiped? it could be converted into a data event hand passed along the stream via the normal mechanism. if you add a parallel channel for errors, then now you have two streams, one made of normal matter, and a weird dark-matter stream that rarely interacts with the normal world.

@bjouhier you may indeed be right, but it would just be way to much work (and cause way too much work) to change core this dramatically. I also have a alt-stream design that propagates errors and gives you simpler back pressure pull-stream that is what i generally use when I want object streams now.

@chrisdickinson I think a better way to solve this problem is to use something like https://www.npmjs.org/package/pump instead of a backwards compatible monkey patch. If gulp just used a different error name for non fatal errors then think this would solve all the problems.

@bjouhier
Copy link

@dominictarr Thanks Dominic for the answer. I understand that this is a big change. But node is only 5 years old and we have many more years ahead of us (at least I hope so). Do you imagine people still programming with the low level stream API 5 years from now? I don't. I see them moving to higher level APIs like pull-stream or ez-streams. This is not difficult but it introduces mappers to convert in both directions (we both have them). This means more code and a less efficient result. Would be better if core had the simpler API.

The simpler API could be provided as wrappers around the existing implementation in a first step and then the implementation could be reworked to expose the simpler API directly, with a compatibility wrapper around it.

IMO any new API should have the qualities that I listed above: simple, content-agnostic, explicit continuation points, loosely coupled, application level semantics, monadic, ...

I'm setting this PR discussion off-tracks so I'll stop here. If core is not willing to consider this I'll just continue with our userland solution. But if core is interested, I'd be happy to participate and we should probably continue the discussion somewhere else.

@dominictarr
Copy link

@bjouhier a much better approach would be to expose lower level apis so that modules you could implement modules such as http and fs on their own. Adding things to core is generally a one way operation. Also, change is happening much slower these days. still waiting for 0.12

@bjouhier
Copy link

@dominictarr This is interesting. The low level fs APIs are more or less there (fs.open, fs.read, fs.write) and it would be very easy to reimplement ez file streams around them (instead of wrapping fs.createRead/WriteStream). http is another story (especially because of https).

The problem is interoperability. If you want to mix modules based on different alt-stream libraries, you'll have to convert (twice if you need to go through standard node streams in the middle).

@phated
Copy link

phated commented Oct 16, 2014

@bjouhier stop derailing the thread please.

@bjouhier
Copy link

@phated I was just answering. @dominictarr we should take this in private if you want to continue the discussion.

@dominictarr
Copy link

@bjouhier there is no way we can escape the conversion thing, but fortunately, converting between stream implementations is pretty easy, since they are all the same idea at the end of the day.
Making backwards incompatible changes to node streams will never happen now, it will just affect to many people. node.js is all grown up now, like it or not.

@jasnell
Copy link
Member

jasnell commented Aug 13, 2015

@chrisdickinson ... any reason to keep this open still?

@chrisdickinson
Copy link
Author

Nope! Feel free to close this.

On Aug 13, 2015, at 1:42 PM, James M Snell [email protected] wrote:

@chrisdickinson ... any reason to keep this open still?


Reply to this email directly or view it on GitHub.

@jasnell jasnell closed this Aug 14, 2015
@yocontra
Copy link

Just for reference - what was the resolution here?

@stevemao
Copy link

What @dominictarr said is the resolution?

If the error doesn't break the stream, it's not really an error, not from the point of view of the stream.
From the point of view of the stream, it's data. It's like, there wen't enough chicken meals for all the passengers, and some people had to have fish. It's not like the wings fall off the plane, that would most certainly be a fatal error. So what if gulp just emitted a non-fatal-error event, that did not cause streams to be unpiped? it could be converted into a data event hand passed along the stream via the normal mechanism. if you add a parallel channel for errors, then now you have two streams, one made of normal matter, and a weird dark-matter stream that rarely interacts with the normal world.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.