Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring in line with Akka Persistence 1.1.1 #30

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="..\packages\xunit.core.2.0.0\build\portable-net45+win+wpa81+wp80+monotouch+monoandroid+Xamarin.iOS\xunit.core.props" Condition="Exists('..\packages\xunit.core.2.0.0\build\portable-net45+win+wpa81+wp80+monotouch+monoandroid+Xamarin.iOS\xunit.core.props')" />
<Import Project="..\packages\xunit.runner.visualstudio.2.0.0\build\net20\xunit.runner.visualstudio.props" Condition="Exists('..\packages\xunit.runner.visualstudio.2.0.0\build\net20\xunit.runner.visualstudio.props')" />
<Import Project="..\..\..\packages\xunit.core.2.0.0\build\portable-net45+win+wpa81+wp80+monotouch+monoandroid+Xamarin.iOS\xunit.core.props" Condition="Exists('..\..\..\packages\xunit.core.2.0.0\build\portable-net45+win+wpa81+wp80+monotouch+monoandroid+Xamarin.iOS\xunit.core.props')" />
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
Expand All @@ -15,7 +15,8 @@
<FileAlignment>512</FileAlignment>
<SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\..\..\</SolutionDir>
<RestorePackages>true</RestorePackages>
<NuGetPackageImportStamp>fa451739</NuGetPackageImportStamp>
<NuGetPackageImportStamp>
</NuGetPackageImportStamp>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
Expand All @@ -35,35 +36,37 @@
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="Akka, Version=1.0.6.16, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.1.0.6\lib\net45\Akka.dll</HintPath>
<Reference Include="Akka, Version=1.1.1.27, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.1.1.1\lib\net45\Akka.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Akka.Persistence, Version=1.0.6.17, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.Persistence.1.0.6.17-beta\lib\net45\Akka.Persistence.dll</HintPath>
<Reference Include="Akka.Persistence, Version=1.1.1.28, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.Persistence.1.1.1.28-beta\lib\net45\Akka.Persistence.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Akka.Persistence.TestKit, Version=1.0.6.17, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.Persistence.TestKit.1.0.6.17-beta\lib\net45\Akka.Persistence.TestKit.dll</HintPath>
<Reference Include="Akka.Persistence.TestKit, Version=1.1.1.28, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.Persistence.TestKit.1.1.1.28-beta\lib\net45\Akka.Persistence.TestKit.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Akka.TestKit, Version=1.0.6.16, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.TestKit.1.0.6\lib\net45\Akka.TestKit.dll</HintPath>
<Reference Include="Akka.TestKit, Version=1.1.1.27, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.TestKit.1.1.1\lib\net45\Akka.TestKit.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Akka.TestKit.Xunit2, Version=1.0.6.16, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.TestKit.Xunit2.1.0.6\lib\net45\Akka.TestKit.Xunit2.dll</HintPath>
<Reference Include="Akka.TestKit.Xunit2, Version=1.1.1.27, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.TestKit.Xunit2.1.1.1\lib\net45\Akka.TestKit.Xunit2.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Cassandra, Version=2.7.0.0, Culture=neutral, PublicKeyToken=10b231fbfc8c4b4d, processorArchitecture=MSIL">
<HintPath>..\packages\CassandraCSharpDriver.2.7.3\lib\net40\Cassandra.dll</HintPath>
<Reference Include="Cassandra, Version=3.0.0.0, Culture=neutral, PublicKeyToken=10b231fbfc8c4b4d, processorArchitecture=MSIL">
<HintPath>..\packages\CassandraCSharpDriver.3.0.8\lib\net40\Cassandra.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
<Reference Include="Google.ProtocolBuffers, Version=2.4.1.555, Culture=neutral, PublicKeyToken=55f7125234beb589, processorArchitecture=MSIL">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.555\lib\net40\Google.ProtocolBuffers.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Google.ProtocolBuffers.Serialization">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.Serialization.dll</HintPath>
<Reference Include="Google.ProtocolBuffers.Serialization, Version=2.4.1.555, Culture=neutral, PublicKeyToken=55f7125234beb589, processorArchitecture=MSIL">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.555\lib\net40\Google.ProtocolBuffers.Serialization.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="LZ4">
<HintPath>..\packages\lz4net.1.0.5.93\lib\net40-client\LZ4.dll</HintPath>
Expand All @@ -72,6 +75,10 @@
<HintPath>..\Packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Collections.Immutable, Version=1.1.36.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<HintPath>..\packages\System.Collections.Immutable.1.1.36\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
Expand All @@ -81,11 +88,17 @@
<Reference Include="xunit.abstractions">
<HintPath>..\packages\xunit.abstractions.2.0.0\lib\net35\xunit.abstractions.dll</HintPath>
</Reference>
<Reference Include="xunit.assert">
<HintPath>..\packages\xunit.assert.2.0.0\lib\portable-net45+win+wpa81+wp80+monotouch+monoandroid+Xamarin.iOS\xunit.assert.dll</HintPath>
<Reference Include="xunit.assert, Version=2.1.0.3179, Culture=neutral, PublicKeyToken=8d05b1bb7a6fdb6c, processorArchitecture=MSIL">
<HintPath>..\packages\xunit.assert.2.1.0\lib\dotnet\xunit.assert.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="xunit.core, Version=2.1.0.3179, Culture=neutral, PublicKeyToken=8d05b1bb7a6fdb6c, processorArchitecture=MSIL">
<HintPath>..\packages\xunit.extensibility.core.2.1.0\lib\dotnet\xunit.core.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="xunit.core">
<HintPath>..\packages\xunit.extensibility.core.2.0.0\lib\portable-net45+win+wpa81+wp80+monotouch+monoandroid+Xamarin.iOS\xunit.core.dll</HintPath>
<Reference Include="xunit.execution.desktop, Version=2.1.0.3179, Culture=neutral, PublicKeyToken=8d05b1bb7a6fdb6c, processorArchitecture=MSIL">
<HintPath>..\packages\xunit.extensibility.execution.2.1.0\lib\net45\xunit.execution.desktop.dll</HintPath>
<Private>True</Private>
</Reference>
</ItemGroup>
<ItemGroup>
Expand Down Expand Up @@ -118,6 +131,7 @@
<PropertyGroup>
<ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
</PropertyGroup>
<Error Condition="!Exists('..\packages\xunit.runner.visualstudio.2.0.0\build\net20\xunit.runner.visualstudio.props')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\xunit.runner.visualstudio.2.0.0\build\net20\xunit.runner.visualstudio.props'))" />
</Target>
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
Expand Down
65 changes: 14 additions & 51 deletions src/Akka.Persistence.Cassandra.Tests/CassandraIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Akka.TestKit;
using Akka.Util.Internal;
using Xunit;
using Akka.Persistence;

