diff --git a/build/azure-pipelines-release-ci-cd.yaml b/build/azure-pipelines-release-ci-cd.yaml index 485361ce..cca2d489 100644 --- a/build/azure-pipelines-release-ci-cd.yaml +++ b/build/azure-pipelines-release-ci-cd.yaml @@ -71,4 +71,11 @@ jobs: displayName: 'Publish Artifact Source' inputs: PathtoPublish: '$(Agent.BuildDirectory)' - ArtifactName: Source \ No newline at end of file + ArtifactName: Source + + - task: GitHubRelease@0 + displayName: 'GitHub release (create)' + inputs: + gitHubConnection: 'Akkatecture-GitHub' + tagSource: manual + tag: $(Build.BuildNumber) \ No newline at end of file diff --git a/src/Akkatecture.Clustering/Akkatecture.Clustering.csproj b/src/Akkatecture.Clustering/Akkatecture.Clustering.csproj index dca19e01..d0b348fc 100644 --- a/src/Akkatecture.Clustering/Akkatecture.Clustering.csproj +++ b/src/Akkatecture.Clustering/Akkatecture.Clustering.csproj @@ -28,7 +28,11 @@ true $(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb - + + + NU5104 + + diff --git a/src/Akkatecture.Clustering/Dispatchers/ShardedAggregateSagaDispatcher.cs b/src/Akkatecture.Clustering/Dispatchers/ShardedAggregateSagaDispatcher.cs index 1e6202a2..c8340e3d 100644 --- a/src/Akkatecture.Clustering/Dispatchers/ShardedAggregateSagaDispatcher.cs +++ b/src/Akkatecture.Clustering/Dispatchers/ShardedAggregateSagaDispatcher.cs @@ -65,7 +65,7 @@ protected virtual bool Dispatch(IDomainEvent domainEvent) { AggregateSagaManager.Tell(domainEvent); - Logger.Debug($"{GetType().PrettyPrint()} just dispatched {domainEvent.GetType().PrettyPrint()} to {AggregateSagaManager}"); + Logger.Debug("{0} just dispatched {1} to {2}",GetType().PrettyPrint(),domainEvent.GetType().PrettyPrint(),AggregateSagaManager.Path.Name); return true; } diff --git a/src/Akkatecture.TestFixture/Aggregates/AggregateFixture.cs b/src/Akkatecture.TestFixture/Aggregates/AggregateFixture.cs index fa2ea68a..2ca6ecbd 100644 --- a/src/Akkatecture.TestFixture/Aggregates/AggregateFixture.cs +++ b/src/Akkatecture.TestFixture/Aggregates/AggregateFixture.cs @@ -43,7 +43,8 @@ public class AggregateFixture : IFixtureArranger For(TIdentity aggregateId) if(aggregateId == null) throw new ArgumentNullException(nameof(aggregateId)); - if(!AggregateTestProbe.IsNobody()) - throw new InvalidOperationException(nameof(AggregateTestProbe)); + if(!AggregateEventTestProbe.IsNobody()) + throw new InvalidOperationException(nameof(AggregateEventTestProbe)); AggregateId = aggregateId; - AggregateTestProbe = _testKit.CreateTestProbe("aggregate-probe"); + AggregateEventTestProbe = _testKit.CreateTestProbe("aggregate-event-test-probe"); + AggregateReplyTestProbe = _testKit.CreateTestProbe("aggregate-reply-test-probe"); AggregateProps = Props.Create(args: aggregateId); AggregateRef = ActorRefs.Nobody; UsesAggregateManager = false; @@ -76,11 +78,14 @@ public IFixtureArranger Using( { if(aggregateId == null) throw new ArgumentNullException(nameof(aggregateId)); - if(!AggregateTestProbe.IsNobody()) - throw new InvalidOperationException(nameof(AggregateTestProbe)); + if(!AggregateEventTestProbe.IsNobody()) + throw new InvalidOperationException(nameof(AggregateEventTestProbe)); + if(!AggregateReplyTestProbe.IsNobody()) + throw new InvalidOperationException(nameof(AggregateReplyTestProbe)); AggregateId = aggregateId; - AggregateTestProbe = _testKit.CreateTestProbe("aggregate-probe"); + AggregateEventTestProbe = _testKit.CreateTestProbe("aggregate-event-test-probe"); + AggregateReplyTestProbe = _testKit.CreateTestProbe("aggregate-reply-test-probe"); AggregateRef = _testKit.Sys.ActorOf(Props.Create(aggregateManagerFactory), "aggregate-manager"); UsesAggregateManager = false; AggregateProps = Props.Empty; @@ -90,7 +95,7 @@ public IFixtureArranger Using( public IFixtureExecutor GivenNothing() { - if (!UsesAggregateManager && AggregateRef == ActorRefs.Nobody) + if (!UsesAggregateManager && AggregateRef.IsNobody()) AggregateRef = _testKit.Sys.ActorOf(AggregateProps, AggregateId.Value); return this; @@ -115,7 +120,7 @@ public IFixtureExecutor Given(params ICommand Given(params ICommand When(params ICommand When(params ICommand AndWhen(params ICommand ThenExpect(Predicate aggregateEventPredicate = null) - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { - _testKit.Sys.EventStream.Subscribe(AggregateTestProbe, typeof(DomainEvent)); + _testKit.Sys.EventStream.Subscribe(AggregateEventTestProbe, typeof(IDomainEvent)); if(aggregateEventPredicate == null) - AggregateTestProbe.ExpectMsg>(); + AggregateEventTestProbe.ExpectMsg>(); else - AggregateTestProbe.ExpectMsg>(x => aggregateEventPredicate(x.AggregateEvent)); + AggregateEventTestProbe.ExpectMsg>(x => aggregateEventPredicate(x.AggregateEvent)); return this; } + + public IFixtureAsserter ThenExpectReply(Predicate aggregateReplyPredicate = null) + { + AggregateReplyTestProbe.ExpectMsg(aggregateReplyPredicate); + return this; + } - public IFixtureAsserter ThenExpectDomainEvent(Predicate> domainEventPredicate = null) - where TAggregateEvent : IAggregateEvent + public IFixtureAsserter ThenExpectDomainEvent(Predicate> domainEventPredicate = null) + where TAggregateEvent : class, IAggregateEvent { - _testKit.Sys.EventStream.Subscribe(AggregateTestProbe, typeof(DomainEvent)); + _testKit.Sys.EventStream.Subscribe(AggregateEventTestProbe, typeof(IDomainEvent)); if(domainEventPredicate == null) - AggregateTestProbe.ExpectMsg>(); + AggregateEventTestProbe.ExpectMsg>(); else - AggregateTestProbe.ExpectMsg>(domainEventPredicate); + AggregateEventTestProbe.ExpectMsg>(domainEventPredicate); return this; } @@ -190,14 +201,14 @@ private void InitializeEventJournal(TIdentity aggregateId, params IAggregateEven writes[i] = new AtomicWrite(new Persistent(committedEvent, i+1, aggregateId.Value, string.Empty, false, ActorRefs.NoSender, writerGuid)); } var journal = Persistence.Instance.Apply(_testKit.Sys).JournalFor(null); - journal.Tell(new WriteMessages(writes, AggregateTestProbe.Ref, 1)); + journal.Tell(new WriteMessages(writes, AggregateEventTestProbe.Ref, 1)); - AggregateTestProbe.ExpectMsg(); + AggregateEventTestProbe.ExpectMsg(); for (var i = 0; i < events.Length; i++) { var seq = i; - AggregateTestProbe.ExpectMsg(x => + AggregateEventTestProbe.ExpectMsg(x => x.Persistent.PersistenceId == aggregateId.ToString() && x.Persistent.Payload is CommittedEvent> && x.Persistent.SequenceNr == (long) seq+1); @@ -208,12 +219,12 @@ private void InitializeSnapshotStore(TIdentity aggregateId, where TAggregateSnapshot : IAggregateSnapshot { var snapshotStore = Persistence.Instance.Apply(_testKit.Sys).SnapshotStoreFor(null); - var committedSnapshot = new ComittedSnapshot(aggregateId, aggregateSnapshot, new SnapshotMetadata(), DateTimeOffset.UtcNow, sequenceNumber); + var committedSnapshot = new CommittedSnapshot(aggregateId, aggregateSnapshot, new SnapshotMetadata(), DateTimeOffset.UtcNow, sequenceNumber); var metadata = new AkkaSnapshotMetadata(aggregateId.ToString(), sequenceNumber); - snapshotStore.Tell(new SaveSnapshot(metadata, committedSnapshot), AggregateTestProbe.Ref); + snapshotStore.Tell(new SaveSnapshot(metadata, committedSnapshot), AggregateEventTestProbe.Ref); - AggregateTestProbe.ExpectMsg(x => + AggregateEventTestProbe.ExpectMsg(x => x.Metadata.SequenceNr == sequenceNumber && x.Metadata.PersistenceId == aggregateId.ToString()); diff --git a/src/Akkatecture.TestFixture/Aggregates/IFixtureAsserter.cs b/src/Akkatecture.TestFixture/Aggregates/IFixtureAsserter.cs index 1f1b2611..900ac1a2 100644 --- a/src/Akkatecture.TestFixture/Aggregates/IFixtureAsserter.cs +++ b/src/Akkatecture.TestFixture/Aggregates/IFixtureAsserter.cs @@ -35,9 +35,11 @@ public interface IFixtureAsserter { IFixtureAsserter AndWhen(params ICommand[] commands); IFixtureAsserter ThenExpect(Predicate aggregateEventPredicate = null) - where TAggregateEvent : IAggregateEvent; + where TAggregateEvent : class, IAggregateEvent; + + IFixtureAsserter ThenExpectReply(Predicate aggregateReply = null); - IFixtureAsserter ThenExpectDomainEvent(Predicate> domainEventPredicate = null) - where TAggregateEvent : IAggregateEvent; + IFixtureAsserter ThenExpectDomainEvent(Predicate> domainEventPredicate = null) + where TAggregateEvent : class, IAggregateEvent; } } \ No newline at end of file diff --git a/src/Akkatecture.TestFixture/Akkatecture.TestFixture.csproj b/src/Akkatecture.TestFixture/Akkatecture.TestFixture.csproj index 3ee140a8..37787c40 100644 --- a/src/Akkatecture.TestFixture/Akkatecture.TestFixture.csproj +++ b/src/Akkatecture.TestFixture/Akkatecture.TestFixture.csproj @@ -18,7 +18,7 @@ bdd style test fixtures for akkatecture - akka cqrs es eventsourcing tdd actors testing actor-model + akka cqrs es eventsourcing tdd bdd actors testing actor-model git https://github.com/Lutando/Akkatecture MIT diff --git a/src/Akkatecture/Aggregates/AggregateManager.cs b/src/Akkatecture/Aggregates/AggregateManager.cs index 2421b680..0cccfb59 100644 --- a/src/Akkatecture/Aggregates/AggregateManager.cs +++ b/src/Akkatecture/Aggregates/AggregateManager.cs @@ -60,7 +60,7 @@ protected AggregateManager() protected virtual bool Dispatch(TCommand command) { - Logger.Info($"{GetType().PrettyPrint()} received {command.GetType().PrettyPrint()}"); + Logger.Info("{0} received {1}", GetType().PrettyPrint(),command.GetType().PrettyPrint()); var aggregateRef = FindOrCreate(command.AggregateId); @@ -72,7 +72,7 @@ protected virtual bool Dispatch(TCommand command) protected virtual bool ReDispatch(TCommand command) { - Logger.Info($"{GetType().PrettyPrint()} as dead letter {command.GetType().PrettyPrint()}"); + Logger.Info("{0} as dead letter {1}",GetType().PrettyPrint(), command.GetType().PrettyPrint()); var aggregateRef = FindOrCreate(command.AggregateId); @@ -97,7 +97,7 @@ protected bool Handle(DeadLetter deadLetter) protected virtual bool Terminate(Terminated message) { - Logger.Warning($"{typeof(TAggregate).PrettyPrint()}: {message.ActorRef.Path} has terminated."); + Logger.Warning("Aggregate of Type={0}, and Id={1}; has terminated.",typeof(TAggregate).PrettyPrint(), message.ActorRef.Path.Name); Context.Unwatch(message.ActorRef); return true; } @@ -106,7 +106,7 @@ protected virtual IActorRef FindOrCreate(TIdentity aggregateId) { var aggregate = Context.Child(aggregateId); - if(Equals(aggregate, ActorRefs.Nobody)) + if(aggregate.IsNobody()) { aggregate = CreateAggregate(aggregateId); } @@ -129,7 +129,7 @@ protected override SupervisorStrategy SupervisorStrategy() localOnlyDecider: x => { - Logger.Warning($"[{GetType().PrettyPrint()}] Exception={x.ToString()} to be decided."); + Logger.Warning("{0} will supervise Exception={1} to be decided as {2}.",GetType().PrettyPrint(), x.ToString(), Directive.Restart); return Directive.Restart; }); } diff --git a/src/Akkatecture/Aggregates/AggregateRoot.cs b/src/Akkatecture/Aggregates/AggregateRoot.cs index d7052e64..05a38165 100644 --- a/src/Akkatecture/Aggregates/AggregateRoot.cs +++ b/src/Akkatecture/Aggregates/AggregateRoot.cs @@ -27,7 +27,9 @@ using System; using System.Collections.Generic; +using System.Globalization; using System.Linq; +using System.Reflection; using Akka.Actor; using Akka.Event; using Akka.Persistence; @@ -55,11 +57,10 @@ public abstract class AggregateRoot : Re private object PinnedReply { get; set; } protected ILoggingAdapter Logger { get; } - //TODO This should be private readonly. - protected readonly IEventDefinitionService EventDefinitionService; + private readonly IEventDefinitionService _eventDefinitionService; private readonly ISnapshotDefinitionService _snapshotDefinitionService; private ISnapshotStrategy SnapshotStrategy { get; set; } = SnapshotNeverStrategy.Instance; - protected TAggregateState State { get; } + public TAggregateState State { get; } public IAggregateName Name => AggregateName; public override string PersistenceId { get; } public TIdentity Id { get; } @@ -77,7 +78,7 @@ protected AggregateRoot(TIdentity id) if ((this as TAggregate) == null) { throw new InvalidOperationException( - $"Aggregate '{GetType().PrettyPrint()}' specifies '{typeof(TAggregate).PrettyPrint()}' as generic argument, it should be its own type"); + $"Aggregate {Name} specifies Type={typeof(TAggregate).PrettyPrint()} as generic argument, it should be its own type."); } if (State == null) @@ -86,15 +87,15 @@ protected AggregateRoot(TIdentity id) { State = (TAggregateState)Activator.CreateInstance(typeof(TAggregateState)); } - catch + catch(Exception exception) { - Logger.Error($"Unable to activate State for {GetType()}"); + Logger.Error(exception,"Unable to activate AggregateState of Type={0} for AggregateRoot of Name={1}",typeof(TAggregateState).PrettyPrint(), Name); } } PinnedCommand = null; - EventDefinitionService = new EventDefinitionService(Logger); + _eventDefinitionService = new EventDefinitionService(Logger); _snapshotDefinitionService = new SnapshotDefinitionService(Logger); Id = id; PersistenceId = id.Value; @@ -133,37 +134,86 @@ public IIdentity GetIdentity() } public virtual void Emit(TAggregateEvent aggregateEvent, IMetadata metadata = null) - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { var committedEvent = From(aggregateEvent, Version, metadata); Persist(committedEvent, ApplyCommittedEvent); } - public virtual void EmitAll(IEnumerable aggregateEvents, IMetadata metadata = null) - where TAggregateEvent : IAggregateEvent + + + public virtual void EmitAll(params IAggregateEvent[] aggregateEvents) { - long version = Version; - var comittedEvents = new List>(); + var version = Version; + + var committedEvents = new List(); foreach (var aggregateEvent in aggregateEvents) { - var committedEvent = From(aggregateEvent, version + 1, metadata); - comittedEvents.Add(committedEvent); + var committedEvent = FromObject(aggregateEvent, version + 1); + committedEvents.Add(committedEvent); version++; } - PersistAll(comittedEvents, ApplyCommittedEvent); + PersistAll(committedEvents, ApplyObjectCommittedEvent); } + + + private object FromObject(object aggregateEvent, long version, IMetadata metadata = null) + { + if (aggregateEvent is IAggregateEvent) + { + _eventDefinitionService.Load(aggregateEvent.GetType()); + var eventDefinition = _eventDefinitionService.GetDefinition(aggregateEvent.GetType()); + var aggregateSequenceNumber = version + 1; + var eventId = EventId.NewDeterministic( + GuidFactories.Deterministic.Namespaces.Events, + $"{Id.Value}-v{aggregateSequenceNumber}"); + var now = DateTimeOffset.UtcNow; + var eventMetadata = new Metadata + { + Timestamp = now, + AggregateSequenceNumber = aggregateSequenceNumber, + AggregateName = Name.Value, + AggregateId = Id.Value, + SourceId = PinnedCommand.SourceId, + EventId = eventId, + EventName = eventDefinition.Name, + EventVersion = eventDefinition.Version + }; + eventMetadata.Add(MetadataKeys.TimestampEpoch, now.ToUnixTime().ToString()); + if (metadata != null) + { + eventMetadata.AddRange(metadata); + } + var genericType = typeof(CommittedEvent<,,>) + .MakeGenericType(typeof(TAggregate), typeof(TIdentity),aggregateEvent.GetType()); + + var committedEvent = Activator.CreateInstance( + genericType, + Id, + aggregateEvent, + eventMetadata, + now, + aggregateSequenceNumber); + + return committedEvent; + } + + throw new InvalidOperationException("could not perform the required mapping for committed event."); + + } + public virtual CommittedEvent From(TAggregateEvent aggregateEvent, long version, IMetadata metadata = null) - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { if (aggregateEvent == null) { throw new ArgumentNullException(nameof(aggregateEvent)); } - EventDefinitionService.Load(aggregateEvent.GetType()); - var eventDefinition = EventDefinitionService.GetDefinition(aggregateEvent.GetType()); + _eventDefinitionService.Load(aggregateEvent.GetType()); + var eventDefinition = _eventDefinitionService.GetDefinition(aggregateEvent.GetType()); var aggregateSequenceNumber = version + 1; var eventId = EventId.NewDeterministic( GuidFactories.Deterministic.Namespaces.Events, @@ -191,17 +241,17 @@ public virtual CommittedEvent From CreateSnapshot() { - Logger.Info($"[{Name}] With Id={Id} Attempted to Create a Snapshot, override the CreateSnapshot() method to return the snapshot data model."); + Logger.Warning("Aggregate of Name={0}, and Id={1}; attempted to create a snapshot, override the {2}() method to get snapshotting to function.", Name, Id, nameof(CreateSnapshot)); return null; } protected void ApplyCommittedEvent(ICommittedEvent committedEvent) - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { var applyMethods = GetEventApplyMethods(committedEvent.AggregateEvent); applyMethods(committedEvent.AggregateEvent); - Logger.Info($"[{Name}] With Id={Id} Commited and Applied [{typeof(TAggregateEvent).PrettyPrint()}]"); + Logger.Info("Aggregate of Name={0}, and Id={1}; committed and applied an AggregateEvent of Type={2}", Name, Id, typeof(TAggregateEvent).PrettyPrint()); Version++; @@ -226,30 +276,49 @@ protected void ApplyCommittedEvent(ICommittedEvent>( + var committedSnapshot = + new CommittedSnapshot>( Id, aggregateSnapshot, snapshotMetadata, committedEvent.Timestamp, Version); - SaveSnapshot(commitedSnapshot); + SaveSnapshot(committedSnapshot); } } + } + + private void ApplyObjectCommittedEvent(object committedEvent) + { + try + { + var method = GetType() + .GetMethods(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic) + .Where(m => m.IsFamily || m.IsPublic) + .Single(m => m.Name.Equals("ApplyCommittedEvent")); + var genericMethod = method.MakeGenericMethod(committedEvent.GetType().GenericTypeArguments[2]); + + genericMethod.Invoke(this, new[] {committedEvent}); + } + catch (Exception exception) + { + Logger.Error(exception, "Aggregate of Name={0}, and Id={1}; tried to invoke Method={2} with object Type={3} .",Name, Id, nameof(ApplyCommittedEvent), committedEvent.GetType().PrettyPrint()); + } } protected virtual void Publish(TEvent aggregateEvent) { Context.System.EventStream.Publish(aggregateEvent); - Logger.Info($"[{Name}] With Id={Id} Published [{typeof(TEvent).PrettyPrint()}]"); + Logger.Info("Aggregate of Name={0}, and Id={1}; published DomainEvent of Type={2}.",Name, Id, typeof(TEvent).PrettyPrint()); } protected override bool AroundReceive(Receive receive, object message) { if (message is Command command) { - PinnedCommand = command; + if(IsNew || Id.Equals(command.AggregateId)) + PinnedCommand = command; } return base.AroundReceive(receive, message); @@ -262,6 +331,14 @@ protected virtual void Reply(object replyMessage) PinnedReply = replyMessage; } } + + protected virtual void ReplyFailure(object replyMessage) + { + if(!Sender.IsNobody()) + { + Context.Sender.Tell(replyMessage); + } + } protected virtual void ReplyIfAvailable() { @@ -274,18 +351,23 @@ protected virtual void ReplyIfAvailable() protected override void Unhandled(object message) { - Logger.Info($"Aggregate with Id '{Id?.Value} has received an unhandled message {message.GetType().PrettyPrint()}'"); + Logger.Warning("Aggregate of Name={0}, and Id={1}; has received an unhandled message of Type={2}.",Name, Id, message.GetType().PrettyPrint()); base.Unhandled(message); } + protected IEnumerable> Events(params IAggregateEvent[] events) + { + return events.ToList(); + } + protected Action GetEventApplyMethods(TAggregateEvent aggregateEvent) - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { var eventType = aggregateEvent.GetType(); Action applyMethod; if (!ApplyMethodsFromState.TryGetValue(eventType, out applyMethod)) - throw new NotImplementedException($"Aggregate State '{State.GetType().PrettyPrint()}' does not have an 'Apply' method that takes aggregate event type '{eventType.PrettyPrint()}' as argument"); + throw new NotImplementedException($"AggregateState of Type={State.GetType().PrettyPrint()} does not have an 'Apply' method that takes in an aggregate event of Type={eventType.PrettyPrint()} as an argument."); var aggregateApplyMethod = applyMethod.Bind(State); @@ -293,13 +375,13 @@ protected Action GetEventApplyMethods(TAggrega } protected Action GetSnapshotHydrateMethods(TAggregateSnapshot aggregateEvent) - where TAggregateSnapshot : IAggregateSnapshot + where TAggregateSnapshot : class, IAggregateSnapshot { var snapshotType = aggregateEvent.GetType(); Action hydrateMethod; if (!HydrateMethodsFromState.TryGetValue(snapshotType, out hydrateMethod)) - throw new NotImplementedException($"Aggregate State '{State.GetType().PrettyPrint()}' does not have a 'Hydrate' method that takes aggregate snapshot type '{snapshotType.PrettyPrint()}' as argument"); + throw new NotImplementedException($"AggregateState of Type={State.GetType().PrettyPrint()} does not have a 'Hydrate' method that takes in an aggregate snapshot of Type={snapshotType.PrettyPrint()} as an argument."); @@ -330,12 +412,12 @@ protected virtual bool Recover(ICommittedEvent>; + Logger.Debug("Aggregate of Name={0}, and Id={1}; has received a SnapshotOffer of Type={2}.", Name, Id, aggregateSnapshotOffer.Snapshot.GetType().PrettyPrint()); + var comittedSnapshot = aggregateSnapshotOffer.Snapshot as CommittedSnapshot>; HydrateSnapshot(comittedSnapshot.AggregateSnapshot, aggregateSnapshotOffer.Metadata.SequenceNr); } catch (Exception exception) { - Logger.Error($"Recovering with snapshot of type [{aggregateSnapshotOffer.Snapshot.GetType().PrettyPrint()}] caused an exception {exception.GetType().PrettyPrint()}"); + Logger.Error(exception,"Aggregate of Name={0}, Id={1}; recovering with snapshot of Type={2} caused an exception.", Name, Id, aggregateSnapshotOffer.Snapshot.GetType().PrettyPrint()); return false; } @@ -369,20 +451,20 @@ protected virtual void SetSnapshotStrategy(ISnapshotStrategy snapshotStrategy) } protected virtual bool SnapshotStatus(SaveSnapshotSuccess snapshotSuccess) { - Logger.Info($"Aggregate [{Name}] With Id={Id} Saved Snapshot at Version {snapshotSuccess.Metadata.SequenceNr}"); + Logger.Debug("Aggregate of Name={0}, and Id={1}; saved a snapshot at Version={2}.", Name, Id, snapshotSuccess.Metadata.SequenceNr); return true; } protected virtual bool SnapshotStatus(SaveSnapshotFailure snapshotFailure) { - Logger.Error($"Aggregate [{Name}] With Id={Id} Failed to save snapshot at version {snapshotFailure.Metadata.SequenceNr} because of {snapshotFailure.Cause}"); + Logger.Error(snapshotFailure.Cause,"Aggregate of Name={0}, and Id={1}; failed to save snapshot at Version={2}.", Name, Id, snapshotFailure.Metadata.SequenceNr); return true; } protected virtual bool Recover(RecoveryCompleted recoveryCompleted) { - Logger.Info($"Aggregate [{Name}] With Id={Id} has completed recovering from it's journal(s)"); + Logger.Debug("Aggregate of Name={0}, and Id={1}; has completed recovering from it's event journal at Version={2}.", Name, Id, Version); return true; } @@ -400,9 +482,9 @@ protected void Command(Predicate shouldHand var handler = (TCommandHandler) Activator.CreateInstance(typeof(TCommandHandler)); Command(x => handler.HandleCommand(this as TAggregate, Context, x),shouldHandle); } - catch + catch (Exception exception) { - Logger.Error($"Unable to Activate CommandHandler {typeof(TCommandHandler).PrettyPrint()} for {typeof(TAggregate).PrettyPrint()}"); + Logger.Error(exception,"Unable to activate CommandHandler of Type={0} for Aggregate of Type={1}.",typeof(TCommandHandler).PrettyPrint(), typeof(TAggregate).PrettyPrint()); } } diff --git a/src/Akkatecture/Aggregates/AggregateState.cs b/src/Akkatecture/Aggregates/AggregateState.cs index e61cc516..e3a33538 100644 --- a/src/Akkatecture/Aggregates/AggregateState.cs +++ b/src/Akkatecture/Aggregates/AggregateState.cs @@ -50,7 +50,7 @@ protected AggregateState() var me = this as TMessageApplier; if (me == null) - throw new InvalidOperationException($"Event applier of type '{GetType().PrettyPrint()}' has a wrong generic argument '{typeof(TMessageApplier).PrettyPrint()}'"); + throw new InvalidOperationException($"MessageApplier of Type={GetType().PrettyPrint()} has a wrong generic argument Type={typeof(TMessageApplier).PrettyPrint()}."); } diff --git a/src/Akkatecture/Aggregates/CommittedEvent.cs b/src/Akkatecture/Aggregates/CommittedEvent.cs index 47c3c462..d3020de1 100644 --- a/src/Akkatecture/Aggregates/CommittedEvent.cs +++ b/src/Akkatecture/Aggregates/CommittedEvent.cs @@ -27,13 +27,14 @@ using System; using Akkatecture.Core; +using Akkatecture.Extensions; namespace Akkatecture.Aggregates { public class CommittedEvent : ICommittedEvent where TAggregate : IAggregateRoot where TIdentity : IIdentity - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { public TIdentity AggregateIdentity { get; } public TAggregateEvent AggregateEvent { get; } @@ -70,5 +71,10 @@ public IAggregateEvent GetAggregateEvent() { return AggregateEvent; } + + public override string ToString() + { + return $"{typeof(TAggregate).PrettyPrint()} v{AggregateSequenceNumber}/{typeof(TAggregateEvent).PrettyPrint()}:{AggregateIdentity}"; + } } } \ No newline at end of file diff --git a/src/Akkatecture/Aggregates/DomainEvent.cs b/src/Akkatecture/Aggregates/DomainEvent.cs index 46513b56..dd7a7c37 100644 --- a/src/Akkatecture/Aggregates/DomainEvent.cs +++ b/src/Akkatecture/Aggregates/DomainEvent.cs @@ -34,7 +34,7 @@ namespace Akkatecture.Aggregates public class DomainEvent : IDomainEvent where TAggregate : IAggregateRoot where TIdentity : IIdentity - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { public Type AggregateType => typeof(TAggregate); public Type IdentityType => typeof(TIdentity); diff --git a/src/Akkatecture/Aggregates/ICommittedEvent.cs b/src/Akkatecture/Aggregates/ICommittedEvent.cs index a3b3ea70..cf44d88b 100644 --- a/src/Akkatecture/Aggregates/ICommittedEvent.cs +++ b/src/Akkatecture/Aggregates/ICommittedEvent.cs @@ -51,7 +51,7 @@ public interface ICommittedEvent : ICommittedEvent public interface ICommittedEvent : ICommittedEvent where TAggregate : IAggregateRoot where TIdentity : IIdentity - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { TAggregateEvent AggregateEvent { get; } } diff --git a/src/Akkatecture/Aggregates/IDomainEvent.cs b/src/Akkatecture/Aggregates/IDomainEvent.cs index 3f1601f2..3a04584d 100644 --- a/src/Akkatecture/Aggregates/IDomainEvent.cs +++ b/src/Akkatecture/Aggregates/IDomainEvent.cs @@ -53,7 +53,7 @@ public interface IDomainEvent : IDomainEvent public interface IDomainEvent : IDomainEvent where TAggregate : IAggregateRoot where TIdentity : IIdentity - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { TAggregateEvent AggregateEvent { get; } } diff --git a/src/Akkatecture/Aggregates/Snapshot/ComittedSnapshot.cs b/src/Akkatecture/Aggregates/Snapshot/CommittedSnapshot.cs similarity index 96% rename from src/Akkatecture/Aggregates/Snapshot/ComittedSnapshot.cs rename to src/Akkatecture/Aggregates/Snapshot/CommittedSnapshot.cs index f0a004e4..b02fedae 100644 --- a/src/Akkatecture/Aggregates/Snapshot/ComittedSnapshot.cs +++ b/src/Akkatecture/Aggregates/Snapshot/CommittedSnapshot.cs @@ -26,7 +26,7 @@ namespace Akkatecture.Aggregates.Snapshot { - public class ComittedSnapshot + public class CommittedSnapshot where TAggregate : IAggregateRoot where TIdentity : IIdentity where TAggregateSnapshot : IAggregateSnapshot @@ -37,7 +37,7 @@ public class ComittedSnapshot public long AggregateSequenceNumber { get; } public DateTimeOffset Timestamp { get; } - public ComittedSnapshot( + public CommittedSnapshot( TIdentity aggregateIdentity, TAggregateSnapshot aggregateSnapshot, SnapshotMetadata metadata, diff --git a/src/Akkatecture/Commands/CommandHandler.cs b/src/Akkatecture/Commands/CommandHandler.cs index dee0dbef..629f52b0 100644 --- a/src/Akkatecture/Commands/CommandHandler.cs +++ b/src/Akkatecture/Commands/CommandHandler.cs @@ -59,7 +59,7 @@ public override bool HandleCommand( { var logger = context.GetLogger(); Handle(aggregate, context, command); - logger.Info($"{command.GetType().PrettyPrint()} handled in {GetType().PrettyPrint()}"); + logger.Debug("Command of Type={0} handled in CommandHandler of Type={1}",command.GetType().PrettyPrint(), GetType().PrettyPrint()); return true; } diff --git a/src/Akkatecture/Events/AggregateEventUpcaster.cs b/src/Akkatecture/Events/AggregateEventUpcaster.cs index a99f4910..b2c36e52 100644 --- a/src/Akkatecture/Events/AggregateEventUpcaster.cs +++ b/src/Akkatecture/Events/AggregateEventUpcaster.cs @@ -68,7 +68,7 @@ private bool ShouldUpcast(object potentialUpcast) { var type = potentialUpcast.GetType(); - if (potentialUpcast is ICommittedEvent comittedEvent) + if (potentialUpcast is ICommittedEvent committedEvent) { var eventType = type.GenericTypeArguments[2]; diff --git a/src/Akkatecture/Events/DomainEventReadAdapter.cs b/src/Akkatecture/Events/DomainEventReadAdapter.cs index 50ad3a5b..3c320ba6 100644 --- a/src/Akkatecture/Events/DomainEventReadAdapter.cs +++ b/src/Akkatecture/Events/DomainEventReadAdapter.cs @@ -1,3 +1,26 @@ +// The MIT License (MIT) +// +// Copyright (c) 2018 - 2019 Lutando Ngqakaza +// https://github.com/Lutando/Akkatecture +// +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + using Akka.Persistence.Journal; namespace Akkatecture.Events diff --git a/src/Akkatecture/Sagas/AggregateSaga/AggregateSaga.cs b/src/Akkatecture/Sagas/AggregateSaga/AggregateSaga.cs index 0b2b092b..dba88d28 100644 --- a/src/Akkatecture/Sagas/AggregateSaga/AggregateSaga.cs +++ b/src/Akkatecture/Sagas/AggregateSaga/AggregateSaga.cs @@ -81,7 +81,7 @@ protected AggregateSaga() if ((this as TAggregateSaga) == null) { throw new InvalidOperationException( - $"AggregateSaga '{GetType().PrettyPrint()}' specifies '{typeof(TAggregateSaga).PrettyPrint()}' as generic argument, it should be its own type"); + $"AggregateSaga {Name} specifies Type={typeof(TAggregateSaga).PrettyPrint()} as generic argument, it should be its own type."); } if (State == null) @@ -90,9 +90,9 @@ protected AggregateSaga() { State = (TSagaState)Activator.CreateInstance(typeof(TSagaState)); } - catch + catch (Exception exception) { - Logger.Warning($"Unable to activate State for {GetType()}"); + Logger.Error(exception,"AggregateSaga of Name={1}; was unable to activate SagaState of Type={0}.", Name, typeof(TSagaState).PrettyPrint()); } } @@ -220,31 +220,95 @@ public void InitAsyncReceives() protected virtual void Emit(TAggregateEvent aggregateEvent, IMetadata metadata = null) - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { var committedEvent = From(aggregateEvent, Version, metadata); Persist(committedEvent, ApplyCommittedEvent); } - public virtual void EmitAll(IEnumerable aggregateEvents, IMetadata metadata = null) - where TAggregateEvent : IAggregateEvent + public virtual void EmitAll(params IAggregateEvent[] aggregateEvents) { - long version = Version; - var comittedEvents = new List>(); + var version = Version; + + var committedEvents = new List(); foreach (var aggregateEvent in aggregateEvents) { - var committedEvent = From(aggregateEvent, version + 1, metadata); - comittedEvents.Add(committedEvent); + var committedEvent = FromObject(aggregateEvent, version + 1); + committedEvents.Add(committedEvent); version++; } - PersistAll(comittedEvents, ApplyCommittedEvent); + PersistAll(committedEvents, ApplyObjectCommittedEvent); + } + + private object FromObject(object aggregateEvent, long version, IMetadata metadata = null) + { + if (aggregateEvent is IAggregateEvent) + { + _eventDefinitionService.Load(aggregateEvent.GetType()); + var eventDefinition = _eventDefinitionService.GetDefinition(aggregateEvent.GetType()); + var aggregateSequenceNumber = version + 1; + var eventId = EventId.NewDeterministic( + GuidFactories.Deterministic.Namespaces.Events, + $"{Id.Value}-v{aggregateSequenceNumber}"); + var now = DateTimeOffset.UtcNow; + var eventMetadata = new Metadata + { + Timestamp = now, + AggregateSequenceNumber = aggregateSequenceNumber, + AggregateName = Name.Value, + AggregateId = Id.Value, + EventId = eventId, + EventName = eventDefinition.Name, + EventVersion = eventDefinition.Version + }; + eventMetadata.Add(MetadataKeys.TimestampEpoch, now.ToUnixTime().ToString()); + if (metadata != null) + { + eventMetadata.AddRange(metadata); + } + var genericType = typeof(CommittedEvent<,,>) + .MakeGenericType(typeof(TAggregateSaga), typeof(TIdentity),aggregateEvent.GetType()); + + + var committedEvent = Activator.CreateInstance( + genericType, + Id, + aggregateEvent, + eventMetadata, + now, + aggregateSequenceNumber); + + return committedEvent; + } + + throw new InvalidOperationException("could not perform the required mapping for committed event."); + + } + + private void ApplyObjectCommittedEvent(object committedEvent) + { + try + { + var method = GetType() + .GetMethods(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic) + .Where(m => m.IsFamily || m.IsPublic) + .Single(m => m.Name.Equals("ApplyCommittedEvent")); + + var genericMethod = method.MakeGenericMethod(committedEvent.GetType().GenericTypeArguments[2]); + + genericMethod.Invoke(this, new[] {committedEvent}); + } + catch (Exception exception) + { + Logger.Error(exception, "Aggregate of Name={0}, and Id={1}; tried to invoke Method={2} with object Type={3} .",Name, Id, nameof(ApplyCommittedEvent), committedEvent.GetType().PrettyPrint()); + } } public virtual CommittedEvent From(TAggregateEvent aggregateEvent, long version, IMetadata metadata = null) - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { if (aggregateEvent == null) { @@ -279,17 +343,17 @@ public virtual CommittedEvent From CreateSnapshot() { - Logger.Info($"[{Name}] With Id={Id} Attempted to Create a Snapshot, override the CreateSnapshot() method to return the snapshot data model."); + Logger.Info("AggregateSaga of Name={0}, and Id={2}; attempted to create a snapshot, override the {2}() method to get snapshotting to function.", Name, Id, nameof(CreateSnapshot)); return null; } protected void ApplyCommittedEvent(ICommittedEvent committedEvent) - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { var applyMethods = GetEventApplyMethods(committedEvent.AggregateEvent); applyMethods(committedEvent.AggregateEvent); - Logger.Info($"[{Name}] With Id={Id} Commited and Applied [{typeof(TAggregateEvent).PrettyPrint()}]"); + Logger.Info("AggregateSaga of Name={0}, and Id={1}; committed and applied an AggregateEvent of Type={2}", Name, Id, typeof(TAggregateEvent).PrettyPrint()); Version++; @@ -313,14 +377,14 @@ protected void ApplyCommittedEvent(ICommittedEvent>( + var committedSnapshot = + new CommittedSnapshot>( Id, aggregateSnapshot, snapshotMetadata, committedEvent.Timestamp, Version); - SaveSnapshot(commitedSnapshot); + SaveSnapshot(committedSnapshot); } } @@ -330,7 +394,7 @@ protected void ApplyCommittedEvent(ICommittedEvent(TEvent aggregateEvent) { Context.System.EventStream.Publish(aggregateEvent); - Logger.Info($"[{Name}] With Id={Id} Published [{typeof(TEvent).PrettyPrint()}]"); + Logger.Info("Aggregate of Name={0}, and Id={1}; published DomainEvent of Type={2}.",Name, Id, typeof(TEvent).PrettyPrint()); } protected Action GetEventApplyMethods(TAggregateEvent aggregateEvent) @@ -340,11 +404,8 @@ protected Action GetEventApplyMethods(TAggrega Action applyMethod; if (!ApplyMethodsFromState.TryGetValue(eventType, out applyMethod)) - { - throw new NotImplementedException( - $"Aggregate State '{State.GetType().PrettyPrint()}' does not have an 'Apply' method that takes aggregate event type '{eventType.PrettyPrint()}' as argument"); - } - + throw new NotImplementedException($"SagaState of Type={State.GetType().PrettyPrint()} does not have an 'Apply' method that takes in an aggregate event of Type={eventType.PrettyPrint()} as an argument."); + var aggregateApplyMethod = applyMethod.Bind(State); return aggregateApplyMethod; @@ -359,16 +420,16 @@ protected virtual void ApplyEvent(IAggregateEvent agg Version++; } - protected virtual bool Recover(ICommittedEvent> domainEvent) + protected virtual bool Recover(ICommittedEvent> committedEvent) { try { - Logger.Debug($"Recovering with event of type [{domainEvent.GetType().PrettyPrint()}] "); - ApplyEvent(domainEvent.AggregateEvent); + Logger.Debug("AggregateSaga of Name={0}, Id={1}, and Version={2}, is recovering with CommittedEvent of Type={3}.", Name, Id, Version, committedEvent.GetType().PrettyPrint()); + ApplyEvent(committedEvent.AggregateEvent); } catch (Exception exception) { - Logger.Error($"Recovering with event of type [{domainEvent.GetType().PrettyPrint()}] caused an exception {exception.GetType().PrettyPrint()}"); + Logger.Error(exception,"Aggregate of Name={0}, Id={1}; while recovering with event of Type={2} caused an exception.", Name, Id, committedEvent.GetType().PrettyPrint()); return false; } @@ -377,15 +438,15 @@ protected virtual bool Recover(ICommittedEvent>; + Logger.Debug("AggregateSaga of Name={0}, and Id={1}; has received a SnapshotOffer of Type={2}.", Name, Id, aggregateSnapshotOffer.Snapshot.GetType().PrettyPrint()); + var comittedSnapshot = aggregateSnapshotOffer.Snapshot as CommittedSnapshot>; HydrateSnapshot(comittedSnapshot.AggregateSnapshot, aggregateSnapshotOffer.Metadata.SequenceNr); } catch (Exception exception) { - Logger.Error($"Recovering with snapshot of type [{aggregateSnapshotOffer.Snapshot.GetType().PrettyPrint()}] caused an exception {exception.GetType().PrettyPrint()}"); + Logger.Error(exception,"AggregateSaga of Name={0}, Id={1}; recovering with snapshot of Type={2} caused an exception.", Name, Id, aggregateSnapshotOffer.Snapshot.GetType().PrettyPrint()); return false; } @@ -409,10 +470,7 @@ protected Action GetSnapshotHydrateMethods hydrateMethod; if (!HydrateMethodsFromState.TryGetValue(snapshotType, out hydrateMethod)) - { - throw new NotImplementedException( - $"Aggregate State '{State.GetType().PrettyPrint()}' does not have an 'Hydrate' method that takes aggregate snapshot type '{snapshotType.PrettyPrint()}' as argument"); - } + throw new NotImplementedException($"SagaState of Type={State.GetType().PrettyPrint()} does not have a 'Hydrate' method that takes in an aggregate snapshot of Type={snapshotType.PrettyPrint()} as an argument."); var snapshotHydrateMethod = hydrateMethod.Bind(State); @@ -443,20 +501,20 @@ protected virtual void SetSnapshotStrategy(ISnapshotStrategy snapshotStrategy) } protected virtual bool SnapshotStatus(SaveSnapshotSuccess snapshotSuccess) { - Logger.Info($"Aggregate [{Name}] With Id={Id} Saved Snapshot at Version {snapshotSuccess.Metadata.SequenceNr}"); + Logger.Debug("Aggregate of Name={0}, and Id={1}; saved a snapshot at Version={2}.", Name, Id, snapshotSuccess.Metadata.SequenceNr); return true; } protected virtual bool SnapshotStatus(SaveSnapshotFailure snapshotFailure) { - Logger.Error($"Aggregate [{Name}] With Id={Id} Failed to save snapshot at version {snapshotFailure.Metadata.SequenceNr} because of {snapshotFailure.Cause}"); + Logger.Error(snapshotFailure.Cause,"Aggregate of Name={0}, and Id={1}; failed to save snapshot at Version={2}.", Name, Id, snapshotFailure.Metadata.SequenceNr); return true; } protected virtual bool Recover(RecoveryCompleted recoveryCompleted) { - Logger.Info($"Aggregate [{Name}] With Id={Id} has completed recovering from it's journal(s)"); + Logger.Debug("Aggregate of Name={0}, and Id={1}; has completed recovering from it's event journal at Version={2}.", Name, Id, Version); return true; } } diff --git a/src/Akkatecture/Sagas/AggregateSaga/AggregateSagaManager.cs b/src/Akkatecture/Sagas/AggregateSaga/AggregateSagaManager.cs index e04d93e6..16a967d2 100644 --- a/src/Akkatecture/Sagas/AggregateSaga/AggregateSagaManager.cs +++ b/src/Akkatecture/Sagas/AggregateSaga/AggregateSagaManager.cs @@ -91,7 +91,7 @@ protected virtual bool Handle(IDomainEvent domainEvent) protected virtual bool Terminate(Terminated message) { - Logger.Warning($"{GetType().PrettyPrint()}: {message.ActorRef.Path} has terminated."); + Logger.Warning("AggregateSaga of Type={0}, and Id={1}; has terminated.",typeof(TAggregateSaga).PrettyPrint(), message.ActorRef.Path.Name); Context.Unwatch(message.ActorRef); return true; } @@ -104,7 +104,7 @@ protected override SupervisorStrategy SupervisorStrategy() localOnlyDecider: x => { - Logger.Error($"[{GetType().PrettyPrint()}] Exception={x.ToString()} to be decided."); + Logger.Warning("{0} will supervise Exception={1} to be decided as {2}.",GetType().PrettyPrint(), x.ToString(),Directive.Restart); return Directive.Restart; }); } @@ -112,7 +112,7 @@ protected override SupervisorStrategy SupervisorStrategy() protected IActorRef FindOrSpawn(TIdentity sagaId) { var saga = Context.Child(sagaId); - if (Equals(saga, ActorRefs.Nobody)) + if (saga.IsNobody()) { return Spawn(sagaId); } diff --git a/src/Akkatecture/Sagas/ISagaHandles.cs b/src/Akkatecture/Sagas/ISagaHandles.cs index 73b1f650..77b0ffba 100644 --- a/src/Akkatecture/Sagas/ISagaHandles.cs +++ b/src/Akkatecture/Sagas/ISagaHandles.cs @@ -34,14 +34,14 @@ namespace Akkatecture.Sagas { public interface ISagaHandles : ISaga - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent where TAggregate : IAggregateRoot where TIdentity : IIdentity { bool Handle(IDomainEvent domainEvent); } public interface ISagaHandlesAsync : ISaga - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent where TAggregate : IAggregateRoot where TIdentity : IIdentity { diff --git a/src/Akkatecture/Sagas/ISagaIsStartedBy.cs b/src/Akkatecture/Sagas/ISagaIsStartedBy.cs index 2af30200..47a27c44 100644 --- a/src/Akkatecture/Sagas/ISagaIsStartedBy.cs +++ b/src/Akkatecture/Sagas/ISagaIsStartedBy.cs @@ -31,14 +31,14 @@ namespace Akkatecture.Sagas { public interface ISagaIsStartedByAsync : ISagaHandlesAsync - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent where TAggregate : IAggregateRoot where TIdentity : IIdentity { } public interface ISagaIsStartedBy : ISagaHandles - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent where TAggregate : IAggregateRoot where TIdentity : IIdentity { diff --git a/src/Akkatecture/Sagas/SagaState.cs b/src/Akkatecture/Sagas/SagaState.cs index f0d9a31e..4fd0310c 100644 --- a/src/Akkatecture/Sagas/SagaState.cs +++ b/src/Akkatecture/Sagas/SagaState.cs @@ -41,11 +41,9 @@ public abstract class SagaState : IMessageApp protected SagaState() { var me = this as TMessageApplier; + if (me == null) - { - throw new InvalidOperationException( - $"Event applier of type '{GetType().PrettyPrint()}' has a wrong generic argument '{typeof(TMessageApplier).PrettyPrint()}'"); - } + throw new InvalidOperationException($"MessageApplier of Type={GetType().PrettyPrint()} has a wrong generic argument Type={typeof(TMessageApplier).PrettyPrint()}."); } } } \ No newline at end of file diff --git a/src/Akkatecture/Subscribers/ISubscribeTo.cs b/src/Akkatecture/Subscribers/ISubscribeTo.cs index a285e69e..edf82eaa 100644 --- a/src/Akkatecture/Subscribers/ISubscribeTo.cs +++ b/src/Akkatecture/Subscribers/ISubscribeTo.cs @@ -32,19 +32,19 @@ namespace Akkatecture.Subscribers { - public interface ISubscribeToAsync + public interface ISubscribeToAsync where TAggregate : IAggregateRoot where TIdentity : IIdentity - where TEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { - Task HandleAsync(IDomainEvent domainEvent); + Task HandleAsync(IDomainEvent domainEvent); } - public interface ISubscribeTo + public interface ISubscribeTo where TAggregate : IAggregateRoot where TIdentity : IIdentity - where TEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { - bool Handle(IDomainEvent domainEvent); + bool Handle(IDomainEvent domainEvent); } } \ No newline at end of file diff --git a/test/Akkatecture.TestHelpers/Aggregates/Commands/CreateAndAddTwoTestsCommand.cs b/test/Akkatecture.TestHelpers/Aggregates/Commands/CreateAndAddTwoTestsCommand.cs new file mode 100644 index 00000000..c150b2f2 --- /dev/null +++ b/test/Akkatecture.TestHelpers/Aggregates/Commands/CreateAndAddTwoTestsCommand.cs @@ -0,0 +1,22 @@ +using Akkatecture.Commands; +using Akkatecture.TestHelpers.Aggregates.Entities; + +namespace Akkatecture.TestHelpers.Aggregates.Commands +{ + public class CreateAndAddTwoTestsCommand: Command + { + public Test FirstTest { get; } + public Test SecondTest { get; } + + public CreateAndAddTwoTestsCommand( + TestAggregateId aggregateId, + CommandId sourceId, + Test firstTest, + Test secondTest) + : base(aggregateId, sourceId) + { + FirstTest = firstTest; + SecondTest = secondTest; + } + } +} \ No newline at end of file diff --git a/test/Akkatecture.TestHelpers/Aggregates/TestAggregate.cs b/test/Akkatecture.TestHelpers/Aggregates/TestAggregate.cs index abc4f372..0b47ec11 100644 --- a/test/Akkatecture.TestHelpers/Aggregates/TestAggregate.cs +++ b/test/Akkatecture.TestHelpers/Aggregates/TestAggregate.cs @@ -22,7 +22,6 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System; -using System.Collections.Generic; using System.Linq; using Akka.Persistence; using Akkatecture.Aggregates; @@ -49,6 +48,7 @@ public TestAggregate(TestAggregateId aggregateId) TestErrors = 0; //Aggregate Commands Command(Execute); + Command(Execute); Command(Execute); Command(Execute); Command(Execute); @@ -78,7 +78,30 @@ private bool Execute(CreateTestCommand command) { TestErrors++; Throw(new TestedErrorEvent(TestErrors)); - Reply(TestExecutionResult.FailedWith(command.SourceId)); + ReplyFailure(TestExecutionResult.FailedWith(command.SourceId)); + } + + return true; + } + + + private bool Execute(CreateAndAddTwoTestsCommand command) + { + if (IsNew) + { + var createdEvent = new TestCreatedEvent(command.AggregateId); + var firstTestAddedEvent = new TestAddedEvent(command.FirstTest); + var secondTestAddedEvent = new TestAddedEvent(command.SecondTest); + var events = Events(createdEvent, firstTestAddedEvent, secondTestAddedEvent); + //EmitAll(events, new Metadata {{"some-key","some-value"}}); + EmitAll(createdEvent, firstTestAddedEvent, secondTestAddedEvent); + Reply(TestExecutionResult.SucceededWith(command.SourceId)); + } + else + { + TestErrors++; + Throw(new TestedErrorEvent(TestErrors)); + ReplyFailure(TestExecutionResult.FailedWith(command.SourceId)); } return true; @@ -97,7 +120,7 @@ private bool Execute(AddTestCommand command) { TestErrors++; Throw(new TestedErrorEvent(TestErrors)); - Reply(TestExecutionResult.FailedWith(command.SourceId)); + ReplyFailure(TestExecutionResult.FailedWith(command.SourceId)); } return true; } @@ -110,7 +133,7 @@ private bool Execute(AddFourTestsCommand command) .Range(0, 4) .Select(x => new TestAddedEvent(command.Test)); - EmitAll(events); + EmitAll(events.ToArray()); Reply(TestExecutionResult.SucceededWith(command.SourceId)); } @@ -118,7 +141,7 @@ private bool Execute(AddFourTestsCommand command) { TestErrors++; Throw(new TestedErrorEvent(TestErrors)); - Reply(TestExecutionResult.FailedWith(command.SourceId)); + ReplyFailure(TestExecutionResult.FailedWith(command.SourceId)); } return true; } @@ -138,7 +161,7 @@ private bool Execute(GiveTestCommand command) { TestErrors++; Throw(new TestedErrorEvent(TestErrors)); - Reply(TestExecutionResult.FailedWith(command.SourceId)); + ReplyFailure(TestExecutionResult.FailedWith(command.SourceId)); } return true; @@ -155,7 +178,7 @@ private bool Execute(ReceiveTestCommand command) { TestErrors++; Throw(new TestedErrorEvent(TestErrors)); - Reply(TestExecutionResult.FailedWith(command.SourceId)); + ReplyFailure(TestExecutionResult.FailedWith(command.SourceId)); } return true; @@ -186,7 +209,7 @@ private bool Execute(PoisonTestAggregateCommand command) { TestErrors++; Throw(new TestedErrorEvent(TestErrors)); - Reply(TestExecutionResult.FailedWith(command.SourceId)); + ReplyFailure(TestExecutionResult.FailedWith(command.SourceId)); } return true; @@ -222,15 +245,13 @@ protected override IAggregateSnapshot CreateSnap } private void Signal(TAggregateEvent aggregateEvent, IMetadata metadata = null) - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { if (aggregateEvent == null) { throw new ArgumentNullException(nameof(aggregateEvent)); } - EventDefinitionService.Load(aggregateEvent.GetType()); - var eventDefinition = EventDefinitionService.GetDefinition(aggregateEvent.GetType()); var aggregateSequenceNumber = Version; var eventId = EventId.NewDeterministic( GuidFactories.Deterministic.Namespaces.Events, @@ -242,9 +263,7 @@ private void Signal(TAggregateEvent aggregateEvent, IMetadata m AggregateSequenceNumber = aggregateSequenceNumber, AggregateName = Name.Value, AggregateId = Id.Value, - EventId = eventId, - EventName = eventDefinition.Name, - EventVersion = eventDefinition.Version + EventId = eventId }; eventMetadata.Add(MetadataKeys.TimestampEpoch, now.ToUnixTime().ToString()); @@ -253,15 +272,13 @@ private void Signal(TAggregateEvent aggregateEvent, IMetadata m eventMetadata.AddRange(metadata); } - Logger.Info($"[{Name}] With Id={Id} Commited [{typeof(TAggregateEvent).PrettyPrint()}]"); - var domainEvent = new DomainEvent(Id, aggregateEvent, eventMetadata, now, Version); Publish(domainEvent); } private void Throw(TAggregateEvent aggregateEvent, IMetadata metadata = null) - where TAggregateEvent : IAggregateEvent + where TAggregateEvent : class, IAggregateEvent { Signal(aggregateEvent, metadata); } diff --git a/test/Akkatecture.Tests/Akkatecture.Tests.csproj b/test/Akkatecture.Tests/Akkatecture.Tests.csproj index 81977200..21461413 100644 --- a/test/Akkatecture.Tests/Akkatecture.Tests.csproj +++ b/test/Akkatecture.Tests/Akkatecture.Tests.csproj @@ -33,7 +33,7 @@ - + diff --git a/test/Akkatecture.Tests/UnitTests/Aggregates/AggregateTests.cs b/test/Akkatecture.Tests/UnitTests/Aggregates/AggregateTests.cs index ed1aba36..b5b5b4da 100644 --- a/test/Akkatecture.Tests/UnitTests/Aggregates/AggregateTests.cs +++ b/test/Akkatecture.Tests/UnitTests/Aggregates/AggregateTests.cs @@ -21,6 +21,7 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using System; using System.ComponentModel; using System.Threading.Tasks; using Akka.Actor; @@ -54,7 +55,7 @@ public AggregateTests() public void InitialState_AfterAggregateCreation_TestCreatedEventEmitted() { var eventProbe = CreateTestProbe("event-probe"); - Sys.EventStream.Subscribe(eventProbe, typeof(DomainEvent)); + Sys.EventStream.Subscribe(eventProbe, typeof(IDomainEvent)); var aggregateManager = Sys.ActorOf(Props.Create(() => new TestAggregateManager()), "test-aggregatemanager"); var aggregateId = TestAggregateId.New; @@ -63,7 +64,7 @@ public void InitialState_AfterAggregateCreation_TestCreatedEventEmitted() aggregateManager.Tell(command); eventProbe - .ExpectMsg>( + .ExpectMsg>( x => x.AggregateEvent.TestAggregateId.Equals(aggregateId) && x.Metadata.ContainsKey("some-key")); } @@ -75,7 +76,7 @@ public async Task SendingCommand_ToAggregateRoot_ShouldReplyWithProperMessage() { var eventProbe = CreateTestProbe("event-probe"); var commandProbe = CreateTestProbe("command-probe"); - Sys.EventStream.Subscribe(eventProbe, typeof(DomainEvent)); + Sys.EventStream.Subscribe(eventProbe, typeof(IDomainEvent)); var aggregateManager = Sys.ActorOf(Props.Create(() => new TestAggregateManager()), "test-aggregatemanager"); var aggregateId = TestAggregateId.New; @@ -92,7 +93,7 @@ public async Task SendingCommand_ToAggregateRoot_ShouldReplyWithProperMessage() public void EventContainerMetadata_AfterAggregateCreation_TestCreatedEventEmitted() { var eventProbe = CreateTestProbe("event-probe"); - Sys.EventStream.Subscribe(eventProbe, typeof(DomainEvent)); + Sys.EventStream.Subscribe(eventProbe, typeof(IDomainEvent)); var aggregateManager = Sys.ActorOf(Props.Create(() => new TestAggregateManager()), "test-aggregatemanager"); var aggregateId = TestAggregateId.New; @@ -101,7 +102,7 @@ public void EventContainerMetadata_AfterAggregateCreation_TestCreatedEventEmitte aggregateManager.Tell(command); eventProbe - .ExpectMsg>( + .ExpectMsg>( x => x.AggregateIdentity.Equals(aggregateId) && x.IdentityType == typeof(TestAggregateId) && x.AggregateType == typeof(TestAggregate) @@ -118,7 +119,7 @@ public void EventContainerMetadata_AfterAggregateCreation_TestCreatedEventEmitte public void InitialState_AfterAggregateCreation_TestStateSignalled() { var eventProbe = CreateTestProbe("event-probe"); - Sys.EventStream.Subscribe(eventProbe, typeof(DomainEvent)); + Sys.EventStream.Subscribe(eventProbe, typeof(IDomainEvent)); var aggregateManager = Sys.ActorOf(Props.Create(() => new TestAggregateManager()), "test-aggregatemanager"); var aggregateId = TestAggregateId.New; @@ -129,7 +130,7 @@ public void InitialState_AfterAggregateCreation_TestStateSignalled() aggregateManager.Tell(nextCommand); eventProbe - .ExpectMsg>( + .ExpectMsg>( x => x.AggregateEvent.LastSequenceNr == 1 && x.AggregateEvent.Version == 1 && x.AggregateEvent.AggregateState.TestCollection.Count == 0); @@ -140,7 +141,7 @@ public void InitialState_AfterAggregateCreation_TestStateSignalled() public void TestCommand_AfterAggregateCreation_TestEventEmitted() { var eventProbe = CreateTestProbe("event-probe"); - Sys.EventStream.Subscribe(eventProbe, typeof(DomainEvent)); + Sys.EventStream.Subscribe(eventProbe, typeof(IDomainEvent)); var aggregateManager = Sys.ActorOf(Props.Create(() => new TestAggregateManager()), "test-aggregatemanager"); var aggregateId = TestAggregateId.New; @@ -154,7 +155,7 @@ public void TestCommand_AfterAggregateCreation_TestEventEmitted() aggregateManager.Tell(nextCommand); eventProbe - .ExpectMsg>( + .ExpectMsg>( x => x.AggregateEvent.Test.Equals(test)); } @@ -163,7 +164,7 @@ public void TestCommand_AfterAggregateCreation_TestEventEmitted() public void TestCommandTwice_AfterAggregateCreation_TestEventEmitted() { var eventProbe = CreateTestProbe("event-probe"); - Sys.EventStream.Subscribe(eventProbe, typeof(DomainEvent)); + Sys.EventStream.Subscribe(eventProbe, typeof(IDomainEvent)); var aggregateManager = Sys.ActorOf(Props.Create(() => new TestAggregateManager()), "test-aggregatemanager"); @@ -184,12 +185,12 @@ public void TestCommandTwice_AfterAggregateCreation_TestEventEmitted() eventProbe - .ExpectMsg>( + .ExpectMsg>( x => x.AggregateEvent.Test.Equals(test) && x.AggregateSequenceNumber == 2); eventProbe - .ExpectMsg>( + .ExpectMsg>( x => x.AggregateEvent.Test.Equals(test2) && x.AggregateSequenceNumber == 3); } @@ -200,7 +201,7 @@ public void TestEventSourcing_AfterManyTests_TestStateSignalled() { var eventProbe = CreateTestProbe("event-probe"); - Sys.EventStream.Subscribe(eventProbe, typeof(DomainEvent)); + Sys.EventStream.Subscribe(eventProbe, typeof(IDomainEvent)); var aggregateManager = Sys.ActorOf(Props.Create(() => new TestAggregateManager()), "test-aggregatemanager"); var aggregateId = TestAggregateId.New; var commandId = CommandId.New; @@ -223,21 +224,44 @@ public void TestEventSourcing_AfterManyTests_TestStateSignalled() aggregateManager.Tell(reviveCommand); - eventProbe - .ExpectMsg>( + .ExpectMsg>( x => x.AggregateEvent.LastSequenceNr == 6 && x.AggregateEvent.Version == 6 && x.AggregateEvent.AggregateState.TestCollection.Count == 5); } + [Fact] + [Category(Category)] + public void TestEventMultipleEmitSourcing_AfterManyMultiCreateCommand_EventsEmitted() + { + var eventProbe = CreateTestProbe("event-probe"); + Sys.EventStream.Subscribe(eventProbe, typeof(IDomainEvent)); + Sys.EventStream.Subscribe(eventProbe, typeof(IDomainEvent)); + var aggregateManager = Sys.ActorOf(Props.Create(() => new TestAggregateManager()), "test-aggregatemanager"); + + var aggregateId = TestAggregateId.New; + var commandId = CommandId.New; + var firstTest = new Test(TestId.New); + var secondTest = new Test(TestId.New); + var command = new CreateAndAddTwoTestsCommand(aggregateId, commandId, firstTest, secondTest); + + aggregateManager.Tell(command); + + eventProbe.ExpectMsg>(); + eventProbe.ExpectMsg>(); + eventProbe.ExpectMsg>(); + + } + [Fact] [Category(Category)] public void TestEventMultipleEmitSourcing_AfterManyMultiCommand_TestStateSignalled() { var eventProbe = CreateTestProbe("event-probe"); - Sys.EventStream.Subscribe(eventProbe, typeof(DomainEvent)); + Sys.EventStream.Subscribe(eventProbe, typeof(IDomainEvent)); + Sys.EventStream.Subscribe(eventProbe, typeof(IDomainEvent)); var aggregateManager = Sys.ActorOf(Props.Create(() => new TestAggregateManager()), "test-aggregatemanager"); var aggregateId = TestAggregateId.New; var commandId = CommandId.New; @@ -257,9 +281,17 @@ public void TestEventMultipleEmitSourcing_AfterManyMultiCommand_TestStateSignall var reviveCommand = new PublishTestStateCommand(aggregateId); aggregateManager.Tell(reviveCommand); + eventProbe + .ExpectMsg>(); + eventProbe + .ExpectMsg>(); + eventProbe + .ExpectMsg>(); + eventProbe + .ExpectMsg>(); eventProbe - .ExpectMsg>( + .ExpectMsg>( x => x.AggregateEvent.LastSequenceNr == 5 && x.AggregateEvent.Version == 5 && x.AggregateEvent.AggregateState.TestCollection.Count == 4); @@ -271,7 +303,7 @@ public void TestEventMultipleEmitSourcing_AfterManyMultiCommand_TestStateSignall public void TestSnapShotting_AfterManyTests_TestStateSignalled() { var eventProbe = CreateTestProbe("event-probe"); - Sys.EventStream.Subscribe(eventProbe, typeof(DomainEvent)); + Sys.EventStream.Subscribe(eventProbe, typeof(IDomainEvent)); var aggregateManager = Sys.ActorOf(Props.Create(() => new TestAggregateManager()), "test-aggregatemanager"); var aggregateId = TestAggregateId.New; var commandId = CommandId.New; @@ -289,7 +321,7 @@ public void TestSnapShotting_AfterManyTests_TestStateSignalled() } eventProbe - .ExpectMsg>( + .ExpectMsg>( x => x.AggregateEvent.LastSequenceNr == 11 && x.AggregateEvent.Version == 11 && x.AggregateEvent.AggregateState.TestCollection.Count == 10 diff --git a/test/Akkatecture.Tests/UnitTests/Aggregates/AggregateTestsWithFixtures.cs b/test/Akkatecture.Tests/UnitTests/Aggregates/AggregateTestsWithFixtures.cs index bd96b590..ebe6fb33 100644 --- a/test/Akkatecture.Tests/UnitTests/Aggregates/AggregateTestsWithFixtures.cs +++ b/test/Akkatecture.Tests/UnitTests/Aggregates/AggregateTestsWithFixtures.cs @@ -59,11 +59,14 @@ public void InitialEvent_AfterAggregateCreation_TestCreatedEventEmitted() var aggregateId = TestAggregateId.New; var commandId = CommandId.New; var testId = TestId.New; - - this.FixtureFor(aggregateId) + + this.FixtureFor(aggregateId) .Given(new TestCreatedEvent(aggregateId), new TestAddedEvent(new Test(TestId.New))) .When(new AddTestCommand(aggregateId, commandId, new Test(testId))) - .ThenExpect(x => x.Test.Id == testId); + .ThenExpect(x => x.Test.Id == testId) + .ThenExpectReply(x => x.SourceId.Value == commandId.Value && x.Result.IsSuccess); + + } [Fact] @@ -259,6 +262,23 @@ public void TestEventSourcing_AfterManyTests_TestStateSignalled() && x.AggregateEvent.AggregateState.TestCollection.Count == 5); } + [Fact] + [Category(Category)] + public void TestEventMultipleEmitSourcing_AfterManyMultiCreateCommand_EventsEmitted() + { + var aggregateId = TestAggregateId.New; + var commandId = CommandId.New; + var firstTest = new Test(TestId.New); + var secondTest = new Test(TestId.New); + + this.FixtureFor(aggregateId) + .GivenNothing() + .When(new CreateAndAddTwoTestsCommand(aggregateId, commandId, firstTest, secondTest)) + .ThenExpectDomainEvent() + .ThenExpect() + .ThenExpect(); + } + [Fact] [Category(Category)] public void TestEventMultipleEmitSourcing_AfterManyMultiCommand_TestStateSignalled() diff --git a/test/Akkatecture.Tests/UnitTests/Fixtures/AggregateFixtureTests.cs b/test/Akkatecture.Tests/UnitTests/Fixtures/AggregateFixtureTests.cs index 4f671ebf..c9251431 100644 --- a/test/Akkatecture.Tests/UnitTests/Fixtures/AggregateFixtureTests.cs +++ b/test/Akkatecture.Tests/UnitTests/Fixtures/AggregateFixtureTests.cs @@ -114,11 +114,11 @@ public void FixtureArrangerWithSnapshot_CanBeLoaded() snapshotStore.Tell(new LoadSnapshot(aggregateIdentity.Value, new SnapshotSelectionCriteria(long.MaxValue, DateTime.MaxValue), long.MaxValue), receiverProbe.Ref); receiverProbe.ExpectMsg(x => - x.Snapshot.Snapshot is ComittedSnapshot> && + x.Snapshot.Snapshot is CommittedSnapshot> && x.Snapshot.Metadata.SequenceNr == snapshotSequenceNumber && x.Snapshot.Metadata.PersistenceId == aggregateIdentity.Value && x.Snapshot.Snapshot - .As>>().AggregateSnapshot + .As>>().AggregateSnapshot .As().Tests.Count == snapshot.Tests.Count && x.ToSequenceNr == long.MaxValue); diff --git a/test/Akkatecture.Tests/UnitTests/Serialization/SerializationTests.cs b/test/Akkatecture.Tests/UnitTests/Serialization/SerializationTests.cs index 9034e3ee..5d84f2cc 100644 --- a/test/Akkatecture.Tests/UnitTests/Serialization/SerializationTests.cs +++ b/test/Akkatecture.Tests/UnitTests/Serialization/SerializationTests.cs @@ -130,7 +130,7 @@ public void CommittedSnapshot_AfterSerialization_IsValidAfterDeserialization() AggregateId = aggregateId.Value, }; var committedEvent = - new ComittedSnapshot( + new CommittedSnapshot( aggregateId, aggregateSnapshot, snapshotMetadata,