Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

feat: close read and write streams #122

Closed
wants to merge 17 commits into from

Conversation

filoozom
Copy link

@filoozom filoozom commented Apr 19, 2021

Based on #121 and fixes and issue when the sink was already written to.

This also prohibits the sink function from being called a second time, as it would break either way.

There's still the issue that the sink does not abort without writing to it.

Adding tests.

Related to #120

Closed #121
Closes #115

Needs:

@filoozom
Copy link
Author

@vasco-santos I think this best reflects my issue with abortable-iterator, in particular the following test: filoozom/js-libp2p-interfaces@f18f8c2

@vasco-santos
Copy link
Member

Thanks for the PR @filoozom , I think I could figure out the outstanding problems.

I have looked through it, ran it and checked both abortable-iterator and your interfaces tests. The abortable-iterator does a race with the abort and should really be enough here.

The interface test looks almost good and we found a new problem 👌🏼 I think we need to have two test cases here.

Going through the added test case:

await stream.sink([randomBuffer()])
await stream.closeWrite()

Once the sink promise finishes here, the sink ended, which explains why the onSinkEnd will be called before the closeWrite and then the abort does nothing, given that the abortable iterator had already finished correctly.

With the above in mind, we need three tests:

  • can close a stream for writing before writing
    • already have in the interface
  • can close a stream for writing after writing (this will already be closed in reality!)
    • we have this in your interfaces branch, but the mplex logic in this PR needs to be updated. In this case sinkEnded will be true and we should check it before doing the abort. Accordingly, closeWrite needs to check if sinkInProgress (better naming?) and sinkEnded and it will need to simply return if sinkEnded
  • can close a stream for writing during writing
    • this new test in the interface will need an async generator as source where it can yield the first value and then get blocked on a defer promise. Then, we do the closeWrite.

Does this make sense?

@filoozom
Copy link
Author

@vasco-santos It does make sense! You're right, I didn't think about the added test enough, it doesn't do anything.

The abortable-iterator library does race, but when it catches the abort it still waits for the source to finish (or one more data point?), and that never happens: https://github.com/beejeez/abortable-iterator/blob/master/index.js#L60-L62.

