Skip to content

Commit

Permalink
Merge pull request reactphp#74 from clue-labs/uri
Browse files Browse the repository at this point in the history
Use `connect($uri)` instead of `create($host, $port)`
  • Loading branch information
clue authored Jan 9, 2017
2 parents a322101 + 035b373 commit 1bf5685
Show file tree
Hide file tree
Showing 15 changed files with 238 additions and 101 deletions.
30 changes: 15 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ swap this implementation against any other implementation of this interface.

The interface only offers a single method:

#### create()
#### connect()

The `create(string $host, int $port): PromiseInterface<Stream, Exception>` method
The `connect(string $uri): PromiseInterface<Stream, Exception>` method
can be used to establish a streaming connection.
It returns a [Promise](https://github.com/reactphp/promise) which either
fulfills with a [Stream](https://github.com/reactphp/stream) or
rejects with an `Exception`:

```php
$connector->create('google.com', 443)->then(
$connector->connect('google.com:443')->then(
function (Stream $stream) {
// connection successfully established
},
Expand All @@ -69,7 +69,7 @@ reject its value with an `Exception`. It SHOULD clean up any underlying
resources and references as applicable:

```php
$promise = $connector->create($host, $port);
$promise = $connector->connect($uri);

$promise->cancel();
```
Expand All @@ -83,7 +83,7 @@ TCP/IP connections to any IP-port-combination:
```php
$tcpConnector = new React\SocketClient\TcpConnector($loop);

$tcpConnector->create('127.0.0.1', 80)->then(function (React\Stream\Stream $stream) {
$tcpConnector->connect('127.0.0.1:80')->then(function (React\Stream\Stream $stream) {
$stream->write('...');
$stream->end();
});
Expand All @@ -96,7 +96,7 @@ See also the [first example](examples).
Pending connection attempts can be cancelled by cancelling its pending promise like so:

```php
$promise = $tcpConnector->create($host, $port);
$promise = $tcpConnector->connect('127.0.0.1:80');

$promise->cancel();
```
Expand Down Expand Up @@ -132,11 +132,11 @@ Make sure to set up your DNS resolver and underlying TCP connector like this:

```php
$dnsResolverFactory = new React\Dns\Resolver\Factory();
$dns = $dnsResolverFactory->createCached('8.8.8.8', $loop);
$dns = $dnsResolverFactory->connectCached('8.8.8.8', $loop);

$dnsConnector = new React\SocketClient\DnsConnector($tcpConnector, $dns);

$dnsConnector->create('www.google.com', 80)->then(function (React\Stream\Stream $stream) {
$dnsConnector->connect('www.google.com:80')->then(function (React\Stream\Stream $stream) {
$stream->write('...');
$stream->end();
});
Expand All @@ -149,7 +149,7 @@ See also the [first example](examples).
Pending connection attempts can be cancelled by cancelling its pending promise like so:

```php
$promise = $dnsConnector->create($host, $port);
$promise = $dnsConnector->connect('www.google.com:80');

$promise->cancel();
```
Expand All @@ -164,7 +164,7 @@ set up like this:
```php
$connector = new React\SocketClient\Connector($loop, $dns);

$connector->create('www.google.com', 80)->then($callback);
$connector->connect('www.google.com:80')->then($callback);
```

### Async SSL/TLS connections
Expand All @@ -180,7 +180,7 @@ stream.
```php
$secureConnector = new React\SocketClient\SecureConnector($dnsConnector, $loop);

$secureConnector->create('www.google.com', 443)->then(function (React\Stream\Stream $stream) {
$secureConnector->connect('www.google.com:443')->then(function (React\Stream\Stream $stream) {
$stream->write("GET / HTTP/1.0\r\nHost: www.google.com\r\n\r\n");
...
});
Expand All @@ -193,7 +193,7 @@ See also the [second example](examples).
Pending connection attempts can be cancelled by cancelling its pending promise like so:

```php
$promise = $secureConnector->create($host, $port);
$promise = $secureConnector->connect('www.google.com:443');

$promise->cancel();
```
Expand Down Expand Up @@ -233,7 +233,7 @@ underlying connection attempt if it takes too long.
```php
$timeoutConnector = new React\SocketClient\TimeoutConnector($connector, 3.0, $loop);

$timeoutConnector->create('google.com', 80)->then(function (React\Stream\Stream $stream) {
$timeoutConnector->connect('google.com:80')->then(function (React\Stream\Stream $stream) {
// connection succeeded within 3.0 seconds
});
```
Expand All @@ -243,7 +243,7 @@ See also any of the [examples](examples).
Pending connection attempts can be cancelled by cancelling its pending promise like so:

```php
$promise = $timeoutConnector->create($host, $port);
$promise = $timeoutConnector->connect('google.com:80');

$promise->cancel();
```
Expand All @@ -260,7 +260,7 @@ Unix domain socket (UDS) paths like this:
```php
$connector = new React\SocketClient\UnixConnector($loop);

$connector->create('/tmp/demo.sock')->then(function (React\Stream\Stream $stream) {
$connector->connect('/tmp/demo.sock')->then(function (React\Stream\Stream $stream) {
$stream->write("HELLO\n");
});

Expand Down
4 changes: 2 additions & 2 deletions src/Connector.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ public function __construct(LoopInterface $loop, Resolver $resolver)
$this->connector = new DnsConnector(new TcpConnector($loop), $resolver);
}

public function create($host, $port)
public function connect($uri)
{
return $this->connector->create($host, $port);
return $this->connector->connect($uri);
}
}
7 changes: 3 additions & 4 deletions src/ConnectorInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* This is usually done via dependency injection, so it's fairly simple to actually
* swap this implementation against any other implementation of this interface.
*
* The interface only offers a single `create()` method.
* The interface only offers a single `connect()` method.
*/
interface ConnectorInterface
{
Expand All @@ -30,9 +30,8 @@ interface ConnectorInterface
* reject its value with an Exception. It SHOULD clean up any underlying
* resources and references as applicable.
*
* @param string $host
* @param int $port
* @param string $uri
* @return React\Promise\PromiseInterface resolves with a Stream on success or rejects with an Exception on error
*/
public function create($host, $port);
public function connect($uri);
}
56 changes: 51 additions & 5 deletions src/DnsConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,60 @@ public function __construct(ConnectorInterface $connector, Resolver $resolver)
$this->resolver = $resolver;
}

public function create($host, $port)
public function connect($uri)
{
if (strpos($uri, '://') === false) {
$parts = parse_url('tcp://' . $uri);
unset($parts['scheme']);
} else {
$parts = parse_url($uri);
}

if (!$parts || !isset($parts['host'])) {
return Promise\reject(new \InvalidArgumentException('Given URI "' . $uri . '" is invalid'));
}

$that = $this;
$host = trim($parts['host'], '[]');

return $this
->resolveHostname($host)
->then(function ($ip) use ($that, $port) {
return $that->connect($ip, $port);
->then(function ($ip) use ($that, $parts) {
$uri = '';

// prepend original scheme if known
if (isset($parts['scheme'])) {
$uri .= $parts['scheme'] . '://';
}

if (strpos($ip, ':') !== false) {
// enclose IPv6 addresses in square brackets before appending port
$uri .= '[' . $ip . ']';
} else {
$uri .= $ip;
}

// append original port if known
if (isset($parts['port'])) {
$uri .= ':' . $parts['port'];
}

// append orignal path if known
if (isset($parts['path'])) {
$uri .= $parts['path'];
}

// append original query if known
if (isset($parts['query'])) {
$uri .= '?' . $parts['query'];
}

// append original fragment if known
if (isset($parts['fragment'])) {
$uri .= '#' . $parts['fragment'];
}

return $that->connectTcp($uri);
});
}

Expand Down Expand Up @@ -55,9 +101,9 @@ function ($_, $reject) use ($promise) {
}

/** @internal */
public function connect($ip, $port)
public function connectTcp($uri)
{
$promise = $this->connector->create($ip, $port);
$promise = $this->connector->connect($uri);

return new Promise\Promise(
function ($resolve, $reject) use ($promise) {
Expand Down
20 changes: 16 additions & 4 deletions src/SecureConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,24 @@ public function __construct(ConnectorInterface $connector, LoopInterface $loop,
$this->context = $context;
}

public function create($host, $port)
public function connect($uri)
{
if (!function_exists('stream_socket_enable_crypto')) {
return Promise\reject(new \BadMethodCallException('Encryption not supported on your platform (HHVM < 3.8?)'));
}

if (strpos($uri, '://') === false) {
$uri = 'tls://' . $uri;
}

$parts = parse_url($uri);
if (!$parts || !isset($parts['host']) || $parts['scheme'] !== 'tls') {
return Promise\reject(new \InvalidArgumentException('Given URI "' . $uri . '" is invalid'));
}

$uri = str_replace('tls://', '', $uri);
$host = trim($parts['host'], '[]');

$context = $this->context + array(
'SNI_enabled' => true,
'peer_name' => $host
Expand All @@ -40,7 +52,7 @@ public function create($host, $port)
}

$encryption = $this->streamEncryption;
return $this->connect($host, $port)->then(function (Stream $stream) use ($context, $encryption) {
return $this->connectTcp($uri)->then(function (Stream $stream) use ($context, $encryption) {
// (unencrypted) TCP/IP connection succeeded

// set required SSL/TLS context options
Expand All @@ -57,9 +69,9 @@ public function create($host, $port)
});
}

private function connect($host, $port)
private function connectTcp($uri)
{
$promise = $this->connector->create($host, $port);
$promise = $this->connector->connect($uri);

return new Promise\Promise(
function ($resolve, $reject) use ($promise) {
Expand Down
29 changes: 14 additions & 15 deletions src/TcpConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,24 @@ public function __construct(LoopInterface $loop, array $context = array())
$this->context = $context;
}

public function create($ip, $port)
public function connect($uri)
{
if (false === filter_var($ip, FILTER_VALIDATE_IP)) {
return Promise\reject(new \InvalidArgumentException('Given parameter "' . $ip . '" is not a valid IP'));
if (strpos($uri, '://') === false) {
$uri = 'tcp://' . $uri;
}

$parts = parse_url($uri);
if (!$parts || !isset($parts['scheme'], $parts['host'], $parts['port']) || $parts['scheme'] !== 'tcp') {
return Promise\reject(new \InvalidArgumentException('Given URI "' . $uri . '" is invalid'));
}

$url = $this->getSocketUrl($ip, $port);
$ip = trim($parts['host'], '[]');
if (false === filter_var($ip, FILTER_VALIDATE_IP)) {
return Promise\reject(new \InvalidArgumentException('Given URI "' . $ip . '" does not contain a valid host IP'));
}

$socket = @stream_socket_client(
$url,
$uri,
$errno,
$errstr,
0,
Expand All @@ -38,7 +46,7 @@ public function create($ip, $port)

if (false === $socket) {
return Promise\reject(new \RuntimeException(
sprintf("Connection to %s:%d failed: %s", $ip, $port, $errstr),
sprintf("Connection to %s failed: %s", $uri, $errstr),
$errno
));
}
Expand Down Expand Up @@ -90,13 +98,4 @@ public function handleConnectedSocket($socket)
{
return new Stream($socket, $this->loop);
}

private function getSocketUrl($ip, $port)
{
if (strpos($ip, ':') !== false) {
// enclose IPv6 addresses in square brackets before appending port
$ip = '[' . $ip . ']';
}
return sprintf('tcp://%s:%s', $ip, $port);
}
}
4 changes: 2 additions & 2 deletions src/TimeoutConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ public function __construct(ConnectorInterface $connector, $timeout, LoopInterfa
$this->loop = $loop;
}

public function create($host, $port)
public function connect($uri)
{
$promise = $this->connector->create($host, $port);
$promise = $this->connector->connect($uri);

return Timer\timeout(new Promise(
function ($resolve, $reject) use ($promise) {
Expand Down
10 changes: 8 additions & 2 deletions src/UnixConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@ public function __construct(LoopInterface $loop)
$this->loop = $loop;
}

public function create($path, $unusedPort = 0)
public function connect($path)
{
$resource = @stream_socket_client('unix://' . $path, $errno, $errstr, 1.0);
if (strpos($path, '://') === false) {
$path = 'unix://' . $path;
} elseif (substr($path, 0, 7) !== 'unix://') {
return Promise\reject(new \InvalidArgumentException('Given URI "' . $path . '" is invalid'));
}

$resource = @stream_socket_client($path, $errno, $errstr, 1.0);

if (!$resource) {
return Promise\reject(new RuntimeException('Unable to connect to unix domain socket "' . $path . '": ' . $errstr, $errno));
Expand Down
Loading

0 comments on commit 1bf5685

Please sign in to comment.