Skip to content

Commit

Permalink
Merge pull request #70 from clue-labs/end-event
Browse files Browse the repository at this point in the history
Consistent end event semantics (EOF)
  • Loading branch information
clue authored Mar 5, 2017
2 parents 7629e45 + 92a67e6 commit 4901524
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 13 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ This component depends on `événement`, which is an implementation of the

* `data`: Emitted whenever data was read from the source
with a single mixed argument for incoming data.
* `end`: Emitted when the source has reached the `eof`.
* `end`: Emitted when the source has successfully reached the end
of the stream (EOF).
This event will only be emitted if the *end* was reached successfully, not
if the stream was interrupted due to an error or explicitly closed.
Also note that not all streams know the concept of a "successful end".
* `error`: Emitted when an error occurs
with a single `Exception` argument for error instance.
* `close`: Emitted when the connection is closed.
* `close`: Emitted when the stream is closed.

### Methods

* `isReadable()`: Check if the stream is still in a state allowing it to be
read from. It becomes unreadable when the connection ends, closes or an
read from. It becomes unreadable when the stream ends, closes or an
error occurs.
* `pause()`: Remove the data source file descriptor from the event loop. This
allows you to throttle incoming data.
Expand All @@ -46,7 +50,7 @@ This component depends on `événement`, which is an implementation of the
to accept more data.
* `error`: Emitted whenever an error occurs
with a single `Exception` argument for error instance.
* `close`: Emitted whenever the connection is closed.
* `close`: Emitted whenever the stream is closed.
* `pipe`: Emitted whenever a readable stream is `pipe()`d into this stream
with a single `ReadableStreamInterface` argument for source stream.

Expand Down
1 change: 1 addition & 0 deletions src/BufferedSink.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public function __construct()
public function handlePipeEvent($source)
{
Util::forwardEvents($source, $this, array('error'));
$source->on('close', array($this, 'close'));
}

public function handleErrorEvent($e)
Expand Down
1 change: 0 additions & 1 deletion src/ReadableStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public function close()
}

$this->closed = true;
$this->emit('end');
$this->emit('close');
$this->removeAllListeners();
}
Expand Down
9 changes: 4 additions & 5 deletions src/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ public function close()
$this->readable = false;
$this->writable = false;

$this->emit('end');
$this->emit('close');
$this->loop->removeStream($this->stream);
$this->buffer->close();
Expand Down Expand Up @@ -171,10 +170,10 @@ public function handleData($stream)

if ($data !== '') {
$this->emit('data', array($data));
}

if (!is_resource($stream) || feof($stream)) {
$this->end();
} else{
// no data read => we reached the end and close the stream
$this->emit('end');
$this->close();
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/ThroughStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public function end($data = null)
$this->readable->emit('data', array($this->filter($data)));
}

$this->writable->end($data);
$this->readable->emit('end');

$this->writable->end();
}
}
1 change: 0 additions & 1 deletion src/WritableStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public function close()
}

$this->closed = true;
$this->emit('end');
$this->emit('close');
$this->removeAllListeners();
}
Expand Down
10 changes: 10 additions & 0 deletions tests/ReadableStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ public function closeShouldClose()
$this->assertFalse($readable->isReadable());
}

/** @test */
public function closeShouldEmitCloseEvent()
{
$readable = new ReadableStream();
$readable->on('close', $this->expectCallableOnce());
$readable->on('end', $this->expectCallableNever());

$readable->close();
}

/** @test */
public function doubleCloseShouldWork()
{
Expand Down
16 changes: 15 additions & 1 deletion tests/StreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ public function testConstructorAcceptsBuffer()
$this->assertSame($buffer, $conn->getBuffer());
}

public function testCloseShouldEmitCloseEvent()
{
$stream = fopen('php://temp', 'r+');
$loop = $this->createLoopMock();

$conn = new Stream($stream, $loop);
$conn->on('close', $this->expectCallableOnce());
$conn->on('end', $this->expectCallableNever());

$conn->close();

$this->assertFalse($conn->isReadable());
}

/**
* @covers React\Stream\Stream::__construct
* @covers React\Stream\Stream::handleData
Expand Down Expand Up @@ -114,7 +128,7 @@ public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfi

$conn->handleData($stream);

$this->assertFalse($conn->isReadable());
$this->assertTrue($conn->isReadable());
$this->assertEquals(100000, strlen($capturedData));
}

Expand Down
10 changes: 10 additions & 0 deletions tests/ThroughStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ public function pipingStuffIntoItShouldWork()
$readable->emit('data', array('foo'));
}

/** @test */
public function endShouldEmitEndAndClose()
{
$through = new ThroughStream();
$through->on('data', $this->expectCallableNever());
$through->on('end', $this->expectCallableOnce());
$through->on('close', $this->expectCallableOnce());
$through->end();
}

/** @test */
public function endShouldCloseTheStream()
{
Expand Down
10 changes: 10 additions & 0 deletions tests/WritableStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ public function closeShouldClose()
$this->assertFalse($through->isWritable());
}

/** @test */
public function closeShouldEmitCloseEvent()
{
$through = new WritableStream();
$through->on('close', $this->expectCallableOnce());
$through->on('end', $this->expectCallableNever());

$through->close();
}

/** @test */
public function doubleCloseShouldWork()
{
Expand Down

0 comments on commit 4901524

Please sign in to comment.