Skip to content

Commit

Permalink
Support cancellation of pending connection attempts
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Oct 11, 2018
1 parent c192e21 commit e38c8c1
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 14 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ will resolve with a [`ConnectionInterface`](#connectioninterface)
instance on success or will reject with an `Exception` if the URL is
invalid or the connection or authentication fails.

The returned Promise is implemented in such a way that it can be
cancelled when it is still pending. Cancelling a pending promise will
reject its value with an Exception and will cancel the underlying TCP/IP
connection attempt and/or MySQL authentication.

```php
$promise = $factory->createConnection($url);

$loop->addTimer(3.0, function () use ($promise) {
$promise->cancel();
});
```

The `$url` parameter must contain the database host, optional
authentication, port and database to connect to:

Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"evenement/evenement": "^3.0 || ^2.1 || ^1.1",
"react/event-loop": "^1.0 || ^0.5 || ^0.4",
"react/promise": "^2.7",
"react/socket": "^1.0 || ^0.8"
"react/socket": "^1.1"
},
"require-dev": {
"clue/block-react": "^1.2",
Expand Down
53 changes: 40 additions & 13 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use React\MySQL\Io\Connection;
use React\MySQL\Io\Executor;
use React\MySQL\Io\Parser;
use React\Promise\Promise;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use React\Socket\Connector;
use React\Socket\ConnectorInterface;
Expand Down Expand Up @@ -81,6 +81,19 @@ public function __construct(LoopInterface $loop, ConnectorInterface $connector =
* instance on success or will reject with an `Exception` if the URL is
* invalid or the connection or authentication fails.
*
* The returned Promise is implemented in such a way that it can be
* cancelled when it is still pending. Cancelling a pending promise will
* reject its value with an Exception and will cancel the underlying TCP/IP
* connection attempt and/or MySQL authentication.
*
* ```php
* $promise = $factory->createConnection($url);
*
* $loop->addTimer(3.0, function () use ($promise) {
* $promise->cancel();
* });
* ```
*
* The `$url` parameter must contain the database host, optional
* authentication, port and database to connect to:
*
Expand Down Expand Up @@ -113,8 +126,22 @@ public function createConnection($uri)
return \React\Promise\reject(new \InvalidArgumentException('Invalid connect uri given'));
}

$uri = $parts['host'] . ':' . (isset($parts['port']) ? $parts['port'] : 3306);
return $this->connector->connect($uri)->then(function (ConnectionInterface $stream) use ($parts) {
$connecting = $this->connector->connect(
$parts['host'] . ':' . (isset($parts['port']) ? $parts['port'] : 3306)
);

$deferred = new Deferred(function ($_, $reject) use ($connecting) {
// connection cancelled, start with rejecting attempt, then clean up
$reject(new \RuntimeException('Connection to database server cancelled'));

// either close successful connection or cancel pending connection attempt
$connecting->then(function (ConnectionInterface $connection) {
$connection->close();
});
$connecting->cancel();
});

$connecting->then(function (ConnectionInterface $stream) use ($parts, $deferred) {
$executor = new Executor();
$parser = new Parser($stream, $executor);

Expand All @@ -126,17 +153,17 @@ public function createConnection($uri)
));
$parser->start();

return new Promise(function ($resolve, $reject) use ($command, $connection, $stream) {
$command->on('success', function () use ($resolve, $connection) {
$resolve($connection);
});
$command->on('error', function ($error) use ($reject, $stream) {
$reject($error);
$stream->close();
});
$command->on('success', function () use ($deferred, $connection) {
$deferred->resolve($connection);
});
$command->on('error', function ($error) use ($deferred, $stream) {
$deferred->reject($error);
$stream->close();
});
}, function ($error) {
throw new \RuntimeException('Unable to connect to database server', 0, $error);
}, function ($error) use ($deferred) {
$deferred->reject(new \RuntimeException('Unable to connect to database server', 0, $error));
});

return $deferred->promise();
}
}
59 changes: 59 additions & 0 deletions tests/FactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use React\MySQL\ConnectionInterface;
use React\MySQL\Factory;
use React\Socket\Server;
use React\Promise\Promise;

class FactoryTest extends BaseTestCase
{
Expand Down Expand Up @@ -230,4 +231,62 @@ public function testConnectWithValidAuthCanCloseAndAbortPing()

$loop->run();
}

public function testCancelConnectWillCancelPendingConnection()
{
$pending = new Promise(function () { }, $this->expectCallableOnce());
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
$connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock();
$connector->expects($this->once())->method('connect')->willReturn($pending);

$factory = new Factory($loop, $connector);
$promise = $factory->createConnection('127.0.0.1');

$promise->cancel();

$promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')));
$promise->then(null, $this->expectCallableOnceWith($this->callback(function ($e) {
return ($e->getMessage() === 'Connection to database server cancelled');
})));
}

public function testCancelConnectWillCancelPendingConnectionWithRuntimeException()
{
$pending = new Promise(function () { }, function () {
throw new \UnexpectedValueException('ignored');
});
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
$connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock();
$connector->expects($this->once())->method('connect')->willReturn($pending);

$factory = new Factory($loop, $connector);
$promise = $factory->createConnection('127.0.0.1');

$promise->cancel();

$promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')));
$promise->then(null, $this->expectCallableOnceWith($this->callback(function ($e) {
return ($e->getMessage() === 'Connection to database server cancelled');
})));
}

public function testCancelConnectDuringAuthenticationWillCloseConnection()
{
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$connection->expects($this->once())->method('close');

$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
$connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock();
$connector->expects($this->once())->method('connect')->willReturn(\React\Promise\resolve($connection));

$factory = new Factory($loop, $connector);
$promise = $factory->createConnection('127.0.0.1');

$promise->cancel();

$promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')));
$promise->then(null, $this->expectCallableOnceWith($this->callback(function ($e) {
return ($e->getMessage() === 'Connection to database server cancelled');
})));
}
}

0 comments on commit e38c8c1

Please sign in to comment.