diff --git a/README.md b/README.md index 8dcf730..7e71744 100644 --- a/README.md +++ b/README.md @@ -262,6 +262,19 @@ $body->read(); // throws BadMethodCallException $body->getContents(); // throws BadMethodCallException ``` +Besides streaming the response body, you can also stream the request body. +This can be useful if you want to send big POST requests (uploading files etc.) +or process many outgoing streams at once. +Instead of passing the body as a string, you can simply pass an instance +implementing React's [`ReadableStreamInterface`](https://github.com/reactphp/stream#readablestreaminterface) +to the [HTTP methods](#methods) like this: + +```php +$browser->post($url, array(), $stream)->then(function (ResponseInterface $response) { + echo 'Successfully sent.'; +}); +``` + #### submit() The `submit($url, array $fields, $headers = array(), $method = 'POST')` method can be used to submit an array of field values similar to submitting a form (`application/x-www-form-urlencoded`). diff --git a/examples/stream-stdin.php b/examples/stream-stdin.php new file mode 100644 index 0000000..4aaaa44 --- /dev/null +++ b/examples/stream-stdin.php @@ -0,0 +1,24 @@ +post($url, array(), $in)->then(function (ResponseInterface $response) { + echo 'Received' . PHP_EOL . Psr7\str($response); +}, 'printf'); + +$loop->run(); diff --git a/src/Io/Sender.php b/src/Io/Sender.php index 871f4a6..66ecd92 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -17,6 +17,7 @@ use React\Dns\Resolver\Resolver; use React\Promise; use Clue\React\Buzz\Message\MessageFactory; +use React\Stream\ReadableStreamInterface; class Sender { @@ -115,11 +116,15 @@ public function send(RequestInterface $request, MessageFactory $messageFactory) return Promise\reject(new \InvalidArgumentException('Sending request requires absolute URI with scheme and host')); } - $body = (string)$request->getBody(); + $body = $request->getBody(); - // automatically assign a Content-Length header if the body is not empty - if ($body !== '' && $request->hasHeader('Content-Length') !== null) { - $request = $request->withHeader('Content-Length', strlen($body)); + // automatically assign a Content-Length header if the body size is known + if ($body->getSize() !== null && $body->getSize() !== 0 && $request->hasHeader('Content-Length') !== null) { + $request = $request->withHeader('Content-Length', $body->getSize()); + } + + if ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) { + $request = $request->withHeader('Transfer-Encoding', 'chunked'); } $headers = array(); @@ -146,7 +151,29 @@ public function send(RequestInterface $request, MessageFactory $messageFactory) )); }); - $requestStream->end($body); + if ($body instanceof ReadableStreamInterface) { + if ($body->isReadable()) { + if ($request->hasHeader('Content-Length')) { + // length is known => just write to request + $body->pipe($requestStream); + } else { + // length unknown => apply chunked transfer-encoding + // this should be moved somewhere else obviously + $body->on('data', function ($data) use ($requestStream) { + $requestStream->write(dechex(strlen($data)) . "\r\n" . $data . "\r\n"); + }); + $body->on('end', function() use ($requestStream) { + $requestStream->end("0\r\n\r\n"); + }); + } + } else { + // stream is not readable => end request without body + $requestStream->end(); + } + } else { + // body is fully buffered => write as one chunk + $requestStream->end((string)$body); + } return $deferred->promise(); } diff --git a/src/Message/MessageFactory.php b/src/Message/MessageFactory.php index f533a20..5e734ff 100644 --- a/src/Message/MessageFactory.php +++ b/src/Message/MessageFactory.php @@ -17,15 +17,15 @@ class MessageFactory /** * Creates a new instance of RequestInterface for the given request parameters * - * @param string $method - * @param string|UriInterface $uri - * @param array $headers - * @param string $content + * @param string $method + * @param string|UriInterface $uri + * @param array $headers + * @param string|ReadableStreamInterface $content * @return RequestInterface */ public function request($method, $uri, $headers = array(), $content = '') { - return new Request($method, $uri, $headers, $content); + return new Request($method, $uri, $headers, $this->body($content)); } /** diff --git a/tests/FunctionalBrowserTest.php b/tests/FunctionalBrowserTest.php index 49549ac..a0511da 100644 --- a/tests/FunctionalBrowserTest.php +++ b/tests/FunctionalBrowserTest.php @@ -9,6 +9,7 @@ use React\SocketClient\DnsConnector; use Clue\React\Buzz\Message\ResponseException; use Clue\React\Block; +use React\Stream\ReadableStream; class FunctionalBrowserTest extends TestCase { @@ -130,4 +131,53 @@ public function testErrorStatusCodeRejectsWithResponseException() $this->assertEquals(404, $e->getResponse()->getStatusCode()); } } + + public function testPostString() + { + $response = Block\await($this->browser->post($this->base . 'post', array(), 'hello world'), $this->loop); + $data = json_decode((string)$response->getBody(), true); + + $this->assertEquals('hello world', $data['data']); + } + + public function testPostStreamChunked() + { + $stream = new ReadableStream(); + + $this->loop->addTimer(0.001, function () use ($stream) { + $stream->emit('data', array('hello world')); + $stream->close(); + }); + + $response = Block\await($this->browser->post($this->base . 'post', array(), $stream), $this->loop); + $data = json_decode((string)$response->getBody(), true); + + $this->assertEquals('hello world', $data['data']); + } + + public function testPostStreamKnownLength() + { + $stream = new ReadableStream(); + + $this->loop->addTimer(0.001, function () use ($stream) { + $stream->emit('data', array('hello world')); + $stream->close(); + }); + + $response = Block\await($this->browser->post($this->base . 'post', array('Content-Length' => 11), $stream), $this->loop); + $data = json_decode((string)$response->getBody(), true); + + $this->assertEquals('hello world', $data['data']); + } + + public function testPostStreamClosed() + { + $stream = new ReadableStream(); + $stream->close(); + + $response = Block\await($this->browser->post($this->base . 'post', array(), $stream), $this->loop); + $data = json_decode((string)$response->getBody(), true); + + $this->assertEquals('', $data['data']); + } }