diff --git a/src/Ratchet/Server/IoConnection.php b/src/Ratchet/Server/IoConnection.php index 9f864bb9..e141dd84 100644 --- a/src/Ratchet/Server/IoConnection.php +++ b/src/Ratchet/Server/IoConnection.php @@ -1,29 +1,35 @@ conn = $conn; + $this->loop = $loop; } /** * {@inheritdoc} */ - public function send($data) { + public function send($data) + { $this->conn->write($data); return $this; @@ -32,7 +38,8 @@ public function send($data) { /** * {@inheritdoc} */ - public function close() { + public function close() + { $this->conn->end(); } } diff --git a/src/Ratchet/Server/IoServer.php b/src/Ratchet/Server/IoServer.php index 921c7b14..75f507b8 100644 --- a/src/Ratchet/Server/IoServer.php +++ b/src/Ratchet/Server/IoServer.php @@ -1,5 +1,6 @@ loop = $loop; - $this->app = $app; + $this->app = $app; $this->socket = $socket; $socket->on('connection', array($this, 'handleConnect')); @@ -60,12 +63,13 @@ public function __construct(MessageComponentInterface $app, ServerInterface $soc /** * @param \Ratchet\MessageComponentInterface $component The application that I/O will call when events are received - * @param int $port The port to server sockets on - * @param string $address The address to receive sockets on (0.0.0.0 means receive connections from any) + * @param int $port The port to server sockets on + * @param string $address The address to receive sockets on (0.0.0.0 means receive connections from any) * @return IoServer */ - public static function factory(MessageComponentInterface $component, $port = 80, $address = '0.0.0.0') { - $loop = LoopFactory::create(); + public static function factory(MessageComponentInterface $component, $port = 80, $address = '0.0.0.0') + { + $loop = LoopFactory::create(); $socket = new Reactor($loop); $socket->listen($port, $address); @@ -76,7 +80,8 @@ public static function factory(MessageComponentInterface $component, $port = 80, * Run the application by entering the event loop * @throws \RuntimeException If a loop was not previously specified */ - public function run() { + public function run() + { if (null === $this->loop) { throw new \RuntimeException("A React Loop was not provided during instantiation"); } @@ -90,10 +95,11 @@ public function run() { * Triggered when a new connection is received from React * @param \React\Socket\ConnectionInterface $conn */ - public function handleConnect($conn) { - $conn->decor = new IoConnection($conn); + public function handleConnect($conn) + { + $conn->decor = new IoConnection($conn, $this->loop); - $conn->decor->resourceId = (int)$conn->stream; + $conn->decor->resourceId = (int)$conn->stream; $conn->decor->remoteAddress = $conn->getRemoteAddress(); $this->app->onOpen($conn->decor); @@ -105,10 +111,11 @@ public function handleConnect($conn) { /** * Data has been received from React - * @param string $data + * @param string $data * @param \React\Socket\ConnectionInterface $conn */ - public function handleData($data, $conn) { + public function handleData($data, $conn) + { try { $this->app->onMessage($conn->decor, $data); } catch (\Exception $e) { @@ -120,7 +127,8 @@ public function handleData($data, $conn) { * A connection has been closed by React * @param \React\Socket\ConnectionInterface $conn */ - public function handleEnd($conn) { + public function handleEnd($conn) + { try { $this->app->onClose($conn->decor); } catch (\Exception $e) { @@ -132,10 +140,11 @@ public function handleEnd($conn) { /** * An error has occurred, let the listening application know - * @param \Exception $e + * @param \Exception $e * @param \React\Socket\ConnectionInterface $conn */ - public function handleError(\Exception $e, $conn) { + public function handleError(\Exception $e, $conn) + { $this->app->onError($conn->decor, $e); } } diff --git a/src/Ratchet/WebSocket/Version/RFC6455/Connection.php b/src/Ratchet/WebSocket/Version/RFC6455/Connection.php index b14d0fb7..28f21d2c 100644 --- a/src/Ratchet/WebSocket/Version/RFC6455/Connection.php +++ b/src/Ratchet/WebSocket/Version/RFC6455/Connection.php @@ -2,6 +2,7 @@ namespace Ratchet\WebSocket\Version\RFC6455; use Ratchet\AbstractConnectionDecorator; +use Ratchet\Wamp\Exception; use Ratchet\WebSocket\Version\DataInterface; use React\Promise\Deferred; use React\EventLoop\LibEventLoop; @@ -37,24 +38,28 @@ public function send($msg) * @param null $uniqId A id for identify the ping request (call uniqId function if null) * @return mixed A promise */ - public function ping(LibEventLoop $loop, $timeout, $uniqId = null) + public function ping($timeout, $uniqId = null) { - $connection = &$this; + if (is_null($this->loop)) + throw new \UnexpectedValueException('No loop event in server'); + if (is_null($uniqId)) $uniqId = uniqid('', true); if (!isset($this->WebSocket->pingH)) $this->WebSocket->pingH = []; + $this->WebSocket->pingH[$uniqId] = new \StdClass; $this->WebSocket->pingH[$uniqId]->deferredPong = new \React\Promise\Deferred(); - $loop->addTimer(0.01, function () use (&$connection, $uniqId, $loop, $timeout) { - $connection->WebSocket->pingH[$uniqId]->pongTimer = $loop->addTimer($timeout, function () use (&$connection, $uniqId) { - $deferred = $connection->WebSocket->pingH[$uniqId]; - unset($connection->WebSocket->pingH[$uniqId]); + + $this->loop->addTimer(0.01, function () use ($uniqId, $timeout) { + $this->WebSocket->pingH[$uniqId]->pongTimer = $this->loop->addTimer($timeout, function () use ($uniqId) { + $deferred = $this->WebSocket->pingH[$uniqId]; + unset($this->WebSocket->pingH[$uniqId]); $deferred->deferredPong->reject('no response'); }); - $connection->WebSocket->pingH[$uniqId]->lastPingTimestamp = (new \DateTime())->getTimestamp(); + $this->WebSocket->pingH[$uniqId]->lastPingTimestamp = (new \DateTime())->getTimestamp(); $frame = new Frame($uniqId, true, Frame::OP_PING); - $connection->send($frame); + $this->send($frame); }); return $this->WebSocket->pingH[$uniqId]->deferredPong->promise(); } @@ -68,17 +73,16 @@ public function ping(LibEventLoop $loop, $timeout, $uniqId = null) * @param int $interval Duration between two sending of ping request * @param float $timeout Timeout before the connection is considered as dead */ - public function keepAlive(LibEventLoop $loop, $interval = 5, $timeout = 0.5) + public function keepAlive($interval = 5, $timeout = 0.5) { - $conn = $this; - $loop->addPeriodicTimer($interval, function ($timer) use ($loop, $conn, $timeout) { - $conn->ping($loop, $timeout, 'keepAlive')->then( + $this->loop->addPeriodicTimer($interval, function ($timer) use ($timeout) { + $this->ping($timeout, 'keepAlive')->then( function ($timestamp) { }, - function () use ($conn, $timer) { + function () use ($timer) { $timer->cancel(); - unset($conn->WebSocket->pingH['keepAlive']); - $conn->close(); + unset($this->WebSocket->pingH['keepAlive']); + $this->close(); }); }); }