Skip to content

Commit

Permalink
Make use of Guzzle Pool to improve efficiency (#401)
Browse files Browse the repository at this point in the history
* Make use of Guzzle Pool to improve efficiency

* Fix documentation: add concurrency parameter where needed

* fix checks: add a comma for php-cs-fixer

* Add tests for flushPooled

* refactor: rename concurrency parameter to requestConcurrency

* readme: mention pooling for scaling
  • Loading branch information
Gugu7264 authored Jun 18, 2024
1 parent 2245c8d commit b33c70d
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 9 deletions.
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ $notifications = [
'payload' => '{"message":"Hello World!"}',
], [
// current PushSubscription format (browsers might change this in the future)
'subscription' => Subscription::create([
'subscription' => Subscription::create([
"endpoint" => "https://example.com/other/endpoint/of/another/vendor/abcdef...",
"keys" => [
'p256dh' => '(stringOf88Chars)',
Expand Down Expand Up @@ -253,18 +253,18 @@ foreach ($webPush->flush() as $report) {
echo "[v] Message sent successfully for subscription {$endpoint}.";
} else {
echo "[x] Message failed to sent for subscription {$endpoint}: {$report->getReason()}";

// also available (to get more info)

/** @var \Psr\Http\Message\RequestInterface $requestToPushService */
$requestToPushService = $report->getRequest();

/** @var \Psr\Http\Message\ResponseInterface $responseOfPushService */
$responseOfPushService = $report->getResponse();

/** @var string $failReason */
$failReason = $report->getReason();

/** @var bool $isTheEndpointWrongOrExpired */
$isTheEndpointWrongOrExpired = $report->isSubscriptionExpired();
}
Expand Down Expand Up @@ -364,6 +364,7 @@ Here are some ideas:
1. Make sure MultiCurl is available on your server
2. Find the right balance for your needs between security and performance (see above)
3. Find the right batch size (set it in `defaultOptions` or as parameter to `flush()`)
4. Use `flushPooled()` instead of `flush()`. The former uses concurrent requests, accelerating the process and often doubling the speed of the requests.

### How to solve "SSL certificate problem: unable to get local issuer certificate"?

Expand Down
61 changes: 58 additions & 3 deletions src/WebPush.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace Minishlink\WebPush;

use GuzzleHttp\Client;
use GuzzleHttp\Pool;
use GuzzleHttp\Exception\RequestException;
use GuzzleHttp\Psr7\Request;
use ParagonIE\ConstantTime\Base64UrlSafe;
Expand All @@ -30,7 +31,7 @@ class WebPush
protected ?array $notifications = null;

/**
* @var array Default options: TTL, urgency, topic, batchSize
* @var array Default options: TTL, urgency, topic, batchSize, requestConcurrency
*/
protected array $defaultOptions;

Expand All @@ -53,7 +54,7 @@ class WebPush
* WebPush constructor.
*
* @param array $auth Some servers need authentication
* @param array $defaultOptions TTL, urgency, topic, batchSize
* @param array $defaultOptions TTL, urgency, topic, batchSize, requestConcurrency
* @param int|null $timeout Timeout of POST request
*
* @throws \ErrorException
Expand Down Expand Up @@ -175,6 +176,58 @@ public function flush(?int $batchSize = null): \Generator
}
}

/**
* Flush notifications. Triggers concurrent requests.
*
* @param callable(MessageSentReport): void $callback Callback for each notification
* @param null|int $batchSize Defaults the value defined in defaultOptions during instantiation (which defaults to 1000).
* @param null|int $requestConcurrency Defaults the value defined in defaultOptions during instantiation (which defaults to 100).
*/
public function flushPooled($callback, ?int $batchSize = null, ?int $requestConcurrency = null): void
{
if (empty($this->notifications)) {
return;
}

if (null === $batchSize) {
$batchSize = $this->defaultOptions['batchSize'];
}

if (null === $requestConcurrency) {
$requestConcurrency = $this->defaultOptions['requestConcurrency'];
}

$batches = array_chunk($this->notifications, $batchSize);
$this->notifications = [];

foreach ($batches as $batch) {
$batch = $this->prepare($batch);
$pool = new Pool($this->client, $batch, [
'requestConcurrency' => $requestConcurrency,
'fulfilled' => function (ResponseInterface $response, int $index) use ($callback, $batch) {
/** @var \Psr\Http\Message\RequestInterface $request **/
$request = $batch[$index];
$callback(new MessageSentReport($request, $response));
},
'rejected' => function (RequestException $reason) use ($callback) {
if (method_exists($reason, 'getResponse')) {
$response = $reason->getResponse();
} else {
$response = null;
}
$callback(new MessageSentReport($reason->getRequest(), $response, false, $reason->getMessage()));
},
]);

$promise = $pool->promise();
$promise->wait();
}

if ($this->reuseVAPIDHeaders) {
$this->vapidHeaders = [];
}
}

/**
* @throws \ErrorException|\Random\RandomException
*/
Expand Down Expand Up @@ -315,14 +368,16 @@ public function getDefaultOptions(): array
}

/**
* @param array $defaultOptions Keys 'TTL' (Time To Live, defaults 4 weeks), 'urgency', 'topic', 'batchSize'
* @param array $defaultOptions Keys 'TTL' (Time To Live, defaults 4 weeks), 'urgency', 'topic', 'batchSize', 'requestConcurrency'
*/
public function setDefaultOptions(array $defaultOptions): WebPush
{
$this->defaultOptions['TTL'] = $defaultOptions['TTL'] ?? 2419200;
$this->defaultOptions['urgency'] = $defaultOptions['urgency'] ?? null;
$this->defaultOptions['topic'] = $defaultOptions['topic'] ?? null;
$this->defaultOptions['batchSize'] = $defaultOptions['batchSize'] ?? 1000;
$this->defaultOptions['requestConcurrency'] = $defaultOptions['requestConcurrency'] ?? 100;


return $this;
}
Expand Down
40 changes: 40 additions & 0 deletions tests/WebPushTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,46 @@ public function testFlush(): void
}
}

/**
* @throws \ErrorException
* @throws \JsonException
*/
public function testFlushPooled(): void
{
$subscription = new Subscription(self::$endpoints['standard']);

$report = $this->webPush->sendOneNotification($subscription);
$this->assertFalse($report->isSuccess()); // it doesn't have VAPID

// queue has been reset
$this->assertEmpty(iterator_to_array($this->webPush->flush()));

$report = $this->webPush->sendOneNotification($subscription);
$this->assertFalse($report->isSuccess()); // it doesn't have VAPID

$nonExistentSubscription = Subscription::create([
'endpoint' => 'https://fcm.googleapis.com/fcm/send/fCd2-8nXJhU:APA91bGi2uaqFXGft4qdolwyRUcUPCL1XV_jWy1tpCRqnu4sk7ojUpC5gnq1PTncbCdMq9RCVQIIFIU9BjzScvjrDqpsI7J-K_3xYW8xo1xSNCfge1RvJ6Xs8RGL_Sw7JtbCyG1_EVgWDc22on1r_jozD8vsFbB0Fg',
'publicKey' => 'BME-1ZSAv2AyGjENQTzrXDj6vSnhAIdKso4n3NDY0lsd1DUgEzBw7ARMKjrYAm7JmJBPsilV5CWNH0mVPyJEt0Q',
'authToken' => 'hUIGbmiypj9_EQea8AnCKA',
'contentEncoding' => 'aes128gcm',
]);

// test multiple requests
$this->webPush->queueNotification($nonExistentSubscription, json_encode(['test' => 1], JSON_THROW_ON_ERROR));
$this->webPush->queueNotification($nonExistentSubscription, json_encode(['test' => 2], JSON_THROW_ON_ERROR));
$this->webPush->queueNotification($nonExistentSubscription, json_encode(['test' => 3], JSON_THROW_ON_ERROR));

$callback = function ($report) {
$this->assertFalse($report->isSuccess());
$this->assertTrue($report->isSubscriptionExpired());
$this->assertEquals(410, $report->getResponse()->getStatusCode());
$this->assertNotEmpty($report->getReason());
$this->assertNotFalse(filter_var($report->getEndpoint(), FILTER_VALIDATE_URL));
};

$this->webPush->flushPooled($callback);
}

public function testFlushEmpty(): void
{
$this->assertEmpty(iterator_to_array($this->webPush->flush(300)));
Expand Down

0 comments on commit b33c70d

Please sign in to comment.