Skip to content

Commit

Permalink
Merge pull request #119 from Lutando/dev
Browse files Browse the repository at this point in the history
Fixed EmitAll bug for polymorphic types
  • Loading branch information
Lutando committed May 15, 2019
2 parents 996cf1d + c6dba50 commit ac37dcc
Show file tree
Hide file tree
Showing 30 changed files with 469 additions and 187 deletions.
9 changes: 8 additions & 1 deletion build/azure-pipelines-release-ci-cd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,11 @@ jobs:
displayName: 'Publish Artifact Source'
inputs:
PathtoPublish: '$(Agent.BuildDirectory)'
ArtifactName: Source
ArtifactName: Source

- task: GitHubRelease@0
displayName: 'GitHub release (create)'
inputs:
gitHubConnection: 'Akkatecture-GitHub'
tagSource: manual
tag: $(Build.BuildNumber)
6 changes: 5 additions & 1 deletion src/Akkatecture.Clustering/Akkatecture.Clustering.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<AllowedOutputExtensionsInPackageBuildOutputFolder>$(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb</AllowedOutputExtensionsInPackageBuildOutputFolder>
</PropertyGroup>


<PropertyGroup>
<NoWarn>NU5104</NoWarn>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka.Cluster" Version="1.3.13" />
<PackageReference Include="Akka.Cluster.Sharding" Version="1.3.12-beta" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected virtual bool Dispatch(IDomainEvent domainEvent)
{
AggregateSagaManager.Tell(domainEvent);

Logger.Debug($"{GetType().PrettyPrint()} just dispatched {domainEvent.GetType().PrettyPrint()} to {AggregateSagaManager}");
Logger.Debug("{0} just dispatched {1} to {2}",GetType().PrettyPrint(),domainEvent.GetType().PrettyPrint(),AggregateSagaManager.Path.Name);
return true;
}

Expand Down
65 changes: 38 additions & 27 deletions src/Akkatecture.TestFixture/Aggregates/AggregateFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public class AggregateFixture<TAggregate, TIdentity> : IFixtureArranger<TAggrega
private readonly TestKitBase _testKit;
public TIdentity AggregateId { get; private set; }
public IActorRef AggregateRef { get; private set; }
public TestProbe AggregateTestProbe { get; private set; }
public TestProbe AggregateEventTestProbe { get; private set; }
public TestProbe AggregateReplyTestProbe { get; private set; }
public Props AggregateProps { get; private set; }
public bool UsesAggregateManager { get; private set; }
public AggregateFixture(
Expand All @@ -58,11 +59,12 @@ public IFixtureArranger<TAggregate, TIdentity> For(TIdentity aggregateId)
if(aggregateId == null)
throw new ArgumentNullException(nameof(aggregateId));

if(!AggregateTestProbe.IsNobody())
throw new InvalidOperationException(nameof(AggregateTestProbe));
if(!AggregateEventTestProbe.IsNobody())
throw new InvalidOperationException(nameof(AggregateEventTestProbe));

AggregateId = aggregateId;
AggregateTestProbe = _testKit.CreateTestProbe("aggregate-probe");
AggregateEventTestProbe = _testKit.CreateTestProbe("aggregate-event-test-probe");
AggregateReplyTestProbe = _testKit.CreateTestProbe("aggregate-reply-test-probe");
AggregateProps = Props.Create<TAggregate>(args: aggregateId);
AggregateRef = ActorRefs.Nobody;
UsesAggregateManager = false;
Expand All @@ -76,11 +78,14 @@ public IFixtureArranger<TAggregate, TIdentity> Using<TAggregateManager>(
{
if(aggregateId == null)
throw new ArgumentNullException(nameof(aggregateId));
if(!AggregateTestProbe.IsNobody())
throw new InvalidOperationException(nameof(AggregateTestProbe));
if(!AggregateEventTestProbe.IsNobody())
throw new InvalidOperationException(nameof(AggregateEventTestProbe));
if(!AggregateReplyTestProbe.IsNobody())
throw new InvalidOperationException(nameof(AggregateReplyTestProbe));

AggregateId = aggregateId;
AggregateTestProbe = _testKit.CreateTestProbe("aggregate-probe");
AggregateEventTestProbe = _testKit.CreateTestProbe("aggregate-event-test-probe");
AggregateReplyTestProbe = _testKit.CreateTestProbe("aggregate-reply-test-probe");
AggregateRef = _testKit.Sys.ActorOf(Props.Create(aggregateManagerFactory), "aggregate-manager");
UsesAggregateManager = false;
AggregateProps = Props.Empty;
Expand All @@ -90,7 +95,7 @@ public IFixtureArranger<TAggregate, TIdentity> Using<TAggregateManager>(

public IFixtureExecutor<TAggregate, TIdentity> GivenNothing()
{
if (!UsesAggregateManager && AggregateRef == ActorRefs.Nobody)
if (!UsesAggregateManager && AggregateRef.IsNobody())
AggregateRef = _testKit.Sys.ActorOf(AggregateProps, AggregateId.Value);

return this;
Expand All @@ -115,15 +120,15 @@ public IFixtureExecutor<TAggregate, TIdentity> Given(params ICommand<TAggregate,
if(commands == null)
throw new ArgumentNullException(nameof(commands));

if (!UsesAggregateManager && AggregateRef == ActorRefs.Nobody)
if (!UsesAggregateManager && AggregateRef.IsNobody())
AggregateRef = _testKit.Sys.ActorOf(AggregateProps, AggregateId.Value);

foreach (var command in commands)
{
if(command == null)
throw new NullReferenceException(nameof(command));

AggregateRef.Tell(command);
AggregateRef.Tell(command, AggregateReplyTestProbe);
}

return this;
Expand All @@ -135,15 +140,15 @@ public IFixtureAsserter<TAggregate, TIdentity> When(params ICommand<TAggregate,
if(commands == null)
throw new ArgumentNullException(nameof(commands));

if(!UsesAggregateManager && AggregateRef == ActorRefs.Nobody)
if(!UsesAggregateManager && AggregateRef.IsNobody())
AggregateRef = _testKit.Sys.ActorOf(AggregateProps, AggregateId.Value);

foreach (var command in commands)
{
if(command == null)
throw new NullReferenceException(nameof(command));

AggregateRef.Tell(command);
AggregateRef.Tell(command, AggregateReplyTestProbe);
}

return this;
Expand All @@ -155,27 +160,33 @@ public IFixtureAsserter<TAggregate, TIdentity> AndWhen(params ICommand<TAggregat
}

public IFixtureAsserter<TAggregate, TIdentity> ThenExpect<TAggregateEvent>(Predicate<TAggregateEvent> aggregateEventPredicate = null)
where TAggregateEvent : IAggregateEvent<TAggregate, TIdentity>
where TAggregateEvent : class, IAggregateEvent<TAggregate, TIdentity>
{
_testKit.Sys.EventStream.Subscribe(AggregateTestProbe, typeof(DomainEvent<TAggregate, TIdentity, TAggregateEvent>));
_testKit.Sys.EventStream.Subscribe(AggregateEventTestProbe, typeof(IDomainEvent<TAggregate, TIdentity, TAggregateEvent>));

if(aggregateEventPredicate == null)
AggregateTestProbe.ExpectMsg<DomainEvent<TAggregate, TIdentity, TAggregateEvent>>();
AggregateEventTestProbe.ExpectMsg<DomainEvent<TAggregate, TIdentity, TAggregateEvent>>();
else
AggregateTestProbe.ExpectMsg<DomainEvent<TAggregate, TIdentity, TAggregateEvent>>(x => aggregateEventPredicate(x.AggregateEvent));
AggregateEventTestProbe.ExpectMsg<DomainEvent<TAggregate, TIdentity, TAggregateEvent>>(x => aggregateEventPredicate(x.AggregateEvent));

return this;
}

public IFixtureAsserter<TAggregate, TIdentity> ThenExpectReply<TReply>(Predicate<TReply> aggregateReplyPredicate = null)
{
AggregateReplyTestProbe.ExpectMsg<TReply>(aggregateReplyPredicate);
return this;
}

public IFixtureAsserter<TAggregate, TIdentity> ThenExpectDomainEvent<TAggregateEvent>(Predicate<DomainEvent<TAggregate, TIdentity, TAggregateEvent>> domainEventPredicate = null)
where TAggregateEvent : IAggregateEvent<TAggregate,TIdentity>
public IFixtureAsserter<TAggregate, TIdentity> ThenExpectDomainEvent<TAggregateEvent>(Predicate<IDomainEvent<TAggregate, TIdentity, TAggregateEvent>> domainEventPredicate = null)
where TAggregateEvent : class, IAggregateEvent<TAggregate,TIdentity>
{
_testKit.Sys.EventStream.Subscribe(AggregateTestProbe, typeof(DomainEvent<TAggregate, TIdentity, TAggregateEvent>));
_testKit.Sys.EventStream.Subscribe(AggregateEventTestProbe, typeof(IDomainEvent<TAggregate, TIdentity, TAggregateEvent>));

if(domainEventPredicate == null)
AggregateTestProbe.ExpectMsg<DomainEvent<TAggregate, TIdentity, TAggregateEvent>>();
AggregateEventTestProbe.ExpectMsg<DomainEvent<TAggregate, TIdentity, TAggregateEvent>>();
else
AggregateTestProbe.ExpectMsg<DomainEvent<TAggregate, TIdentity, TAggregateEvent>>(domainEventPredicate);
AggregateEventTestProbe.ExpectMsg<DomainEvent<TAggregate, TIdentity, TAggregateEvent>>(domainEventPredicate);

return this;
}
Expand All @@ -190,14 +201,14 @@ private void InitializeEventJournal(TIdentity aggregateId, params IAggregateEven
writes[i] = new AtomicWrite(new Persistent(committedEvent, i+1, aggregateId.Value, string.Empty, false, ActorRefs.NoSender, writerGuid));
}
var journal = Persistence.Instance.Apply(_testKit.Sys).JournalFor(null);
journal.Tell(new WriteMessages(writes, AggregateTestProbe.Ref, 1));
journal.Tell(new WriteMessages(writes, AggregateEventTestProbe.Ref, 1));

AggregateTestProbe.ExpectMsg<WriteMessagesSuccessful>();
AggregateEventTestProbe.ExpectMsg<WriteMessagesSuccessful>();

for (var i = 0; i < events.Length; i++)
{
var seq = i;
AggregateTestProbe.ExpectMsg<WriteMessageSuccess>(x =>
AggregateEventTestProbe.ExpectMsg<WriteMessageSuccess>(x =>
x.Persistent.PersistenceId == aggregateId.ToString() &&
x.Persistent.Payload is CommittedEvent<TAggregate, TIdentity, IAggregateEvent<TAggregate, TIdentity>> &&
x.Persistent.SequenceNr == (long) seq+1);
Expand All @@ -208,12 +219,12 @@ private void InitializeSnapshotStore<TAggregateSnapshot>(TIdentity aggregateId,
where TAggregateSnapshot : IAggregateSnapshot<TAggregate, TIdentity>
{
var snapshotStore = Persistence.Instance.Apply(_testKit.Sys).SnapshotStoreFor(null);
var committedSnapshot = new ComittedSnapshot<TAggregate, TIdentity, TAggregateSnapshot>(aggregateId, aggregateSnapshot, new SnapshotMetadata(), DateTimeOffset.UtcNow, sequenceNumber);
var committedSnapshot = new CommittedSnapshot<TAggregate, TIdentity, TAggregateSnapshot>(aggregateId, aggregateSnapshot, new SnapshotMetadata(), DateTimeOffset.UtcNow, sequenceNumber);

var metadata = new AkkaSnapshotMetadata(aggregateId.ToString(), sequenceNumber);
snapshotStore.Tell(new SaveSnapshot(metadata, committedSnapshot), AggregateTestProbe.Ref);
snapshotStore.Tell(new SaveSnapshot(metadata, committedSnapshot), AggregateEventTestProbe.Ref);

AggregateTestProbe.ExpectMsg<SaveSnapshotSuccess>(x =>
AggregateEventTestProbe.ExpectMsg<SaveSnapshotSuccess>(x =>
x.Metadata.SequenceNr == sequenceNumber &&
x.Metadata.PersistenceId == aggregateId.ToString());

Expand Down
8 changes: 5 additions & 3 deletions src/Akkatecture.TestFixture/Aggregates/IFixtureAsserter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ public interface IFixtureAsserter<TAggregate, TIdentity>
{
IFixtureAsserter<TAggregate, TIdentity> AndWhen(params ICommand<TAggregate, TIdentity>[] commands);
IFixtureAsserter<TAggregate, TIdentity> ThenExpect<TAggregateEvent>(Predicate<TAggregateEvent> aggregateEventPredicate = null)
where TAggregateEvent : IAggregateEvent<TAggregate, TIdentity>;
where TAggregateEvent : class, IAggregateEvent<TAggregate, TIdentity>;

IFixtureAsserter<TAggregate, TIdentity> ThenExpectReply<TReply>(Predicate<TReply> aggregateReply = null);

IFixtureAsserter<TAggregate, TIdentity> ThenExpectDomainEvent<TAggregateEvent>(Predicate<DomainEvent<TAggregate, TIdentity, TAggregateEvent>> domainEventPredicate = null)
where TAggregateEvent : IAggregateEvent<TAggregate, TIdentity>;
IFixtureAsserter<TAggregate, TIdentity> ThenExpectDomainEvent<TAggregateEvent>(Predicate<IDomainEvent<TAggregate, TIdentity, TAggregateEvent>> domainEventPredicate = null)
where TAggregateEvent : class, IAggregateEvent<TAggregate, TIdentity>;
}
}
2 changes: 1 addition & 1 deletion src/Akkatecture.TestFixture/Akkatecture.TestFixture.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<Description>
bdd style test fixtures for akkatecture
</Description>
<PackageTags>akka cqrs es eventsourcing tdd actors testing actor-model</PackageTags>
<PackageTags>akka cqrs es eventsourcing tdd bdd actors testing actor-model</PackageTags>
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/Lutando/Akkatecture</RepositoryUrl>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
Expand Down
10 changes: 5 additions & 5 deletions src/Akkatecture/Aggregates/AggregateManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected AggregateManager()

protected virtual bool Dispatch(TCommand command)
{
Logger.Info($"{GetType().PrettyPrint()} received {command.GetType().PrettyPrint()}");
Logger.Info("{0} received {1}", GetType().PrettyPrint(),command.GetType().PrettyPrint());

var aggregateRef = FindOrCreate(command.AggregateId);

Expand All @@ -72,7 +72,7 @@ protected virtual bool Dispatch(TCommand command)

protected virtual bool ReDispatch(TCommand command)
{
Logger.Info($"{GetType().PrettyPrint()} as dead letter {command.GetType().PrettyPrint()}");
Logger.Info("{0} as dead letter {1}",GetType().PrettyPrint(), command.GetType().PrettyPrint());

var aggregateRef = FindOrCreate(command.AggregateId);

Expand All @@ -97,7 +97,7 @@ protected bool Handle(DeadLetter deadLetter)

protected virtual bool Terminate(Terminated message)
{
Logger.Warning($"{typeof(TAggregate).PrettyPrint()}: {message.ActorRef.Path} has terminated.");
Logger.Warning("Aggregate of Type={0}, and Id={1}; has terminated.",typeof(TAggregate).PrettyPrint(), message.ActorRef.Path.Name);
Context.Unwatch(message.ActorRef);
return true;
}
Expand All @@ -106,7 +106,7 @@ protected virtual IActorRef FindOrCreate(TIdentity aggregateId)
{
var aggregate = Context.Child(aggregateId);

if(Equals(aggregate, ActorRefs.Nobody))
if(aggregate.IsNobody())
{
aggregate = CreateAggregate(aggregateId);
}
Expand All @@ -129,7 +129,7 @@ protected override SupervisorStrategy SupervisorStrategy()
localOnlyDecider: x =>
{
Logger.Warning($"[{GetType().PrettyPrint()}] Exception={x.ToString()} to be decided.");
Logger.Warning("{0} will supervise Exception={1} to be decided as {2}.",GetType().PrettyPrint(), x.ToString(), Directive.Restart);
return Directive.Restart;
});
}
Expand Down
Loading

0 comments on commit ac37dcc

Please sign in to comment.