Skip to content

Commit

Permalink
Client ACK (#34)
Browse files Browse the repository at this point in the history
* Apply fixes from StyleCI

* Update StompQueue.php

client ACK, so we don't use messages

* Apply fixes from StyleCI

* Update StompQueue.php

* ENV for WINDOW-SIZE

ENV : STOMP_CONSUMER_WIN_SIZE

* Apply fixes from StyleCI

* config

config with ENV variables

* Apply fixes from StyleCI

---------

Co-authored-by: StyleCI Bot <[email protected]>
  • Loading branch information
ngaspari and StyleCIBot authored May 14, 2024
1 parent 9b2c0f8 commit d7ec135
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 24 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ topic::queue1 <-- will read only from queue1 on the topic
topic::queue1;topic::queue2 <-- will read from queue1 and queue2 on the topic
```

Subscribing with client acknowledgement option (ENV variables):

STOMP_CONSUMER_WIN_SIZE=1024 // number of bytes that Broker will send to client before it expects ACK
STOMP_CONSUMER_ACK_MODE=client // mode: client (ACK needs to be sent) | auto (no ACK, and window-size has to be -1 in that case)


You can see all other available ``.env`` variables, their defaults and usage explanation within
the [config file](config/stomp.php).

Expand Down
51 changes: 32 additions & 19 deletions config/stomp.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,31 @@

return [

'driver' => 'stomp',
'read_queues' => env('STOMP_READ_QUEUES'),
'write_queues' => env('STOMP_WRITE_QUEUES'),
'protocol' => env('STOMP_PROTOCOL', 'tcp'),
'host' => env('STOMP_HOST', '127.0.0.1'),
'port' => env('STOMP_PORT', 61613),
'username' => env('STOMP_USERNAME', 'admin'),
'password' => env('STOMP_PASSWORD', 'admin'),
'driver' => 'stomp',
'read_queues' => env('STOMP_READ_QUEUES'),
'write_queues' => env('STOMP_WRITE_QUEUES'),
'protocol' => env('STOMP_PROTOCOL', 'tcp'),
'host' => env('STOMP_HOST', '127.0.0.1'),
'port' => env('STOMP_PORT', 61613),
'username' => env('STOMP_USERNAME', 'admin'),
'password' => env('STOMP_PASSWORD', 'admin'),

/**
* Set to "horizon" if you wish to use Laravel Horizon.
*/
'worker' => env('STOMP_WORKER', 'default'),
'worker' => env('STOMP_WORKER', 'default'),

/**
* Calculate tries and backoff automatically without the need to specify it
* in the queue work command.
*/
'auto_tries' => env('STOMP_AUTO_TRIES', true),
'auto_backoff' => env('STOMP_AUTO_BACKOFF', true),
'auto_tries' => env('STOMP_AUTO_TRIES', true),
'auto_backoff' => env('STOMP_AUTO_BACKOFF', true),

/** If all messages should fail on timeout. Set to false in order to revert to default (looking in event payload) */
'fail_on_timeout' => env('STOMP_FAIL_ON_TIMEOUT', true),
'fail_on_timeout' => env('STOMP_FAIL_ON_TIMEOUT', true),
/** Maximum time in seconds for job execution. This value must be less than send heartbeat in order to run correctly. */
'timeout' => env('STOMP_TIMEOUT', 10),
'timeout' => env('STOMP_TIMEOUT', 10),

/**
* Incremental multiplier for failed job redelivery.
Expand All @@ -48,32 +48,45 @@
* hash as queue name. In case of multiple services connecting in such
* a way, it becomes unclear which queue is from which service.
*/
'default_queue' => env('STOMP_DEFAULT_QUEUE'),
'default_queue' => env('STOMP_DEFAULT_QUEUE'),

/**
* Use Laravel logger for outputting logs.
*/
'enable_logs' => env('STOMP_LOGS', false) === true,
'enable_logs' => env('STOMP_LOGS', false) === true,

/**
* Should the read queues be prepended. Useful for i.e. Artemis where queue
* name is unique across whole broker instance. This will thus add some
* uniqueness to the queues.
*/
'prepend_queues' => true,
'prepend_queues' => true,

/**
* Heartbeat which will be requested from server at given millisecond period.
*/
'receive_heartbeat' => env('STOMP_RECEIVE_HEARTBEAT', 0),
'receive_heartbeat' => env('STOMP_RECEIVE_HEARTBEAT', 0),

/**
* Heartbeat which we will be sending to server at given millisecond period.
*/
'send_heartbeat' => env('STOMP_SEND_HEARTBEAT', 20000),
'send_heartbeat' => env('STOMP_SEND_HEARTBEAT', 20000),

/**
* Setting consumer-window-size to a value greater than 0 will allow it to receive messages until
* the cumulative bytes of those messages reaches the configured size.
* Once that happens the client will not receive any more messages until it sends the appropriate ACK or NACK
* frame for the messages it already has.
*/
'consumer_window_size' => env('STOMP_CONSUMER_WIN_SIZE', 1048576),

/**
* Subscribe mode: auto, client.
*/
'consumer_ack_mode' => env('STOMP_CONSUMER_ACK_MODE', 'client'),

/**
* Array of supported versions.
*/
'version' => [Version::VERSION_1_2],
'version' => [Version::VERSION_1_2],
];
2 changes: 1 addition & 1 deletion src/Queue/Jobs/StompJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ protected function getSubscriptionName(): string
public function delete()
{
$this->log->info("$this->session [STOMP] Deleting a message from queue: " . print_r([
'queue' => $this->queue,
'queue' => $this->queue,
'message' => $this->frame,
], true));

Expand Down
40 changes: 36 additions & 4 deletions src/Queue/StompQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ class StompQueue extends Queue implements QueueInterface
{
public const AMQ_QUEUE_SEPARATOR = '::';
public const HEADERS_KEY = '_headers';

const CORRELATION = 'X-Correlation-ID';

const ACK_MODE_CLIENT = 'client';

/**
* Stomp instance from stomp-php repo.
*/
Expand All @@ -50,6 +53,9 @@ class StompQueue extends Queue implements QueueInterface
protected static int $circuitBreaker = 0;
protected string $session;

protected $_lastFrame = null;
protected $_ackMode = 'client';

public function __construct(ClientWrapper $stompClient)
{
$this->readQueues = $this->setReadQueues();
Expand All @@ -58,6 +64,8 @@ public function __construct(ClientWrapper $stompClient)
$this->log = app('stompLog');

$this->session = $this->client->getClient()->getSessionId();

$this->_ackMode = strtolower(Config::get('consumer_ack_mode') ?? 'client');
}

/**
Expand Down Expand Up @@ -208,9 +216,9 @@ protected function writeToMultipleQueues(array $writeQueues, Message $payload):
* @var $payload Message
*/
$this->log->info("$this->session [STOMP] Pushing stomp payload to queue: " . print_r([
'body' => $payload->getBody(),
'body' => $payload->getBody(),
'headers' => $payload->getHeaders(),
'queue' => $writeQueues,
'queue' => $writeQueues,
], true));

$allEventsSent = true;
Expand Down Expand Up @@ -337,6 +345,8 @@ protected function hasEvent($job): bool
*/
public function pop($queue = null)
{
$this->ackLastFrameIfNecessary();

$frame = $this->read($queue);

if (!($frame instanceof Frame)) {
Expand Down Expand Up @@ -472,6 +482,8 @@ public function disconnect()
}

try {
$this->ackLastFrameIfNecessary();

$this->log->info("$this->session [STOMP] Disconnecting...");
$this->client->getClient()->disconnect();
} catch (Exception $e) {
Expand All @@ -488,12 +500,32 @@ protected function subscribeToQueues(): void
continue;
}

$this->client->subscribe($queue, null, 'auto', [
$winSize = Config::get('consumer_window_size') ?? 512000;
if ($this->_ackMode != self::ACK_MODE_CLIENT) {
$winSize = -1;
}

$this->client->subscribe($queue, null, this->_ackMode, [
// New Artemis version can't work without this as it will consume only first message otherwise.
'consumer-window-size' => '-1',
//'consumer-window-size' => '-1',
// we can define this if we are using ack mode = client
'consumer-window-size' => (string) $winSize,
]);

$this->subscribedTo[] = $queue;
}
}

/**
* If ack mode = client, and we have last frame - send ACK.
*
* @return void
*/
protected function ackLastFrameIfNecessary()
{
if ($this->_ackMode == self::ACK_MODE_CLIENT && $this->_lastFrame) {
$this->client->ack($this->_lastFrame);
$this->_lastFrame = null;
}
}
}

0 comments on commit d7ec135

Please sign in to comment.