From 92a67e6ccfba9cd03463205c74dd25fdb77214c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Fri, 24 Feb 2017 17:55:32 +0100 Subject: [PATCH] Consistent end event semantics (EOF) --- README.md | 12 ++++++++---- src/BufferedSink.php | 1 + src/ReadableStream.php | 1 - src/Stream.php | 9 ++++----- src/ThroughStream.php | 4 +++- src/WritableStream.php | 1 - tests/ReadableStreamTest.php | 10 ++++++++++ tests/StreamTest.php | 16 +++++++++++++++- tests/ThroughStreamTest.php | 10 ++++++++++ tests/WritableStreamTest.php | 10 ++++++++++ 10 files changed, 61 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 40e0241..b3d0bce 100644 --- a/README.md +++ b/README.md @@ -20,15 +20,19 @@ This component depends on `événement`, which is an implementation of the * `data`: Emitted whenever data was read from the source with a single mixed argument for incoming data. -* `end`: Emitted when the source has reached the `eof`. +* `end`: Emitted when the source has successfully reached the end + of the stream (EOF). + This event will only be emitted if the *end* was reached successfully, not + if the stream was interrupted due to an error or explicitly closed. + Also note that not all streams know the concept of a "successful end". * `error`: Emitted when an error occurs with a single `Exception` argument for error instance. -* `close`: Emitted when the connection is closed. +* `close`: Emitted when the stream is closed. ### Methods * `isReadable()`: Check if the stream is still in a state allowing it to be - read from. It becomes unreadable when the connection ends, closes or an + read from. It becomes unreadable when the stream ends, closes or an error occurs. * `pause()`: Remove the data source file descriptor from the event loop. This allows you to throttle incoming data. @@ -46,7 +50,7 @@ This component depends on `événement`, which is an implementation of the to accept more data. * `error`: Emitted whenever an error occurs with a single `Exception` argument for error instance. -* `close`: Emitted whenever the connection is closed. +* `close`: Emitted whenever the stream is closed. * `pipe`: Emitted whenever a readable stream is `pipe()`d into this stream with a single `ReadableStreamInterface` argument for source stream. diff --git a/src/BufferedSink.php b/src/BufferedSink.php index a6b35c0..730c808 100644 --- a/src/BufferedSink.php +++ b/src/BufferedSink.php @@ -21,6 +21,7 @@ public function __construct() public function handlePipeEvent($source) { Util::forwardEvents($source, $this, array('error')); + $source->on('close', array($this, 'close')); } public function handleErrorEvent($e) diff --git a/src/ReadableStream.php b/src/ReadableStream.php index ca154b2..bdf6c33 100644 --- a/src/ReadableStream.php +++ b/src/ReadableStream.php @@ -35,7 +35,6 @@ public function close() } $this->closed = true; - $this->emit('end'); $this->emit('close'); $this->removeAllListeners(); } diff --git a/src/Stream.php b/src/Stream.php index ae03fcc..34386ae 100644 --- a/src/Stream.php +++ b/src/Stream.php @@ -116,7 +116,6 @@ public function close() $this->readable = false; $this->writable = false; - $this->emit('end'); $this->emit('close'); $this->loop->removeStream($this->stream); $this->buffer->close(); @@ -171,10 +170,10 @@ public function handleData($stream) if ($data !== '') { $this->emit('data', array($data)); - } - - if (!is_resource($stream) || feof($stream)) { - $this->end(); + } else{ + // no data read => we reached the end and close the stream + $this->emit('end'); + $this->close(); } } diff --git a/src/ThroughStream.php b/src/ThroughStream.php index 862bde5..29789df 100644 --- a/src/ThroughStream.php +++ b/src/ThroughStream.php @@ -28,6 +28,8 @@ public function end($data = null) $this->readable->emit('data', array($this->filter($data))); } - $this->writable->end($data); + $this->readable->emit('end'); + + $this->writable->end(); } } diff --git a/src/WritableStream.php b/src/WritableStream.php index d610f1f..612450a 100644 --- a/src/WritableStream.php +++ b/src/WritableStream.php @@ -33,7 +33,6 @@ public function close() } $this->closed = true; - $this->emit('end'); $this->emit('close'); $this->removeAllListeners(); } diff --git a/tests/ReadableStreamTest.php b/tests/ReadableStreamTest.php index 01466d9..12a77b7 100644 --- a/tests/ReadableStreamTest.php +++ b/tests/ReadableStreamTest.php @@ -36,6 +36,16 @@ public function closeShouldClose() $this->assertFalse($readable->isReadable()); } + /** @test */ + public function closeShouldEmitCloseEvent() + { + $readable = new ReadableStream(); + $readable->on('close', $this->expectCallableOnce()); + $readable->on('end', $this->expectCallableNever()); + + $readable->close(); + } + /** @test */ public function doubleCloseShouldWork() { diff --git a/tests/StreamTest.php b/tests/StreamTest.php index 6d9d5ea..801e064 100644 --- a/tests/StreamTest.php +++ b/tests/StreamTest.php @@ -43,6 +43,20 @@ public function testConstructorAcceptsBuffer() $this->assertSame($buffer, $conn->getBuffer()); } + public function testCloseShouldEmitCloseEvent() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + + $conn = new Stream($stream, $loop); + $conn->on('close', $this->expectCallableOnce()); + $conn->on('end', $this->expectCallableNever()); + + $conn->close(); + + $this->assertFalse($conn->isReadable()); + } + /** * @covers React\Stream\Stream::__construct * @covers React\Stream\Stream::handleData @@ -114,7 +128,7 @@ public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfi $conn->handleData($stream); - $this->assertFalse($conn->isReadable()); + $this->assertTrue($conn->isReadable()); $this->assertEquals(100000, strlen($capturedData)); } diff --git a/tests/ThroughStreamTest.php b/tests/ThroughStreamTest.php index acc964c..d6a6ca3 100644 --- a/tests/ThroughStreamTest.php +++ b/tests/ThroughStreamTest.php @@ -30,6 +30,16 @@ public function pipingStuffIntoItShouldWork() $readable->emit('data', array('foo')); } + /** @test */ + public function endShouldEmitEndAndClose() + { + $through = new ThroughStream(); + $through->on('data', $this->expectCallableNever()); + $through->on('end', $this->expectCallableOnce()); + $through->on('close', $this->expectCallableOnce()); + $through->end(); + } + /** @test */ public function endShouldCloseTheStream() { diff --git a/tests/WritableStreamTest.php b/tests/WritableStreamTest.php index f48f096..0eeb12d 100644 --- a/tests/WritableStreamTest.php +++ b/tests/WritableStreamTest.php @@ -59,6 +59,16 @@ public function closeShouldClose() $this->assertFalse($through->isWritable()); } + /** @test */ + public function closeShouldEmitCloseEvent() + { + $through = new WritableStream(); + $through->on('close', $this->expectCallableOnce()); + $through->on('end', $this->expectCallableNever()); + + $through->close(); + } + /** @test */ public function doubleCloseShouldWork() {