Skip to content

Commit

Permalink
Add loop in IoConnection
Browse files Browse the repository at this point in the history
Delete of different variable
Note to myself : learn php
  • Loading branch information
Remi committed Dec 12, 2014
1 parent 6da3b04 commit fe96e6b
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 39 deletions.
17 changes: 12 additions & 5 deletions src/Ratchet/Server/IoConnection.php
Original file line number Diff line number Diff line change
@@ -1,29 +1,35 @@
<?php
namespace Ratchet\Server;

use Ratchet\ConnectionInterface;
use React\Socket\ConnectionInterface as ReactConn;
use React\EventLoop\LibEventLoop;

/**
* {@inheritdoc}
*/
class IoConnection implements ConnectionInterface {
class IoConnection implements ConnectionInterface
{
/**
* @var \React\Socket\ConnectionInterface
*/
protected $conn;

public $loop;

/**
* @param \React\Socket\ConnectionInterface $conn
*/
public function __construct(ReactConn $conn) {
public function __construct(ReactConn $conn, LibEventLoop $loop = null)
{
$this->conn = $conn;
$this->loop = $loop;
}

/**
* {@inheritdoc}
*/
public function send($data) {
public function send($data)
{
$this->conn->write($data);

return $this;
Expand All @@ -32,7 +38,8 @@ public function send($data) {
/**
* {@inheritdoc}
*/
public function close() {
public function close()
{
$this->conn->end();
}
}
47 changes: 28 additions & 19 deletions src/Ratchet/Server/IoServer.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?php
namespace Ratchet\Server;

use Ratchet\MessageComponentInterface;
use React\EventLoop\LoopInterface;
use React\Socket\ServerInterface;
Expand All @@ -10,7 +11,8 @@
* Creates an open-ended socket to listen on a port for incoming connections.
* Events are delegated through this to attached applications
*/
class IoServer {
class IoServer
{
/**
* @var \React\EventLoop\LoopInterface
*/
Expand All @@ -34,11 +36,12 @@ class IoServer {
public $socket;

/**
* @param \Ratchet\MessageComponentInterface $app The Ratchet application stack to host
* @param \React\Socket\ServerInterface $socket The React socket server to run the Ratchet application off of
* @param \React\EventLoop\LoopInterface|null $loop The React looper to run the Ratchet application off of
* @param \Ratchet\MessageComponentInterface $app The Ratchet application stack to host
* @param \React\Socket\ServerInterface $socket The React socket server to run the Ratchet application off of
* @param \React\EventLoop\LoopInterface|null $loop The React looper to run the Ratchet application off of
*/
public function __construct(MessageComponentInterface $app, ServerInterface $socket, LoopInterface $loop = null) {
public function __construct(MessageComponentInterface $app, ServerInterface $socket, LoopInterface $loop = null)
{
if (false === strpos(PHP_VERSION, "hiphop")) {
gc_enable();
}
Expand All @@ -47,7 +50,7 @@ public function __construct(MessageComponentInterface $app, ServerInterface $soc
ob_implicit_flush();

$this->loop = $loop;
$this->app = $app;
$this->app = $app;
$this->socket = $socket;

$socket->on('connection', array($this, 'handleConnect'));
Expand All @@ -60,12 +63,13 @@ public function __construct(MessageComponentInterface $app, ServerInterface $soc

/**
* @param \Ratchet\MessageComponentInterface $component The application that I/O will call when events are received
* @param int $port The port to server sockets on
* @param string $address The address to receive sockets on (0.0.0.0 means receive connections from any)
* @param int $port The port to server sockets on
* @param string $address The address to receive sockets on (0.0.0.0 means receive connections from any)
* @return IoServer
*/
public static function factory(MessageComponentInterface $component, $port = 80, $address = '0.0.0.0') {
$loop = LoopFactory::create();
public static function factory(MessageComponentInterface $component, $port = 80, $address = '0.0.0.0')
{
$loop = LoopFactory::create();
$socket = new Reactor($loop);
$socket->listen($port, $address);

Expand All @@ -76,7 +80,8 @@ public static function factory(MessageComponentInterface $component, $port = 80,
* Run the application by entering the event loop
* @throws \RuntimeException If a loop was not previously specified
*/
public function run() {
public function run()
{
if (null === $this->loop) {
throw new \RuntimeException("A React Loop was not provided during instantiation");
}
Expand All @@ -90,10 +95,11 @@ public function run() {
* Triggered when a new connection is received from React
* @param \React\Socket\ConnectionInterface $conn
*/
public function handleConnect($conn) {
$conn->decor = new IoConnection($conn);
public function handleConnect($conn)
{
$conn->decor = new IoConnection($conn, $this->loop);

$conn->decor->resourceId = (int)$conn->stream;
$conn->decor->resourceId = (int)$conn->stream;
$conn->decor->remoteAddress = $conn->getRemoteAddress();

$this->app->onOpen($conn->decor);
Expand All @@ -105,10 +111,11 @@ public function handleConnect($conn) {

/**
* Data has been received from React
* @param string $data
* @param string $data
* @param \React\Socket\ConnectionInterface $conn
*/
public function handleData($data, $conn) {
public function handleData($data, $conn)
{
try {
$this->app->onMessage($conn->decor, $data);
} catch (\Exception $e) {
Expand All @@ -120,7 +127,8 @@ public function handleData($data, $conn) {
* A connection has been closed by React
* @param \React\Socket\ConnectionInterface $conn
*/
public function handleEnd($conn) {
public function handleEnd($conn)
{
try {
$this->app->onClose($conn->decor);
} catch (\Exception $e) {
Expand All @@ -132,10 +140,11 @@ public function handleEnd($conn) {

/**
* An error has occurred, let the listening application know
* @param \Exception $e
* @param \Exception $e
* @param \React\Socket\ConnectionInterface $conn
*/
public function handleError(\Exception $e, $conn) {
public function handleError(\Exception $e, $conn)
{
$this->app->onError($conn->decor, $e);
}
}
34 changes: 19 additions & 15 deletions src/Ratchet/WebSocket/Version/RFC6455/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
namespace Ratchet\WebSocket\Version\RFC6455;

use Ratchet\AbstractConnectionDecorator;
use Ratchet\Wamp\Exception;
use Ratchet\WebSocket\Version\DataInterface;
use React\Promise\Deferred;
use React\EventLoop\LibEventLoop;
Expand Down Expand Up @@ -37,24 +38,28 @@ public function send($msg)
* @param null $uniqId A id for identify the ping request (call uniqId function if null)
* @return mixed A promise
*/
public function ping(LibEventLoop $loop, $timeout, $uniqId = null)
public function ping($timeout, $uniqId = null)
{
$connection = &$this;
if (is_null($this->loop))
throw new \UnexpectedValueException('No loop event in server');

if (is_null($uniqId))
$uniqId = uniqid('', true);
if (!isset($this->WebSocket->pingH))
$this->WebSocket->pingH = [];

$this->WebSocket->pingH[$uniqId] = new \StdClass;
$this->WebSocket->pingH[$uniqId]->deferredPong = new \React\Promise\Deferred();
$loop->addTimer(0.01, function () use (&$connection, $uniqId, $loop, $timeout) {
$connection->WebSocket->pingH[$uniqId]->pongTimer = $loop->addTimer($timeout, function () use (&$connection, $uniqId) {
$deferred = $connection->WebSocket->pingH[$uniqId];
unset($connection->WebSocket->pingH[$uniqId]);

$this->loop->addTimer(0.01, function () use ($uniqId, $timeout) {
$this->WebSocket->pingH[$uniqId]->pongTimer = $this->loop->addTimer($timeout, function () use ($uniqId) {
$deferred = $this->WebSocket->pingH[$uniqId];
unset($this->WebSocket->pingH[$uniqId]);
$deferred->deferredPong->reject('no response');
});
$connection->WebSocket->pingH[$uniqId]->lastPingTimestamp = (new \DateTime())->getTimestamp();
$this->WebSocket->pingH[$uniqId]->lastPingTimestamp = (new \DateTime())->getTimestamp();
$frame = new Frame($uniqId, true, Frame::OP_PING);
$connection->send($frame);
$this->send($frame);
});
return $this->WebSocket->pingH[$uniqId]->deferredPong->promise();
}
Expand All @@ -68,17 +73,16 @@ public function ping(LibEventLoop $loop, $timeout, $uniqId = null)
* @param int $interval Duration between two sending of ping request
* @param float $timeout Timeout before the connection is considered as dead
*/
public function keepAlive(LibEventLoop $loop, $interval = 5, $timeout = 0.5)
public function keepAlive($interval = 5, $timeout = 0.5)
{
$conn = $this;
$loop->addPeriodicTimer($interval, function ($timer) use ($loop, $conn, $timeout) {
$conn->ping($loop, $timeout, 'keepAlive')->then(
$this->loop->addPeriodicTimer($interval, function ($timer) use ($timeout) {
$this->ping($timeout, 'keepAlive')->then(
function ($timestamp) {
},
function () use ($conn, $timer) {
function () use ($timer) {
$timer->cancel();
unset($conn->WebSocket->pingH['keepAlive']);
$conn->close();
unset($this->WebSocket->pingH['keepAlive']);
$this->close();
});
});
}
Expand Down

2 comments on commit fe96e6b

@arunpoudel
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any timeline on this merge?

@cboden
Copy link
Member

@cboden cboden commented on fe96e6b Jun 18, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KeepAlive will be introduced in v0.4

Please sign in to comment.