Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DuplexResourceStream and deprecate Stream #85

Merged
merged 3 commits into from
Mar 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ descriptor based implementation with an in-memory write buffer.
* [DuplexStreamInterface](#duplexstreaminterface)
* [ReadableResourceStream](#readableresourcestream)
* [WritableResourceStream](#writableresourcestream)
* [DuplexResourceStream](#duplexresourcestream)
* [Usage](#usage)
* [Install](#install)
* [Tests](#tests)
Expand Down Expand Up @@ -724,6 +725,9 @@ Otherwise, it will throw an `InvalidArgumentException`:
$stream = new ReadableResourceStream(false, $loop);
```

See also the [`DuplexResourceStream`](#readableresourcestream) for read-and-write
stream resources otherwise.

Internally, this class tries to enable non-blocking mode on the stream resource
which may not be supported for all stream resources.
Most notably, this is not supported by pipes on Windows (STDIN etc.).
Expand Down Expand Up @@ -787,6 +791,9 @@ Otherwise, it will throw an `InvalidArgumentException`:
$stream = new WritableResourceStream(false, $loop);
```

See also the [`DuplexResourceStream`](#readableresourcestream) for read-and-write
stream resources otherwise.

Internally, this class tries to enable non-blocking mode on the stream resource
which may not be supported for all stream resources.
Most notably, this is not supported by pipes on Windows (STDOUT, STDERR etc.).
Expand Down Expand Up @@ -823,6 +830,94 @@ $stream->softLimit = 8192;

See also [`write()`](#write) for more details.

### DuplexResourceStream

The `DuplexResourceStream` is a concrete implementation of the
[`DuplexStreamInterface`](#duplexstreaminterface) for PHP's stream resources.

This can be used to represent a read-and-write resource like a file stream opened
in read and write mode mode or a stream such as a TCP/IP connection:

```php
$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn, $loop);
$stream->write('hello!');
$stream->end();
```

See also [`DuplexStreamInterface`](#duplexstreaminterface) for more details.

The first parameter given to the constructor MUST be a valid stream resource
that is opened for reading *and* writing.
Otherwise, it will throw an `InvalidArgumentException`:

```php
// throws InvalidArgumentException
$stream = new DuplexResourceStream(false, $loop);
```

See also the [`ReadableResourceStream`](#readableresourcestream) for read-only
and the [`WritableResourceStream`](#writableresourcestream) for write-only
stream resources otherwise.

Internally, this class tries to enable non-blocking mode on the stream resource
which may not be supported for all stream resources.
Most notably, this is not supported by pipes on Windows (STDOUT, STDERR etc.).
If this fails, it will throw a `RuntimeException`:

```php
// throws RuntimeException on Windows
$stream = new DuplexResourceStream(STDOUT, $loop);
```

Once the constructor is called with a valid stream resource, this class will
take care of the underlying stream resource.
You SHOULD only use its public API and SHOULD NOT interfere with the underlying
stream resource manually.
Should you need to access the underlying stream resource, you can use the public
`$stream` property like this:

```php
var_dump(stream_get_meta_data($stream->stream));
```

The `$bufferSize` property controls the maximum buffer size in bytes to read
at once from the stream.
This value SHOULD NOT be changed unless you know what you're doing.
This can be a positive number which means that up to X bytes will be read
at once from the underlying stream resource. Note that the actual number
of bytes read may be lower if the stream resource has less than X bytes
currently available.
This can be `null` which means "read everything available" from the
underlying stream resource.
This should read until the stream resource is not readable anymore
(i.e. underlying buffer drained), note that this does not neccessarily
mean it reached EOF.

```php
$stream->bufferSize = 8192;
```

Any `write()` calls to this class will not be performaned instantly, but will
be performaned asynchronously, once the EventLoop reports the stream resource is
ready to accept data.
For this, it uses an in-memory buffer string to collect all outstanding writes.
This buffer has a soft-limit applied which defines how much data it is willing
to accept before the caller SHOULD stop sending further data.
It currently defaults to 64 KiB and can be controlled through the public
`$softLimit` property like this:

```php
$buffer = $stream->getBuffer();
$buffer->softLimit = 8192;
```

See also [`write()`](#write) for more details.

> BC note: This class was previously called `Stream`.
The `Stream` class still exists for BC reasons and will be removed in future
versions of this package.

## Usage
```php
$loop = React\EventLoop\Factory::create();
Expand Down
3 changes: 1 addition & 2 deletions examples/benchmark-throughput.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
$loop = new React\EventLoop\StreamSelectLoop();

// setup information stream
$info = new React\Stream\Stream(STDERR, $loop);
$info->pause();
$info = new React\Stream\WritableResourceStream(STDERR, $loop);
if (extension_loaded('xdebug')) {
$info->write('NOTICE: The "xdebug" extension is loaded, this has a major impact on performance.' . PHP_EOL);
}
Expand Down
224 changes: 224 additions & 0 deletions src/DuplexResourceStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
<?php

namespace React\Stream;

use Evenement\EventEmitter;
use React\EventLoop\LoopInterface;
use InvalidArgumentException;

class DuplexResourceStream extends EventEmitter implements DuplexStreamInterface
{
/**
* Controls the maximum buffer size in bytes to read at once from the stream.
*
* This can be a positive number which means that up to X bytes will be read
* at once from the underlying stream resource. Note that the actual number
* of bytes read may be lower if the stream resource has less than X bytes
* currently available.
*
* This can be `null` which means read everything available from the
* underlying stream resource.
* This should read until the stream resource is not readable anymore
* (i.e. underlying buffer drained), note that this does not neccessarily
* mean it reached EOF.
*
* @var int|null
*/
public $bufferSize = 65536;

public $stream;
protected $readable = true;
protected $writable = true;
protected $closing = false;
protected $loop;
protected $buffer;

public function __construct($stream, LoopInterface $loop, WritableStreamInterface $buffer = null)
{
if (!is_resource($stream) || get_resource_type($stream) !== "stream") {
throw new InvalidArgumentException('First parameter must be a valid stream resource');
}

// ensure resource is opened for reading and wrting (fopen mode must contain "+")
$meta = stream_get_meta_data($stream);
if (isset($meta['mode']) && $meta['mode'] !== '' && strpos($meta['mode'], '+') === false) {
throw new InvalidArgumentException('Given stream resource is not opened in read and write mode');
}

// this class relies on non-blocking I/O in order to not interrupt the event loop
// e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
if (stream_set_blocking($stream, 0) !== true) {
throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
}

// Use unbuffered read operations on the underlying stream resource.
// Reading chunks from the stream may otherwise leave unread bytes in
// PHP's stream buffers which some event loop implementations do not
// trigger events on (edge triggered).
// This does not affect the default event loop implementation (level
// triggered), so we can ignore platforms not supporting this (HHVM).
// Pipe streams (such as STDIN) do not seem to require this and legacy
// PHP < 5.4 causes SEGFAULTs on unbuffered pipe streams, so skip this.
if (function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
stream_set_read_buffer($stream, 0);
}

if ($buffer === null) {
$buffer = new WritableResourceStream($stream, $loop);
}

$this->stream = $stream;
$this->loop = $loop;
$this->buffer = $buffer;

$that = $this;

$this->buffer->on('error', function ($error) use ($that) {
$that->emit('error', array($error));
});

$this->buffer->on('close', array($this, 'close'));

$this->buffer->on('drain', function () use ($that) {
$that->emit('drain');
});

$this->resume();
}

public function isReadable()
{
return $this->readable;
}

public function isWritable()
{
return $this->writable;
}

public function pause()
{
$this->loop->removeReadStream($this->stream);
}

public function resume()
{
if ($this->readable) {
$this->loop->addReadStream($this->stream, array($this, 'handleData'));
}
}

public function write($data)
{
if (!$this->writable) {
return false;
}

return $this->buffer->write($data);
}

public function close()
{
if (!$this->writable && !$this->closing) {
return;
}

$this->closing = false;

$this->readable = false;
$this->writable = false;

$this->emit('close');
$this->loop->removeStream($this->stream);
$this->buffer->close();
$this->removeAllListeners();

$this->handleClose();
}

public function end($data = null)
{
if (!$this->writable) {
return;
}

$this->closing = true;

$this->readable = false;
$this->writable = false;

$this->buffer->end($data);
}

public function pipe(WritableStreamInterface $dest, array $options = array())
{
return Util::pipe($this, $dest, $options);
}

public function handleData($stream)
{
$error = null;
set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
$error = new \ErrorException(
$errstr,
0,
$errno,
$errfile,
$errline
);
});

$data = stream_get_contents($stream, $this->bufferSize === null ? -1 : $this->bufferSize);

restore_error_handler();

if ($error !== null) {
$this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
$this->close();
return;
}

if ($data !== '') {
$this->emit('data', array($data));
} else{
// no data read => we reached the end and close the stream
$this->emit('end');
$this->close();
}
}

public function handleClose()
{
if (is_resource($this->stream)) {
fclose($this->stream);
}
}

/**
* @return WritableStreamInterface
*/
public function getBuffer()
{
return $this->buffer;
}

/**
* Returns whether this is a pipe resource in a legacy environment
*
* @param resource $resource
* @return bool
*
* @codeCoverageIgnore
*/
private function isLegacyPipe($resource)
{
if (PHP_VERSION_ID < 50400) {
$meta = stream_get_meta_data($resource);

if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
return true;
}
}
return false;
}
}
Loading