Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stomp hotfix 240517 #37

Merged
merged 3 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ topic::queue1;topic::queue2 <-- will read from queue1 and queue2 on the topic

Subscribing with client acknowledgement option (ENV variables):

STOMP_CONSUMER_WIN_SIZE=1024 // number of bytes that Broker will send to client before it expects ACK
```
STOMP_CONSUMER_WIN_SIZE=819200 // number of bytes that Broker will send to client before it expects ACK
STOMP_CONSUMER_ACK_MODE=client // mode: client (ACK needs to be sent) | auto (no ACK, and window-size has to be -1 in that case)

```

You can see all other available ``.env`` variables, their defaults and usage explanation within
the [config file](config/stomp.php).
Expand Down
2 changes: 1 addition & 1 deletion config/stomp.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
* Once that happens the client will not receive any more messages until it sends the appropriate ACK or NACK
* frame for the messages it already has.
*/
'consumer_window_size' => env('STOMP_CONSUMER_WIN_SIZE', 1048576),
'consumer_window_size' => env('STOMP_CONSUMER_WIN_SIZE', 819200),

/**
* Subscribe mode: auto, client.
Expand Down
34 changes: 28 additions & 6 deletions src/Queue/Jobs/StompJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Asseco\Stomp\Queue\Stomp\Config;
use Asseco\Stomp\Queue\StompQueue;
use Illuminate\Broadcasting\BroadcastEvent;
use Illuminate\Container\Container;
use Illuminate\Contracts\Queue\Job as JobContract;
use Illuminate\Queue\Jobs\Job;
Expand Down Expand Up @@ -112,13 +113,36 @@ public function fire()

protected function isNativeLaravelJob(): bool
{
return array_key_exists('job', $this->payload);
$job = Arr::get($this->payload, 'job');

return $job && str_contains($job, 'CallQueuedHandler@call');
}

protected function laravelJobClassExists(): bool
{
$eventClassName = Arr::get($this->payload, 'displayName');
if ($eventClassName) {
return class_exists($eventClassName);
} else {
$command = Arr::get($this->payload, 'data.command');
$command = $command ?? unserialize($command);
/** @var BroadcastEvent $command */
if ($command & $command->event && class_exists(get_class($command->event))) {
return true;
}
}

return false;
}

protected function fireLaravelJob(): void
{
[$class, $method] = JobName::parse($this->payload['job']);
($this->instance = $this->resolve($class))->{$method}($this, $this->payload['data']);
if ($this->laravelJobClassExists()) {
[$class, $method] = JobName::parse($this->payload['job']);
($this->instance = $this->resolve($class))->{$method}($this, $this->payload['data']);
} else {
$this->log->error("$this->session [STOMP] Laravel job class does not exist!");
}
}

protected function fireExternalJob(): void
Expand Down Expand Up @@ -241,8 +265,6 @@ protected function failed($e)

protected function ackIfNecessary()
{
if (Config::get('consumer_ack_mode') == StompQueue::ACK_MODE_CLIENT && $this->frame) {
$this->stompQueue->client->ack($this->frame);
}
$this->stompQueue->ackLastFrameIfNecessary();
}
}
7 changes: 5 additions & 2 deletions src/Queue/StompQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,13 @@ public function pop($queue = null)

if (!$queueFromFrame) {
$this->log->error("$this->session [STOMP] Wrong frame received. Expected MESSAGE, got: " . print_r($frame, true));
$this->_lastFrame = null;

return null;
}

$this->_lastFrame = $frame;

return new StompJob($this->container, $this, $frame, $queueFromFrame);
}

Expand Down Expand Up @@ -500,7 +503,7 @@ protected function subscribeToQueues(): void
continue;
}

$winSize = Config::get('consumer_window_size') ?? 512000;
$winSize = Config::get('consumer_window_size') ?? 819200;
if ($this->_ackMode != self::ACK_MODE_CLIENT) {
$winSize = -1;
}
Expand All @@ -521,7 +524,7 @@ protected function subscribeToQueues(): void
*
* @return void
*/
protected function ackLastFrameIfNecessary()
public function ackLastFrameIfNecessary()
{
if ($this->_ackMode == self::ACK_MODE_CLIENT && $this->_lastFrame) {
$this->client->ack($this->_lastFrame);
Expand Down
Loading