Skip to content

Commit

Permalink
Add ping and keepAlive function in connection instance
Browse files Browse the repository at this point in the history
Add action to pong response
Add Promise require in composer.json
  • Loading branch information
Remi committed Dec 12, 2014
1 parent 7bb7b06 commit 6da3b04
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 67 deletions.
62 changes: 34 additions & 28 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,32 +1,38 @@
{
"name": "cboden/ratchet"
, "type": "library"
, "description": "PHP WebSocket library"
, "keywords": ["WebSockets", "Server", "Ratchet", "Sockets"]
, "homepage": "http://socketo.me"
, "license": "MIT"
, "authors": [
{
"name": "Chris Boden"
, "email": "[email protected]"
, "role": "Developer"
}
]
, "support": {
"forum": "https://groups.google.com/forum/#!forum/ratchet-php"
, "issues": "https://github.com/ratchetphp/Ratchet/issues"
, "irc": "irc://irc.freenode.org/reactphp"
"name": "cboden/ratchet",
"type": "library",
"description": "PHP WebSocket library",
"keywords": [
"WebSockets",
"Server",
"Ratchet",
"Sockets"
],
"homepage": "http://socketo.me",
"license": "MIT",
"authors": [
{
"name": "Chris Boden",
"email": "[email protected]",
"role": "Developer"
}
, "autoload": {
"psr-0": {
"Ratchet": "src"
}
}
, "require": {
"php": ">=5.3.9"
, "react/socket": "0.3.*|0.4.*"
, "guzzle/http": "~3.6"
, "symfony/http-foundation": "~2.2"
, "symfony/routing": "~2.2"
],
"support": {
"forum": "https://groups.google.com/forum/#!forum/ratchet-php",
"issues": "https://github.com/ratchetphp/Ratchet/issues",
"irc": "irc://irc.freenode.org/reactphp"
},
"autoload": {
"psr-0": {
"Ratchet": "src"
}
},
"require": {
"php": ">=5.3.9",
"react/socket": "0.3.*|0.4.*",
"react/promise": ">=1.0,<2.0",
"guzzle/http": "~3.6",
"symfony/http-foundation": "~2.2",
"symfony/routing": "~2.2"
}
}
83 changes: 52 additions & 31 deletions src/Ratchet/WebSocket/Version/RFC6455.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?php
namespace Ratchet\WebSocket\Version;

use Ratchet\ConnectionInterface;
use Ratchet\MessageInterface;
use Ratchet\WebSocket\Version\RFC6455\HandshakeVerifier;
Expand All @@ -16,7 +17,8 @@
* @link http://tools.ietf.org/html/rfc6455
* @todo Unicode: return mb_convert_encoding(pack("N",$u), mb_internal_encoding(), 'UCS-4BE');
*/
class RFC6455 implements VersionInterface {
class RFC6455 implements VersionInterface
{
const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';

/**
Expand All @@ -35,7 +37,8 @@ class RFC6455 implements VersionInterface {
*/
protected $validator;

public function __construct(ValidatorInterface $validator = null) {
public function __construct(ValidatorInterface $validator = null)
{
$this->_verifier = new HandshakeVerifier;
$this->setCloseCodes();

Expand All @@ -49,7 +52,8 @@ public function __construct(ValidatorInterface $validator = null) {
/**
* {@inheritdoc}
*/
public function isProtocol(RequestInterface $request) {
public function isProtocol(RequestInterface $request)
{
$version = (int)(string)$request->getHeader('Sec-WebSocket-Version');

return ($this->getVersionNumber() === $version);
Expand All @@ -58,31 +62,34 @@ public function isProtocol(RequestInterface $request) {
/**
* {@inheritdoc}
*/
public function getVersionNumber() {
public function getVersionNumber()
{
return 13;
}

/**
* {@inheritdoc}
*/
public function handshake(RequestInterface $request) {
public function handshake(RequestInterface $request)
{
if (true !== $this->_verifier->verifyAll($request)) {
return new Response(400);
}

return new Response(101, array(
'Upgrade' => 'websocket'
, 'Connection' => 'Upgrade'
, 'Sec-WebSocket-Accept' => $this->sign((string)$request->getHeader('Sec-WebSocket-Key'))
'Upgrade' => 'websocket'
, 'Connection' => 'Upgrade'
, 'Sec-WebSocket-Accept' => $this->sign((string)$request->getHeader('Sec-WebSocket-Key'))
));
}

/**
* @param \Ratchet\ConnectionInterface $conn
* @param \Ratchet\MessageInterface $coalescedCallback
* @param \Ratchet\MessageInterface $coalescedCallback
* @return \Ratchet\WebSocket\Version\RFC6455\Connection
*/
public function upgradeConnection(ConnectionInterface $conn, MessageInterface $coalescedCallback) {
public function upgradeConnection(ConnectionInterface $conn, MessageInterface $coalescedCallback)
{
$upgraded = new Connection($conn);

if (!isset($upgraded->WebSocket)) {
Expand All @@ -96,9 +103,10 @@ public function upgradeConnection(ConnectionInterface $conn, MessageInterface $c

/**
* @param \Ratchet\WebSocket\Version\RFC6455\Connection $from
* @param string $data
* @param string $data
*/
public function onMessage(ConnectionInterface $from, $data) {
public function onMessage(ConnectionInterface $from, $data)
{
$overflow = '';

if (!isset($from->WebSocket->message)) {
Expand Down Expand Up @@ -155,15 +163,23 @@ public function onMessage(ConnectionInterface $from, $data) {
}

return $from->close($frame);
break;
break;
case $frame::OP_PING:
$from->send($this->newFrame($frame->getPayload(), true, $frame::OP_PONG));
break;
break;
case $frame::OP_PONG:
break;
$uniqId = $frame->getPayload();
if ($from->WebSocket->pingH[$uniqId]->deferredPong !== null) {
$from->WebSocket->pingH[$uniqId]->pongTimer->cancel();
$pongResponseDiff = (new \DateTime())->getTimestamp() - $from->WebSocket->pingH[$uniqId]->lastPingTimestamp;
$deferred = $from->WebSocket->pingH[$uniqId]->deferredPong;
unset($from->WebSocket->pingH[$uniqId]);
$deferred->resolve($pongResponseDiff);
}
break;
default:
return $from->close($frame::CLOSE_PROTOCOL);
break;
break;
}

$overflow = $from->WebSocket->frame->extractOverflow();
Expand Down Expand Up @@ -210,17 +226,19 @@ public function onMessage(ConnectionInterface $from, $data) {
/**
* @return RFC6455\Message
*/
public function newMessage() {
public function newMessage()
{
return new Message;
}

/**
* @param string|null $payload
* @param bool|null $final
* @param int|null $opcode
* @param bool|null $final
* @param int|null $opcode
* @return RFC6455\Frame
*/
public function newFrame($payload = null, $final = null, $opcode = null) {
public function newFrame($payload = null, $final = null, $opcode = null)
{
return new Frame($payload, $final, $opcode);
}

Expand All @@ -230,7 +248,8 @@ public function newFrame($payload = null, $final = null, $opcode = null) {
* @return string
* @internal
*/
public function sign($key) {
public function sign($key)
{
return base64_encode(sha1($key . static::GUID, true));
}

Expand All @@ -239,7 +258,8 @@ public function sign($key) {
* @param int|string
* @return bool
*/
public function isValidCloseCode($val) {
public function isValidCloseCode($val)
{
if (array_key_exists($val, $this->closeCodes)) {
return true;
}
Expand All @@ -254,18 +274,19 @@ public function isValidCloseCode($val) {
/**
* Creates a private lookup of valid, private close codes
*/
protected function setCloseCodes() {
$this->closeCodes[Frame::CLOSE_NORMAL] = true;
$this->closeCodes[Frame::CLOSE_GOING_AWAY] = true;
$this->closeCodes[Frame::CLOSE_PROTOCOL] = true;
$this->closeCodes[Frame::CLOSE_BAD_DATA] = true;
protected function setCloseCodes()
{
$this->closeCodes[Frame::CLOSE_NORMAL] = true;
$this->closeCodes[Frame::CLOSE_GOING_AWAY] = true;
$this->closeCodes[Frame::CLOSE_PROTOCOL] = true;
$this->closeCodes[Frame::CLOSE_BAD_DATA] = true;
//$this->closeCodes[Frame::CLOSE_NO_STATUS] = true;
//$this->closeCodes[Frame::CLOSE_ABNORMAL] = true;
$this->closeCodes[Frame::CLOSE_BAD_PAYLOAD] = true;
$this->closeCodes[Frame::CLOSE_POLICY] = true;
$this->closeCodes[Frame::CLOSE_TOO_BIG] = true;
$this->closeCodes[Frame::CLOSE_MAND_EXT] = true;
$this->closeCodes[Frame::CLOSE_SRV_ERR] = true;
$this->closeCodes[Frame::CLOSE_POLICY] = true;
$this->closeCodes[Frame::CLOSE_TOO_BIG] = true;
$this->closeCodes[Frame::CLOSE_MAND_EXT] = true;
$this->closeCodes[Frame::CLOSE_SRV_ERR] = true;
//$this->closeCodes[Frame::CLOSE_TLS] = true;
}
}
68 changes: 65 additions & 3 deletions src/Ratchet/WebSocket/Version/RFC6455/Connection.php
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
<?php
namespace Ratchet\WebSocket\Version\RFC6455;

use Ratchet\AbstractConnectionDecorator;
use Ratchet\WebSocket\Version\DataInterface;
use React\Promise\Deferred;
use React\EventLoop\LibEventLoop;

/**
* {@inheritdoc}
* @property \StdClass $WebSocket
*/
class Connection extends AbstractConnectionDecorator {
class Connection extends AbstractConnectionDecorator
{
/**
* {@inheritdoc}
*/
public function send($msg) {
public function send($msg)
{
if (!$this->WebSocket->closing) {
if (!($msg instanceof DataInterface)) {
$msg = new Frame($msg);
Expand All @@ -23,10 +28,67 @@ public function send($msg) {
return $this;
}


/**
* Send a ping frame to the connection and return a promise
*
* @param LibEventLoop $loop The loop for timeout managment
* @param $timeout Timeout before the connection is considered as dead
* @param null $uniqId A id for identify the ping request (call uniqId function if null)
* @return mixed A promise
*/
public function ping(LibEventLoop $loop, $timeout, $uniqId = null)
{
$connection = &$this;
if (is_null($uniqId))
$uniqId = uniqid('', true);
if (!isset($this->WebSocket->pingH))
$this->WebSocket->pingH = [];
$this->WebSocket->pingH[$uniqId] = new \StdClass;
$this->WebSocket->pingH[$uniqId]->deferredPong = new \React\Promise\Deferred();
$loop->addTimer(0.01, function () use (&$connection, $uniqId, $loop, $timeout) {
$connection->WebSocket->pingH[$uniqId]->pongTimer = $loop->addTimer($timeout, function () use (&$connection, $uniqId) {
$deferred = $connection->WebSocket->pingH[$uniqId];
unset($connection->WebSocket->pingH[$uniqId]);
$deferred->deferredPong->reject('no response');
});
$connection->WebSocket->pingH[$uniqId]->lastPingTimestamp = (new \DateTime())->getTimestamp();
$frame = new Frame($uniqId, true, Frame::OP_PING);
$connection->send($frame);
});
return $this->WebSocket->pingH[$uniqId]->deferredPong->promise();
}

/**
*
* Check if the connection is up by a periodic send of ping request to the connection.
* Call close if the ping fail.
*
* @param LibEventLoop $loop
* @param int $interval Duration between two sending of ping request
* @param float $timeout Timeout before the connection is considered as dead
*/
public function keepAlive(LibEventLoop $loop, $interval = 5, $timeout = 0.5)
{
$conn = $this;
$loop->addPeriodicTimer($interval, function ($timer) use ($loop, $conn, $timeout) {
$conn->ping($loop, $timeout, 'keepAlive')->then(
function ($timestamp) {
},
function () use ($conn, $timer) {
$timer->cancel();
unset($conn->WebSocket->pingH['keepAlive']);
$conn->close();
});
});
}


/**
* {@inheritdoc}
*/
public function close($code = 1000) {
public function close($code = 1000)
{
if ($this->WebSocket->closing) {
return;
}
Expand Down
8 changes: 3 additions & 5 deletions tests/unit/Session/SessionComponentTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,10 @@ public function testConnectionValueFromPdo() {

$pdo = new \PDO("sqlite::memory:");
$pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);
$pdo->exec(vsprintf("CREATE TABLE %s (%s TEXT NOT NULL PRIMARY KEY, %s BLOB NOT NULL, %s INTEGER NOT NULL, %s INTEGER)", $dbOptions));
$pdo->exec(vsprintf("CREATE TABLE %s (%s VARCHAR(255) PRIMARY KEY, %s TEXT, %s INTEGER)", $dbOptions));
$pdo->prepare(vsprintf("INSERT INTO %s (%s, %s, %s) VALUES (?, ?, ?)", $dbOptions))->execute(array($sessionId, base64_encode('_sf2_attributes|a:2:{s:5:"hello";s:5:"world";s:4:"last";i:1332872102;}_sf2_flashes|a:0:{}'), time()));

$pdoHandler = new PdoSessionHandler($pdo, $dbOptions);
$pdoHandler->write($sessionId, '_sf2_attributes|a:2:{s:5:"hello";s:5:"world";s:4:"last";i:1332872102;}_sf2_flashes|a:0:{}');

$component = new SessionProvider($this->getMock('Ratchet\\MessageComponentInterface'), $pdoHandler, array('auto_start' => 1));
$component = new SessionProvider($this->getMock('Ratchet\\MessageComponentInterface'), new PdoSessionHandler($pdo, $dbOptions), array('auto_start' => 1));
$connection = $this->getMock('Ratchet\\ConnectionInterface');

$headers = $this->getMock('Guzzle\\Http\\Message\\Request', array('getCookie'), array('POST', '/', array()));
Expand Down

1 comment on commit 6da3b04

@half2me
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I have to call these ping functions manually, or is there some parameter I can turn on, and have this auto-scheduled in the loop?

Please sign in to comment.