Skip to content

Commit

Permalink
[Messenger] Passing to WorkerMessageRetriedEvent envelope with actu…
Browse files Browse the repository at this point in the history
…al stamps after sent

Signed-off-by: j.apsitis <[email protected]>
  • Loading branch information
daffoxdev committed Feb 26, 2024
1 parent 2114101 commit ab320a2
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
2 changes: 1 addition & 1 deletion EventListener/SendFailedMessageForRetryListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public function onMessageFailed(WorkerMessageFailedEvent $event)
$retryEnvelope = $this->withLimitedHistory($envelope, new DelayStamp($delay), new RedeliveryStamp($retryCount));

// re-send the message for retry
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
$retryEnvelope = $this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);

if (null !== $this->eventDispatcher) {
$this->eventDispatcher->dispatch(new WorkerMessageRetriedEvent($retryEnvelope, $event->getReceiverName()));
Expand Down
46 changes: 46 additions & 0 deletions Tests/EventListener/SendFailedMessageForRetryListenerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@

use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Symfony\Component\DependencyInjection\Container;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent;
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

Expand Down Expand Up @@ -190,4 +193,47 @@ public function testEnvelopeKeepOnlyTheLast10Stamps()

$listener->onMessageFailed($event);
}

public function testRetriedEnvelopePassesToRetriedEvent()
{
$exception = new \Exception('no!');
$envelope = new Envelope(new \stdClass());

$sender = $this->createMock(SenderInterface::class);
$sender->expects($this->once())->method('send')->willReturnCallback(static function (Envelope $envelope) {
return $envelope->with(new TransportMessageIdStamp(123));
});

$eventDispatcher = $this->createMock(EventDispatcherInterface::class);
$eventDispatcher->expects($this->once())->method('dispatch')->willReturnCallback(
function (WorkerMessageRetriedEvent $retriedEvent) {
$envelope = $retriedEvent->getEnvelope();

$transportIdStamp = $envelope->last(TransportMessageIdStamp::class);
$this->assertNotNull($transportIdStamp);

return $retriedEvent;
});

$senderLocator = new Container();
$senderLocator->set('my_receiver', $sender);

$retryStrategy = $this->createMock(RetryStrategyInterface::class);
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true);
$retryStrategy->expects($this->once())->method('getWaitingTime')->willReturn(1000);

$retryStrategyLocator = new Container();
$retryStrategyLocator->set('my_receiver', $retryStrategy);

$listener = new SendFailedMessageForRetryListener(
$senderLocator,
$retryStrategyLocator,
null,
$eventDispatcher
);

$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);

$listener->onMessageFailed($event);
}
}

0 comments on commit ab320a2

Please sign in to comment.