namespace Akka.Persistence.Cassandra.Tests
{
Expand Down Expand Up @@ -41,11 +42,11 @@ public CassandraIntegrationSpec()
public void Cassandra_journal_should_write_and_replay_messages()
{
// Start a persistence actor and write some messages to it
var actor1 = Sys.ActorOf(Props.Create<PersistentActorA>(_actorId));
var actor1 = Sys.ActorOf(Props.Create<PersistentActorA>(_actorId, TestActor));
WriteAndVerifyMessages(actor1, 1L, 16L);

// Now start a new instance (same persistence Id) and it should recover with those same messages
var actor2 = Sys.ActorOf(Props.Create<PersistentActorA>(_actorId));
var actor2 = Sys.ActorOf(Props.Create<PersistentActorA>(_actorId, TestActor));
for (long i = 1L; i <= 16L; i++)
{
string msg = string.Format("a-{0}", i);
Expand All @@ -66,15 +67,15 @@ public void Cassandra_journal_should_not_replay_deleted_messages(bool permanentD
TestProbe deleteProbe = CreateTestProbe();
Sys.EventStream.Subscribe(deleteProbe.Ref, typeof (DeleteMessagesTo));

var actor1 = Sys.ActorOf(Props.Create<PersistentActorA>(_actorId));
var actor1 = Sys.ActorOf(Props.Create<PersistentActorA>(_actorId, TestActor));
WriteAndVerifyMessages(actor1, 1L, 16L);

// Tell the actor to delete some messages and make sure it's finished
actor1.Tell(new DeleteToCommand(3L, permanentDelete));
deleteProbe.ExpectMsg<DeleteMessagesTo>();

// Start a second copy of the actor and verify it starts replaying from the correct spot
Sys.ActorOf(Props.Create<PersistentActorA>(_actorId));
Sys.ActorOf(Props.Create<PersistentActorA>(_actorId, TestActor));
for (long i = 4L; i <= 16L; i++)
{
string msg = string.Format("a-{0}", i);
Expand All @@ -86,7 +87,7 @@ public void Cassandra_journal_should_not_replay_deleted_messages(bool permanentD
deleteProbe.ExpectMsg<DeleteMessagesTo>();

// Start another copy and verify playback again
Sys.ActorOf(Props.Create<PersistentActorA>(_actorId));
Sys.ActorOf(Props.Create<PersistentActorA>(_actorId, TestActor));
for (long i = 8L; i <= 16L; i++)
{
string msg = string.Format("a-{0}", i);
Expand All @@ -98,7 +99,7 @@ public void Cassandra_journal_should_not_replay_deleted_messages(bool permanentD
public void Cassandra_journal_should_replay_message_incrementally()
{
// Write some messages to a Persistent Actor
var actor = Sys.ActorOf(Props.Create<PersistentActorA>(_actorId));
var actor = Sys.ActorOf(Props.Create<PersistentActorA>(_actorId, TestActor));
WriteAndVerifyMessages(actor, 1L, 6L);

TestProbe probe = CreateTestProbe();
Expand Down Expand Up @@ -140,34 +141,6 @@ public void Persistent_actor_should_recover_from_a_snapshot_with_follow_up_messa
ExpectHandled("b", 2, true);
}

[Fact]
public void Persistent_actor_should_recover_from_a_snapshot_with_follow_up_messages_and_an_upper_bound()
{
// Create an actor and trigger manual recovery so it will accept new messages
var actor1 = Sys.ActorOf(Props.Create<PersistentActorCWithManualRecovery>(_actorId, TestActor));
actor1.Tell(new Recover(SnapshotSelectionCriteria.None));

// Write a message, snapshot, then write some follow-up messages
actor1.Tell("a");
ExpectHandled("a", 1, false);
actor1.Tell("snap");
ExpectMsg("snapped-a-1");
WriteSameMessageAndVerify(actor1, "a", 2L, 7L);

// Create another copy of that actor and manually recover to an upper bound (i.e. past state) and verify
// we get the expected messages after the snapshot
var actor2 = Sys.ActorOf(Props.Create<PersistentActorCWithManualRecovery>(_actorId, TestActor));
actor2.Tell(new Recover(SnapshotSelectionCriteria.Latest, toSequenceNr: 3L));
ExpectMsg("offered-a-1");
ExpectHandled("a", 2, true);
ExpectHandled("a", 3, true);

// Should continue working after recovery to previous state, but highest sequence number should take into
// account other messages that were written but not replayed
actor2.Tell("d");
ExpectHandled("d", 8L, false);
}

[Fact]
public void Persistent_actor_should_recover_from_a_snapshot_without_follow_up_messages_inside_a_partition()
{
Expand Down Expand Up @@ -274,10 +247,12 @@ public HandledMessage(string message, long sequenceNumber, bool isRecovering)
public class PersistentActorA : PersistentActor
{
private readonly string _persistenceId;
private readonly IActorRef _testActor;

public PersistentActorA(string persistenceId)
public PersistentActorA(string persistenceId, IActorRef testActor)
{
_persistenceId = persistenceId;
_testActor = testActor;
}

public override string PersistenceId
Expand All @@ -290,7 +265,8 @@ protected override bool ReceiveRecover(object message)
if (message is string)
{
var payload = (string) message;
Handle(payload);
//Handle(payload);
_testActor.Tell(new HandledMessage(payload, LastSequenceNr, IsRecovering), Self);
return true;
}

Expand All @@ -302,7 +278,7 @@ protected override bool ReceiveCommand(object message)
if (message is DeleteToCommand)
{
var delete = (DeleteToCommand) message;
DeleteMessages(delete.SequenceNumber, delete.Permanent);
DeleteMessages(delete.SequenceNumber);
return true;
}

Expand Down Expand Up @@ -382,7 +358,7 @@ protected override bool ReceiveCommand(object message)
if (message is DeleteToCommand)
{
var delete = (DeleteToCommand) message;
DeleteMessages(delete.SequenceNumber, delete.Permanent);
DeleteMessages(delete.SequenceNumber);
return true;
}

Expand All @@ -396,19 +372,6 @@ private void Handle(string payload)
}
}

public class PersistentActorCWithManualRecovery : PersistentActorC
{
public PersistentActorCWithManualRecovery(string persistenceId, IActorRef probe)
: base(persistenceId, probe)
{
}

protected override void PreRestart(Exception reason, object message)
{
// Don't do automatic recovery
}
}

public class ViewA : PersistentView
{
private readonly string _viewId;
Expand Down
25 changes: 14 additions & 11 deletions src/Akka.Persistence.Cassandra.Tests/packages.config
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Akka" version="1.0.6" targetFramework="net45" />
<package id="Akka.Persistence" version="1.0.6.17-beta" targetFramework="net45" />
<package id="Akka.Persistence.TestKit" version="1.0.6.17-beta" targetFramework="net45" />
<package id="Akka.TestKit" version="1.0.6" targetFramework="net45" />
<package id="Akka.TestKit.Xunit2" version="1.0.6" targetFramework="net45" />
<package id="CassandraCSharpDriver" version="2.7.3" targetFramework="net45" />
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
<package id="Akka" version="1.1.1" targetFramework="net45" />
<package id="Akka.Persistence" version="1.1.1.28-beta" targetFramework="net45" />
<package id="Akka.Persistence.TestKit" version="1.1.1.28-beta" targetFramework="net45" />
<package id="Akka.TestKit" version="1.1.1" targetFramework="net45" />
<package id="Akka.TestKit.Xunit2" version="1.1.1" targetFramework="net45" />
<package id="CassandraCSharpDriver" version="3.0.8" targetFramework="net45" />
<package id="Google.ProtocolBuffers" version="2.4.1.555" targetFramework="net45" />
<package id="lz4net" version="1.0.5.93" targetFramework="net45" />
<package id="Newtonsoft.Json" version="7.0.1" targetFramework="net45" />
<package id="xunit" version="2.0.0" targetFramework="net45" />
<package id="System.Collections.Immutable" version="1.1.36" targetFramework="net45" />
<package id="xunit" version="2.1.0" targetFramework="net45" />
<package id="xunit.abstractions" version="2.0.0" targetFramework="net45" />
<package id="xunit.assert" version="2.0.0" targetFramework="net45" />
<package id="xunit.core" version="2.0.0" targetFramework="net45" />
<package id="xunit.extensibility.core" version="2.0.0" targetFramework="net45" />
<package id="xunit.assert" version="2.1.0" targetFramework="net45" />
<package id="xunit.core" version="2.1.0" targetFramework="net45" />
<package id="xunit.extensibility.core" version="2.1.0" targetFramework="net45" />
<package id="xunit.extensibility.execution" version="2.1.0" targetFramework="net45" />
<package id="xunit.runner.visualstudio" version="2.0.0" targetFramework="net45" />
</packages>
Loading