-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
MySqlDoctrineEventBus.php
49 lines (40 loc) · 1.5 KB
/
MySqlDoctrineEventBus.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<?php
declare(strict_types=1);
namespace CodelyTv\Shared\Infrastructure\Bus\Event\MySql;
use CodelyTv\Shared\Domain\Bus\Event\DomainEvent;
use CodelyTv\Shared\Domain\Bus\Event\EventBus;
use CodelyTv\Shared\Domain\Utils;
use Doctrine\DBAL\Connection;
use Doctrine\ORM\EntityManager;
use function Lambdish\Phunctional\each;
final class MySqlDoctrineEventBus implements EventBus
{
private const DATABASE_TIMESTAMP_FORMAT = 'Y-m-d H:i:s';
private readonly Connection $connection;
public function __construct(EntityManager $entityManager)
{
$this->connection = $entityManager->getConnection();
}
public function publish(DomainEvent ...$events): void
{
each($this->publisher(), $events);
}
private function publisher(): callable
{
return function (DomainEvent $domainEvent): void {
$id = $this->connection->quote($domainEvent->eventId());
$aggregateId = $this->connection->quote($domainEvent->aggregateId());
$name = $this->connection->quote($domainEvent::eventName());
$body = $this->connection->quote(Utils::jsonEncode($domainEvent->toPrimitives()));
$occurredOn = $this->connection->quote(
Utils::stringToDate($domainEvent->occurredOn())->format(self::DATABASE_TIMESTAMP_FORMAT)
);
$this->connection->executeStatement(
<<<SQL
INSERT INTO domain_events (id, aggregate_id, name, body, occurred_on)
VALUES ($id, $aggregateId, $name, $body, $occurredOn);
SQL
);
};
}
}