Skip to content

Commit

Permalink
moved the storage models to sagas as well
Browse files Browse the repository at this point in the history
  • Loading branch information
Unknown committed Jan 15, 2019
1 parent b846565 commit 1a19936
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

namespace Akkatecture.Walkthrough.Domain.Sagas.MoneyTransfer
{
public class MoneyTransferSagaState : SagaState<MoneyTransferSaga,MoneyTransferSagaId,IEventApplier<MoneyTransferSaga, MoneyTransferSagaId>>,
public class MoneyTransferSagaState : SagaState<MoneyTransferSaga,MoneyTransferSagaId, IMessageApplier<MoneyTransferSaga, MoneyTransferSagaId>>,
IApply<MoneyTransferStartedEvent>,
IApply<MoneyTransferCompletedEvent>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

namespace Akkatecture.Examples.Api.Domain.Sagas
{
public class ResourceCreationSagaState : SagaState<ResourceCreationSaga, ResourceCreationSagaId, IEventApplier<ResourceCreationSaga, ResourceCreationSagaId>>,
public class ResourceCreationSagaState : SagaState<ResourceCreationSaga, ResourceCreationSagaId, IMessageApplier<ResourceCreationSaga, ResourceCreationSagaId>>,
IApply<ResourceCreationStartedEvent>,
IApply<ResourceCreationProgressEvent>,
IApply<ResourceCreationEndedEvent>
Expand Down
144 changes: 131 additions & 13 deletions src/Akkatecture/Sagas/AggregateSaga/AggregateSaga.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
using Akka.Event;
using Akka.Persistence;
using Akkatecture.Aggregates;
using Akkatecture.Aggregates.Snapshot;
using Akkatecture.Aggregates.Snapshot.Strategies;
using Akkatecture.Core;
using Akkatecture.Events;
using Akkatecture.Extensions;
Expand All @@ -44,15 +46,22 @@ namespace Akkatecture.Sagas.AggregateSaga
public abstract class AggregateSaga<TAggregateSaga, TIdentity, TSagaState> : ReceivePersistentActor, IAggregateSaga<TIdentity>
where TAggregateSaga : AggregateSaga<TAggregateSaga, TIdentity, TSagaState>
where TIdentity : SagaId<TIdentity>
where TSagaState : SagaState<TAggregateSaga,TIdentity, IEventApplier<TAggregateSaga, TIdentity>>
where TSagaState : SagaState<TAggregateSaga,TIdentity, IMessageApplier<TAggregateSaga, TIdentity>>
{
private static readonly IReadOnlyDictionary<Type, Action<TSagaState, IAggregateEvent>> ApplyMethodsFromState;
private static readonly IReadOnlyDictionary<Type, Action<TSagaState, IAggregateSnapshot>> HydrateMethodsFromState;
private static readonly IAggregateName AggregateName = typeof(TAggregateSaga).GetSagaName();
private CircularBuffer<ISourceId> _previousSourceIds = new CircularBuffer<ISourceId>(10);
private readonly List<IEventApplier<TAggregateSaga, TIdentity>> _eventAppliers = new List<IEventApplier<TAggregateSaga, TIdentity>>();
private readonly List<ISnapshotHydrater<TAggregateSaga, TIdentity>> _snapshotHydraters = new List<ISnapshotHydrater<TAggregateSaga, TIdentity>>();
private readonly Dictionary<Type, Action<object>> _eventHandlers = new Dictionary<Type, Action<object>>();
private readonly Dictionary<Type, Action<object>> _snapshotHandlers = new Dictionary<Type, Action<object>>();
public override string PersistenceId { get; } = Context.Self.Path.Name;
public AggregateSagaSettings Settings { get; }
protected ILoggingAdapter Logger { get; }
protected IEventDefinitionService _eventDefinitionService;
protected ISnapshotDefinitionService _snapshotDefinitionService;
protected ISnapshotStrategy SnapshotStrategy { get; set; } = SnapshotNeverStrategy.Instance;
public TSagaState State { get; protected set; }
public TIdentity Id { get; }
public int? SnapshotVersion { get; private set; }
Expand All @@ -64,6 +73,9 @@ static AggregateSaga()
{
ApplyMethodsFromState = typeof(TSagaState)
.GetAggregateStateEventApplyMethods<TAggregateSaga, TIdentity, TSagaState>();

HydrateMethodsFromState = typeof(TSagaState)
.GetAggregateSnapshotHydrateMethods<TAggregateSaga, TIdentity, TSagaState>();
}

protected AggregateSaga()
Expand Down Expand Up @@ -109,17 +121,17 @@ protected AggregateSaga()

if (Settings.UseDefaultEventRecover)
{
Recover<DomainEvent<TAggregateSaga, TIdentity, IAggregateEvent<TAggregateSaga, TIdentity>>>(Recover);
Recover<IAggregateEvent<TAggregateSaga, TIdentity>>(Recover);
Recover<ICommittedEvent<TAggregateSaga, TIdentity, IAggregateEvent<TAggregateSaga, TIdentity>>>(Recover);
Recover<RecoveryCompleted>(Recover);
}


if (Settings.UseDefaultSnapshotRecover)
Recover<SnapshotOffer>(Recover);


_eventDefinitionService = new EventDefinitionService(Logger);
_snapshotDefinitionService = new SnapshotDefinitionService(Logger);

}

Expand Down Expand Up @@ -253,6 +265,15 @@ protected virtual void Emit<TAggregateEvent>(TAggregateEvent aggregateEvent, IMe
eventMetadata.AddRange(metadata);
}


var committedEvent = new CommittedEvent<TAggregateSaga, TIdentity, TAggregateEvent>(Id, aggregateEvent, eventMetadata, now, Version);
Persist(committedEvent, ApplyCommittedEvents);

Logger.Info($"[{Name}] With Id={Id} Commited [{typeof(TAggregateEvent).PrettyPrint()}]");

Version++;


var aggregateApplyMethod = GetEventApplyMethods(aggregateEvent);

Persist(aggregateEvent, aggregateApplyMethod);
Expand All @@ -264,6 +285,47 @@ protected virtual void Emit<TAggregateEvent>(TAggregateEvent aggregateEvent, IMe
var domainEvent = new DomainEvent<TAggregateSaga, TIdentity, TAggregateEvent>(Id, aggregateEvent, eventMetadata, now, Version);

Publish(domainEvent);

if (SnapshotStrategy.ShouldCreateSnapshot(this))
{
var aggregateSnapshot = CreateSnapshot();
if (aggregateSnapshot != null)
{
var t = aggregateSnapshot.GetType();
_snapshotDefinitionService.Load(aggregateSnapshot.GetType());
var snapshotDefinition = _snapshotDefinitionService.GetDefinition(aggregateSnapshot.GetType());
var snapshotMetadata = new Aggregates.Snapshot.SnapshotMetadata
{
AggregateId = Id.Value,
AggregateName = Name.Value,
AggregateSequenceNumber = Version,
SnapshotName = snapshotDefinition.Name,
SnapshotVersion = snapshotDefinition.Version
};

var commitedSnapshot =
new ComittedSnapshot<TAggregateSaga, TIdentity, IAggregateSnapshot<TAggregateSaga, TIdentity>>(
Id,
aggregateSnapshot,
snapshotMetadata,
now, Version);
SaveSnapshot(commitedSnapshot);
}
}
}

protected virtual IAggregateSnapshot<TAggregateSaga, TIdentity> CreateSnapshot()
{
Logger.Info($"[{Name}] With Id={Id} Attempted to Create a Snapshot, override the CreateSnapshot() method to return the snapshot data model.");
return null;
}

protected void ApplyCommittedEvents<TAggregateEvent>(ICommittedEvent<TAggregateSaga, TIdentity, TAggregateEvent> committedEvent)
where TAggregateEvent : IAggregateEvent<TAggregateSaga, TIdentity>
{
var applyMethods = GetEventApplyMethods(committedEvent.AggregateEvent);
applyMethods(committedEvent.AggregateEvent);

}

protected virtual void Publish<TEvent>(TEvent aggregateEvent)
Expand All @@ -288,10 +350,7 @@ protected Action<IAggregateEvent> GetEventApplyMethods<TAggregateEvent>(TAggrega

return aggregateApplyMethod;
}

private readonly List<IEventApplier<TAggregateSaga, TIdentity>> _eventAppliers = new List<IEventApplier<TAggregateSaga, TIdentity>>();

private readonly Dictionary<Type, Action<object>> _eventHandlers = new Dictionary<Type, Action<object>>();
protected void Register<TAggregateEvent>(Action<TAggregateEvent> handler)
where TAggregateEvent : IAggregateEvent<TAggregateSaga, TIdentity>
{
Expand Down Expand Up @@ -377,7 +436,7 @@ protected virtual bool Recover(IAggregateEvent<TAggregateSaga, TIdentity> aggreg
return true;
}

protected virtual bool Recover(IDomainEvent<TAggregateSaga, TIdentity, IAggregateEvent<TAggregateSaga, TIdentity>> domainEvent)
protected virtual bool Recover(ICommittedEvent<TAggregateSaga, TIdentity, IAggregateEvent<TAggregateSaga, TIdentity>> domainEvent)
{
try
{
Expand All @@ -395,9 +454,11 @@ protected virtual bool Recover(IDomainEvent<TAggregateSaga, TIdentity, IAggregat

protected virtual bool Recover(SnapshotOffer aggregateSnapshotOffer)
{
Logger.Info($"Aggregate [{Name}] With Id={Id} has received a SnapshotOffer of type {aggregateSnapshotOffer.Snapshot.GetType().PrettyPrint()}");
try
{
State = aggregateSnapshotOffer.Snapshot as TSagaState;
var comittedSnapshot = aggregateSnapshotOffer.Snapshot as ComittedSnapshot<TAggregateSaga, TIdentity, IAggregateSnapshot<TAggregateSaga, TIdentity>>;
HydrateSnapshot(comittedSnapshot.AggregateSnapshot, aggregateSnapshotOffer.Metadata.SequenceNr);
}
catch (Exception exception)
{
Expand All @@ -409,11 +470,42 @@ protected virtual bool Recover(SnapshotOffer aggregateSnapshotOffer)
return true;
}

protected virtual bool Recover(RecoveryCompleted recoveryCompleted)
protected virtual void HydrateSnapshot(IAggregateSnapshot<TAggregateSaga, TIdentity> aggregateSnapshot, long version)
{

return true;
var snapshotType = aggregateSnapshot.GetType();
if (_snapshotHandlers.ContainsKey(snapshotType))
{
_snapshotHandlers[snapshotType](aggregateSnapshot);
}
else if (_snapshotHydraters.Any(ea => ea.Hydrate((TAggregateSaga)this, aggregateSnapshot)))
{
// Already done
}

var snapshotHydrater = GetSnapshotHydrateMethods(aggregateSnapshot);

snapshotHydrater(aggregateSnapshot);

Version = version;
}

protected Action<IAggregateSnapshot> GetSnapshotHydrateMethods<TAggregateSnapshot>(TAggregateSnapshot aggregateEvent)
where TAggregateSnapshot : IAggregateSnapshot<TAggregateSaga, TIdentity>
{
var snapshotType = aggregateEvent.GetType();

Action<TSagaState, IAggregateSnapshot> hydrateMethod;
if (!HydrateMethodsFromState.TryGetValue(snapshotType, out hydrateMethod))
{
throw new NotImplementedException(
$"Aggregate State '{State.GetType().PrettyPrint()}' does have an 'Apply' method that takes aggregate event '{snapshotType.PrettyPrint()}' as argument");
}

var snapshotHydrateMethod = hydrateMethod.Bind(State);

return snapshotHydrateMethod;
}

protected void SetSourceIdHistory(int count)
{
_previousSourceIds = new CircularBuffer<ISourceId>(count);
Expand All @@ -428,5 +520,31 @@ public IIdentity GetIdentity()
{
return Id;
}

protected virtual void SetSnapshotStrategy(ISnapshotStrategy snapshotStrategy)
{
if (snapshotStrategy != null)
{
SnapshotStrategy = snapshotStrategy;
}
}
protected virtual bool SnapshotStatus(SaveSnapshotSuccess snapshotSuccess)
{
Logger.Info($"Aggregate [{Name}] With Id={Id} Saved Snapshot at Version {snapshotSuccess.Metadata.SequenceNr}");
return true;
}

protected virtual bool SnapshotStatus(SaveSnapshotFailure snapshotFailure)
{
Logger.Error($"Aggregate [{Name}] With Id={Id} Failed to save snapshot at version {snapshotFailure.Metadata.SequenceNr} because of {snapshotFailure.Cause}");
return true;
}


protected virtual bool Recover(RecoveryCompleted recoveryCompleted)
{
Logger.Info($"Aggregate [{Name}] With Id={Id} has completed recovering from it's journal(s)");
return true;
}
}
}
36 changes: 27 additions & 9 deletions src/Akkatecture/Sagas/SagaState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,52 +28,70 @@
using System;
using System.Collections.Generic;
using Akkatecture.Aggregates;
using Akkatecture.Aggregates.Snapshot;
using Akkatecture.Extensions;

namespace Akkatecture.Sagas
{
public abstract class SagaState<TSaga, TIdentity, TEventApplier> : ISagaState<TIdentity>, IEventApplier<TSaga, TIdentity>
where TEventApplier : class, IEventApplier<TSaga, TIdentity>
public abstract class SagaState<TSaga, TIdentity, TMessageApplier> : ISagaState<TIdentity>, IMessageApplier<TSaga, TIdentity>
where TMessageApplier : class, IMessageApplier<TSaga, TIdentity>
where TSaga : IAggregateRoot<TIdentity>
where TIdentity : ISagaId
{
private static readonly IReadOnlyDictionary<Type, Action<TEventApplier, IAggregateEvent>> ApplyMethods;
private static readonly IReadOnlyDictionary<Type, Action<TMessageApplier, IAggregateEvent>> ApplyMethods;
private static readonly IReadOnlyDictionary<Type, Action<TMessageApplier, IAggregateSnapshot>> HydrateMethods;
public SagaStatus Status { get; private set; }
public Dictionary<SagaStatus, DateTimeOffset> SagaTimes { get; }

static SagaState()
{
ApplyMethods = typeof(TEventApplier).GetAggregateEventApplyMethods<TSaga, TIdentity, TEventApplier>();
ApplyMethods = typeof(TMessageApplier).GetAggregateEventApplyMethods<TSaga, TIdentity, TMessageApplier>();
HydrateMethods = typeof(TMessageApplier).GetAggregateSnapshotHydrateMethods<TSaga, TIdentity, TMessageApplier>();
}


protected SagaState()
{
SagaTimes = new Dictionary<SagaStatus, DateTimeOffset>();
Status = SagaStatus.NotStarted;
StopWatch();

var me = this as TEventApplier;
var me = this as TMessageApplier;
if (me == null)
{
throw new InvalidOperationException(
$"Event applier of type '{GetType().PrettyPrint()}' has a wrong generic argument '{typeof(TEventApplier).PrettyPrint()}'");
$"Event applier of type '{GetType().PrettyPrint()}' has a wrong generic argument '{typeof(TMessageApplier).PrettyPrint()}'");
}
}

public bool Hydrate(
TSaga aggregate,
IAggregateSnapshot<TSaga, TIdentity> aggregateSnapshot)
{
var aggregateSnapshotType = aggregateSnapshot.GetType();
Action<TMessageApplier, IAggregateSnapshot> hydrater;

if (!HydrateMethods.TryGetValue(aggregateSnapshotType, out hydrater))
{
return false;
}

hydrater((TMessageApplier)(object)this, aggregateSnapshot);
return true;
}

public bool Apply(
TSaga aggregate,
IAggregateEvent<TSaga, TIdentity> aggregateEvent)
{
var aggregateEventType = aggregateEvent.GetType();
Action<TEventApplier, IAggregateEvent> applier;
Action<TMessageApplier, IAggregateEvent> applier;

if (!ApplyMethods.TryGetValue(aggregateEventType, out applier))
{
return false;
}

applier((TEventApplier)(object)this, aggregateEvent);
applier((TMessageApplier)(object)this, aggregateEvent);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

namespace Akkatecture.TestHelpers.Aggregates.Sagas
{
public class TestSagaState : SagaState<TestSaga, TestSagaId, IEventApplier<TestSaga, TestSagaId>>,
public class TestSagaState : SagaState<TestSaga, TestSagaId, IMessageApplier<TestSaga, TestSagaId>>,
IApply<TestSagaStartedEvent>,
IApply<TestSagaTransactionCompletedEvent>,
IApply<TestSagaCompletedEvent>
Expand Down

0 comments on commit 1a19936

Please sign in to comment.