Skip to content

Commit

Permalink
Merge pull request #5342 from mhsdesign/bugfix/publishing-ensure-cont…
Browse files Browse the repository at this point in the history
…entstream-not-closed

BUGFIX: Ensure users content stream is never left closed after publication
  • Loading branch information
kitsunet authored Nov 12, 2024
2 parents 7a5a7c4 + dfd4573 commit 80d8750
Show file tree
Hide file tree
Showing 14 changed files with 564 additions and 296 deletions.
3 changes: 2 additions & 1 deletion .composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
"../../bin/phpunit --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/UnitTests.xml Neos.ContentRepository.Core/Tests/Unit",
"../../bin/phpunit --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/UnitTests.xml Neos.ContentRepositoryRegistry/Tests/Unit"
],
"test:paratest-cli": "../../bin/paratest --debug -v --functional --processes 2 --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/FunctionalTests.xml",
"test:parallel": [
"FLOW_CONTEXT=Testing/Behat ../../bin/paratest --debug -v --functional --group parallel --processes 2 --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/FunctionalTests.xml Neos.ContentRepository.BehavioralTests/Tests/Functional/Feature/WorkspacePublication/WorkspaceWritingDuringPublication.php"
"for f in Neos.ContentRepository.BehavioralTests/Tests/Parallel/**/*Test.php; do composer test:paratest-cli $f; done"
],
"test:behat-cli": "../../bin/behat -f progress --strict --no-interaction",
"test:behavioral": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ final protected function awaitFile(string $filename): void
}
}

