-
-
Notifications
You must be signed in to change notification settings - Fork 86
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feature #29007 [Messenger] Add a Doctrine transport (vincenttouzet)
This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Add a Doctrine transport | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | | License | MIT | Doc PR | symfony/symfony-docs#10616 | DoctrineBundle PR | doctrine/DoctrineBundle#868 As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component. Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database). # How it works The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868 ## Configuration To configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`) ```yml # config/packages/messenger.yaml framework: messenger: transports: my_transport: "doctrine://default?queue=important" ``` ## Table schema Dispatched messages are stored into a database table with the following schema: | Column | Type | Options | Description | |--------------|----------|--------------------------|-------------------------------------------------------------------| | id | bigint | AUTO_INCREMENT, NOT NULL | Primary key | | body | text | NOT NULL | Body of the message | | headers | text | NOT NULL | Headers of the message | | queue | varchar(32) | NOT NULL | Headers of the message | | created_at | datetime | NOT NULL | When the message was inserted onto the table. (automatically set) | | available_at | datetime | NOT NULL | When the message is available to be handled | | delivered_at | datetime | NULL | When the message was delivered to a worker | ## Message dispatching When dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish` ## Message consuming The message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle. ### Getting the next message * Start a transaction * Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query) * Update the message in database to update the delivered_at columns * Commit the transaction ### Handling the message The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table. If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`. ## Message requeueing It may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds) # TODO - [x] Add tests - [x] Create DOC PR - [x] PR on doctrine-bundle for transport factory - [x] Add a `available_at` column - [x] Add a `queue` column - [x] Implement the retry functionnality : See #30557 - [x] Rebase after #29476 Commits ------- 88d008c828 [Messenger] Add a Doctrine transport
- Loading branch information
Showing
14 changed files
with
1,147 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,214 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <[email protected]> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine; | ||
|
||
use Doctrine\DBAL\DBALException; | ||
use Doctrine\DBAL\Driver\Statement; | ||
use Doctrine\DBAL\Platforms\AbstractPlatform; | ||
use Doctrine\DBAL\Query\QueryBuilder; | ||
use PHPUnit\Framework\TestCase; | ||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; | ||
use Symfony\Component\Messenger\Transport\Doctrine\Connection; | ||
|
||
class ConnectionTest extends TestCase | ||
{ | ||
public function testGetAMessageWillChangeItsStatus() | ||
{ | ||
$queryBuilder = $this->getQueryBuilderMock(); | ||
$driverConnection = $this->getDBALConnectionMock(); | ||
$stmt = $this->getStatementMock([ | ||
'id' => 1, | ||
'body' => '{"message":"Hi"}', | ||
'headers' => \json_encode(['type' => DummyMessage::class]), | ||
]); | ||
|
||
$driverConnection | ||
->method('createQueryBuilder') | ||
->willReturn($queryBuilder); | ||
$queryBuilder | ||
->method('getSQL') | ||
->willReturn(''); | ||
$driverConnection | ||
->method('prepare') | ||
->willReturn($stmt); | ||
|
||
$connection = new Connection([], $driverConnection); | ||
$doctrineEnvelope = $connection->get(); | ||
$this->assertEquals(1, $doctrineEnvelope['id']); | ||
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']); | ||
$this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelope['headers']); | ||
} | ||
|
||
public function testGetWithNoPendingMessageWillReturnNull() | ||
{ | ||
$queryBuilder = $this->getQueryBuilderMock(); | ||
$driverConnection = $this->getDBALConnectionMock(); | ||
$stmt = $this->getStatementMock(false); | ||
|
||
$driverConnection->expects($this->once()) | ||
->method('createQueryBuilder') | ||
->willReturn($queryBuilder); | ||
$driverConnection->method('prepare') | ||
->willReturn($stmt); | ||
$driverConnection->expects($this->never()) | ||
->method('update'); | ||
|
||
$connection = new Connection([], $driverConnection); | ||
$doctrineEnvelope = $connection->get(); | ||
$this->assertNull($doctrineEnvelope); | ||
} | ||
|
||
/** | ||
* @expectedException \Symfony\Component\Messenger\Exception\TransportException | ||
*/ | ||
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() | ||
{ | ||
$driverConnection = $this->getDBALConnectionMock(); | ||
$driverConnection->method('delete')->willThrowException(new DBALException()); | ||
|
||
$connection = new Connection([], $driverConnection); | ||
$connection->ack('dummy_id'); | ||
} | ||
|
||
/** | ||
* @expectedException \Symfony\Component\Messenger\Exception\TransportException | ||
*/ | ||
public function testItThrowsATransportExceptionIfItCannotRejectMessage() | ||
{ | ||
$driverConnection = $this->getDBALConnectionMock(); | ||
$driverConnection->method('delete')->willThrowException(new DBALException()); | ||
|
||
$connection = new Connection([], $driverConnection); | ||
$connection->reject('dummy_id'); | ||
} | ||
|
||
private function getDBALConnectionMock() | ||
{ | ||
$driverConnection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class) | ||
->disableOriginalConstructor() | ||
->getMock(); | ||
$platform = $this->getMockBuilder(AbstractPlatform::class) | ||
->getMock(); | ||
$platform->method('getWriteLockSQL')->willReturn('FOR UPDATE'); | ||
$driverConnection->method('getDatabasePlatform')->willReturn($platform); | ||
|
||
return $driverConnection; | ||
} | ||
|
||
private function getQueryBuilderMock() | ||
{ | ||
$queryBuilder = $this->getMockBuilder(QueryBuilder::class) | ||
->disableOriginalConstructor() | ||
->getMock(); | ||
|
||
$queryBuilder->method('select')->willReturn($queryBuilder); | ||
$queryBuilder->method('update')->willReturn($queryBuilder); | ||
$queryBuilder->method('from')->willReturn($queryBuilder); | ||
$queryBuilder->method('set')->willReturn($queryBuilder); | ||
$queryBuilder->method('where')->willReturn($queryBuilder); | ||
$queryBuilder->method('andWhere')->willReturn($queryBuilder); | ||
$queryBuilder->method('orderBy')->willReturn($queryBuilder); | ||
$queryBuilder->method('setMaxResults')->willReturn($queryBuilder); | ||
$queryBuilder->method('setParameter')->willReturn($queryBuilder); | ||
|
||
return $queryBuilder; | ||
} | ||
|
||
private function getStatementMock($expectedResult) | ||
{ | ||
$stmt = $this->getMockBuilder(Statement::class) | ||
->disableOriginalConstructor() | ||
->getMock(); | ||
$stmt->expects($this->once()) | ||
->method('fetch') | ||
->willReturn($expectedResult); | ||
|
||
return $stmt; | ||
} | ||
|
||
/** | ||
* @dataProvider buildConfigurationProvider | ||
*/ | ||
public function testBuildConfiguration($dsn, $options, $expectedManager, $expectedTableName, $expectedRedeliverTimeout, $expectedQueue) | ||
{ | ||
$config = Connection::buildConfiguration($dsn, $options); | ||
$this->assertEquals($expectedManager, $config['connection']); | ||
$this->assertEquals($expectedTableName, $config['table_name']); | ||
$this->assertEquals($expectedRedeliverTimeout, $config['redeliver_timeout']); | ||
$this->assertEquals($expectedQueue, $config['queue_name']); | ||
} | ||
|
||
public function buildConfigurationProvider() | ||
{ | ||
return [ | ||
[ | ||
'dsn' => 'doctrine://default', | ||
'options' => [], | ||
'expectedManager' => 'default', | ||
'expectedTableName' => 'messenger_messages', | ||
'expectedRedeliverTimeout' => 3600, | ||
'expectedQueue' => 'default', | ||
], | ||
// test options from options array | ||
[ | ||
'dsn' => 'doctrine://default', | ||
'options' => [ | ||
'table_name' => 'name_from_options', | ||
'redeliver_timeout' => 1800, | ||
'queue_name' => 'important', | ||
], | ||
'expectedManager' => 'default', | ||
'expectedTableName' => 'name_from_options', | ||
'expectedRedeliverTimeout' => 1800, | ||
'expectedQueue' => 'important', | ||
], | ||
// tests options from dsn | ||
[ | ||
'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal', | ||
'options' => [], | ||
'expectedManager' => 'default', | ||
'expectedTableName' => 'name_from_dsn', | ||
'expectedRedeliverTimeout' => 1200, | ||
'expectedQueue' => 'normal', | ||
], | ||
// test options from options array wins over options from dsn | ||
[ | ||
'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal', | ||
'options' => [ | ||
'table_name' => 'name_from_options', | ||
'redeliver_timeout' => 1800, | ||
'queue_name' => 'important', | ||
], | ||
'expectedManager' => 'default', | ||
'expectedTableName' => 'name_from_options', | ||
'expectedRedeliverTimeout' => 1800, | ||
'expectedQueue' => 'important', | ||
], | ||
]; | ||
} | ||
|
||
/** | ||
* @expectedException \Symfony\Component\Messenger\Exception\TransportException | ||
*/ | ||
public function testItThrowsAnExceptionIfAnExtraOptionsInDefined() | ||
{ | ||
Connection::buildConfiguration('doctrine://default', ['new_option' => 'woops']); | ||
} | ||
|
||
/** | ||
* @expectedException \Symfony\Component\Messenger\Exception\TransportException | ||
*/ | ||
public function testItThrowsAnExceptionIfAnExtraOptionsInDefinedInDSN() | ||
{ | ||
Connection::buildConfiguration('doctrine://default?new_option=woops'); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <[email protected]> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine; | ||
|
||
use Doctrine\DBAL\DriverManager; | ||
use PHPUnit\Framework\TestCase; | ||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; | ||
use Symfony\Component\Messenger\Transport\Doctrine\Connection; | ||
|
||
/** | ||
* @requires pdo_mysql | ||
*/ | ||
class DoctrineIntegrationTest extends TestCase | ||
{ | ||
private $driverConnection; | ||
private $connection; | ||
|
||
protected function setUp() | ||
{ | ||
parent::setUp(); | ||
|
||
if (!getenv('MESSENGER_DOCTRINE_DSN')) { | ||
$this->markTestSkipped('The "MESSENGER_DOCTRINE_DSN" environment variable is required.'); | ||
} | ||
$dsn = getenv('MESSENGER_DOCTRINE_DSN'); | ||
$this->driverConnection = DriverManager::getConnection(['url' => $dsn]); | ||
$this->connection = new Connection([], $this->driverConnection); | ||
// call send to auto-setup the table | ||
$this->connection->setup(); | ||
// ensure the table is clean for tests | ||
$this->driverConnection->exec('DELETE FROM messenger_messages'); | ||
} | ||
|
||
public function testConnectionSendAndGet() | ||
{ | ||
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); | ||
$encoded = $this->connection->get(); | ||
$this->assertEquals('{"message": "Hi"}', $encoded['body']); | ||
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); | ||
} | ||
|
||
public function testSendWithDelay() | ||
{ | ||
$this->connection->send('{"message": "Hi i am delayed"}', ['type' => DummyMessage::class], 600000); | ||
|
||
$available_at = $this->driverConnection->createQueryBuilder() | ||
->select('m.available_at') | ||
->from('messenger_messages', 'm') | ||
->where('m.body = :body') | ||
->setParameter(':body', '{"message": "Hi i am delayed"}') | ||
->execute() | ||
->fetchColumn(); | ||
|
||
$available_at = new \DateTime($available_at); | ||
|
||
$now = \DateTime::createFromFormat('U.u', microtime(true)); | ||
$now->modify('+60 seconds'); | ||
$this->assertGreaterThan($now, $available_at); | ||
} | ||
|
||
public function testItRetrieveTheFirstAvailableMessage() | ||
{ | ||
// insert messages | ||
// one currently handled | ||
$this->driverConnection->insert('messenger_messages', [ | ||
'body' => '{"message": "Hi handled"}', | ||
'headers' => json_encode(['type' => DummyMessage::class]), | ||
'queue_name' => 'default', | ||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), | ||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), | ||
'delivered_at' => Connection::formatDateTime(\DateTime::createFromFormat('U.u', microtime(true))), | ||
]); | ||
// one available later | ||
$this->driverConnection->insert('messenger_messages', [ | ||
'body' => '{"message": "Hi delayed"}', | ||
'headers' => json_encode(['type' => DummyMessage::class]), | ||
'queue_name' => 'default', | ||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), | ||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 13:00:00')), | ||
]); | ||
// one available | ||
$this->driverConnection->insert('messenger_messages', [ | ||
'body' => '{"message": "Hi available"}', | ||
'headers' => json_encode(['type' => DummyMessage::class]), | ||
'queue_name' => 'default', | ||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), | ||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')), | ||
]); | ||
|
||
$encoded = $this->connection->get(); | ||
$this->assertEquals('{"message": "Hi available"}', $encoded['body']); | ||
} | ||
|
||
public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout() | ||
{ | ||
$twoHoursAgo = new \DateTime('now'); | ||
$twoHoursAgo->modify('-2 hours'); | ||
$this->driverConnection->insert('messenger_messages', [ | ||
'body' => '{"message": "Hi requeued"}', | ||
'headers' => json_encode(['type' => DummyMessage::class]), | ||
'queue_name' => 'default', | ||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), | ||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), | ||
'delivered_at' => Connection::formatDateTime($twoHoursAgo), | ||
]); | ||
$this->driverConnection->insert('messenger_messages', [ | ||
'body' => '{"message": "Hi available"}', | ||
'headers' => json_encode(['type' => DummyMessage::class]), | ||
'queue_name' => 'default', | ||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), | ||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')), | ||
]); | ||
|
||
$next = $this->connection->get(); | ||
$this->assertEquals('{"message": "Hi requeued"}', $next['body']); | ||
$this->connection->reject($next['id']); | ||
} | ||
} |
Oops, something went wrong.