Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUGFIX: Ensure users content stream is never left closed after publication #5342

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
90f2b8f
BUGFIX: Ensure all events are published BEFORE catchup
mhsdesign Nov 4, 2024
6fc43ab
Merge remote-tracking branch 'origin/9.0' into bugfix/publishing-ensu…
mhsdesign Nov 9, 2024
4352b2e
BUGFIX: Fix reopen content stream if base workspace was written to du…
mhsdesign Nov 9, 2024
18072f6
TASK: Fix parallel tests by ensuring only one is run at time
mhsdesign Nov 9, 2024
9a6127b
TASK: Simplify code and remove reopen cs logic into `publishWorkspace`
mhsdesign Nov 9, 2024
cb34618
TASK: Only fetch content stream once for constraint checks
mhsdesign Nov 9, 2024
0273e32
TASK: Adjust .composer json
mhsdesign Nov 9, 2024
77778f9
TASK: Do not send `$commitResult` to generator but calculate expected…
mhsdesign Nov 9, 2024
7b922bf
TASK: Inline now simplified `YieldedEventsToPublish` virtual type again
mhsdesign Nov 9, 2024
d27f83f
TASK: Wrap rebaseable command extraction into `finally` block to ensu…
mhsdesign Nov 9, 2024
de7895e
TASK: Close content stream a bit later instead of having to reopen in…
mhsdesign Nov 9, 2024
d290047
TASK: Add proper docs to `EventPersister`
mhsdesign Nov 9, 2024
8e48e7e
TASK: Adjust naming of `removeContentStreamWithoutConstraintChecks`
mhsdesign Nov 9, 2024
59fa2e3
TASK: Improve assertions of WorkspacePublicationDuringWritingTest
mhsdesign Nov 10, 2024
48e09cb
TASK: Assert that in WorkspaceWritingDuringRebaseTest that the worksp…
mhsdesign Nov 10, 2024
e12c641
TASK: Naming things and suggestion from code review :)
mhsdesign Nov 10, 2024
b857367
Merge remote-tracking branch 'origin/9.0' into bugfix/publishing-ensu…
mhsdesign Nov 12, 2024
f883e65
TASK: Fix tests after bastis command test overhaul
mhsdesign Nov 12, 2024
dfd4573
TASK: Recorrect naming of method again (my code editor seems to misbe…
mhsdesign Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Implementation Detail of {@see ContentRepository::handle}, which does the command dispatching to the different
* {@see CommandHandlerInterface} implementation.
*
* @phpstan-import-type YieldedEventsToPublish from CommandHandlerInterface
* @internal
*/
final readonly class CommandBus
Expand All @@ -29,7 +30,10 @@ public function __construct(
}

/**
* @return EventsToPublish|\Generator<int, EventsToPublish>
* The handler only calculate which events they want to have published,
* but do not do the publishing themselves
*
* @return EventsToPublish|YieldedEventsToPublish
mhsdesign marked this conversation as resolved.
Show resolved Hide resolved
*/
public function handle(CommandInterface $command): EventsToPublish|\Generator
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
namespace Neos\ContentRepository\Core\CommandHandler;

use Neos\ContentRepository\Core\EventStore\EventsToPublish;
use Neos\EventStore\Model\EventStore\CommitResult;

/**
* Common interface for all Content Repository command handlers
*
* The {@see CommandHandlingDependencies} are available during handling to do soft-constraint checks
*
* @phpstan-type YieldedEventsToPublish \Generator<int, EventsToPublish, CommitResult|null, void>
* @internal no public API, because commands are no extension points of the CR
*/
interface CommandHandlerInterface
Expand All @@ -23,7 +25,7 @@ public function canHandle(CommandInterface $command): bool;
* For the case of the workspace command handler that need to publish to many streams and "close" the content-stream directly,
* it's allowed to yield the events to interact with the control flow of event publishing.
*
* @return EventsToPublish|\Generator<int, EventsToPublish>
* @return EventsToPublish|YieldedEventsToPublish
mhsdesign marked this conversation as resolved.
Show resolved Hide resolved
*/
public function handle(CommandInterface $command, CommandHandlingDependencies $commandHandlingDependencies): EventsToPublish|\Generator;
}
40 changes: 28 additions & 12 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,35 @@ public function __construct(
*/
public function handle(CommandInterface $command): void
{
// the commands only calculate which events they want to have published, but do not do the
// publishing themselves
$eventsToPublishOrGenerator = $this->commandBus->handle($command);

if ($eventsToPublishOrGenerator instanceof EventsToPublish) {
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($eventsToPublishOrGenerator);
$this->eventPersister->publishEvents($this, $eventsToPublish);
} else {
foreach ($eventsToPublishOrGenerator as $eventsToPublish) {
assert($eventsToPublish instanceof EventsToPublish); // just for the ide
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($eventsToPublish);
$this->eventPersister->publishEvents($this, $eventsToPublish);
$toPublish = $this->commandBus->handle($command);

if ($toPublish instanceof EventsToPublish) {
// simple case
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($toPublish);
if ($eventsToPublish->events->isEmpty()) {
return;
}
mhsdesign marked this conversation as resolved.
Show resolved Hide resolved
$this->eventPersister->publishWithoutCatchup($eventsToPublish);
$this->catchupProjections();
return;
}

// control-flow aware command handling via generator
try {
$yieldedEventsToPublish = $toPublish->current();
kitsunet marked this conversation as resolved.
Show resolved Hide resolved
while ($yieldedEventsToPublish !== null) {
if ($yieldedEventsToPublish->events->isEmpty()) {
$yieldedEventsToPublish = $toPublish->send(null);
continue;
}
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($yieldedEventsToPublish);
$commitResult = $this->eventPersister->publishWithoutCatchup($eventsToPublish);
$yieldedEventsToPublish = $toPublish->send($commitResult);
}
} finally {
// We always NEED to catchup even if there was an unexpected ConcurrencyException to make sure previous commits are handled.
// Technically it would be acceptable for the catchup to fail here (due to hook errors) because all the events are already persisted.
$this->catchupProjections();
}
}

Expand Down
14 changes: 11 additions & 3 deletions Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Events;
use Neos\EventStore\Model\EventStore\CommitResult;

/**
* Internal service to persist {@see EventInterface} with the proper normalization, and triggering the
Expand All @@ -31,15 +32,22 @@ public function publishEvents(ContentRepository $contentRepository, EventsToPubl
if ($eventsToPublish->events->isEmpty()) {
return;
}
$this->publishWithoutCatchup($eventsToPublish);
$contentRepository->catchUpProjections();
}

/**
* @throws ConcurrencyException in case the expectedVersion does not match
*/
public function publishWithoutCatchup(EventsToPublish $eventsToPublish): CommitResult
bwaidelich marked this conversation as resolved.
Show resolved Hide resolved
{
$normalizedEvents = Events::fromArray(
$eventsToPublish->events->map($this->eventNormalizer->normalize(...))
);
$this->eventStore->commit(
return $this->eventStore->commit(
$eventsToPublish->streamName,
$normalizedEvents,
$eventsToPublish->expectedVersion
);

$contentRepository->catchUpProjections();
}
}
113 changes: 14 additions & 99 deletions Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,55 +9,25 @@
use Neos\ContentRepository\Core\EventStore\EventsToPublish;
use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasClosed;
use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasReopened;
use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated;
use Neos\ContentRepository\Core\Feature\ContentStreamForking\Event\ContentStreamWasForked;
use Neos\ContentRepository\Core\Feature\ContentStreamRemoval\Event\ContentStreamWasRemoved;
use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamAlreadyExists;
use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamDoesNotExistYet;
use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamIsClosed;
use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamIsNotClosed;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\EventStore\Model\Event\Version;
use Neos\EventStore\Model\EventStream\ExpectedVersion;

trait ContentStreamHandling
{
/**
* @param ContentStreamId $contentStreamId The id of the content stream to create
* @throws ContentStreamAlreadyExists
* @phpstan-pure this method is pure, to persist the events they must be handled outside
*/
private function createContentStream(
ContentStreamId $contentStreamId,
CommandHandlingDependencies $commandHandlingDependencies,
): EventsToPublish {
$this->requireContentStreamToNotExistYet($contentStreamId, $commandHandlingDependencies);
$streamName = ContentStreamEventStreamName::fromContentStreamId($contentStreamId)
->getEventStreamName();

return new EventsToPublish(
$streamName,
Events::with(
new ContentStreamWasCreated(
$contentStreamId,
)
),
ExpectedVersion::NO_STREAM()
);
}

/**
* @param ContentStreamId $contentStreamId The id of the content stream to close
* @param CommandHandlingDependencies $commandHandlingDependencies
* @return EventsToPublish
* @phpstan-pure this method is pure, to persist the events they must be handled outside
*/
private function closeContentStream(
ContentStreamId $contentStreamId,
CommandHandlingDependencies $commandHandlingDependencies,
Version $contentStreamVersion,
): EventsToPublish {
$this->requireContentStreamToExist($contentStreamId, $commandHandlingDependencies);
$expectedVersion = $this->getExpectedVersionOfContentStream($contentStreamId, $commandHandlingDependencies);
$this->requireContentStreamToNotBeClosed($contentStreamId, $commandHandlingDependencies);
$streamName = ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName();

return new EventsToPublish(
Expand All @@ -67,29 +37,26 @@ private function closeContentStream(
$contentStreamId,
),
),
$expectedVersion
ExpectedVersion::fromVersion($contentStreamVersion)
);
}

/**
* @param ContentStreamId $contentStreamId The id of the content stream to reopen
* @phpstan-pure this method is pure, to persist the events they must be handled outside
*/
private function reopenContentStream(
private function reopenContentStreamWithoutConstraints(
mhsdesign marked this conversation as resolved.
Show resolved Hide resolved
ContentStreamId $contentStreamId,
CommandHandlingDependencies $commandHandlingDependencies,
): EventsToPublish {
$this->requireContentStreamToExist($contentStreamId, $commandHandlingDependencies);
$this->requireContentStreamToBeClosed($contentStreamId, $commandHandlingDependencies);
$streamName = ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName();

return new EventsToPublish(
$streamName,
ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName(),
Events::with(
new ContentStreamWasReopened(
$contentStreamId
),
),
// We operate here without constraints on purpose to ensure this can be commited.
//Constraints have been checked beforehand and its expected that the content stream is closed.
ExpectedVersion::ANY()
);
}
Expand All @@ -104,19 +71,10 @@ private function reopenContentStream(
private function forkContentStream(
ContentStreamId $newContentStreamId,
ContentStreamId $sourceContentStreamId,
CommandHandlingDependencies $commandHandlingDependencies
Version $sourceContentStreamVersion
): EventsToPublish {
$this->requireContentStreamToExist($sourceContentStreamId, $commandHandlingDependencies);
$this->requireContentStreamToNotBeClosed($sourceContentStreamId, $commandHandlingDependencies);
$this->requireContentStreamToNotExistYet($newContentStreamId, $commandHandlingDependencies);

$sourceContentStreamVersion = $commandHandlingDependencies->getContentStreamVersion($sourceContentStreamId);

$streamName = ContentStreamEventStreamName::fromContentStreamId($newContentStreamId)
->getEventStreamName();

return new EventsToPublish(
$streamName,
ContentStreamEventStreamName::fromContentStreamId($newContentStreamId)->getEventStreamName(),
Events::with(
new ContentStreamWasForked(
$newContentStreamId,
Expand All @@ -133,25 +91,19 @@ private function forkContentStream(
* @param ContentStreamId $contentStreamId The id of the content stream to remove
* @phpstan-pure this method is pure, to persist the events they must be handled outside
*/
private function removeContentStream(
private function removeContentStreamWithoutConstraints(
mhsdesign marked this conversation as resolved.
Show resolved Hide resolved
ContentStreamId $contentStreamId,
CommandHandlingDependencies $commandHandlingDependencies
): EventsToPublish {
$this->requireContentStreamToExist($contentStreamId, $commandHandlingDependencies);
$expectedVersion = $this->getExpectedVersionOfContentStream($contentStreamId, $commandHandlingDependencies);

$streamName = ContentStreamEventStreamName::fromContentStreamId(
$contentStreamId
)->getEventStreamName();

return new EventsToPublish(
$streamName,
ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName(),
Events::with(
new ContentStreamWasRemoved(
$contentStreamId,
),
),
$expectedVersion
// We operate here without constraints on purpose to ensure this can be commited.
// Constraints have been checked beforehand and its expected that the content stream is closed.
ExpectedVersion::ANY()
);
}

Expand All @@ -172,23 +124,6 @@ private function requireContentStreamToNotExistYet(
}
}

/**
* @param ContentStreamId $contentStreamId
* @param CommandHandlingDependencies $commandHandlingDependencies
* @throws ContentStreamDoesNotExistYet
*/
private function requireContentStreamToExist(
ContentStreamId $contentStreamId,
CommandHandlingDependencies $commandHandlingDependencies
): void {
if (!$commandHandlingDependencies->contentStreamExists($contentStreamId)) {
throw new ContentStreamDoesNotExistYet(
'Content stream "' . $contentStreamId->value . '" does not exist yet.',
1521386692
);
}
}

private function requireContentStreamToNotBeClosed(
ContentStreamId $contentStreamId,
CommandHandlingDependencies $commandHandlingDependencies
Expand All @@ -200,24 +135,4 @@ private function requireContentStreamToNotBeClosed(
);
}
}

private function requireContentStreamToBeClosed(
ContentStreamId $contentStreamId,
CommandHandlingDependencies $commandHandlingDependencies
): void {
if (!$commandHandlingDependencies->isContentStreamClosed($contentStreamId)) {
throw new ContentStreamIsNotClosed(
'Content stream "' . $contentStreamId->value . '" is not closed.',
1710405911
);
}
}

private function getExpectedVersionOfContentStream(
ContentStreamId $contentStreamId,
CommandHandlingDependencies $commandHandlingDependencies
): ExpectedVersion {
$version = $commandHandlingDependencies->getContentStreamVersion($contentStreamId);
return ExpectedVersion::fromVersion($version);
}
}
Loading
Loading