Unless I'm missing something or it's related to my streamWriter function (https://github.com/filoozom/js-libp2p-mplex-pr-115/blob/main/streams.js#L8-L12), which might well be the case? I'd appreciate any insight on that one.

@vasco-santos
Copy link
Member

I need to debug that part yet. But looking quite fast I understand your point, perhaps it needs to check if the error is an AbortError and not wait for the resolve.

However, it is strange as per this test, it should be aborting and finishing, as the async iterator will never end?

If we can create a test scenario with abort that hangs in the abortable-iterator, we can create a PR there too. Perhaps a test where nothing is ever returned in the async generator

@filoozom
Copy link
Author

filoozom commented Apr 19, 2021

The new test works on my machine (which I did not expect at first) but does not on Travis. Strange.

Almost, I'm not sure if what I wrote is correct.

@vasco-santos
Copy link
Member

Cool, we are almost there!

The interface test generator awaits on a function but as it does not have a loop the underlying for await implementation should be able to understand it ends. If you use the already existent function for an infinite generator in https://github.com/filoozom/js-libp2p-interfaces/blob/close-read-write/src/stream-muxer/tests/close-test.js#L23 it works for me. By the way, in the interfaces PR can you get rid of the pause function as well? libp2p-interfaces already has the delay module which is basically what pause does. Fixing that should make things work and CI green here 🎉 you can also submit the interfaces PR and I will review both

@filoozom
Copy link
Author

I'm not sure I understand what you mean by this:

but as it does not have a loop the underlying for await implementation should be able to understand it ends

Right now, it doesn't seem to be ending, right? At least I'm assuming that's why the test is timing out.

I wrote a new function because I wanted to mimic the scenario in which closeStream would generally be used. That is, I'm sure I'm not going to write anything else, so I close it. Having an infinite iterator does not seem to make much sense in such a scenario, although I'm sure this has a use case.

Actually, this is basically the issue I was seeing in #120. Here it works because we're writing to the source, and then it seems to be able to abort. But when I have a source that does not have any more data sent through it, like the oneBufferAndWait function I wrote, it does not.

Regarding the pause function, I suppose you mean this one: filoozom/js-libp2p-interfaces@4fb97e9#diff-3148fbaa4eedcaf9c0eb3bfad3b4257b84c1a60a34874494ca72f6c3b74bd85cR204? I wasn't sure if it was needed, but my reasoning was basically that I wanted to be sure that the first message (filoozom/js-libp2p-interfaces@4fb97e9#diff-3148fbaa4eedcaf9c0eb3bfad3b4257b84c1a60a34874494ca72f6c3b74bd85cR34) went through before closing the write stream, as I also had the issue in #120, where messages were not sent in the right order if I didn't wait for a bit between piping into stream.sink and calling stream.closeWrite, i.e. I had CLOSE_RECEIVER before MESSAGE_INITIATOR.

@filoozom
Copy link
Author

Actually I just tested this code in the code base I was having issues with and it works, so I guess I'll just make the changes so we can get this moved along!

src/stream.js Outdated
log('%s stream %s reset', type, name)
} else {
log('%s stream %s error', type, name, err)
send({ id, type: Types.RESET })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might result in sending attempts when the sink ended in the other party (read closed). This will result in ERR_STREAM_DOESNT_EXIST error thrown.
Better be safe. Can you create _send function that checks if sinkEnded is false before doing the actual send? This will guarantee that on going writes will be stopped if we receive a message that the other party closed the read stream

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something doesn't seem to like not receiving the Types.RESET message. I tried 5c17ed6 for only the line you commented on, and it results in the same OOM error as on CI.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I have a couple of meetings now. I can recheck later. Otherwise, we can revisit if we should throw the error. I need to properly test this with libp2p first

src/stream.js Outdated
@@ -75,41 +112,49 @@ module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsg
onSinkEnd(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problems seems here, both in reset and abort. They call onSinkEnd and do the abort. This will cause the sink catch to be triggered and a second onSinkEnd happens in the end.

Removing onSinkEnd in abort and reset will not work as if the sink did not start the onEnd will not be triggered. So, we will likely need to improve the logic in the catch function of sink to not do the following on abort/reset:

stream.source.end(err)
return onSinkEnd(err)

Without the previous change, this was not problematic somehow. I think it is related to something now happening in a different event loop.

What do you think to be the best approach here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure I follow. Both onSourceEnd and onSinkEnd are guarded from running twice, so both of these shouldn't do anything if ran a second time, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, I guess it's because in abort, onEnd is called before the Types.Reset message was sent. And with the new _send it can't be, and apparently this makes everything hang. Maybe a bug in some other software, I guess there should be a timeout somewhere.

I'll see tomorrow if I can clean this up a bit, but at least it works now.

src/stream.js Outdated
return sinkClosedDefer.promise
}

return stream.sink([])
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm starting to wrap my head around these streams, and I have one question regarding the current code in relation to the spec: does it make sense to create a new stream right before closing it here? Does the spec require this, or could we simply replace it with onSinkEnd() and prohibit a the sink function from being called if sinkEnded?

I don't see anything indicating that there needs to be a stream in both directions in https://github.com/libp2p/specs/tree/master/mplex#opening-a-new-stream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the advantage of doing so is that we inform the other party that we will not write to it anymore and only expect read and their side will also be close for reading

@filoozom
Copy link
Author

filoozom commented Apr 20, 2021

Alright, I rewrote a bit of stuff to make it work and add a few features:

  • All close functions, as well as reset and abort now return a Promise
  • sinkClose and sourceClose dates are added to the timeline
  • No new stream is opened when closeWrite is called and the sink hadn't been opened it
  • Opening a new sink after closeWrite was called throws an error
  • Closing is generalized in closeWrite, closeRead and closeAll functions, and maintaining them should be easier than special cases used previously
  • onSourceEnd and onSinkEnd were mostly duplicated, so they're now grouped in a single function

Let me know what you like and what I should revert!

This would also require a few more changes to https://github.com/filoozom/js-libp2p-interfaces/blob/master/src/stream-muxer/types.d.ts

@vasco-santos
Copy link
Member

vasco-santos commented Apr 21, 2021

Thanks for moving this forward @filoozom

The general code arrangements look great, but I am concerned about the Promise return values, specially for for abort/reset (as we should try to match with the common patterns, like Node streams, can we keep them as before?

I briefly tried this (this branch + interfaces branch) with libp2p tests and I got some problems, the close function was not resolving. One of the tests failing was in connection close: https://github.com/libp2p/js-libp2p/blob/v0.31.0-rc.4/test/dialing/direct.spec.js#L382 , blocked on streams being closed. Could not understand the underlying reason yet.

Not opening the stream to send the message to the other party saying that their read can be closed might be problematic here. We can revisit if we really need this, but I would do this as a separate PR to evaluate interop with other implementations and guarantee we have no streams leaked

@filoozom
Copy link
Author

filoozom commented Apr 21, 2021

Ah I see. I get it, but those functions at least trigger events (at least destroy and end), so you can know when a stream was actually closed, which libp2p-mplex doesn't provide right now. That's why I thought those Promises were quite useful and an additional feature.

The event here would be the onEnd function right? Do you see a need to have specific functions for the others? Specially the abort would be important to be consistent everywhere

You're right, onEnd should be enough. I'm not entirely sure how this fits into libp2p yet (does onEnd end up resolving await connection.close()?) but I guess it should work!

@vasco-santos
Copy link
Member

We use it for tracking the stream in the connection https://github.com/libp2p/js-libp2p/blob/v0.31.0-rc.5/src/upgrader.js#L248

@filoozom
Copy link
Author

We use it for tracking the stream in the connection https://github.com/libp2p/js-libp2p/blob/v0.31.0-rc.5/src/upgrader.js#L248

As far as I can tell, this doesn't trigger an "outside" event. Basically, in:

const { stream } = await node.dialProtocol(peer, protocol);
stream.reset()

how can I tell if the connection was successfully reset or aborted? Does it even matter? I'm not sure 😛

@filoozom
Copy link
Author

filoozom commented Apr 21, 2021

Should I remove the sinkClose and sourceClose from the timeline here or add them in MuxedTimeline? Or maybe move them to a separate PR?

@vasco-santos
Copy link
Member

vasco-santos commented Apr 21, 2021

how can I tell if the connection was successfully reset or aborted? Does it even matter? I'm not sure 😛

My main concern is actually potential errors. Perhaps it will be better to make the Stream an EventEmitter and emit the events like node streams, including an error? This means wrap the closeAll with a try catch.

Should I remove the sinkClose and sourceClose from the timeline here or add them in MuxedTimeline? Or maybe move them to a separate PR?

Perhaps remove it from now. Unless there is a specific need for it, which I don't think we have.

I created libp2p/js-libp2p#923 but we have some new problems with the latest changes 🤔

@filoozom
Copy link
Author

how can I tell if the connection was successfully reset or aborted? Does it even matter? I'm not sure 😛

My main concern is actually potential errors. Perhaps it will be better to make the Stream an EventEmitter and emit the events like node streams, including an error? This means wrap the closeAll with a try catch.

Should we do that in this PR or in a new one?

Should I remove the sinkClose and sourceClose from the timeline here or add them in MuxedTimeline? Or maybe move them to a separate PR?

Perhaps remove it from now. Unless there is a specific need for it, which I don't think we have.

Ok

I created libp2p/js-libp2p#923 but we have some new problems with the latest changes 🤔

Yes we do 😜: libp2p/js-libp2p-interfaces#92 (comment)
Re-running the CI should hopefully work.

@vasco-santos
Copy link
Member

vasco-santos commented Apr 21, 2021

Should we do that in this PR or in a new one?

We can do a new PR for that

Re-running the CI should hopefully work.

It seems that the close is trying to write for the stream that existed before?

@vasco-santos
Copy link
Member

I actually took it from the previous PR, so I haven't really looked at it. Do you know why this line exists? How did it work previously?

Previously we were delegating the responsability to the stream creators, but this was usually resulting on leaked streams. It came from: libp2p/js-libp2p-interfaces#67

I have no idea of what is going on yet, I am multitasking with other things that I need to get done and I did not have the opportunity to debug yet

@filoozom
Copy link
Author

filoozom commented Apr 21, 2021

Sorry for deleting my previous comment, I feel like I'm going a bit crazy here.

So indeed, removing the await from this line makes tests work: https://github.com/libp2p/js-libp2p-interfaces/pull/92/files#diff-97f156c9af132d53ea601d3f0145caa507b415c11647f35e5b09df28f68307baR225

Alternatively, I can also comment this line: https://github.com/alanshaw/abortable-iterator/blob/master/index.js#L60, which is exactly what I was seeing at the beginning but kinda fixed itself.

So the result is that the writeCloseController is called, which I can see by adding:

writeCloseController.signal.addEventListener('abort', () => console.log('write close aborted'))

just before the loop, but the for await loop never breaks because of that return() function...

Either I'm crazy, or this issue had been there forever and is why abort and reset called onSinkEnd directly before instead of through the sink abort.


I guess I'll have to do a bit more debugging 😬

@filoozom
Copy link
Author

filoozom commented Apr 21, 2021

So, when protocol selection happens, the sink from mplex is actually replaced by this: https://github.com/jacobheun/it-handshake/blob/master/src/index.js#L26-L36

So I guess the whole thing is waiting on https://github.com/jacobheun/it-handshake/blob/master/src/index.js#L18 (which I'm assuming is blocking the return() function) and can't be closed before calling it-handshake's overridden sink function.

This is completely unknown territory for me so I might be way off base, but I think it kind of makes sense. Basically from inside mplex we would need to call it-handshake's sink([]) in order to be able to abort? Or I guess resolve from the above line somehow?

EDIT: Can't quite reproduce with it-handshake sadly.

@vasco-santos
Copy link
Member

Thanks for the analysis, I will try to understand the root reason for this tomorrow/friday and get back to you. I am also not understanding what is happening, and yesterday when I tested with libp2p I got it to work after changing this manually in libp2p's node_modules. But, I think I only changed one of the _send

I will need to get the next release of libp2p out of the door, I thought we could land this with it, but we do it afterwards :)

@filoozom
Copy link
Author

Hi, here's a reproduction with it-handshake:

const abortable = require("abortable-iterator");
const AbortController = require("abort-controller");

const pDefer = require("p-defer");
const handshake = require("it-handshake");

const abortController = new AbortController();

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

const defer = pDefer();
const stream = {
  sink: async (source) => {
    source = abortable(source, abortController.signal);

    try {
      for await (let data of source) {
        console.log("Read:", data);
      }
    } catch (err) {
      if (err.code !== "ABORT_ERR") {
        throw err;
      }
    }

    console.log("Sink closed");
    defer.resolve();
  },
  close: () => {
    abortController.abort();
    return defer.promise;
  },
};

const stop = pDefer();
let closed = false;

// Simulate long running process
// (without it the program quits without resolving stream.close)
(async () => {
  await Promise.race([stop.promise, sleep(10000)]);
  console.log("Successful close:", closed);
  process.exit(0);
})();

(async () => {
  const shake = handshake(stream);
  const { sink } = shake.stream;

  // Same result with any or both of these commented
  // It only hangs on another line in it-handshake
  shake.write([]);
  shake.rest();

  // Doesn't work with (blocks the return function)
  /*
  sink(async function* () {
    let counter = 0;
    while (true) {
      yield counter++;
      await sleep(1000);
    }
  });
  */

  // Only works with finished sink
  //sink([]);

  await sleep(1000);
  console.log("Waiting for the stream to close");

  await stream.close();
  console.log("Stream closed");

  closed = true;
  stop.resolve();
})();

The abort hangs on either of:

Here's a simpler example using only abortable-iterator:

const abortable = require("abortable-iterator");
const AbortController = require("abort-controller");
const pDefer = require("p-defer");

const abortController = new AbortController();
const stop = pDefer();
let stopped = false;

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

async function* neverResolves() {
  await new Promise(() => {});
}

(async () => {
  await Promise.race([stop.promise, sleep(5000)]);
  console.log("Successful close:", stopped);
  process.exit(0);
})();

(async () => {
  setTimeout(() => abortController.abort(), 1000);
  for await (const data of abortable(neverResolves(), abortController.signal)) {
    console.log(data);
  }
  stopped = true;
  stop.resolve();
})();

At this points I'd say that it's a design problem in it-handshake, because async generators that wait for Promises aren't meant to resolve.

Funnily enough, replacing neverResolves in the code above with:

async function* neverResolves() {
  yield* (async function* () {
    while (true) {
      yield 1;
      await sleep(1000);
    }
  })();
  await new Promise(() => {});
}

actually makes it so that it works and doesn't hang on the Promise... Maybe a bug in NodeJS?

@vasco-santos
Copy link
Member

I found this thread with some information on potential problems that we are hitting: tc39/proposal-async-iteration#126

I tried earlier to go back to a previous version: trigger CI commit and tried it with libp2p and I am also getting problems. But I am pretty sure I tested it before doing the review about _send and it worked by locally removing the throw in the send function. But now I am getting a lot of other issues somehow.

Sorry, I could not put my hands back in this stuff properly yet.

@filoozom
Copy link
Author

filoozom commented Apr 23, 2021

Sorry, I could not put my hands back in this stuff properly yet.

No worries, nothing urgent. It's quite a bit of a rabbit hole too!

I found this thread with some information on potential problems that we are hitting: tc39/proposal-async-iteration#126

I think this would explain why the following works:

async function* neverResolves() {
  yield* (async function* () {
    while (true) {
      yield 1;
      await sleep(1000);
    }
  })();
  await new Promise(() => {});
}

Also, I wanted to submit a PR to it-handshake that used it-pair and it-pipe instead of the promise, but actually this simple example hangs forever too:

const Writer = require("it-pushable");
const writer = Writer();

async function* neverResolves() {
  yield* writer;
}

@BigLep
Copy link

BigLep commented May 3, 2021

Per 2021-05-03 triage session, the ball is in @vasco-santos' court.

@vasco-santos
Copy link
Member

vasco-santos commented May 5, 2021

I spent some time debugging this today, but could not land on a compromise solution. Thanks for all the work and test scenarios @filoozom

This code here looks good and does what we expect, but the side effects are worrying when we actually have async generators in scene (like it-handshake in identify). @jacobheun can we have your help figuring out what we should do here?

Consider a small resume of the iterations we had:

  • Current libp2p-mplex and libp2p-interfaces do not guarantee streams are closed on a connection.close(), nor enable us to do the closeRead and closeWrite part. Stream close was only closing the read side of the stream, which is not correct (might result into leaked streams) and further writes would mean other messages were sent (against the spec)
  • First attempt to add closeRead and closeWrite worked fine if the closeWrite was the first thing to happen (i.e no previous writes in the stream). However, it also did not provide real guarantees the stream was closed as we were not waiting for the end signal. Finally, running it with libp2p tests showed that it needed better synchronisation for on going writes, otherwise we would hit the stream is not in the muxer registry, it may have already been closed
  • Aiming to fix all the above issues, @filoozom made a few iterations on this PR, where we seem to have a reliable solution for closing/aborting read/write/both at any time, before data transfer, during data transfer and after data transfer per Close read write js-libp2p-interfaces#92 . Everything seems fine in the tests and interface, but sadly we have problems with async generators usage. An on going async generator will be blocked forever and its end promise will not be resolved, which in the libp2p context will make libp2p hang while closing a connection when (for instance) the identify stream is running

@filoozom created some simple test examples without libp2p worth taking a look:

libp2p integration PR that also shows these errors: libp2p/js-libp2p#923

All this comes essentially from a limitation with async generators

@jacobheun
Copy link
Contributor

I'll cut out some time either Friday or some time next week to take a look.

@lidel
Copy link
Member

lidel commented May 31, 2021

(note from triage) blocked until we decide how to handle aborts in browser/node
Closing this PR as the solution is most likely elsewhere (properly cascade close event), but thank you for opening this to highlight the problem @filoozom

Let's continue in #120

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

5 participants