Skip to content

Commit

Permalink
Support sending streaming request
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Apr 1, 2016
1 parent d3fc099 commit 7798ad1
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 10 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,19 @@ $body->read(); // throws BadMethodCallException
$body->getContents(); // throws BadMethodCallException
```

Besides streaming the response body, you can also stream the request body.
This can be useful if you want to send big POST requests (uploading files etc.)
or process many outgoing streams at once.
Instead of passing the body as a string, you can simply pass an instance
implementing React's [`ReadableStreamInterface`](https://github.com/reactphp/stream#readablestreaminterface)
to the [HTTP methods](#methods) like this:

```php
$browser->post($url, array(), $stream)->then(function (ResponseInterface $response) {
echo 'Successfully sent.';
});
```

#### submit()

The `submit($url, array $fields, $headers = array(), $method = 'POST')` method can be used to submit an array of field values similar to submitting a form (`application/x-www-form-urlencoded`).
Expand Down
24 changes: 24 additions & 0 deletions examples/stream-stdin.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

use Clue\React\Buzz\Browser;
use React\Stream\ReadableStreamInterface;
use Psr\Http\Message\ResponseInterface;
use React\Stream\Stream;
use RingCentral\Psr7;

$url = isset($argv[1]) ? $argv[1] : 'https://httpbin.org/post';

require __DIR__ . '/../vendor/autoload.php';

$loop = React\EventLoop\Factory::create();
$client = new Browser($loop);

$in = new Stream(STDIN, $loop);

echo 'Sending STDIN as POST to ' . $url . '' . PHP_EOL;

$client->post($url, array(), $in)->then(function (ResponseInterface $response) {
echo 'Received' . PHP_EOL . Psr7\str($response);
}, 'printf');

$loop->run();
37 changes: 32 additions & 5 deletions src/Io/Sender.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use React\Dns\Resolver\Resolver;
use React\Promise;
use Clue\React\Buzz\Message\MessageFactory;
use React\Stream\ReadableStreamInterface;

class Sender
{
Expand Down Expand Up @@ -115,11 +116,15 @@ public function send(RequestInterface $request, MessageFactory $messageFactory)
return Promise\reject(new \InvalidArgumentException('Sending request requires absolute URI with scheme and host'));
}

$body = (string)$request->getBody();
$body = $request->getBody();

// automatically assign a Content-Length header if the body is not empty
if ($body !== '' && $request->hasHeader('Content-Length') !== null) {
$request = $request->withHeader('Content-Length', strlen($body));
// automatically assign a Content-Length header if the body size is known
if ($body->getSize() !== null && $body->getSize() !== 0 && $request->hasHeader('Content-Length') !== null) {
$request = $request->withHeader('Content-Length', $body->getSize());
}

if ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) {
$request = $request->withHeader('Transfer-Encoding', 'chunked');
}

$headers = array();
Expand All @@ -146,7 +151,29 @@ public function send(RequestInterface $request, MessageFactory $messageFactory)
));
});

$requestStream->end($body);
if ($body instanceof ReadableStreamInterface) {
if ($body->isReadable()) {
if ($request->hasHeader('Content-Length')) {
// length is known => just write to request
$body->pipe($requestStream);
} else {
// length unknown => apply chunked transfer-encoding
// this should be moved somewhere else obviously
$body->on('data', function ($data) use ($requestStream) {
$requestStream->write(dechex(strlen($data)) . "\r\n" . $data . "\r\n");
});
$body->on('end', function() use ($requestStream) {
$requestStream->end("0\r\n\r\n");
});
}
} else {
// stream is not readable => end request without body
$requestStream->end();
}
} else {
// body is fully buffered => write as one chunk
$requestStream->end((string)$body);
}

return $deferred->promise();
}
Expand Down
10 changes: 5 additions & 5 deletions src/Message/MessageFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ class MessageFactory
/**
* Creates a new instance of RequestInterface for the given request parameters
*
* @param string $method
* @param string|UriInterface $uri
* @param array $headers
* @param string $content
* @param string $method
* @param string|UriInterface $uri
* @param array $headers
* @param string|ReadableStreamInterface $content
* @return RequestInterface
*/
public function request($method, $uri, $headers = array(), $content = '')
{
return new Request($method, $uri, $headers, $content);
return new Request($method, $uri, $headers, $this->body($content));
}

/**
Expand Down
50 changes: 50 additions & 0 deletions tests/FunctionalBrowserTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use React\SocketClient\DnsConnector;
use Clue\React\Buzz\Message\ResponseException;
use Clue\React\Block;
use React\Stream\ReadableStream;

class FunctionalBrowserTest extends TestCase
{
Expand Down Expand Up @@ -130,4 +131,53 @@ public function testErrorStatusCodeRejectsWithResponseException()
$this->assertEquals(404, $e->getResponse()->getStatusCode());
}
}

public function testPostString()
{
$response = Block\await($this->browser->post($this->base . 'post', array(), 'hello world'), $this->loop);
$data = json_decode((string)$response->getBody(), true);

$this->assertEquals('hello world', $data['data']);
}

public function testPostStreamChunked()
{
$stream = new ReadableStream();

$this->loop->addTimer(0.001, function () use ($stream) {
$stream->emit('data', array('hello world'));
$stream->close();
});

$response = Block\await($this->browser->post($this->base . 'post', array(), $stream), $this->loop);
$data = json_decode((string)$response->getBody(), true);

$this->assertEquals('hello world', $data['data']);
}

public function testPostStreamKnownLength()
{
$stream = new ReadableStream();

$this->loop->addTimer(0.001, function () use ($stream) {
$stream->emit('data', array('hello world'));
$stream->close();
});

$response = Block\await($this->browser->post($this->base . 'post', array('Content-Length' => 11), $stream), $this->loop);
$data = json_decode((string)$response->getBody(), true);

$this->assertEquals('hello world', $data['data']);
}

public function testPostStreamClosed()
{
$stream = new ReadableStream();
$stream->close();

$response = Block\await($this->browser->post($this->base . 'post', array(), $stream), $this->loop);
$data = json_decode((string)$response->getBody(), true);

$this->assertEquals('', $data['data']);
}
}

0 comments on commit 7798ad1

Please sign in to comment.