diff --git a/README.md b/README.md index 3e33430..6fc6b3f 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ Alternatively, you can also refer to them with their fully-qualified name: ```php \React\Promise\Stream\buffer(…); -``` +``` ### buffer() @@ -190,8 +190,9 @@ a `Promise` which resolves with a `WritableStreamInterface`. This function returns a writable stream instance (implementing `WritableStreamInterface`) right away which acts as a proxy for the future promise resolution. +Any writes to this instance will be buffered in memory for when the promise resolves. Once the given Promise resolves with a `WritableStreamInterface`, any data you -wrote to the proxy will be piped to the inner stream. +have written to the proxy will be forwarded transparently to the inner stream. ```php //$promise = someFunctionWhichResolvesWithAStream(); diff --git a/src/UnwrapWritableStream.php b/src/UnwrapWritableStream.php index d833f4c..3221305 100644 --- a/src/UnwrapWritableStream.php +++ b/src/UnwrapWritableStream.php @@ -16,7 +16,7 @@ class UnwrapWritableStream extends EventEmitter implements WritableStreamInterfa { private $promise; private $stream; - private $buffer = ''; + private $buffer = array(); private $closed = false; private $ending = false; @@ -69,10 +69,15 @@ function (WritableStreamInterface $stream) use ($out, &$store, &$buffer, &$endin $stream->on('close', array($out, 'close')); $out->on('close', array($stream, 'close')); - if ($buffer !== '') { + if ($buffer) { // flush buffer to stream and check if its buffer is not exceeded - $drained = $stream->write($buffer) !== false; - $buffer = ''; + $drained = true; + foreach ($buffer as $chunk) { + if (!$stream->write($chunk)) { + $drained = false; + } + } + $buffer = array(); if ($drained) { // signal drain event, because the output stream previous signalled a full buffer @@ -109,7 +114,7 @@ public function write($data) } // append to buffer and signal the buffer is full - $this->buffer .= $data; + $this->buffer[] = $data; return false; } @@ -128,7 +133,7 @@ public function end($data = null) // append to buffer if ($data !== null) { - $this->buffer .= $data; + $this->buffer[] = $data; } } @@ -143,7 +148,7 @@ public function close() return; } - $this->buffer = ''; + $this->buffer = array(); $this->ending = true; $this->closed = true; diff --git a/tests/UnwrapWritableTest.php b/tests/UnwrapWritableTest.php index d70d85f..ec120e7 100644 --- a/tests/UnwrapWritableTest.php +++ b/tests/UnwrapWritableTest.php @@ -5,6 +5,7 @@ use Clue\React\Block; use React\EventLoop\Factory; use React\Promise; +use React\Promise\Deferred; use React\Promise\Stream; use React\Promise\Timer; use React\Stream\ThroughStream; @@ -149,11 +150,30 @@ public function testForwardsDataImmediatelyIfPromiseIsAlreadyResolved() $stream->write('hello'); } - public function testForwardsDataInOneGoOncePromiseResolves() + public function testForwardsOriginalDataOncePromiseResolves() { + $data = new \stdClass(); + $input = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $input->expects($this->once())->method('isWritable')->willReturn(true); - $input->expects($this->once())->method('write')->with('helloworld'); + $input->expects($this->once())->method('write')->with($data); + $input->expects($this->never())->method('end'); + + $promise = Timer\resolve(0.001, $this->loop)->then(function () use ($input) { + return $input; + }); + $stream = Stream\unwrapWritable($promise); + + $stream->write($data); + + $this->loop->run(); + } + + public function testForwardsDataInOriginalChunksOncePromiseResolves() + { + $input = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $input->expects($this->once())->method('isWritable')->willReturn(true); + $input->expects($this->exactly(2))->method('write')->withConsecutive(array('hello'), array('world')); $input->expects($this->never())->method('end'); $promise = Timer\resolve(0.001, $this->loop)->then(function () use ($input) { @@ -185,7 +205,7 @@ public function testForwardsDataAndEndOncePromiseResolves() { $input = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $input->expects($this->once())->method('isWritable')->willReturn(true); - $input->expects($this->once())->method('write')->with('helloworld!'); + $input->expects($this->exactly(3))->method('write')->withConsecutive(array('hello'), array('world'), array('!')); $input->expects($this->once())->method('end'); $promise = Timer\resolve(0.001, $this->loop)->then(function () use ($input) { @@ -247,6 +267,20 @@ public function testEmitsErrorAndClosesWhenInputEmitsError() $this->assertFalse($stream->isWritable()); } + public function testDoesNotEmitDrainWhenStreamBufferExceededAfterForwardingData() + { + $input = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $input->expects($this->once())->method('isWritable')->willReturn(true); + $input->expects($this->once())->method('write')->with('hello')->willReturn(false); + + $deferred = new Deferred(); + $stream = Stream\unwrapWritable($deferred->promise()); + $stream->write('hello'); + + $stream->on('drain', $this->expectCallableNever()); + $deferred->resolve($input); + } + public function testEmitsDrainWhenInputEmitsDrain() { $input = new ThroughStream();