Skip to content

Commit

Permalink
fix: always terminate active Node Streams (#43071)
Browse files Browse the repository at this point in the history
`.destroy()` is an important method in the lifecycle of a Node.js
Readable stream. It is typically called to reclaim the resources
(e.g., close file descriptor). The only situations where calling
it manually isn't necessary are when the following events are
emitted first:

- `end`: natural end of a stream
- `error`: stream terminated due to a failure

Prior to this commit the ended state was incorrectly tracked together
with a pending internal error. It led to situations where the request
could get aborted during a read and then get marked as ended (having
pending error).

With this change we disentangle pending "error" and "destroyed" cases to
always properly terminate an active Node.js Readable stream.

Co-authored-by: trop[bot] <37223003+trop[bot]@users.noreply.github.com>
Co-authored-by: Fedor Indutny <[email protected]>
  • Loading branch information
trop[bot] and indutny-signal authored Jul 27, 2024
1 parent b6e19c5 commit 6322c32
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 9 deletions.
24 changes: 16 additions & 8 deletions shell/browser/net/node_stream_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ NodeStreamLoader::~NodeStreamLoader() {
}

// Destroy the stream if not already ended
if (!ended_) {
if (!destroyed_) {
node::MakeCallback(isolate_, emitter_.Get(isolate_), "destroy", 0, nullptr,
{0, 0});
}
Expand All @@ -63,13 +63,21 @@ void NodeStreamLoader::Start(network::mojom::URLResponseHeadPtr head) {
std::nullopt);

auto weak = weak_factory_.GetWeakPtr();
On("end",
base::BindRepeating(&NodeStreamLoader::NotifyComplete, weak, net::OK));
On("error", base::BindRepeating(&NodeStreamLoader::NotifyComplete, weak,
net::ERR_FAILED));
On("end", base::BindRepeating(&NodeStreamLoader::NotifyEnd, weak));
On("error", base::BindRepeating(&NodeStreamLoader::NotifyError, weak));
On("readable", base::BindRepeating(&NodeStreamLoader::NotifyReadable, weak));
}

void NodeStreamLoader::NotifyEnd() {
destroyed_ = true;
NotifyComplete(net::OK);
}

void NodeStreamLoader::NotifyError() {
destroyed_ = true;
NotifyComplete(net::ERR_FAILED);
}

void NodeStreamLoader::NotifyReadable() {
if (!readable_)
ReadMore();
Expand All @@ -81,7 +89,7 @@ void NodeStreamLoader::NotifyReadable() {
void NodeStreamLoader::NotifyComplete(int result) {
// Wait until write finishes or fails.
if (is_reading_ || is_writing_) {
ended_ = true;
pending_result_ = true;
result_ = result;
return;
}
Expand Down Expand Up @@ -121,7 +129,7 @@ void NodeStreamLoader::ReadMore() {
}

readable_ = false;
if (ended_) {
if (pending_result_) {
NotifyComplete(result_);
}
return;
Expand All @@ -146,7 +154,7 @@ void NodeStreamLoader::ReadMore() {
void NodeStreamLoader::DidWrite(MojoResult result) {
is_writing_ = false;
// We were told to end streaming.
if (ended_) {
if (pending_result_) {
NotifyComplete(result_);
return;
}
Expand Down
8 changes: 7 additions & 1 deletion shell/browser/net/node_stream_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class NodeStreamLoader : public network::mojom::URLLoader {
using EventCallback = base::RepeatingCallback<void()>;

void Start(network::mojom::URLResponseHeadPtr head);
void NotifyEnd();
void NotifyError();
void NotifyReadable();
void NotifyComplete(int result);
void ReadMore();
Expand Down Expand Up @@ -86,9 +88,13 @@ class NodeStreamLoader : public network::mojom::URLLoader {

// When NotifyComplete is called while writing, we will save the result and
// quit with it after the write is done.
bool ended_ = false;
bool pending_result_ = false;
int result_ = net::OK;

// Set to `true` when we get either `end` or `error` event on the stream.
// If `false` - we call `stream.destroy()` to finalize the stream.
bool destroyed_ = false;

// When the stream emits the readable event, we only want to start reading
// data if the stream was not readable before, so we store the state in a
// flag.
Expand Down
24 changes: 24 additions & 0 deletions spec/api-protocol-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,30 @@ describe('protocol module', () => {
expect(body).to.equal(text);
});

it('calls destroy on aborted body stream', async () => {
const abortController = new AbortController();

class TestStream extends stream.Readable {
_read () {
this.push('infinite data');

// Abort the request that reads from this stream.
abortController.abort();
}
};
const body = new TestStream();
protocol.handle('test-scheme', () => {
return new Response(stream.Readable.toWeb(body) as ReadableStream<ArrayBufferView>);
});
defer(() => { protocol.unhandle('test-scheme'); });

const res = net.fetch('test-scheme://foo', {
signal: abortController.signal
});
await expect(res).to.be.rejectedWith('This operation was aborted');
await expect(once(body, 'end')).to.be.rejectedWith('The operation was aborted');
});

it('accepts urls with no hostname in non-standard schemes', async () => {
protocol.handle('test-scheme', (req) => new Response(req.url));
defer(() => { protocol.unhandle('test-scheme'); });
Expand Down

0 comments on commit 6322c32

Please sign in to comment.