Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection: keep-alive #22

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
1 change: 0 additions & 1 deletion src/main/php/peer/http/Authorizations.class.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
<?php namespace peer\http;

use lang\Object;
use lang\XPClass;
use lang\reflect\TargetInvocationException;
use util\Secret;
Expand Down
102 changes: 102 additions & 0 deletions src/main/php/peer/http/Channel.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
<?php namespace peer\http;

use peer\SocketException;

/**
* Channel manages I/O between client and server, implementing keep-alive
*
* @see https://en.wikipedia.org/wiki/HTTP_persistent_connection
* @see https://tools.ietf.org/html/rfc7230#section-6.3
* @test xp://peer.http.unittest.ChannelTest
*/
class Channel implements \io\streams\InputStream {
private $socket;
private $reuseable= null;

/** @param peer.Socket */
public function __construct($socket) {
$this->socket= $socket;
}

/** @return peer.Socket */
public function socket() { return $this->socket; }

/**
* Rebinds to a new socket, closing the existing one if necessary
*
* @param peer.Socket
* @return void
*/
public function bind($socket) {
if ($this->socket->isConnected()) {
$this->socket->close();
}
$this->socket= $socket;
}

/**
* Disconnect (if necessary)
*
* @return void
*/
public function disconnect() {
$this->socket->isConnected() && $this->socket->close();
}

/**
* Sends a request and returns the response
*
* @param peer.http.HttpRequest $request
* @param float $connectTimeout
* @param float $readTimeout
* @return peer.http.HttpResponse
*/
public function send($request, $connectTimeout= 2.0, $readTimeout= 60.0) {

// Previous call didn't finish reading all data, connection is not reusable
if (false === $this->reuseable) {
$this->socket->close();
}

do {
if ($this->socket->isConnected()) {
$reused= true;
} else {
$reused= false;
$this->socket->setTimeout($readTimeout);
$this->socket->connect($connectTimeout);
}

$this->socket->write($request->getRequestString());
$this->reuseable= false;
$input= new HttpInputStream($this, function() { $this->reuseable= true; });

// If we reused the connection and we receive EOF, disconnect & retry
$line= $input->readLine();
if ($this->socket->eof()) {
if (!$reused) throw new SocketException('Received EOF (timeout: '.$readTimeout.' seconds)');
$this->socket->close();
continue;
}

// Success
$input->pushBack($line."\r\n");
return new HttpResponse($input, true);
} while (true);
}

/** @return int */
public function available() {
return $this->socket->eof() ? 0 : 1;
}

/** @return string */
public function read($limit= 8192) {
return $this->socket->readBinary($limit);
}

/** @return void */
public function close() {
// NOOP
}
}
8 changes: 7 additions & 1 deletion src/main/php/peer/http/HttpConnection.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use peer\URL;
use util\log\Traceable;
use lang\Closeable;

/**
* HTTP connection
Expand All @@ -26,7 +27,7 @@
* @see rfc://2616
* @test xp://net.xp_framework.unittest.peer.HttpTest
*/
class HttpConnection implements Traceable {
class HttpConnection implements Traceable, Closeable {
protected
$url = null,
$transport = null,
Expand Down Expand Up @@ -260,4 +261,9 @@ public function trace($arg= null, $headers= []) {
public function setTrace($cat) {
$this->transport->setTrace($cat);
}

/** @return void */
public function close() {
$this->transport->close();
}
}
114 changes: 62 additions & 52 deletions src/main/php/peer/http/HttpInputStream.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,74 +8,84 @@
* @test xp://peer.http.unittest.HttpInputStreamTest
*/
class HttpInputStream implements InputStream {
protected
$response = null,
$buffer = '',
$available = 0;

private $stream;
private $buffer= '';

/**
* Constructor
*
* @param peer.http.HttpResponse $response
* @param io.streams.InputStream
* @param callable $consumed Optional callback when stream is completely consumed
*/
public function __construct(HttpResponse $response) {
$this->response= $response;
public function __construct(InputStream $stream, $consumed= null) {
$this->stream= $stream;
$this->consumed= $consumed;
}

/**
* Buffer a chunk if necessary
* Put given bytes back into read buffer
*
* @return int available
* @param string $bytes
* @return void
*/
protected function buffer() {
if (($l= strlen($this->buffer)) > 0) return $l;
if (false === ($chunk= $this->response->readData(8192, true))) {
$this->available= -1;
return 0;
public function pushBack($bytes) {
$this->buffer= $bytes.$this->buffer;
}

/** @param callable $consumed */
public function callback($consumed) {
$this->consumed= $consumed;
}

/** @return void */
public function consumed() {
if ($f= $this->consumed) $f();
}

/** @return bool */
public function available() {
if ('' === $this->buffer) {
return $this->stream->available();
} else {
$this->buffer.= $chunk;
$this->available= strlen($this->buffer);
return $this->available;
return strlen($this->buffer);
}
}

/**
* Read a string
*
* @param int $limit default 8192
* @return string
*/
/** @return string */
public function read($limit= 8192) {
if (-1 === $this->available) return null; // At end
$this->buffer();
$b= substr($this->buffer, 0, $limit);
$this->buffer= substr($this->buffer, $limit);
return $b;
if (null === $this->buffer) {
return null; // EOF
} else if ('' === $this->buffer) {
$chunk= $this->stream->read($limit);
return '' == $chunk ? null : $chunk;
} else {
$return= substr($this->buffer, 0, $limit);
$this->buffer= (string)substr($this->buffer, $limit);
return $return;
}
}

/**
* Returns the number of bytes that can be read from this stream
* without blocking.
*
* @return int
*/
public function available() {
return (-1 === $this->available) ? 0 : $this->buffer();
}
/** @return string */
public function readLine() {
if (null === $this->buffer) return null; // EOF

/**
* Close this buffer
*/
public function close() {
$this->response->closeStream();
while (false === ($p= strpos($this->buffer, "\r\n"))) {
$chunk= $this->stream->read();
if ('' == $chunk) {
$return= $this->buffer;
$this->buffer= null;
return $return;
}
$this->buffer.= $chunk;
}

$return= substr($this->buffer, 0, $p);
$this->buffer= substr($this->buffer, $p + 2);
return $return;
}

/**
* Creates a string representation of this Http
*
* @return string
*/
public function toString() {
return nameof($this).'<'.$this->response->toString().'>';
/** @return voud */
public function close() {
$this->stream->close();
}
}
}
Loading