Skip to content

Commit

Permalink
Fixes for extended Connect
Browse files Browse the repository at this point in the history
  • Loading branch information
bwoebi committed Feb 13, 2024
1 parent e0e2bc1 commit 6dd6e27
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
16 changes: 10 additions & 6 deletions src/Driver/Http2Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,12 @@ public function handleClient(
$this->readableStream = $readableStream;
$this->writableStream = $writableStream;

self::getTimeoutQueue()->insert($this->client, 0, fn () => $this->shutdown(
new ClientException($this->client, 'Shutting down connection due to inactivity'),
), $this->streamTimeout);
self::getTimeoutQueue()->insert($this->client, 0, function () {
if ($this->streams) {
return; // This can only happen with live upgraded connections
}
$this->shutdown(new ClientException($this->client, 'Shutting down connection due to inactivity'));
}, $this->streamTimeout);

$this->processClientInput();
}
Expand Down Expand Up @@ -332,7 +335,7 @@ private function send(int $id, Response $response, Request $request, Cancellatio
if ($request->getMethod() === "CONNECT") {
$status = $response->getStatus();
if ($status >= 200 && $status <= 299) {
$this->upgrade($request, $response);
$this->upgrade($request, $response, $id);
}
}
}
Expand Down Expand Up @@ -736,7 +739,7 @@ private function readPreface(): string
// Initial settings frame, delayed until after the preface is read for non-upgraded connections.
$this->writeFrame(
\pack(
"nNnNnNnN",
"nNnNnNnNnN",
Http2Parser::INITIAL_WINDOW_SIZE,
self::DEFAULT_WINDOW_SIZE,
Http2Parser::MAX_CONCURRENT_STREAMS,
Expand Down Expand Up @@ -1150,7 +1153,7 @@ function (int $bodySize) use ($streamId) {
/**
* Invokes the upgrade handler of the Response with the socket upgraded from the HTTP server.
*/
private function upgrade(Request $request, Response $response): void
private function upgrade(Request $request, Response $response, int $id): void
{
$upgradeHandler = $response->getUpgradeHandler();
if (!$upgradeHandler) {
Expand Down Expand Up @@ -1183,6 +1186,7 @@ private function upgrade(Request $request, Response $response): void

$response->removeTrailers();
$response->setBody($outputPipe->getSource());
self::getTimeoutQueue()->remove($client, $id);
}

public function handleData(int $streamId, string $data): void
Expand Down
8 changes: 5 additions & 3 deletions src/Driver/Http3Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ protected function write(Request $request, Response $response): void
/**
* Invokes the upgrade handler of the Response with the socket upgraded from the HTTP server.
*/
private function upgrade(QuicSocket $socket, Request $request, Response $response): void
private function upgrade(QuicSocket $stream, Request $request, Response $response): void
{
$upgradeHandler = $response->getUpgradeHandler();
if (!$upgradeHandler) {
Expand All @@ -210,7 +210,7 @@ private function upgrade(QuicSocket $socket, Request $request, Response $respons
$outputPipe = new Pipe(0);

$settings = $this->parsedSettings->getFuture()->await();
$datagramStream = empty($settings[Http3Settings::H3_DATAGRAM->value]) ? null : new Http3DatagramStream($this->parser->receiveDatagram(...), $this->writer->writeDatagram(...), $this->writer->maxDatagramSize(...), $socket);
$datagramStream = empty($settings[Http3Settings::H3_DATAGRAM->value]) ? null : new Http3DatagramStream($this->parser->receiveDatagram(...), $this->writer->writeDatagram(...), $this->writer->maxDatagramSize(...), $stream);

$upgraded = new UpgradedSocket($client, $inputStream, $outputPipe->getSink(), $datagramStream);

Expand All @@ -224,11 +224,13 @@ private function upgrade(QuicSocket $socket, Request $request, Response $respons
['exception' => $exception]
);

$socket->resetSending(Http3Error::H3_INTERNAL_ERROR->value);
$stream->resetSending(Http3Error::H3_INTERNAL_ERROR->value);
}

$response->removeTrailers();
$response->setBody($outputPipe->getSource());

self::getTimeoutQueue()->remove($this->client, $stream->getId());
}

public function getApplicationLayerProtocols(): array
Expand Down
4 changes: 3 additions & 1 deletion src/Driver/Internal/TimeoutQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ private function makeId(Client $client, int $streamId): string
public function update(Client $client, int $streamId, int $timeout): void
{
$cacheId = $this->makeId($client, $streamId);
\assert(isset($this->callbacks[$cacheId]));
if (!isset($this->callbacks[$cacheId])) {
return; // In case streams are upgraded
}

$this->timeoutCache->update($cacheId, $this->now + $timeout);
}
Expand Down

0 comments on commit 6dd6e27

Please sign in to comment.