diff --git a/README.md b/README.md index 6fc6b3f..2175049 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ for [ReactPHP](https://reactphp.org/). * [unwrapReadable()](#unwrapreadable) * [unwrapWritable()](#unwrapwritable) * [Install](#install) +* [Tests](#tests) * [License](#license) ## Usage @@ -37,11 +38,8 @@ Alternatively, you can also refer to them with their fully-qualified name: ### buffer() -The `buffer(ReadableStreamInterface $stream, int $maxLength = null)` function can be used to create -a `Promise` which resolves with the stream data buffer. With an optional maximum length argument -which defaults to no limit. In case the maximum length is reached before the end the promise will -be rejected with a `\OverflowException`. - +The `buffer(ReadableStreamInterface $stream, ?int $maxLength = null): PromiseInterface` function can be used to +create a `Promise` which resolves with the stream data buffer. ```php $stream = accessSomeJsonStream(); @@ -57,7 +55,11 @@ The promise will resolve with an empty string if the stream is already closed. The promise will reject if the stream emits an error. -The promise will reject if it is canceled. +The promise will reject if it is cancelled. + +The optional `$maxLength` argument defaults to no limit. In case the maximum +length is given and the stream emits more data before the end, the promise +will be rejected with an `\OverflowException`. ```php $stream = accessSomeToLargeStream(); @@ -67,14 +69,14 @@ Stream\buffer($stream, 1024)->then(function ($contents) { }, function ($error) { // Reaching here when the stream buffer goes above the max size, // in this example that is 1024 bytes, - // or when the stream emits an error. + // or when the stream emits an error. }); ``` ### first() -The `first(ReadableStreamInterface|WritableStreamInterface $stream, $event = 'data')` -function can be used to create a `Promise` which resolves once the given event triggers for the first time. +The `first(ReadableStreamInterface|WritableStreamInterface $stream, string $event = 'data'): PromiseInterface` function can be used to +create a `Promise` which resolves once the given event triggers for the first time. ```php $stream = accessSomeJsonStream(); @@ -97,12 +99,12 @@ The promise will reject once the stream closes – unless you're waiting for the The promise will reject if the stream is already closed. -The promise will reject if it is canceled. +The promise will reject if it is cancelled. ### all() -The `all(ReadableStreamInterface|WritableStreamInterface $stream, $event = 'data')` -function can be used to create a `Promise` which resolves with an array of all the event data. +The `all(ReadableStreamInterface|WritableStreamInterface $stream, string $event = 'data'): PromiseInterface` function can be used to +create a `Promise` which resolves with an array of all the event data. ```php $stream = accessSomeJsonStream(); @@ -123,12 +125,12 @@ The promise will resolve with an empty array if the stream is already closed. The promise will reject if the stream emits an error. -The promise will reject if it is canceled. +The promise will reject if it is cancelled. ### unwrapReadable() -The `unwrapReadable(PromiseInterface $promise)` function can be used to unwrap -a `Promise` which resolves with a `ReadableStreamInterface`. +The `unwrapReadable(PromiseInterface $promise): ReadableStreamInterface` function can be used to +unwrap a `Promise` which resolves with a `ReadableStreamInterface`. This function returns a readable stream instance (implementing `ReadableStreamInterface`) right away which acts as a proxy for the future promise resolution. @@ -142,11 +144,11 @@ $promise = startDownloadStream($uri); $stream = Stream\unwrapReadable($promise); $stream->on('data', function ($data) { - echo $data; + echo $data; }); $stream->on('end', function () { - echo 'DONE'; + echo 'DONE'; }); ``` @@ -185,8 +187,8 @@ $loop->addTimer(2.0, function () use ($stream) { ### unwrapWritable() -The `unwrapWritable(PromiseInterface $promise)` function can be used to unwrap -a `Promise` which resolves with a `WritableStreamInterface`. +The `unwrapWritable(PromiseInterface $promise): WritableStreamInterface` function can be used to +unwrap a `Promise` which resolves with a `WritableStreamInterface`. This function returns a writable stream instance (implementing `WritableStreamInterface`) right away which acts as a proxy for the future promise resolution. @@ -204,7 +206,7 @@ $stream->write('hello'); $stream->end('world'); $stream->on('close', function () { - echo 'DONE'; + echo 'DONE'; }); ``` @@ -246,7 +248,7 @@ $loop->addTimer(2.0, function () use ($stream) { The recommended way to install this library is [through Composer](https://getcomposer.org). [New to Composer?](https://getcomposer.org/doc/00-intro.md) -This project follows [SemVer](http://semver.org/). +This project follows [SemVer](https://semver.org/). This will install the latest supported version: ```bash @@ -260,6 +262,21 @@ extensions and supports running on legacy PHP 5.3 through current PHP 7+ and HHVM. It's *highly recommended to use PHP 7+* for this project. +## Tests + +To run the test suite, you first need to clone this repo and then install all +dependencies [through Composer](https://getcomposer.org): + +```bash +$ composer install +``` + +To run the test suite, go to the project root and run: + +```bash +$ php vendor/bin/phpunit +``` + ## License MIT, see [LICENSE file](LICENSE). diff --git a/src/functions.php b/src/functions.php index 57853c8..6a4c9a1 100644 --- a/src/functions.php +++ b/src/functions.php @@ -9,11 +9,43 @@ use React\Stream\WritableStreamInterface; /** - * Creates a `Promise` which resolves with the stream data buffer + * Creates a `Promise` which resolves with the stream data buffer. * - * @param ReadableStreamInterface $stream - * @param int|null $maxLength Maximum number of bytes to buffer or null for unlimited. - * @return Promise\CancellablePromiseInterface Promise + * ```php + * $stream = accessSomeJsonStream(); + * + * Stream\buffer($stream)->then(function ($contents) { + * var_dump(json_decode($contents)); + * }); + * ``` + * + * The promise will resolve with all data chunks concatenated once the stream closes. + * + * The promise will resolve with an empty string if the stream is already closed. + * + * The promise will reject if the stream emits an error. + * + * The promise will reject if it is cancelled. + * + * The optional `$maxLength` argument defaults to no limit. In case the maximum + * length is given and the stream emits more data before the end, the promise + * will be rejected with an `\OverflowException`. + * + * ```php + * $stream = accessSomeToLargeStream(); + * + * Stream\buffer($stream, 1024)->then(function ($contents) { + * var_dump(json_decode($contents)); + * }, function ($error) { + * // Reaching here when the stream buffer goes above the max size, + * // in this example that is 1024 bytes, + * // or when the stream emits an error. + * }); + * ``` + * + * @param ReadableStreamInterface $stream + * @param ?int $maxLength Maximum number of bytes to buffer or null for unlimited. + * @return PromiseInterface */ function buffer(ReadableStreamInterface $stream, $maxLength = null) { @@ -56,11 +88,34 @@ function buffer(ReadableStreamInterface $stream, $maxLength = null) } /** - * Creates a `Promise` which resolves with the first event data + * Creates a `Promise` which resolves once the given event triggers for the first time. + * + * ```php + * $stream = accessSomeJsonStream(); + * + * Stream\first($stream)->then(function ($chunk) { + * echo 'The first chunk arrived: ' . $chunk; + * }); + * ``` + * + * The promise will resolve with whatever the first event emitted or `null` if the + * event does not pass any data. + * If you do not pass a custom event name, then it will wait for the first "data" + * event and resolve with a string containing the first data chunk. + * + * The promise will reject if the stream emits an error – unless you're waiting for + * the "error" event, in which case it will resolve. + * + * The promise will reject once the stream closes – unless you're waiting for the + * "close" event, in which case it will resolve. + * + * The promise will reject if the stream is already closed. + * + * The promise will reject if it is cancelled. * * @param ReadableStreamInterface|WritableStreamInterface $stream * @param string $event - * @return Promise\CancellablePromiseInterface Promise + * @return PromiseInterface */ function first(EventEmitterInterface $stream, $event = 'data') { @@ -102,11 +157,32 @@ function first(EventEmitterInterface $stream, $event = 'data') } /** - * Creates a `Promise` which resolves with an array of all the event data + * Creates a `Promise` which resolves with an array of all the event data. + * + * ```php + * $stream = accessSomeJsonStream(); + * + * Stream\all($stream)->then(function ($chunks) { + * echo 'The stream consists of ' . count($chunks) . ' chunk(s)'; + * }); + * ``` + * + * The promise will resolve with an array of whatever all events emitted or `null` if the + * events do not pass any data. + * If you do not pass a custom event name, then it will wait for all the "data" + * events and resolve with an array containing all the data chunks. + * + * The promise will resolve with an array once the stream closes. + * + * The promise will resolve with an empty array if the stream is already closed. + * + * The promise will reject if the stream emits an error. + * + * The promise will reject if it is cancelled. * * @param ReadableStreamInterface|WritableStreamInterface $stream * @param string $event - * @return Promise\CancellablePromiseInterface Promise + * @return PromiseInterface */ function all(EventEmitterInterface $stream, $event = 'data') { @@ -152,9 +228,62 @@ function all(EventEmitterInterface $stream, $event = 'data') } /** - * unwrap a `Promise` which resolves with a `ReadableStreamInterface`. + * Unwraps a `Promise` which resolves with a `ReadableStreamInterface`. + * + * This function returns a readable stream instance (implementing `ReadableStreamInterface`) + * right away which acts as a proxy for the future promise resolution. + * Once the given Promise resolves with a `ReadableStreamInterface`, its data will + * be piped to the output stream. + * + * ```php + * //$promise = someFunctionWhichResolvesWithAStream(); + * $promise = startDownloadStream($uri); + * + * $stream = Stream\unwrapReadable($promise); * - * @param PromiseInterface $promise Promise + * $stream->on('data', function ($data) { + * echo $data; + * }); + * + * $stream->on('end', function () { + * echo 'DONE'; + * }); + * ``` + * + * If the given promise is either rejected or fulfilled with anything but an + * instance of `ReadableStreamInterface`, then the output stream will emit + * an `error` event and close: + * + * ```php + * $promise = startDownloadStream($invalidUri); + * + * $stream = Stream\unwrapReadable($promise); + * + * $stream->on('error', function (Exception $error) { + * echo 'Error: ' . $error->getMessage(); + * }); + * ``` + * + * The given `$promise` SHOULD be pending, i.e. it SHOULD NOT be fulfilled or rejected + * at the time of invoking this function. + * If the given promise is already settled and does not resolve with an + * instance of `ReadableStreamInterface`, then you will not be able to receive + * the `error` event. + * + * You can `close()` the resulting stream at any time, which will either try to + * `cancel()` the pending promise or try to `close()` the underlying stream. + * + * ```php + * $promise = startDownloadStream($uri); + * + * $stream = Stream\unwrapReadable($promise); + * + * $loop->addTimer(2.0, function () use ($stream) { + * $stream->close(); + * }); + * ``` + * + * @param PromiseInterface $promise * @return ReadableStreamInterface */ function unwrapReadable(PromiseInterface $promise) @@ -163,9 +292,62 @@ function unwrapReadable(PromiseInterface $promise) } /** - * unwrap a `Promise` which resolves with a `WritableStreamInterface`. + * Unwraps a `Promise` which resolves with a `WritableStreamInterface`. + * + * This function returns a writable stream instance (implementing `WritableStreamInterface`) + * right away which acts as a proxy for the future promise resolution. + * Any writes to this instance will be buffered in memory for when the promise resolves. + * Once the given Promise resolves with a `WritableStreamInterface`, any data you + * have written to the proxy will be forwarded transparently to the inner stream. + * + * ```php + * //$promise = someFunctionWhichResolvesWithAStream(); + * $promise = startUploadStream($uri); + * + * $stream = Stream\unwrapWritable($promise); + * + * $stream->write('hello'); + * $stream->end('world'); + * + * $stream->on('close', function () { + * echo 'DONE'; + * }); + * ``` + * + * If the given promise is either rejected or fulfilled with anything but an + * instance of `WritableStreamInterface`, then the output stream will emit + * an `error` event and close: + * + * ```php + * $promise = startUploadStream($invalidUri); + * + * $stream = Stream\unwrapWritable($promise); + * + * $stream->on('error', function (Exception $error) { + * echo 'Error: ' . $error->getMessage(); + * }); + * ``` + * + * The given `$promise` SHOULD be pending, i.e. it SHOULD NOT be fulfilled or rejected + * at the time of invoking this function. + * If the given promise is already settled and does not resolve with an + * instance of `WritableStreamInterface`, then you will not be able to receive + * the `error` event. + * + * You can `close()` the resulting stream at any time, which will either try to + * `cancel()` the pending promise or try to `close()` the underlying stream. + * + * ```php + * $promise = startUploadStream($uri); + * + * $stream = Stream\unwrapWritable($promise); + * + * $loop->addTimer(2.0, function () use ($stream) { + * $stream->close(); + * }); + * ``` * - * @param PromiseInterface $promise Promise + * @param PromiseInterface $promise * @return WritableStreamInterface */ function unwrapWritable(PromiseInterface $promise)