Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add cdc pattern example #369

Merged
merged 5 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/backoffice/backend/src/BackofficeBackendKernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public function getProjectDir(): string
protected function configureContainer(ContainerBuilder $container, LoaderInterface $loader): void
{
$container->addResource(new FileResource($this->getProjectDir() . '/config/bundles.php'));
$container->setParameter('container.dumper.inline_class_loader', true);
$container->setParameter('.container.dumper.inline_class_loader', true);
$confDir = $this->getProjectDir() . '/config';

$loader->load($confDir . '/services' . self::CONFIG_EXTS, 'glob');
Expand Down
2 changes: 1 addition & 1 deletion apps/backoffice/frontend/src/BackofficeFrontendKernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public function getProjectDir(): string
protected function configureContainer(ContainerBuilder $container, LoaderInterface $loader): void
{
$container->addResource(new FileResource($this->getProjectDir() . '/config/bundles.php'));
$container->setParameter('container.dumper.inline_class_loader', true);
$container->setParameter('.container.dumper.inline_class_loader', true);
$confDir = $this->getProjectDir() . '/config';

$loader->load($confDir . '/services' . self::CONFIG_EXTS, 'glob');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@
use CodelyTv\Shared\Infrastructure\Bus\Event\DomainEventSubscriberLocator;
use CodelyTv\Shared\Infrastructure\Bus\Event\MySql\MySqlDoctrineDomainEventsConsumer;
use CodelyTv\Shared\Infrastructure\Doctrine\DatabaseConnections;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

use function Lambdish\Phunctional\pipe;

#[AsCommand(name: 'codely:domain-events:mysql:consume', description: 'Consume domain events from MySql',)]
final class ConsumeMySqlDomainEventsCommand extends Command
{
protected static $defaultName = 'codelytv:domain-events:mysql:consume';

public function __construct(
private readonly MySqlDoctrineDomainEventsConsumer $consumer,
private readonly DatabaseConnections $connections,
Expand All @@ -29,9 +29,7 @@ public function __construct(

protected function configure(): void
{
$this
->setDescription('Consume domain events from MySql')
->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of events to process');
$this->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of events to process');
}

protected function execute(InputInterface $input, OutputInterface $output): int
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
<?php

declare(strict_types=1);

namespace CodelyTv\Apps\Mooc\Backend\Command\DomainEvents;

use CodelyTv\Mooc\Courses\Infrastructure\Cdc\DatabaseMutationToCourseCreatedDomainEvent;
use CodelyTv\Shared\Domain\Bus\Event\EventBus;
use CodelyTv\Shared\Infrastructure\Cdc\DatabaseMutationAction;
use CodelyTv\Shared\Infrastructure\Cdc\DatabaseMutationToDomainEvent;
use Doctrine\ORM\EntityManager;
use RuntimeException;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(
name: 'codely:domain-events:generate-from-mutations',
description: 'Publish domain events from mutations',
)]
final class PublishDomainEventsFromMutationsCommand extends Command
{
private array $transformers;

public function __construct(
private readonly EntityManager $entityManager,
private readonly EventBus $eventBus
) {
parent::__construct();

$this->transformers = [
'courses' => [
DatabaseMutationAction::INSERT->value => DatabaseMutationToCourseCreatedDomainEvent::class,
DatabaseMutationAction::UPDATE->value => null,
DatabaseMutationAction::DELETE->value => null,
],
];
}

protected function configure(): void
{
$this->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of mutations to process');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$totalMutations = (int) $input->getArgument('quantity');

$this->entityManager->wrapInTransaction(function (EntityManager $entityManager) use ($totalMutations) {
$mutations = $entityManager->getConnection()
->executeQuery("SELECT * FROM mutations ORDER BY id ASC LIMIT $totalMutations FOR UPDATE")
->fetchAllAssociative();

foreach ($mutations as $mutation) {
$transformer = $this->findTransformer($mutation['table_name'], $mutation['operation']);

if ($transformer === null) {
echo sprintf("Ignoring %s %s\n", $mutation['table_name'], $mutation['operation']);
continue;
}

$domainEvents = $transformer->transform($mutation);

$this->eventBus->publish(...$domainEvents);
}

$entityManager->getConnection()->executeStatement(
sprintf('DELETE FROM mutations WHERE id IN (%s)', implode(',', array_column($mutations, 'id')))
);
});

return 0;
}

private function findTransformer(string $tableName, string $operation): ?DatabaseMutationToDomainEvent
{
if (!array_key_exists($tableName, $this->transformers) && array_key_exists(
$operation,
$this->transformers[$tableName]
)) {
throw new RuntimeException("Transformer not found for table $tableName and operation $operation");
}

/** @var class-string<DatabaseMutationToDomainEvent>|null $class */
$class = $this->transformers[$tableName][$operation];

return $class ? new $class() : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
namespace CodelyTv\Apps\Mooc\Backend\Command\DomainEvents\RabbitMq;

use CodelyTv\Shared\Infrastructure\Bus\Event\RabbitMq\RabbitMqConfigurer;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Traversable;

#[AsCommand(
name: 'codely:domain-events:rabbitmq:configure',
description: 'Configure the RabbitMQ to allow publish & consume domain events',
)]
final class ConfigureRabbitMqCommand extends Command
{
protected static $defaultName = 'codelytv:domain-events:rabbitmq:configure';

public function __construct(
private readonly RabbitMqConfigurer $configurer,
private readonly string $exchangeName,
Expand All @@ -22,11 +25,6 @@ public function __construct(
parent::__construct();
}

protected function configure(): void
{
$this->setDescription('Configure the RabbitMQ to allow publish & consume domain events');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->configurer->configure($this->exchangeName, ...iterator_to_array($this->subscribers));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@
use CodelyTv\Shared\Infrastructure\Bus\Event\DomainEventSubscriberLocator;
use CodelyTv\Shared\Infrastructure\Bus\Event\RabbitMq\RabbitMqDomainEventsConsumer;
use CodelyTv\Shared\Infrastructure\Doctrine\DatabaseConnections;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

use function Lambdish\Phunctional\repeat;

#[AsCommand(
name: 'codely:domain-events:rabbitmq:consume',
description: 'Consume domain events from the RabbitMQ',
)]
final class ConsumeRabbitMqDomainEventsCommand extends Command
{
protected static $defaultName = 'codelytv:domain-events:rabbitmq:consume';

public function __construct(
private readonly RabbitMqDomainEventsConsumer $consumer,
private readonly DatabaseConnections $connections,
Expand All @@ -29,7 +32,6 @@ public function __construct(
protected function configure(): void
{
$this
->setDescription('Consume domain events from the RabbitMQ')
->addArgument('queue', InputArgument::REQUIRED, 'Queue name')
->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of events to process');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,23 @@
use CodelyTv\Shared\Domain\Bus\Event\DomainEventSubscriber;
use CodelyTv\Shared\Infrastructure\Bus\Event\DomainEventSubscriberLocator;
use CodelyTv\Shared\Infrastructure\Bus\Event\RabbitMq\RabbitMqQueueNameFormatter;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

use function Lambdish\Phunctional\each;

#[AsCommand(
name: 'codely:domain-events:rabbitmq:generate-supervisor-files',
description: 'Generate the supervisor configuration for every RabbitMQ subscriber',
)]
final class GenerateSupervisorRabbitMqConsumerFilesCommand extends Command
{
private const EVENTS_TO_PROCESS_AT_TIME = 200;
private const NUMBERS_OF_PROCESSES_PER_SUBSCRIBER = 1;
private const SUPERVISOR_PATH = __DIR__ . '/../../../../build/supervisor';
protected static $defaultName = 'codelytv:domain-events:rabbitmq:generate-supervisor-files';

public function __construct(private readonly DomainEventSubscriberLocator $locator)
{
Expand All @@ -28,9 +32,7 @@ public function __construct(private readonly DomainEventSubscriberLocator $locat

protected function configure(): void
{
$this
->setDescription('Generate the supervisor configuration for every RabbitMQ subscriber')
->addArgument('command-path', InputArgument::OPTIONAL, 'Path on this is gonna be deployed', '/var/www');
$this->addArgument('command-path', InputArgument::OPTIONAL, 'Path on this is gonna be deployed', '/var/www');
}

protected function execute(InputInterface $input, OutputInterface $output): int
Expand Down Expand Up @@ -68,7 +70,7 @@ private function template(): string
{
return <<<EOF
[program:codelytv_{queue_name}]
command = {path}/apps/mooc/backend/bin/console codelytv:domain-events:rabbitmq:consume --env=prod {queue_name} {events_to_process}
command = {path}/apps/mooc/backend/bin/console codely:domain-events:rabbitmq:consume --env=prod {queue_name} {events_to_process}
process_name = %(program_name)s_%(process_num)02d
numprocs = {processes}
startsecs = 1
Expand Down
2 changes: 1 addition & 1 deletion apps/mooc/backend/src/MoocBackendKernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public function getProjectDir(): string
protected function configureContainer(ContainerBuilder $container, LoaderInterface $loader): void
{
$container->addResource(new FileResource($this->getProjectDir() . '/config/bundles.php'));
$container->setParameter('container.dumper.inline_class_loader', true);
$container->setParameter('.container.dumper.inline_class_loader', true);
$confDir = $this->getProjectDir() . '/config';

$loader->load($confDir . '/services' . self::CONFIG_EXTS, 'glob');
Expand Down
32 changes: 14 additions & 18 deletions ecs.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,20 @@
use Symplify\EasyCodingStandard\Config\ECSConfig;

return function (ECSConfig $ecsConfig): void {
$ecsConfig->paths([
__DIR__ . '/apps',
__DIR__ . '/src',
__DIR__ . '/tests',
]);
$ecsConfig->paths([__DIR__ . '/apps', __DIR__ . '/src', __DIR__ . '/tests', ]);

$ecsConfig->sets([CodingStyle::DEFAULT]);
$ecsConfig->sets([CodingStyle::DEFAULT]);

$ecsConfig->skip([
FinalClassFixer::class => [
__DIR__ . '/apps/backoffice/backend/src/BackofficeBackendKernel.php',
__DIR__ . '/apps/backoffice/frontend/src/BackofficeFrontendKernel.php',
__DIR__ . '/apps/mooc/backend/src/MoocBackendKernel.php',
__DIR__ . '/src/Shared/Infrastructure/Bus/Event/InMemory/InMemorySymfonyEventBus.php',
],
__DIR__ . '/apps/backoffice/backend/var',
__DIR__ . '/apps/backoffice/frontend/var',
__DIR__ . '/apps/mooc/backend/var',
__DIR__ . '/apps/mooc/frontend/var',
]);
$ecsConfig->skip([
FinalClassFixer::class => [
__DIR__ . '/apps/backoffice/backend/src/BackofficeBackendKernel.php',
__DIR__ . '/apps/backoffice/frontend/src/BackofficeFrontendKernel.php',
__DIR__ . '/apps/mooc/backend/src/MoocBackendKernel.php',
__DIR__ . '/src/Shared/Infrastructure/Bus/Event/InMemory/InMemorySymfonyEventBus.php',
],
__DIR__ . '/apps/backoffice/backend/var',
__DIR__ . '/apps/backoffice/frontend/var',
__DIR__ . '/apps/mooc/backend/var',
__DIR__ . '/apps/mooc/frontend/var',
]);
};
Loading