final protected function awaitSharedLock($resource, int $maximumCycles = 2000): void
final protected function awaitFileRemoval(string $filename): void
{
$waiting = 0;
while (!flock($resource, LOCK_SH)) {
usleep(10000);
while (!is_file($filename)) {
usleep(1000);
$waiting++;
if ($waiting > $maximumCycles) {
throw new \Exception('timeout while waiting on shared lock');
clearstatcache(true, $filename);
if ($waiting > 60000) {
throw new \Exception('timeout while waiting on file ' . $filename);
}
}
}
Expand All @@ -82,6 +83,11 @@ final protected function setUpContentRepository(

final protected function log(string $message): void
{
file_put_contents(self::LOGGING_PATH, substr($this::class, strrpos($this::class, '\\') + 1) . ': ' . getmypid() . ': ' . $message . PHP_EOL, FILE_APPEND);
file_put_contents(self::LOGGING_PATH, self::shortClassName($this::class) . ': ' . getmypid() . ': ' . $message . PHP_EOL, FILE_APPEND);
}

final protected static function shortClassName(string $className): string
{
return substr($className, strrpos($className, '\\') + 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
<?php

/*
* This file is part of the Neos.ContentRepository.BehavioralTests package.
*
* (c) Contributors of the Neos Project - www.neos.io
*
* This package is Open Source Software. For the full copyright and license
* information, please view the LICENSE file which was distributed with this
* source code.
*/

declare(strict_types=1);

namespace Neos\ContentRepository\BehavioralTests\Tests\Parallel\WorkspacePublicationDuringWriting;

use Neos\ContentRepository\BehavioralTests\Tests\Parallel\AbstractParallelTestCase;
use Neos\ContentRepository\BehavioralTests\TestSuite\Behavior\GherkinPyStringNodeBasedNodeTypeManagerFactory;
use Neos\ContentRepository\BehavioralTests\TestSuite\Behavior\GherkinTableNodeBasedContentDimensionSourceFactory;
use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\Dimension\ContentDimension;
use Neos\ContentRepository\Core\Dimension\ContentDimensionId;
use Neos\ContentRepository\Core\Dimension\ContentDimensionSourceInterface;
use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePoint;
use Neos\ContentRepository\Core\DimensionSpace\OriginDimensionSpacePoint;
use Neos\ContentRepository\Core\Feature\NodeCreation\Command\CreateNodeAggregateWithNode;
use Neos\ContentRepository\Core\Feature\NodeModification\Command\SetNodeProperties;
use Neos\ContentRepository\Core\Feature\NodeModification\Dto\PropertyValuesToWrite;
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Command\CreateRootNodeAggregateWithNode;
use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Command\CreateRootWorkspace;
use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Command\CreateWorkspace;
use Neos\ContentRepository\Core\Feature\WorkspacePublication\Command\PublishWorkspace;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
use Neos\ContentRepository\Core\Projection\ContentGraph\VisibilityConstraints;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;
use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamIsClosed;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\Flow\ObjectManagement\ObjectManagerInterface;
use PHPUnit\Framework\Assert;

class WorkspacePublicationDuringWritingTest extends AbstractParallelTestCase
{
private const SETUP_LOCK_PATH = __DIR__ . '/setup-lock';
private const WRITING_IS_RUNNING_FLAG_PATH = __DIR__ . '/write-is-running-flag';

private ContentRepository $contentRepository;

protected ObjectManagerInterface $objectManager;

public function setUp(): void
{
parent::setUp();
$this->log('------ process started ------');
// todo refrain from Gherkin naming here and make fakes easier to use: https://github.com/neos/neos-development-collection/pull/5346
GherkinTableNodeBasedContentDimensionSourceFactory::$contentDimensionsToUse = new class implements ContentDimensionSourceInterface
{
public function getDimension(ContentDimensionId $dimensionId): ?ContentDimension
{
return null;
}
public function getContentDimensionsOrderedByPriority(): array
{
return [];
}
};
// todo refrain from Gherkin naming here and make fakes easier to use: https://github.com/neos/neos-development-collection/pull/5346
GherkinPyStringNodeBasedNodeTypeManagerFactory::$nodeTypesToUse = new NodeTypeManager(
fn (): array => [
'Neos.ContentRepository:Root' => [],
'Neos.ContentRepository.Testing:Document' => [
'properties' => [
'title' => [
'type' => 'string'
]
]
]
]
);

$setupLockResource = fopen(self::SETUP_LOCK_PATH, 'w+');

$exclusiveNonBlockingLockResult = flock($setupLockResource, LOCK_EX | LOCK_NB);
if ($exclusiveNonBlockingLockResult === false) {
$this->log('waiting for setup');
if (!flock($setupLockResource, LOCK_SH)) {
throw new \RuntimeException('failed to acquire blocking shared lock');
}
$this->contentRepository = $this->contentRepositoryRegistry
->get(ContentRepositoryId::fromString('test_parallel'));
$this->log('wait for setup finished');
return;
}

$this->log('setup started');
$contentRepository = $this->setUpContentRepository(ContentRepositoryId::fromString('test_parallel'));

$origin = OriginDimensionSpacePoint::createWithoutDimensions();
$contentRepository->handle(CreateRootWorkspace::create(
WorkspaceName::forLive(),
ContentStreamId::fromString('live-cs-id')
));
$contentRepository->handle(CreateRootNodeAggregateWithNode::create(
WorkspaceName::forLive(),
NodeAggregateId::fromString('lady-eleonode-rootford'),
NodeTypeName::fromString(NodeTypeName::ROOT_NODE_TYPE_NAME)
));
$contentRepository->handle(CreateNodeAggregateWithNode::create(
WorkspaceName::forLive(),
NodeAggregateId::fromString('nody-mc-nodeface'),
NodeTypeName::fromString('Neos.ContentRepository.Testing:Document'),
$origin,
NodeAggregateId::fromString('lady-eleonode-rootford'),
initialPropertyValues: PropertyValuesToWrite::fromArray([
'title' => 'title-original'
])
));
$contentRepository->handle(CreateWorkspace::create(
WorkspaceName::fromString('user-test'),
WorkspaceName::forLive(),
ContentStreamId::fromString('user-cs-id')
));
for ($i = 0; $i <= 5000; $i++) {
$contentRepository->handle(CreateNodeAggregateWithNode::create(
WorkspaceName::fromString('user-test'),
NodeAggregateId::fromString('nody-mc-nodeface-' . $i),
NodeTypeName::fromString('Neos.ContentRepository.Testing:Document'),
$origin,
NodeAggregateId::fromString('lady-eleonode-rootford'),
initialPropertyValues: PropertyValuesToWrite::fromArray([
'title' => 'title'
])
));
}
$this->contentRepository = $contentRepository;

if (!flock($setupLockResource, LOCK_UN)) {
throw new \RuntimeException('failed to release setup lock');
}

$this->log('setup finished');
}

/**
* @test
* @group parallel
*/
public function whileANodesArWrittenOnLive(): void
{
$this->log('writing started');

touch(self::WRITING_IS_RUNNING_FLAG_PATH);

try {
for ($i = 0; $i <= 50; $i++) {
$this->contentRepository->handle(
SetNodeProperties::create(
WorkspaceName::forLive(),
NodeAggregateId::fromString('nody-mc-nodeface'),
OriginDimensionSpacePoint::createWithoutDimensions(),
PropertyValuesToWrite::fromArray([
'title' => 'changed-title-' . $i
])
)
);
}
} finally {
unlink(self::WRITING_IS_RUNNING_FLAG_PATH);
}

$this->log('writing finished');
Assert::assertTrue(true, 'No exception was thrown ;)');
}

/**
* @test
* @group parallel
*/
public function thenConcurrentPublishLeadsToException(): void
{
if (!is_file(self::WRITING_IS_RUNNING_FLAG_PATH)) {
$this->log('waiting to publish');

$this->awaitFile(self::WRITING_IS_RUNNING_FLAG_PATH);
// If write is the process that does the (slowish) setup, and then waits for the rebase to start,
// We give the CR some time to close the content stream
// TODO find another way than to randomly wait!!!
// The problem is, if we dont sleep it happens often that the modification works only then the rebase is startet _really_
// Doing the modification several times in hope that the second one fails will likely just stop the rebase thread as it cannot close
usleep(10000);
}

$this->log('publish started');


/*
// NOTE, can also be tested with PartialPublish, or PartialPublish leading to a full publish, but this test only allows one at time :)
$nodesForAFullPublish = 5000;
$nodesForAPartialPublish = $nodesForAFullPublish - 1;
$nodeIdToPublish = [];
for ($i = 0; $i <= $nodesForAPartialPublish; $i++) {
$nodeIdToPublish[] = new NodeIdToPublishOrDiscard(
NodeAggregateId::fromString('nody-mc-nodeface-' . $i), // see nodes created above
DimensionSpacePoint::createWithoutDimensions()
);
}
$this->contentRepository->handle(PublishIndividualNodesFromWorkspace::create(
WorkspaceName::fromString('user-test'),
NodeIdsToPublishOrDiscard::create(...$nodeIdToPublish)
));
*/

$actualException = null;
try {
$this->contentRepository->handle(PublishWorkspace::create(
WorkspaceName::fromString('user-test')
));
} catch (\Exception $thrownException) {
$actualException = $thrownException;
$this->log(sprintf('Got exception %s: %s', self::shortClassName($actualException::class), $actualException->getMessage()));
}

$this->log('publish finished');

if ($actualException === null) {
Assert::fail(sprintf('No exception was thrown'));
}

Assert::assertInstanceOf(ConcurrencyException::class, $actualException);

$this->awaitFileRemoval(self::WRITING_IS_RUNNING_FLAG_PATH);

// writing to user works!!!
try {
$this->contentRepository->handle(
SetNodeProperties::create(
WorkspaceName::fromString('user-test'),
NodeAggregateId::fromString('nody-mc-nodeface'),
OriginDimensionSpacePoint::createWithoutDimensions(),
PropertyValuesToWrite::fromArray([
'title' => 'written-after-failed-publish'
])
)
);
} catch (ContentStreamIsClosed $exception) {
Assert::fail(sprintf('Workspace that failed to be publish cannot be written: %s', $exception->getMessage()));
}

$node = $this->contentRepository->getContentGraph(WorkspaceName::fromString('user-test'))
->getSubgraph(DimensionSpacePoint::createWithoutDimensions(), VisibilityConstraints::withoutRestrictions())
->findNodeById(NodeAggregateId::fromString('nody-mc-nodeface'));

Assert::assertSame('written-after-failed-publish', $node?->getProperty('title'));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
use Neos\Flow\ObjectManagement\ObjectManagerInterface;
use PHPUnit\Framework\Assert;

class WorkspaceWritingDuringRebase extends AbstractParallelTestCase
class WorkspaceWritingDuringRebaseTest extends AbstractParallelTestCase

{
private const SETUP_LOCK_PATH = __DIR__ . '/setup-lock';
Expand Down Expand Up @@ -88,7 +88,9 @@ public function getContentDimensionsOrderedByPriority(): array
$exclusiveNonBlockingLockResult = flock($setupLockResource, LOCK_EX | LOCK_NB);
if ($exclusiveNonBlockingLockResult === false) {
$this->log('waiting for setup');
$this->awaitSharedLock($setupLockResource);
if (!flock($setupLockResource, LOCK_SH)) {
throw new \RuntimeException('failed to acquire blocking shared lock');
}
$this->contentRepository = $this->contentRepositoryRegistry
->get(ContentRepositoryId::fromString('test_parallel'));
$this->log('wait for setup finished');
Expand Down Expand Up @@ -158,7 +160,7 @@ public function whileAWorkspaceIsBeingRebased(): void
try {
$this->contentRepository->handle(
RebaseWorkspace::create($workspaceName)
->withRebasedContentStreamId(ContentStreamId::create())
->withRebasedContentStreamId(ContentStreamId::fromString('user-cs-rebased'))
->withErrorHandlingStrategy(RebaseErrorHandlingStrategy::STRATEGY_FORCE));
} finally {
unlink(self::REBASE_IS_RUNNING_FLAG_PATH);
Expand Down Expand Up @@ -188,6 +190,11 @@ public function thenConcurrentCommandsLeadToAnException(): void

$this->log('write started');

$workspaceDuringRebase = $this->contentRepository->getContentGraph(WorkspaceName::fromString('user-test'));
Assert::assertSame('user-cs-id', $workspaceDuringRebase->getContentStreamId()->value,
'The parallel tests expects the workspace to still point to the original cs.'
);

$origin = OriginDimensionSpacePoint::createWithoutDimensions();
$actualException = null;
try {
Expand All @@ -201,6 +208,7 @@ public function thenConcurrentCommandsLeadToAnException(): void
));
} catch (\Exception $thrownException) {
$actualException = $thrownException;
$this->log(sprintf('Got exception %s: %s', self::shortClassName($actualException::class), $actualException->getMessage()));
}

$this->log('write finished');
Expand All @@ -215,7 +223,7 @@ public function thenConcurrentCommandsLeadToAnException(): void

Assert::assertThat($actualException, self::logicalOr(
self::isInstanceOf(ContentStreamIsClosed::class),
self::isInstanceOf(ConcurrencyException::class),
self::isInstanceOf(ConcurrencyException::class), // todo is only thrown theoretical? but not during tests here ...
));

Assert::assertSame('title-original', $node?->getProperty('title'));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public function __construct(
}

/**
* The handler only calculate which events they want to have published,
* but do not do the publishing themselves
*
* @return EventsToPublish|\Generator<int, EventsToPublish>
*/
public function handle(CommandInterface|RebasableToOtherWorkspaceInterface $command): EventsToPublish|\Generator
Expand Down
Loading

0 comments on commit 80d8750

Please sign in to comment.