Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Examine Persistence Actor Dispatcher timings #7081

Draft
wants to merge 46 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
adab09d
attempting to fix racy Akka.Persistence.TestKit.Tests
Aaronontheweb Jan 19, 2024
e1e3c02
Merge branch 'dev' into fix-AkkaPersistenceTestKitTests
Aaronontheweb Jan 19, 2024
185d5ad
added debug log
Aaronontheweb Jan 19, 2024
272b962
looking into some suspicious `await` calls inside the `AsyncWriteJour…
Aaronontheweb Jan 19, 2024
fa5bd37
Merge branch 'dev' into fix-AkkaPersistenceTestKitTests
Aaronontheweb Jan 23, 2024
7b8b538
minor code clean up
Aaronontheweb Jan 23, 2024
ef8fd68
`MemoryJournal` cleanup
Aaronontheweb Jan 23, 2024
caf93d7
added more robust external error logging
Aaronontheweb Jan 23, 2024
30897ff
disable sync thread dispatcher
Aaronontheweb Jan 23, 2024
fbb0980
Revert "disable sync thread dispatcher"
Aaronontheweb Jan 23, 2024
0178cd3
added debugging capabilities to `TestJournal`
Aaronontheweb Jan 23, 2024
7206cca
added debug logging to `CounterActor` specs
Aaronontheweb Jan 23, 2024
daf2c8c
added debug logging to `Bug4762FixSpec`
Aaronontheweb Jan 23, 2024
26108a7
added debug logging to `TestSnapshotStore`
Aaronontheweb Jan 23, 2024
852375f
attempting to fix some continuation mess inside the snapshot store
Aaronontheweb Jan 24, 2024
0de3cc9
fixed final cases
Aaronontheweb Jan 24, 2024
ef22f50
formatting
Aaronontheweb Jan 24, 2024
a33110a
fixed snapshot saving errors
Aaronontheweb Jan 24, 2024
b8a7245
enable DEBUG logging
Aaronontheweb Jan 24, 2024
4af443a
more debug logging
Aaronontheweb Jan 24, 2024
190e972
Merge branch 'dev' into fix-AkkaPersistenceTestKitTests
Aaronontheweb Jan 24, 2024
663a88d
more debug logging
Aaronontheweb Jan 25, 2024
8192cc8
Merge branch 'dev' into fix-AkkaPersistenceTestKitTests
Arkatufus Jan 31, 2024
a221b7e
Clean-up and modernize ReplayFilter
Arkatufus Jan 31, 2024
f963565
Fix configuration, make sure that default values are sensible
Arkatufus Jan 31, 2024
7788e72
Fix journal interceptor, make sure that Ask operation are short enoug…
Arkatufus Jan 31, 2024
aeb68c8
Turn on relpay filter debug mode
Arkatufus Jan 31, 2024
c777bc6
Merge branch 'fix-AkkaPersistenceTestKitTests' of github.com:Aaronont…
Arkatufus Jan 31, 2024
afc24df
Bump timeout, testing if recovery is delayed or permanently stuck
Arkatufus Jan 31, 2024
8f84ff5
Fix AsyncWriteJournal replay handler not closing over self
Arkatufus Feb 1, 2024
68e2192
Fix SnapshotStore snapshot load not closing over self
Arkatufus Feb 1, 2024
a7da6a6
Merge branch 'dev' into fix-AkkaPersistenceTestKitTests
Arkatufus Feb 1, 2024
5157e05
#7068 - Examining CircuitBreaker timings
Arkatufus Feb 1, 2024
40bcb35
Measure RecoveryPermitter
Arkatufus Feb 1, 2024
94de668
Investigate recovery timing
Arkatufus Feb 1, 2024
dbc4165
Try to pinpoint cycle stealing/starvation problem
Arkatufus Feb 5, 2024
260d1fa
Merge branch 'dev' into fix-AkkaPersistenceTestKitTest-Greg
Arkatufus Feb 5, 2024
3da6e15
fix wrong dispatcher name
Arkatufus Feb 5, 2024
2860d80
Merge branch 'fix-AkkaPersistenceTestKitTest-Greg' of github.com:Arka…
Arkatufus Feb 5, 2024
23992bb
fix AsyncQueue deadlock
Arkatufus Feb 5, 2024
0ac0957
Revert "fix AsyncQueue deadlock"
Arkatufus Feb 5, 2024
f65914d
Disable AsyncQueue
Arkatufus Feb 5, 2024
a464bb3
Revert dispatcher changes
Arkatufus Feb 5, 2024
97d7dd6
Revert dispatcher changes revert
Arkatufus Feb 6, 2024
77a3c3d
Revert AsyncQueue changes
Arkatufus Feb 6, 2024
91a1d52
Merge branch 'dev' into fix-AkkaPersistenceTestKitTest-Greg
Aaronontheweb Mar 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions src/core/Akka.Persistence.TCK.Tests/LocalSnapshotStoreSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,15 @@ public void LocalSnapshotStore_can_snapshot_actors_with_PersistenceId_containing
ExpectMsg<SaveSnapshotSuccess>();

SnapshotStore.Tell(new LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, long.MaxValue), TestActor);
ExpectMsg<LoadSnapshotResult>(res =>
res.Snapshot.Snapshot.Equals("sample data")
&& res.Snapshot.Metadata.PersistenceId == pid
&& res.Snapshot.Metadata.SequenceNr == 1);
ExpectMsg<LoadSnapshotResult>(IsMessage);
bool IsMessage(LoadSnapshotResult res)
{
var result = res.Snapshot.Snapshot.Equals("sample data")
&& res.Snapshot.Metadata.PersistenceId == pid
&& res.Snapshot.Metadata.SequenceNr == 1;

return result;
}
}
}
}
Expand Down
28 changes: 24 additions & 4 deletions src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

using Akka.Configuration;
using Akka.Event;
using Xunit.Abstractions;

Expand Down Expand Up @@ -72,23 +73,42 @@ protected override void OnRecover(object message)
return;
}
}

protected override void PostStop()
{
_log.Info("Shutting down");
}

protected override void PreStart()
{
_log.Info("Starting up");
}
}

public class CounterActorTests : PersistenceTestKit
{
public CounterActorTests(ITestOutputHelper output) : base(output:output){}
// create a Config that enables debug mode on the TestJournal
private static readonly Config Config =
ConfigurationFactory.ParseString("""
akka.persistence.journal.test.debug = on
akka.persistence.snapshot-store.test.debug = on
""");

public CounterActorTests(ITestOutputHelper output) : base(Config, output:output){}

[Fact]
public Task CounterActor_internal_state_will_be_lost_if_underlying_persistence_store_is_not_available()
{
return WithJournalWrite(write => write.Fail(), async () =>
{
var counterProps = Props.Create(() => new CounterActor("test"));
var counterProps = Props.Create(() => new CounterActor("test"))
.WithDispatcher("akka.actor.internal-dispatcher");
var actor = ActorOf(counterProps, "counter");

Watch(actor);
Sys.Log.Info("Messaging actor");
await WatchAsync(actor);
actor.Tell("inc", TestActor);
await ExpectMsgAsync<Terminated>(TimeSpan.FromSeconds(3));
await ExpectTerminatedAsync(actor);

// need to restart actor
actor = ActorOf(counterProps, "counter1");
Expand Down
44 changes: 30 additions & 14 deletions src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Event;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -22,16 +24,26 @@ namespace Akka.Persistence.TestKit.Tests
/// </summary>
public class Bug4762FixSpec : PersistenceTestKit
{
public Bug4762FixSpec(ITestOutputHelper outputHelper) : base(output: outputHelper)
// create a Config that enables debug mode on the TestJournal
private static readonly Config Config =
ConfigurationFactory.ParseString("""
akka.loglevel = DEBUG
akka.persistence.journal.test.debug = on
akka.persistence.journal.test.replay-filter.debug = on
akka.persistence.snapshot-store.test.debug = on
""");

public Bug4762FixSpec(ITestOutputHelper outputHelper) : base(Config, output: outputHelper)
{

}

private class WriteMessage
{ }
{
}

private class TestEvent
{ }
{
}

private class TestActor2 : UntypedPersistentActor
{
Expand All @@ -48,24 +60,27 @@ public TestActor2(IActorRef probe)
protected override void OnCommand(object message)
{
_log.Info("Received command {0}", message);

switch (message)
{
case WriteMessage _:
var event1 = new TestEvent();
var event2 = new TestEvent();
var events = new List<TestEvent> { event1, event2 };
PersistAll(events, _ =>
{
_probe.Tell(Done.Instance);
});
PersistAll(events, _ => { _probe.Tell(Done.Instance); });
break;

default:
return;
}
}

protected override void PreStart()
{
_log.Info("Starting up and beginning recovery");
base.PreStart();
}

protected override void OnRecover(object message)
{
_log.Info("Received recover {0}", message);
Expand All @@ -79,17 +94,18 @@ public Task TestJournal_PersistAll_should_only_count_each_event_exceptions_once(
var probe = CreateTestProbe();
return WithJournalWrite(write => write.Pass(), async () =>
{
var actor = ActorOf(() => new TestActor2(probe));
Watch(actor);
var actor = Sys.ActorOf(
Props.Create(() => new TestActor2(probe))
.WithDispatcher("akka.actor.internal-dispatcher"), "test-actor");

var command = new WriteMessage();
actor.Tell(command, actor);

await probe.ExpectMsgAsync<RecoveryCompleted>();
await probe.ExpectMsgAsync<RecoveryCompleted>(10.Seconds());
await probe.ExpectMsgAsync<Done>();
await probe.ExpectMsgAsync<Done>();
await probe.ExpectNoMsgAsync(3000);
});
}
}
}
}
38 changes: 32 additions & 6 deletions src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using Akka.Actor.Setup;
using Akka.Configuration;
using Akka.Event;

namespace Akka.Persistence.TestKit
{
Expand Down Expand Up @@ -111,6 +112,11 @@ public async Task WithJournalRecovery(Func<JournalRecoveryBehavior, Task> behavi
await behaviorSelector(Journal.OnRecovery);
await execution();
}
catch (Exception ex)
{
Log.Error(ex, "Error during execution of WithJournalRecovery");
throw;
}
finally
{
await Journal.OnRecovery.Pass();
Expand All @@ -136,6 +142,11 @@ public async Task WithJournalWrite(Func<JournalWriteBehavior, Task> behaviorSele
await behaviorSelector(Journal.OnWrite);
await execution();
}
catch (Exception ex)
{
Log.Error(ex, "Error during execution of WithJournalWrite");
throw;
}
finally
{
await Journal.OnWrite.Pass();
Expand All @@ -157,7 +168,7 @@ public Task WithJournalRecovery(Func<JournalRecoveryBehavior, Task> behaviorSele
if (execution == null) throw new ArgumentNullException(nameof(execution));

execution();
return Task.FromResult(new object());
return Task.CompletedTask;
});

/// <summary>
Expand All @@ -175,7 +186,7 @@ public Task WithJournalWrite(Func<JournalWriteBehavior, Task> behaviorSelector,
if (execution == null) throw new ArgumentNullException(nameof(execution));

execution();
return Task.FromResult(new object());
return Task.CompletedTask;
});

/// <summary>
Expand All @@ -197,6 +208,11 @@ public async Task WithSnapshotSave(Func<SnapshotStoreSaveBehavior, Task> behavio
await behaviorSelector(Snapshots.OnSave);
await execution();
}
catch (Exception ex)
{
Log.Error(ex, "Error during execution of WithSnapshotSave");
throw;
}
finally
{
await Snapshots.OnSave.Pass();
Expand All @@ -222,6 +238,11 @@ public async Task WithSnapshotLoad(Func<SnapshotStoreLoadBehavior, Task> behavio
await behaviorSelector(Snapshots.OnLoad);
await execution();
}
catch (Exception ex)
{
Log.Error(ex, "Error during execution of WithSnapshotLoad");
throw;
}
finally
{
await Snapshots.OnLoad.Pass();
Expand All @@ -247,6 +268,11 @@ public async Task WithSnapshotDelete(Func<SnapshotStoreDeleteBehavior, Task> beh
await behaviorSelector(Snapshots.OnDelete);
await execution();
}
catch (Exception ex)
{
Log.Error(ex, "Error during execution of WithSnapshotDelete");
throw;
}
finally
{
await Snapshots.OnDelete.Pass();
Expand All @@ -268,7 +294,7 @@ public Task WithSnapshotSave(Func<SnapshotStoreSaveBehavior, Task> behaviorSelec
if (execution == null) throw new ArgumentNullException(nameof(execution));

execution();
return Task.FromResult(true);
return Task.CompletedTask;
});

/// <summary>
Expand All @@ -286,7 +312,7 @@ public Task WithSnapshotLoad(Func<SnapshotStoreLoadBehavior, Task> behaviorSelec
if (execution == null) throw new ArgumentNullException(nameof(execution));

execution();
return Task.FromResult(true);
return Task.CompletedTask;
});

/// <summary>
Expand All @@ -304,7 +330,7 @@ public Task WithSnapshotDelete(Func<SnapshotStoreDeleteBehavior, Task> behaviorS
if (execution == null) throw new ArgumentNullException(nameof(execution));

execution();
return Task.FromResult(true);
return Task.CompletedTask;
});

/// <summary>
Expand Down Expand Up @@ -348,7 +374,7 @@ private static Config GetConfig(Config customConfig)
{
var defaultConfig = ConfigurationFactory.FromResource<TestJournal>("Akka.Persistence.TestKit.config.conf");
if (customConfig == Config.Empty) return defaultConfig;
else return defaultConfig.SafeWithFallback(customConfig);
else return customConfig.WithFallback(defaultConfig);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ internal class Noop : IJournalInterceptor
{
public static readonly IJournalInterceptor Instance = new Noop();

public Task InterceptAsync(IPersistentRepresentation message) => Task.FromResult(true);
public Task InterceptAsync(IPersistentRepresentation message) => Task.CompletedTask;
}

internal class Failure : IJournalInterceptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal JournalRecoveryBehaviorSetter(IActorRef journal)
public Task SetInterceptorAsync(IJournalInterceptor interceptor)
=> _journal.Ask<TestJournal.Ack>(
new TestJournal.UseRecoveryInterceptor(interceptor),
TimeSpan.FromSeconds(3)
TimeSpan.FromSeconds(0.5)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal JournalWriteBehaviorSetter(IActorRef journal)
public Task SetInterceptorAsync(IJournalInterceptor interceptor)
=> _journal.Ask<TestJournal.Ack>(
new TestJournal.UseWriteInterceptor(interceptor),
TimeSpan.FromSeconds(3)
TimeSpan.FromSeconds(0.5)
);
}
}
Loading
Loading