diff --git a/baseline.xml b/baseline.xml index dba7c91b..ee6e9be1 100644 --- a/baseline.xml +++ b/baseline.xml @@ -86,6 +86,11 @@ convertToPHPValue($data['recorded_on'], $platform)]]> + + + messages]]> + + diff --git a/docs/pages/store.md b/docs/pages/store.md index af05b927..5e67471f 100644 --- a/docs/pages/store.md +++ b/docs/pages/store.md @@ -9,27 +9,11 @@ Each message contains an event and the associated headers. The store is optimized to efficiently store and load events for aggregates. -## Create DBAL connection - -The first thing we need for our store is a DBAL connection: - -```php -use Doctrine\DBAL\DriverManager; -use Doctrine\DBAL\Tools\DsnParser; - -$connection = DriverManager::getConnection( - (new DsnParser())->parse('pdo-pgsql://user:secret@localhost/app'), -); -``` -!!! note - - You can find out more about how to create a connection - [here](https://www.doctrine-project.org/projects/doctrine-dbal/en/latest/reference/configuration.html) - ## Configure Store -We currently offer two stores, both based on the [doctrine dbal](https://www.doctrine-project.org/projects/dbal.html) library. -The default store is the `DoctrineDbalStore` and the new experimental store is the `StreamDoctrineDbalStore`. +We offer different stores to store the messages. +Two stores based on [doctrine dbal](https://www.doctrine-project.org/projects/dbal.html) +and one in-memory store for testing purposes. ### DoctrineDbalStore @@ -38,16 +22,25 @@ You can create a store with the `DoctrineDbalStore` class. The store needs a dbal connection, an event serializer and has some optional parameters like options. ```php -use Doctrine\DBAL\Connection; +use Doctrine\DBAL\DriverManager; +use Doctrine\DBAL\Tools\DsnParser; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\DoctrineDbalStore; -/** @var Connection $connection */ +$connection = DriverManager::getConnection( + (new DsnParser())->parse('pdo-pgsql://user:secret@localhost/app'), +); + $store = new DoctrineDbalStore( $connection, DefaultEventSerializer::createFromPaths(['src/Event']), ); ``` +!!! note + + You can find out more about how to create a connection + [here](https://www.doctrine-project.org/projects/doctrine-dbal/en/latest/reference/configuration.html) + Following options are available in `DoctrineDbalStore`: | Option | Type | Default | Description | @@ -89,16 +82,25 @@ This store introduces two new methods `streams` and `remove`. The store needs a dbal connection, an event serializer and has some optional parameters like options. ```php -use Doctrine\DBAL\Connection; +use Doctrine\DBAL\DriverManager; +use Doctrine\DBAL\Tools\DsnParser; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; -/** @var Connection $connection */ +$connection = DriverManager::getConnection( + (new DsnParser())->parse('pdo-pgsql://user:secret@localhost/app'), +); + $store = new StreamDoctrineDbalStore( $connection, DefaultEventSerializer::createFromPaths(['src/Event']), ); ``` +!!! note + + You can find out more about how to create a connection + [here](https://www.doctrine-project.org/projects/doctrine-dbal/en/latest/reference/configuration.html) + Following options are available in `StreamDoctrineDbalStore`: | Option | Type | Default | Description | @@ -122,6 +124,19 @@ The table structure of the `StreamDoctrineDbalStore` looks like this: | archived | bool | If the event is archived | | custom_headers | json | Custom headers for the event | +### InMemoryStore + +We also offer an in-memory store for testing purposes. + +```php +use Patchlevel\EventSourcing\Store\InMemoryStore; + +$store = new InMemoryStore(); +``` +!!! tip + + You can pass messages to the constructor to initialize the store with some events. + ## Schema With the help of the `SchemaDirector`, the database structure can be created, updated and deleted. @@ -130,7 +145,7 @@ With the help of the `SchemaDirector`, the database structure can be created, up You can also use doctrine migration to create and keep your schema in sync. -### Schema Director +### Doctrine Schema Director The `SchemaDirector` is responsible for creating, updating and deleting the database schema. The `DoctrineSchemaDirector` is a concrete implementation of the `SchemaDirector` for doctrine dbal. @@ -159,18 +174,18 @@ $schemaDirector = new DoctrineSchemaDirector( You can create the table from scratch using the `create` method. ```php -use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; +use Patchlevel\EventSourcing\Schema\SchemaDirector; -/** @var DoctrineSchemaDirector $schemaDirector */ +/** @var SchemaDirector $schemaDirector */ $schemaDirector->create(); ``` Or can give you back which SQL statements would be necessary for this. Either for a dry run, or to define your own migrations. ```php -use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; +use Patchlevel\EventSourcing\Schema\DryRunSchemaDirector; -/** @var DoctrineSchemaDirector $schemaDirector */ +/** @var DryRunSchemaDirector $schemaDirector */ $sql = $schemaDirector->dryRunCreate(); ``` #### Update schema @@ -179,17 +194,17 @@ The update method compares the current state in the database and how the table s As a result, the diff is executed to bring the table to the desired state. ```php -use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; +use Patchlevel\EventSourcing\Schema\SchemaDirector; -/** @var DoctrineSchemaDirector $schemaDirector */ +/** @var SchemaDirector $schemaDirector */ $schemaDirector->update(); ``` Or can give you back which SQL statements would be necessary for this. ```php -use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; +use Patchlevel\EventSourcing\Schema\DryRunSchemaDirector; -/** @var DoctrineSchemaDirector $schemaDirector */ +/** @var DryRunSchemaDirector $schemaDirector */ $sql = $schemaDirector->dryRunUpdate(); ``` #### Drop schema @@ -197,17 +212,17 @@ $sql = $schemaDirector->dryRunUpdate(); You can also delete the table with the `drop` method. ```php -use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; +use Patchlevel\EventSourcing\Schema\SchemaDirector; -/** @var DoctrineSchemaDirector $schemaDirector */ +/** @var SchemaDirector $schemaDirector */ $schemaDirector->drop(); ``` Or can give you back which SQL statements would be necessary for this. ```php -use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; +use Patchlevel\EventSourcing\Schema\DryRunSchemaDirector; -/** @var DoctrineSchemaDirector $schemaDirector */ +/** @var DryRunSchemaDirector $schemaDirector */ $sql = $schemaDirector->dryRunDrop(); ``` ### Doctrine Migrations diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 26a4be10..ee93d9a3 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -50,6 +50,11 @@ parameters: count: 1 path: src/Store/DoctrineDbalStoreStream.php + - + message: "#^Property Patchlevel\\\\EventSourcing\\\\Store\\\\InMemoryStore\\:\\:\\$messages \\(array\\, Patchlevel\\\\EventSourcing\\\\Message\\\\Message\\>\\) does not accept array\\\\.$#" + count: 1 + path: src/Store/InMemoryStore.php + - message: "#^Parameter \\#2 \\$playhead of class Patchlevel\\\\EventSourcing\\\\Store\\\\StreamHeader constructor expects int\\<1, max\\>\\|null, int\\|null given\\.$#" count: 1 diff --git a/src/Store/Criteria/StreamCriterion.php b/src/Store/Criteria/StreamCriterion.php index 070c1c83..bbe5eecd 100644 --- a/src/Store/Criteria/StreamCriterion.php +++ b/src/Store/Criteria/StreamCriterion.php @@ -4,12 +4,19 @@ namespace Patchlevel\EventSourcing\Store\Criteria; +use Patchlevel\EventSourcing\Store\InvalidStreamName; + +use function preg_match; + /** @experimental */ final class StreamCriterion { public function __construct( public readonly string $streamName, ) { + if (!preg_match('/^[^*]*\*?$/', $this->streamName)) { + throw new InvalidStreamName($this->streamName); + } } public static function startWith(string $streamName): self diff --git a/src/Store/InMemoryStore.php b/src/Store/InMemoryStore.php new file mode 100644 index 00000000..f774e89f --- /dev/null +++ b/src/Store/InMemoryStore.php @@ -0,0 +1,230 @@ + $messages */ + public function __construct( + private array $messages = [], + ) { + } + + public function load( + Criteria|null $criteria = null, + int|null $limit = null, + int|null $offset = null, + bool $backwards = false, + ): ArrayStream { + $messages = $this->filter($criteria); + + if ($backwards) { + $messages = array_reverse($messages); + } + + if ($offset !== null) { + $messages = array_slice($messages, $offset); + } + + if ($limit !== null) { + $messages = array_slice($messages, 0, $limit); + } + + return new ArrayStream($messages); + } + + public function count(Criteria|null $criteria = null): int + { + return count($this->filter($criteria)); + } + + public function save(Message ...$messages): void + { + array_push($this->messages, ...$messages); + } + + /** + * @param Closure():ClosureReturn $function + * + * @template ClosureReturn + */ + public function transactional(Closure $function): void + { + $function(); + } + + /** @return list */ + public function streams(): array + { + return array_values( + array_unique( + array_filter( + array_map( + static function (Message $message): string|null { + try { + return $message->header(AggregateHeader::class)->streamName(); + } catch (HeaderNotFound) { + try { + return $message->header(StreamHeader::class)->streamName; + } catch (HeaderNotFound) { + return null; + } + } + }, + $this->messages, + ), + static fn (string|null $streamName): bool => $streamName !== null, + ), + ), + ); + } + + public function remove(string $streamName): void + { + $this->messages = array_values( + array_filter( + $this->messages, + static function (Message $message) use ($streamName): bool { + try { + return $message->header(AggregateHeader::class)->streamName() !== $streamName; + } catch (HeaderNotFound) { + try { + return $message->header(StreamHeader::class)->streamName !== $streamName; + } catch (HeaderNotFound) { + return true; + } + } + }, + ), + ); + } + + /** @return array */ + private function filter(Criteria|null $criteria): array + { + if (!$criteria) { + return $this->messages; + } + + return array_filter( + $this->messages, + static function (Message $message, int $index) use ($criteria): bool { + foreach ($criteria->all() as $criterion) { + switch ($criterion::class) { + case AggregateIdCriterion::class: + try { + if ($message->header(AggregateHeader::class)->aggregateId !== $criterion->aggregateId) { + return false; + } + } catch (HeaderNotFound) { + return false; + } + + break; + case AggregateNameCriterion::class: + try { + if ($message->header(AggregateHeader::class)->aggregateName !== $criterion->aggregateName) { + return false; + } + } catch (HeaderNotFound) { + return false; + } + + break; + case StreamCriterion::class: + if ($criterion->streamName === '*') { + break; + } + + try { + $messageStreamName = $message->header(AggregateHeader::class)->streamName(); + } catch (HeaderNotFound) { + try { + $messageStreamName = $message->header(StreamHeader::class)->streamName; + } catch (HeaderNotFound) { + return false; + } + } + + if (str_ends_with($criterion->streamName, '*')) { + if (!str_starts_with($messageStreamName, mb_substr($criterion->streamName, 0, -1))) { + return false; + } + + break; + } + + if ($messageStreamName !== $criterion->streamName) { + return false; + } + + break; + case FromPlayheadCriterion::class: + $playhead = null; + + try { + $playhead = $message->header(AggregateHeader::class)->playhead; + } catch (HeaderNotFound) { + try { + $playhead = $message->header(StreamHeader::class)->playhead; + } catch (HeaderNotFound) { + return false; + } + } + + if ($playhead < $criterion->fromPlayhead) { + return false; + } + + break; + case ArchivedCriterion::class: + if (!$message->hasHeader(ArchivedHeader::class) === $criterion->archived) { + return false; + } + + break; + case FromIndexCriterion::class: + if ($index < $criterion->fromIndex) { + return false; + } + + break; + default: + throw new UnsupportedCriterion($criterion::class); + } + } + + return true; + }, + ARRAY_FILTER_USE_BOTH, + ); + } +} diff --git a/src/Store/StreamDoctrineDbalStore.php b/src/Store/StreamDoctrineDbalStore.php index e9fa93b4..d9c1cd16 100644 --- a/src/Store/StreamDoctrineDbalStore.php +++ b/src/Store/StreamDoctrineDbalStore.php @@ -44,7 +44,6 @@ use function is_string; use function mb_substr; use function sprintf; -use function str_contains; use function str_ends_with; /** @experimental */ @@ -149,14 +148,8 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void } if (str_ends_with($criterion->streamName, '*')) { - $streamName = mb_substr($criterion->streamName, 0, -1); - - if (str_contains($streamName, '*')) { - throw new InvalidStreamName($criterion->streamName); - } - $builder->andWhere('stream LIKE :stream'); - $builder->setParameter('stream', $streamName . '%'); + $builder->setParameter('stream', mb_substr($criterion->streamName, 0, -1) . '%'); break; } diff --git a/tests/Unit/Store/InMemoryStoreTest.php b/tests/Unit/Store/InMemoryStoreTest.php new file mode 100644 index 00000000..626918ba --- /dev/null +++ b/tests/Unit/Store/InMemoryStoreTest.php @@ -0,0 +1,350 @@ +load(); + + self::assertCount(0, $stream); + } + + public function testLoadMessages(): void + { + $expected = [ + new Message(new ProfileVisited(ProfileId::fromString('1'))), + new Message(new ProfileVisited(ProfileId::fromString('2'))), + ]; + + $store = new InMemoryStore($expected); + + $stream = $store->load(); + + $messages = iterator_to_array($stream); + + self::assertSame($expected, $messages); + } + + public function testLoadByAggregateId(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new AggregateHeader('profile', '1', 1, new DateTimeImmutable())); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new AggregateHeader('profile', '2', 1, new DateTimeImmutable())); + $message3 = new Message(new ProfileVisited(ProfileId::fromString('3'))); + + $store = new InMemoryStore([$message1, $message2, $message3]); + + $stream = $store->load(new Criteria(new AggregateIdCriterion('2'))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message2], $messages); + } + + public function testLoadByAggregateName(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new AggregateHeader('foo', '1', 1, new DateTimeImmutable())); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new AggregateHeader('bar', '2', 1, new DateTimeImmutable())); + $message3 = new Message(new ProfileVisited(ProfileId::fromString('3'))); + + $store = new InMemoryStore([$message1, $message2, $message3]); + + $stream = $store->load(new Criteria(new AggregateNameCriterion('bar'))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message2], $messages); + } + + public function testLoadByStreamName(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new StreamHeader('foo')); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new StreamHeader('bar')); + $message3 = new Message(new ProfileVisited(ProfileId::fromString('3'))); + + $store = new InMemoryStore([$message1, $message2, $message3]); + + $stream = $store->load(new Criteria(new StreamCriterion('bar'))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message2], $messages); + } + + public function testLoadByStreamNameWithLike(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new StreamHeader('foo-3')); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new StreamHeader('bar-1')); + $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) + ->withHeader(new StreamHeader('bar-2')); + + $store = new InMemoryStore([$message1, $message2, $message3]); + + $stream = $store->load(new Criteria(new StreamCriterion('bar-*'))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message2, $message3], $messages); + } + + public function testLoadFromPlayhead(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new AggregateHeader('foo', '1', 1, new DateTimeImmutable())); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new AggregateHeader('foo', '1', 2, new DateTimeImmutable())); + $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) + ->withHeader(new StreamHeader('foo-1', 3, new DateTimeImmutable())); + $message4 = (new Message(new ProfileVisited(ProfileId::fromString('3')))); + + $store = new InMemoryStore([$message1, $message2, $message3, $message4]); + + $stream = $store->load(new Criteria(new FromPlayheadCriterion(2))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message2, $message3], $messages); + } + + public function testLoadFromIndex(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new AggregateHeader('foo', '1', 1, new DateTimeImmutable())); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new AggregateHeader('foo', '1', 2, new DateTimeImmutable())); + $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) + ->withHeader(new StreamHeader('foo-1', 3, new DateTimeImmutable())); + $message4 = (new Message(new ProfileVisited(ProfileId::fromString('3')))); + + $store = new InMemoryStore([$message1, $message2, $message3, $message4]); + + $stream = $store->load(new Criteria(new FromIndexCriterion(2))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message3, $message4], $messages); + } + + public function testLoadByStreamNameWithLikeAll(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new StreamHeader('foo-3')); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new StreamHeader('bar-1')); + $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) + ->withHeader(new StreamHeader('bar-2')); + + $store = new InMemoryStore([$message1, $message2, $message3]); + + $stream = $store->load(new Criteria(new StreamCriterion('*'))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message1, $message2, $message3], $messages); + } + + public function testLoadArchived(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new ArchivedHeader()); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))); + + $store = new InMemoryStore([$message1, $message2]); + + $stream = $store->load(new Criteria(new ArchivedCriterion(true))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message1], $messages); + } + + public function testLoadUnsupportedCriterion(): void + { + $store = new InMemoryStore([ + new Message(new ProfileVisited(ProfileId::fromString('1'))), + new Message(new ProfileVisited(ProfileId::fromString('2'))), + ]); + + $this->expectException(UnsupportedCriterion::class); + + $store->load(new Criteria(new stdClass())); + } + + public function testLoadLimit(): void + { + $message1 = new Message(new ProfileVisited(ProfileId::fromString('1'))); + $message2 = new Message(new ProfileVisited(ProfileId::fromString('2'))); + + $store = new InMemoryStore([$message1, $message2]); + + $stream = $store->load(null, 1); + + $messages = iterator_to_array($stream); + + self::assertSame([$message1], $messages); + } + + public function testLoadOffset(): void + { + $message1 = new Message(new ProfileVisited(ProfileId::fromString('1'))); + $message2 = new Message(new ProfileVisited(ProfileId::fromString('2'))); + + $store = new InMemoryStore([$message1, $message2]); + + $stream = $store->load(null, null, 1); + + $messages = iterator_to_array($stream); + + self::assertSame([$message2], $messages); + } + + public function testLoadBackwards(): void + { + $message1 = new Message(new ProfileVisited(ProfileId::fromString('1'))); + $message2 = new Message(new ProfileVisited(ProfileId::fromString('2'))); + + $store = new InMemoryStore([$message1, $message2]); + + $stream = $store->load(null, null, null, true); + + $messages = iterator_to_array($stream); + + self::assertSame([$message2, $message1], $messages); + } + + public function testCount(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new ArchivedHeader()); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))); + + $store = new InMemoryStore([$message1, $message2]); + + self::assertSame(1, $store->count(new Criteria(new ArchivedCriterion(true)))); + } + + public function testSaveEmpty(): void + { + $expected = [ + new Message(new ProfileVisited(ProfileId::fromString('1'))), + new Message(new ProfileVisited(ProfileId::fromString('2'))), + ]; + + $store = new InMemoryStore([]); + + $store->save(...$expected); + + $stream = $store->load(); + + $messages = iterator_to_array($stream); + + self::assertSame($expected, $messages); + } + + public function testSaveWithExistingMessages(): void + { + $startMessages = [ + new Message(new ProfileVisited(ProfileId::fromString('1'))), + new Message(new ProfileVisited(ProfileId::fromString('2'))), + ]; + + $message1 = new Message(new ProfileVisited(ProfileId::fromString('3'))); + + $store = new InMemoryStore($startMessages); + + $store->save($message1); + + $stream = $store->load(); + + $messages = iterator_to_array($stream); + + self::assertSame([...$startMessages, $message1], $messages); + } + + public function testStreams(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new StreamHeader('foo')); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new StreamHeader('bar')); + $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) + ->withHeader(new StreamHeader('bar')); + $message4 = (new Message(new ProfileVisited(ProfileId::fromString('3')))); + + $store = new InMemoryStore([$message1, $message2, $message3, $message4]); + + self::assertSame(['foo', 'bar'], $store->streams()); + } + + public function testRemove(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new StreamHeader('foo')); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new StreamHeader('bar')); + $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) + ->withHeader(new StreamHeader('bar')); + $message4 = (new Message(new ProfileVisited(ProfileId::fromString('3')))); + + $store = new InMemoryStore([$message1, $message2, $message3, $message4]); + + $store->remove('bar'); + + $stream = $store->load(); + + $messages = iterator_to_array($stream); + + self::assertSame([$message1, $message4], $messages); + } + + public function testTransactional(): void + { + $called = false; + + $store = new InMemoryStore(); + $store->transactional( + static function () use (&$called): void { + $called = true; + }, + ); + + self::assertTrue($called); + } +}