From 0bfe1df0090e87e2fbc6aa3fbef1ca125bb595ca Mon Sep 17 00:00:00 2001 From: Chris Richardson Date: Mon, 15 Aug 2022 21:13:17 +0900 Subject: [PATCH] Updates to reflect changes made by https://github.com/eventuate-tram/eventuate-tram-core/issues/174 --- .../participant/SagaParticipantFactory.java | 7 ++- .../SagaParticipantStubManagerFactory.java | 9 +-- .../AbstractSagaCommandHandlersBuilder.java | 11 ++-- .../participant/SagaCommandDispatcher.java | 16 ++---- .../SagaCommandDispatcherFactory.java | 7 ++- .../sagas/participant/SagaCommandHandler.java | 10 ++-- .../SagaCommandHandlerBuilder.java | 11 ++-- .../SagaCommandHandlersBuilder.java | 31 +++++----- ...actReactiveSagaCommandHandlersBuilder.java | 5 +- .../ReactiveSagaCommandDispatcher.java | 14 +++-- .../ReactiveSagaCommandDispatcherFactory.java | 10 ++-- .../ReactiveSagaCommandHandler.java | 9 +-- .../ReactiveSagaCommandHandlerBuilder.java | 5 +- .../ReactiveSagaCommandHandlersBuilder.java | 9 +-- .../ReactiveSagaCommandDispatcherTest.java | 56 +++++++++---------- .../SagaParticipantConfiguration.java | 10 +++- .../ReactiveSagaParticipantConfiguration.java | 10 ++-- ...gaParticipantStubManagerConfiguration.java | 10 ++-- .../testing/SagaParticipantStubManager.java | 14 ++--- .../SagaParticipantStubCommandHandler.java | 11 ++-- ...ndledMessageTrackingCommandDispatcher.java | 10 ++-- 21 files changed, 141 insertions(+), 134 deletions(-) diff --git a/eventuate-tram-sagas-micronaut-participant/src/main/java/io/eventuate/tram/sagas/micronaut/participant/SagaParticipantFactory.java b/eventuate-tram-sagas-micronaut-participant/src/main/java/io/eventuate/tram/sagas/micronaut/participant/SagaParticipantFactory.java index 9d0c8b3..08eeaac 100644 --- a/eventuate-tram-sagas-micronaut-participant/src/main/java/io/eventuate/tram/sagas/micronaut/participant/SagaParticipantFactory.java +++ b/eventuate-tram-sagas-micronaut-participant/src/main/java/io/eventuate/tram/sagas/micronaut/participant/SagaParticipantFactory.java @@ -1,6 +1,7 @@ package io.eventuate.tram.sagas.micronaut.participant; import io.eventuate.tram.commands.common.CommandNameMapping; +import io.eventuate.tram.commands.consumer.CommandReplyProducer; import io.eventuate.tram.messaging.consumer.MessageConsumer; import io.eventuate.tram.messaging.producer.MessageProducer; import io.eventuate.tram.sagas.common.SagaLockManager; @@ -15,7 +16,9 @@ public class SagaParticipantFactory { public SagaCommandDispatcherFactory sagaCommandDispatcherFactory(MessageConsumer messageConsumer, MessageProducer messageProducer, SagaLockManager sagaLockManager, - CommandNameMapping commandNameMapping) { - return new SagaCommandDispatcherFactory(messageConsumer, messageProducer, sagaLockManager, commandNameMapping); + CommandNameMapping commandNameMapping, CommandReplyProducer commandReplyProducer) { + return new SagaCommandDispatcherFactory(messageConsumer, messageProducer, sagaLockManager, commandNameMapping, commandReplyProducer); } + + } diff --git a/eventuate-tram-sagas-micronaut-testing-support/src/main/java/io/eventuate/tram/sagas/micronaut/testing/SagaParticipantStubManagerFactory.java b/eventuate-tram-sagas-micronaut-testing-support/src/main/java/io/eventuate/tram/sagas/micronaut/testing/SagaParticipantStubManagerFactory.java index 2415e30..72c37e3 100644 --- a/eventuate-tram-sagas-micronaut-testing-support/src/main/java/io/eventuate/tram/sagas/micronaut/testing/SagaParticipantStubManagerFactory.java +++ b/eventuate-tram-sagas-micronaut-testing-support/src/main/java/io/eventuate/tram/sagas/micronaut/testing/SagaParticipantStubManagerFactory.java @@ -1,8 +1,8 @@ package io.eventuate.tram.sagas.micronaut.testing; import io.eventuate.tram.commands.common.CommandNameMapping; +import io.eventuate.tram.commands.consumer.CommandReplyProducer; import io.eventuate.tram.messaging.consumer.MessageConsumer; -import io.eventuate.tram.messaging.producer.MessageProducer; import io.eventuate.tram.sagas.testing.SagaParticipantChannels; import io.eventuate.tram.sagas.testing.SagaParticipantStubManager; import io.micronaut.context.annotation.Factory; @@ -15,8 +15,9 @@ public class SagaParticipantStubManagerFactory { @Singleton public SagaParticipantStubManager sagaParticipantStubManager(SagaParticipantChannels sagaParticipantChannels, MessageConsumer messageConsumer, - MessageProducer messageProducer, - CommandNameMapping commandNameMapping) { - return new SagaParticipantStubManager(sagaParticipantChannels, messageConsumer, messageProducer, commandNameMapping); + CommandNameMapping commandNameMapping, CommandReplyProducer commandReplyProducer) { + return new SagaParticipantStubManager(sagaParticipantChannels, messageConsumer, commandNameMapping, commandReplyProducer); } + + } diff --git a/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/AbstractSagaCommandHandlersBuilder.java b/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/AbstractSagaCommandHandlersBuilder.java index db3b66d..ab55e89 100644 --- a/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/AbstractSagaCommandHandlersBuilder.java +++ b/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/AbstractSagaCommandHandlersBuilder.java @@ -1,5 +1,6 @@ package io.eventuate.tram.sagas.participant; +import io.eventuate.tram.commands.common.Command; import io.eventuate.tram.commands.consumer.CommandMessage; import io.eventuate.tram.messaging.common.Message; @@ -9,14 +10,14 @@ import java.util.function.Function; public interface AbstractSagaCommandHandlersBuilder { - SagaCommandHandlerBuilder onMessageReturningMessages(Class commandClass, - Function, List> handler); + SagaCommandHandlerBuilder onMessageReturningMessages(Class commandClass, + Function, List> handler); - SagaCommandHandlerBuilder onMessageReturningOptionalMessage(Class commandClass, + SagaCommandHandlerBuilder onMessageReturningOptionalMessage(Class commandClass, Function, Optional> handler); - SagaCommandHandlerBuilder onMessage(Class commandClass, + SagaCommandHandlerBuilder onMessage(Class commandClass, Function, Message> handler); - SagaCommandHandlerBuilder onMessage(Class commandClass, Consumer> handler); + SagaCommandHandlerBuilder onMessage(Class commandClass, Consumer> handler); } diff --git a/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandDispatcher.java b/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandDispatcher.java index ce87884..9b47e16 100644 --- a/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandDispatcher.java +++ b/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandDispatcher.java @@ -2,15 +2,10 @@ import io.eventuate.tram.commands.common.CommandMessageHeaders; import io.eventuate.tram.commands.common.CommandNameMapping; -import io.eventuate.tram.commands.consumer.CommandDispatcher; -import io.eventuate.tram.commands.consumer.CommandHandler; -import io.eventuate.tram.commands.consumer.CommandHandlers; -import io.eventuate.tram.commands.consumer.CommandMessage; -import io.eventuate.tram.commands.consumer.PathVariables; +import io.eventuate.tram.commands.consumer.*; import io.eventuate.tram.messaging.common.Message; import io.eventuate.tram.messaging.consumer.MessageConsumer; import io.eventuate.tram.messaging.producer.MessageBuilder; -import io.eventuate.tram.messaging.producer.MessageProducer; import io.eventuate.tram.sagas.common.*; import java.util.List; @@ -25,10 +20,9 @@ public class SagaCommandDispatcher extends CommandDispatcher { public SagaCommandDispatcher(String commandDispatcherId, CommandHandlers target, MessageConsumer messageConsumer, - MessageProducer messageProducer, SagaLockManager sagaLockManager, - CommandNameMapping commandNameMapping) { - super(commandDispatcherId, target, messageConsumer, messageProducer, commandNameMapping); + CommandNameMapping commandNameMapping, CommandReplyProducer commandReplyProducer) { + super(commandDispatcherId, target, messageConsumer, commandNameMapping, commandReplyProducer); this.sagaLockManager = sagaLockManager; } @@ -61,7 +55,7 @@ private String getSagaType(Message message) { @Override - protected List invoke(CommandHandler commandHandler, CommandMessage cm, Map pathVars) { + protected List invoke(CommandHandler commandHandler, CommandMessage cm, Map pathVars, CommandReplyToken commandReplyToken) { Optional lockedTarget = Optional.empty(); if (commandHandler instanceof SagaCommandHandler) { SagaCommandHandler sch = (SagaCommandHandler) commandHandler; @@ -77,7 +71,7 @@ protected List invoke(CommandHandler commandHandler, CommandMessage cm, } } - List messages = super.invoke(commandHandler, cm, pathVars); + List messages = super.invoke(commandHandler, cm, pathVars, commandReplyToken); if (lockedTarget.isPresent()) return addLockedHeader(messages, lockedTarget.get()); diff --git a/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandDispatcherFactory.java b/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandDispatcherFactory.java index 65d9984..4c2ed71 100644 --- a/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandDispatcherFactory.java +++ b/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandDispatcherFactory.java @@ -2,6 +2,7 @@ import io.eventuate.tram.commands.common.CommandNameMapping; import io.eventuate.tram.commands.consumer.CommandHandlers; +import io.eventuate.tram.commands.consumer.CommandReplyProducer; import io.eventuate.tram.messaging.consumer.MessageConsumer; import io.eventuate.tram.messaging.producer.MessageProducer; import io.eventuate.tram.sagas.common.SagaLockManager; @@ -12,18 +13,20 @@ public class SagaCommandDispatcherFactory { private final MessageProducer messageProducer; private final SagaLockManager sagaLockManager; private final CommandNameMapping commandNameMapping; + private final CommandReplyProducer commandReplyProducer; public SagaCommandDispatcherFactory(MessageConsumer messageConsumer, MessageProducer messageProducer, SagaLockManager sagaLockManager, - CommandNameMapping commandNameMapping) { + CommandNameMapping commandNameMapping, CommandReplyProducer commandReplyProducer) { this.messageConsumer = messageConsumer; this.messageProducer = messageProducer; this.sagaLockManager = sagaLockManager; this.commandNameMapping = commandNameMapping; + this.commandReplyProducer = commandReplyProducer; } public SagaCommandDispatcher make(String commandDispatcherId, CommandHandlers target) { - return new SagaCommandDispatcher(commandDispatcherId, target, messageConsumer, messageProducer, sagaLockManager, commandNameMapping); + return new SagaCommandDispatcher(commandDispatcherId, target, messageConsumer, sagaLockManager, commandNameMapping, commandReplyProducer); } } diff --git a/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandHandler.java b/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandHandler.java index 5e43bde..5dd90cf 100644 --- a/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandHandler.java +++ b/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandHandler.java @@ -1,6 +1,8 @@ package io.eventuate.tram.sagas.participant; +import io.eventuate.tram.commands.common.Command; import io.eventuate.tram.commands.consumer.CommandHandler; +import io.eventuate.tram.commands.consumer.CommandHandlerArgs; import io.eventuate.tram.commands.consumer.CommandMessage; import io.eventuate.tram.commands.consumer.PathVariables; import io.eventuate.tram.messaging.common.Message; @@ -17,12 +19,8 @@ public class SagaCommandHandler extends CommandHandler { private Optional> preLock = Optional.empty(); private Optional postLock = Optional.empty(); - public SagaCommandHandler(String channel, String resource, Class commandClass, BiFunction, PathVariables, List> handler) { - super(channel, Optional.of(resource), commandClass, handler); - } - - public SagaCommandHandler(String channel, Class commandClass, Function, List> handler) { - super(channel, Optional.empty(), commandClass, (c, pv) -> handler.apply(c)); + public SagaCommandHandler(String channel, Class commandClass, Function, List> handler) { + super(channel, Optional.empty(), commandClass, handler); } public void setPreLock(BiFunction preLock) { diff --git a/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandHandlerBuilder.java b/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandHandlerBuilder.java index 03bc1c6..8759ff6 100644 --- a/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandHandlerBuilder.java +++ b/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandHandlerBuilder.java @@ -1,5 +1,6 @@ package io.eventuate.tram.sagas.participant; +import io.eventuate.tram.commands.common.Command; import io.eventuate.tram.commands.consumer.CommandHandlers; import io.eventuate.tram.commands.consumer.CommandMessage; import io.eventuate.tram.commands.consumer.PathVariables; @@ -12,7 +13,7 @@ import java.util.function.Consumer; import java.util.function.Function; -public class SagaCommandHandlerBuilder implements AbstractSagaCommandHandlersBuilder{ +public class SagaCommandHandlerBuilder implements AbstractSagaCommandHandlersBuilder { private final SagaCommandHandlersBuilder parent; private final SagaCommandHandler h; @@ -23,22 +24,22 @@ public SagaCommandHandlerBuilder(SagaCommandHandlersBuilder parent, SagaCommandH } @Override - public SagaCommandHandlerBuilder onMessageReturningMessages(Class commandClass, Function, List> handler) { + public SagaCommandHandlerBuilder onMessageReturningMessages(Class commandClass, Function, List> handler) { return parent.onMessageReturningMessages(commandClass, handler); } @Override - public SagaCommandHandlerBuilder onMessageReturningOptionalMessage(Class commandClass, Function, Optional> handler) { + public SagaCommandHandlerBuilder onMessageReturningOptionalMessage(Class commandClass, Function, Optional> handler) { return parent.onMessageReturningOptionalMessage(commandClass, handler); } @Override - public SagaCommandHandlerBuilder onMessage(Class commandClass, Function, Message> handler) { + public SagaCommandHandlerBuilder onMessage(Class commandClass, Function, Message> handler) { return parent.onMessage(commandClass, handler); } @Override - public SagaCommandHandlerBuilder onMessage(Class commandClass, Consumer> handler) { + public SagaCommandHandlerBuilder onMessage(Class commandClass, Consumer> handler) { return parent.onMessage(commandClass, handler); } diff --git a/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandHandlersBuilder.java b/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandHandlersBuilder.java index 7000ed4..639e612 100644 --- a/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandHandlersBuilder.java +++ b/eventuate-tram-sagas-participant/src/main/java/io/eventuate/tram/sagas/participant/SagaCommandHandlersBuilder.java @@ -1,6 +1,7 @@ package io.eventuate.tram.sagas.participant; +import io.eventuate.tram.commands.common.Command; import io.eventuate.tram.commands.consumer.CommandHandler; import io.eventuate.tram.commands.consumer.CommandHandlers; import io.eventuate.tram.commands.consumer.CommandMessage; @@ -16,7 +17,7 @@ public class SagaCommandHandlersBuilder implements AbstractSagaCommandHandlersBuilder { private String channel; - private List handlers = new ArrayList<>(); + private final List handlers = new ArrayList<>(); public static SagaCommandHandlersBuilder fromChannel(String channel) { return new SagaCommandHandlersBuilder().andFromChannel(channel); @@ -28,39 +29,39 @@ private SagaCommandHandlersBuilder andFromChannel(String channel) { } @Override - public SagaCommandHandlerBuilder onMessageReturningMessages(Class commandClass, - Function, List> handler) { - SagaCommandHandler h = new SagaCommandHandler(channel, commandClass, handler); + public SagaCommandHandlerBuilder onMessageReturningMessages(Class commandClass, + Function, List> handler) { + SagaCommandHandler h = new SagaCommandHandler(channel, commandClass, args -> handler.apply(args.getCommandMessage())); this.handlers.add(h); - return new SagaCommandHandlerBuilder(this, h); + return new SagaCommandHandlerBuilder<>(this, h); } @Override - public SagaCommandHandlerBuilder onMessageReturningOptionalMessage(Class commandClass, + public SagaCommandHandlerBuilder onMessageReturningOptionalMessage(Class commandClass, Function, Optional> handler) { - SagaCommandHandler h = new SagaCommandHandler(channel, commandClass, (c) -> handler.apply(c).map(Collections::singletonList).orElse(Collections.EMPTY_LIST)); + SagaCommandHandler h = new SagaCommandHandler(channel, commandClass, args -> handler.apply(args.getCommandMessage()).map(Collections::singletonList).orElse(Collections.emptyList())); this.handlers.add(h); - return new SagaCommandHandlerBuilder(this, h); + return new SagaCommandHandlerBuilder<>(this, h); } @Override - public SagaCommandHandlerBuilder onMessage(Class commandClass, + public SagaCommandHandlerBuilder onMessage(Class commandClass, Function, Message> handler) { SagaCommandHandler h = new SagaCommandHandler(channel, commandClass, - (c) -> Collections.singletonList(handler.apply(c))); + args -> Collections.singletonList(handler.apply(args.getCommandMessage()))); this.handlers.add(h); - return new SagaCommandHandlerBuilder(this, h); + return new SagaCommandHandlerBuilder<>(this, h); } @Override - public SagaCommandHandlerBuilder onMessage(Class commandClass, Consumer> handler) { + public SagaCommandHandlerBuilder onMessage(Class commandClass, Consumer> handler) { SagaCommandHandler h = new SagaCommandHandler(channel, commandClass, - (c) -> { - handler.accept(c); + args -> { + handler.accept(args.getCommandMessage()); return Collections.emptyList(); }); this.handlers.add(h); - return new SagaCommandHandlerBuilder(this, h); + return new SagaCommandHandlerBuilder<>(this, h); } public CommandHandlers build() { diff --git a/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/AbstractReactiveSagaCommandHandlersBuilder.java b/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/AbstractReactiveSagaCommandHandlersBuilder.java index d8ad925..a089487 100644 --- a/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/AbstractReactiveSagaCommandHandlersBuilder.java +++ b/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/AbstractReactiveSagaCommandHandlersBuilder.java @@ -1,5 +1,6 @@ package io.eventuate.tram.sagas.reactive.participant; +import io.eventuate.tram.commands.common.Command; import io.eventuate.tram.commands.consumer.CommandMessage; import io.eventuate.tram.messaging.common.Message; import org.reactivestreams.Publisher; @@ -7,6 +8,6 @@ import java.util.function.Function; public interface AbstractReactiveSagaCommandHandlersBuilder { - ReactiveSagaCommandHandlerBuilder onMessage(Class commandClass, - Function, Publisher> handler); + ReactiveSagaCommandHandlerBuilder onMessage(Class commandClass, + Function, Publisher> handler); } diff --git a/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandDispatcher.java b/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandDispatcher.java index 8c2ac75..25b59ea 100644 --- a/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandDispatcher.java +++ b/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandDispatcher.java @@ -3,6 +3,7 @@ import io.eventuate.tram.commands.common.CommandMessageHeaders; import io.eventuate.tram.commands.consumer.CommandHandlerParams; import io.eventuate.tram.commands.consumer.CommandMessage; +import io.eventuate.tram.commands.consumer.CommandReplyToken; import io.eventuate.tram.commands.consumer.PathVariables; import io.eventuate.tram.consumer.common.reactive.ReactiveMessageConsumer; import io.eventuate.tram.messaging.common.Message; @@ -10,13 +11,14 @@ import io.eventuate.tram.reactive.commands.consumer.ReactiveCommandDispatcher; import io.eventuate.tram.reactive.commands.consumer.ReactiveCommandHandler; import io.eventuate.tram.reactive.commands.consumer.ReactiveCommandHandlers; -import io.eventuate.tram.reactive.messaging.producer.common.ReactiveMessageProducer; +import io.eventuate.tram.reactive.commands.consumer.ReactiveCommandReplyProducer; import io.eventuate.tram.sagas.common.*; import io.eventuate.tram.sagas.participant.SagaReplyMessage; import io.eventuate.tram.sagas.reactive.common.ReactiveSagaLockManager; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; + import java.util.Optional; public class ReactiveSagaCommandDispatcher extends ReactiveCommandDispatcher { @@ -26,9 +28,9 @@ public class ReactiveSagaCommandDispatcher extends ReactiveCommandDispatcher { public ReactiveSagaCommandDispatcher(String commandDispatcherId, ReactiveCommandHandlers target, ReactiveMessageConsumer messageConsumer, - ReactiveMessageProducer messageProducer, - ReactiveSagaLockManager sagaLockManager) { - super(commandDispatcherId, target, messageConsumer, messageProducer); + ReactiveSagaLockManager sagaLockManager, + ReactiveCommandReplyProducer commandReplyProducer) { + super(commandDispatcherId, target, messageConsumer, commandReplyProducer); this.sagaLockManager = sagaLockManager; } @@ -62,7 +64,7 @@ private String getSagaType(Message message) { @Override - protected Publisher invoke(ReactiveCommandHandler commandHandler, CommandMessage cm, CommandHandlerParams commandHandlerParams) { + protected Publisher invoke(ReactiveCommandHandler commandHandler, CommandMessage cm, CommandHandlerParams commandHandlerParams, CommandReplyToken commandReplyToken) { Optional lockedTarget = Optional.empty(); Flux result = Flux.empty(); @@ -86,7 +88,7 @@ protected Publisher invoke(ReactiveCommandHandler commandHandler, Comma } } - result = result.thenMany(super.invoke(commandHandler, cm, commandHandlerParams)).cache(); + result = result.thenMany(super.invoke(commandHandler, cm, commandHandlerParams, commandReplyToken)).cache(); Flux finalizedResult = result; if (lockedTarget.isPresent()) diff --git a/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandDispatcherFactory.java b/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandDispatcherFactory.java index 289c45e..6c9c8e6 100644 --- a/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandDispatcherFactory.java +++ b/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandDispatcherFactory.java @@ -2,22 +2,22 @@ import io.eventuate.tram.consumer.common.reactive.ReactiveMessageConsumer; import io.eventuate.tram.reactive.commands.consumer.ReactiveCommandHandlers; -import io.eventuate.tram.reactive.messaging.producer.common.ReactiveMessageProducer; +import io.eventuate.tram.reactive.commands.consumer.ReactiveCommandReplyProducer; import io.eventuate.tram.sagas.reactive.common.ReactiveSagaLockManager; public class ReactiveSagaCommandDispatcherFactory { private final ReactiveMessageConsumer messageConsumer; - private final ReactiveMessageProducer messageProducer; private final ReactiveSagaLockManager sagaLockManager; + private final ReactiveCommandReplyProducer commandReplyProducer; - public ReactiveSagaCommandDispatcherFactory(ReactiveMessageConsumer messageConsumer, ReactiveMessageProducer messageProducer, ReactiveSagaLockManager sagaLockManager) { + public ReactiveSagaCommandDispatcherFactory(ReactiveMessageConsumer messageConsumer, ReactiveSagaLockManager sagaLockManager, ReactiveCommandReplyProducer commandReplyProducer) { this.messageConsumer = messageConsumer; - this.messageProducer = messageProducer; this.sagaLockManager = sagaLockManager; + this.commandReplyProducer = commandReplyProducer; } public ReactiveSagaCommandDispatcher make(String commandDispatcherId, ReactiveCommandHandlers target) { - return new ReactiveSagaCommandDispatcher(commandDispatcherId, target, messageConsumer, messageProducer, sagaLockManager); + return new ReactiveSagaCommandDispatcher(commandDispatcherId, target, messageConsumer, sagaLockManager, commandReplyProducer); } } diff --git a/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandHandler.java b/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandHandler.java index 3e6d01d..68fa105 100644 --- a/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandHandler.java +++ b/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandHandler.java @@ -1,5 +1,7 @@ package io.eventuate.tram.sagas.reactive.participant; +import io.eventuate.tram.commands.common.Command; +import io.eventuate.tram.commands.consumer.CommandHandlerArgs; import io.eventuate.tram.commands.consumer.CommandMessage; import io.eventuate.tram.commands.consumer.PathVariables; import io.eventuate.tram.messaging.common.Message; @@ -8,7 +10,6 @@ import io.eventuate.tram.sagas.participant.PostLockFunction; import org.reactivestreams.Publisher; -import java.util.List; import java.util.Optional; import java.util.function.BiFunction; import java.util.function.Function; @@ -19,12 +20,12 @@ public class ReactiveSagaCommandHandler extends ReactiveCommandHandler { private Optional> preLock = Optional.empty(); private Optional postLock = Optional.empty(); - public ReactiveSagaCommandHandler(String channel, String resource, Class commandClass, BiFunction, PathVariables, Publisher> handler) { + public ReactiveSagaCommandHandler(String channel, String resource, Class commandClass, Function, Publisher> handler) { super(channel, Optional.of(resource), commandClass, handler); } - public ReactiveSagaCommandHandler(String channel, Class commandClass, Function, Publisher> handler) { - super(channel, Optional.empty(), commandClass, (c, pv) -> handler.apply(c)); + public ReactiveSagaCommandHandler(String channel, Class commandClass, Function, Publisher> handler) { + super(channel, Optional.empty(), commandClass, handler); } public void setPreLock(BiFunction preLock) { diff --git a/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandHandlerBuilder.java b/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandHandlerBuilder.java index 79a40ad..22050a5 100644 --- a/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandHandlerBuilder.java +++ b/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandHandlerBuilder.java @@ -1,5 +1,6 @@ package io.eventuate.tram.sagas.reactive.participant; +import io.eventuate.tram.commands.common.Command; import io.eventuate.tram.commands.consumer.CommandMessage; import io.eventuate.tram.commands.consumer.PathVariables; import io.eventuate.tram.messaging.common.Message; @@ -11,7 +12,7 @@ import java.util.function.BiFunction; import java.util.function.Function; -public class ReactiveSagaCommandHandlerBuilder implements AbstractReactiveSagaCommandHandlersBuilder { +public class ReactiveSagaCommandHandlerBuilder implements AbstractReactiveSagaCommandHandlersBuilder { private final ReactiveSagaCommandHandlersBuilder parent; private final ReactiveSagaCommandHandler h; @@ -22,7 +23,7 @@ public ReactiveSagaCommandHandlerBuilder(ReactiveSagaCommandHandlersBuilder pare } @Override - public ReactiveSagaCommandHandlerBuilder onMessage(Class commandClass, Function, Publisher> handler) { + public ReactiveSagaCommandHandlerBuilder onMessage(Class commandClass, Function, Publisher> handler) { return parent.onMessage(commandClass, handler); } diff --git a/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandHandlersBuilder.java b/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandHandlersBuilder.java index 16216e0..8eb25f8 100644 --- a/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandHandlersBuilder.java +++ b/eventuate-tram-sagas-reactive-participant/src/main/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandHandlersBuilder.java @@ -1,6 +1,7 @@ package io.eventuate.tram.sagas.reactive.participant; +import io.eventuate.tram.commands.common.Command; import io.eventuate.tram.commands.consumer.CommandMessage; import io.eventuate.tram.messaging.common.Message; import io.eventuate.tram.reactive.commands.consumer.ReactiveCommandHandler; @@ -14,7 +15,7 @@ public class ReactiveSagaCommandHandlersBuilder implements AbstractReactiveSagaCommandHandlersBuilder { private String channel; - private List handlers = new ArrayList<>(); + private final List handlers = new ArrayList<>(); public static ReactiveSagaCommandHandlersBuilder fromChannel(String channel) { return new ReactiveSagaCommandHandlersBuilder().andFromChannel(channel); @@ -26,9 +27,9 @@ private ReactiveSagaCommandHandlersBuilder andFromChannel(String channel) { } @Override - public ReactiveSagaCommandHandlerBuilder onMessage(Class commandClass, - Function, Publisher> handler) { - ReactiveSagaCommandHandler h = new ReactiveSagaCommandHandler(channel, commandClass, handler); + public ReactiveSagaCommandHandlerBuilder onMessage(Class commandClass, + Function, Publisher> handler) { + ReactiveSagaCommandHandler h = new ReactiveSagaCommandHandler(channel, commandClass, args -> handler.apply(args.getCommandMessage())); this.handlers.add(h); return new ReactiveSagaCommandHandlerBuilder(this, h); diff --git a/eventuate-tram-sagas-reactive-participant/src/test/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandDispatcherTest.java b/eventuate-tram-sagas-reactive-participant/src/test/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandDispatcherTest.java index d17fe90..1d931c2 100644 --- a/eventuate-tram-sagas-reactive-participant/src/test/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandDispatcherTest.java +++ b/eventuate-tram-sagas-reactive-participant/src/test/java/io/eventuate/tram/sagas/reactive/participant/ReactiveSagaCommandDispatcherTest.java @@ -8,9 +8,14 @@ import io.eventuate.tram.messaging.producer.MessageBuilder; import io.eventuate.tram.reactive.commands.consumer.ReactiveCommandHandler; import io.eventuate.tram.reactive.commands.consumer.ReactiveCommandHandlers; +import io.eventuate.tram.reactive.commands.consumer.ReactiveCommandReplyProducer; import io.eventuate.tram.reactive.messaging.producer.common.ReactiveMessageProducer; import io.eventuate.tram.sagas.reactive.common.ReactiveSagaLockManager; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -19,20 +24,35 @@ import java.util.function.Function; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static reactor.core.publisher.Mono.from; +@RunWith(MockitoJUnitRunner.class) public class ReactiveSagaCommandDispatcherTest { + + @Mock + private Function messageHandlerConsumer; + @Mock + private ReactiveMessageProducer producer; + @Mock + private ReactiveSagaLockManager sagaLockManager; + @Mock + private ReactiveMessageConsumer messageConsumer; + + @Before + public void setUp() { + when(messageHandlerConsumer.apply(any())).thenReturn(CommandHandlerReplyBuilder.withSuccess()); + when(producer.send(any(), any())).thenReturn(Mono.just(CommandHandlerReplyBuilder.withSuccess())); + } + @Test public void testHandlerIsInvokedOnlyOnce() { - Function messageHandlerConsumer = createMessageHandlerConsumer(); ReactiveSagaCommandDispatcher reactiveSagaCommandDispatcher = - new ReactiveSagaCommandDispatcher("commandDispatcherId1", createReactiveCommandHandlers(messageHandlerConsumer), mockConsumer(), mockProducer(), mockLockManager()); + new ReactiveSagaCommandDispatcher("commandDispatcherId1", createReactiveCommandHandlers(messageHandlerConsumer), messageConsumer, sagaLockManager, + new ReactiveCommandReplyProducer(producer)); - from(reactiveSagaCommandDispatcher.messageHandler(createCommandMessage())).block(); + Mono.from(reactiveSagaCommandDispatcher.messageHandler(createCommandMessage())).block(); verify(messageHandlerConsumer).apply(any()); } @@ -48,35 +68,11 @@ private Message createCommandMessage() { private ReactiveCommandHandlers createReactiveCommandHandlers(Function messageHandlerConsumer) { ReactiveCommandHandler reactiveCommandHandler = - new ReactiveCommandHandler("channel1", Optional.empty(), TestCommand.class, (message, pathVariables) -> + new ReactiveCommandHandler("channel1", Optional.empty(), TestCommand.class, (args) -> Flux.just("TEST ITERATION VALUE").map(messageHandlerConsumer)); return new ReactiveCommandHandlers(Collections.singletonList(reactiveCommandHandler)); } - private Function createMessageHandlerConsumer() { - Function messageHandlerConsumer = mock(Function.class); - - when(messageHandlerConsumer.apply(any())).thenReturn(CommandHandlerReplyBuilder.withSuccess()); - - return messageHandlerConsumer; - } - - private ReactiveMessageProducer mockProducer() { - ReactiveMessageProducer producer = mock(ReactiveMessageProducer.class); - - when(producer.send(any(), any())).thenReturn(Mono.just(CommandHandlerReplyBuilder.withSuccess())); - - return producer; - } - - private ReactiveSagaLockManager mockLockManager() { - return mock(ReactiveSagaLockManager.class); - } - - private ReactiveMessageConsumer mockConsumer() { - return mock(ReactiveMessageConsumer.class); - } - public static class TestCommand implements Command {} } diff --git a/eventuate-tram-sagas-spring-participant/src/main/java/io/eventuate/tram/sagas/spring/participant/SagaParticipantConfiguration.java b/eventuate-tram-sagas-spring-participant/src/main/java/io/eventuate/tram/sagas/spring/participant/SagaParticipantConfiguration.java index 197e324..a8d4e7f 100644 --- a/eventuate-tram-sagas-spring-participant/src/main/java/io/eventuate/tram/sagas/spring/participant/SagaParticipantConfiguration.java +++ b/eventuate-tram-sagas-spring-participant/src/main/java/io/eventuate/tram/sagas/spring/participant/SagaParticipantConfiguration.java @@ -1,11 +1,12 @@ package io.eventuate.tram.sagas.spring.participant; import io.eventuate.tram.commands.common.CommandNameMapping; +import io.eventuate.tram.commands.consumer.CommandReplyProducer; import io.eventuate.tram.messaging.consumer.MessageConsumer; import io.eventuate.tram.messaging.producer.MessageProducer; import io.eventuate.tram.sagas.common.SagaLockManager; -import io.eventuate.tram.sagas.spring.common.EventuateTramSagaCommonConfiguration; import io.eventuate.tram.sagas.participant.SagaCommandDispatcherFactory; +import io.eventuate.tram.sagas.spring.common.EventuateTramSagaCommonConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -17,7 +18,10 @@ public class SagaParticipantConfiguration { public SagaCommandDispatcherFactory sagaCommandDispatcherFactory(MessageConsumer messageConsumer, MessageProducer messageProducer, SagaLockManager sagaLockManager, - CommandNameMapping commandNameMapping) { - return new SagaCommandDispatcherFactory(messageConsumer, messageProducer, sagaLockManager, commandNameMapping); + CommandNameMapping commandNameMapping, + CommandReplyProducer commandReplyProducer) { + return new SagaCommandDispatcherFactory(messageConsumer, messageProducer, sagaLockManager, commandNameMapping, commandReplyProducer); } + + } diff --git a/eventuate-tram-sagas-spring-reactive-participant/src/main/java/io/eventuate/tram/sagas/spring/reactive/participant/ReactiveSagaParticipantConfiguration.java b/eventuate-tram-sagas-spring-reactive-participant/src/main/java/io/eventuate/tram/sagas/spring/reactive/participant/ReactiveSagaParticipantConfiguration.java index 96e0866..960f338 100644 --- a/eventuate-tram-sagas-spring-reactive-participant/src/main/java/io/eventuate/tram/sagas/spring/reactive/participant/ReactiveSagaParticipantConfiguration.java +++ b/eventuate-tram-sagas-spring-reactive-participant/src/main/java/io/eventuate/tram/sagas/spring/reactive/participant/ReactiveSagaParticipantConfiguration.java @@ -1,7 +1,7 @@ package io.eventuate.tram.sagas.spring.reactive.participant; import io.eventuate.tram.consumer.common.reactive.ReactiveMessageConsumer; -import io.eventuate.tram.reactive.messaging.producer.common.ReactiveMessageProducer; +import io.eventuate.tram.reactive.commands.consumer.ReactiveCommandReplyProducer; import io.eventuate.tram.sagas.reactive.common.ReactiveSagaLockManager; import io.eventuate.tram.sagas.reactive.participant.ReactiveSagaCommandDispatcherFactory; import io.eventuate.tram.sagas.spring.reactive.common.EventuateReactiveTramSagaCommonConfiguration; @@ -14,8 +14,10 @@ public class ReactiveSagaParticipantConfiguration { @Bean public ReactiveSagaCommandDispatcherFactory sagaCommandDispatcherFactory(ReactiveMessageConsumer messageConsumer, - ReactiveMessageProducer messageProducer, - ReactiveSagaLockManager sagaLockManager) { - return new ReactiveSagaCommandDispatcherFactory(messageConsumer, messageProducer, sagaLockManager); + ReactiveSagaLockManager sagaLockManager, + ReactiveCommandReplyProducer commandReplyProducer) { + return new ReactiveSagaCommandDispatcherFactory(messageConsumer, sagaLockManager, commandReplyProducer); } + + } diff --git a/eventuate-tram-sagas-spring-testing-support/src/main/java/io/eventuate/tram/sagas/spring/testing/SagaParticipantStubManagerConfiguration.java b/eventuate-tram-sagas-spring-testing-support/src/main/java/io/eventuate/tram/sagas/spring/testing/SagaParticipantStubManagerConfiguration.java index 8a07d41..3d229f4 100644 --- a/eventuate-tram-sagas-spring-testing-support/src/main/java/io/eventuate/tram/sagas/spring/testing/SagaParticipantStubManagerConfiguration.java +++ b/eventuate-tram-sagas-spring-testing-support/src/main/java/io/eventuate/tram/sagas/spring/testing/SagaParticipantStubManagerConfiguration.java @@ -1,12 +1,12 @@ package io.eventuate.tram.sagas.spring.testing; import io.eventuate.tram.commands.common.CommandNameMapping; +import io.eventuate.tram.commands.consumer.CommandReplyProducer; +import io.eventuate.tram.messaging.consumer.MessageConsumer; import io.eventuate.tram.sagas.testing.SagaParticipantChannels; import io.eventuate.tram.sagas.testing.SagaParticipantStubManager; import io.eventuate.tram.spring.commands.producer.TramCommandProducerConfiguration; import io.eventuate.tram.spring.events.publisher.TramEventsPublisherConfiguration; -import io.eventuate.tram.messaging.consumer.MessageConsumer; -import io.eventuate.tram.messaging.producer.MessageProducer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -18,9 +18,9 @@ public class SagaParticipantStubManagerConfiguration { @Bean public SagaParticipantStubManager sagaParticipantStubManager(SagaParticipantChannels sagaParticipantChannels, MessageConsumer messageConsumer, - MessageProducer messageProducer, - CommandNameMapping commandNameMapping) { - return new SagaParticipantStubManager(sagaParticipantChannels, messageConsumer, messageProducer, commandNameMapping); + CommandNameMapping commandNameMapping, + CommandReplyProducer commandReplyProducer) { + return new SagaParticipantStubManager(sagaParticipantChannels, messageConsumer, commandNameMapping, commandReplyProducer); } } diff --git a/eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/SagaParticipantStubManager.java b/eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/SagaParticipantStubManager.java index 72442c3..1a1ecf7 100644 --- a/eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/SagaParticipantStubManager.java +++ b/eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/SagaParticipantStubManager.java @@ -4,11 +4,11 @@ import io.eventuate.tram.commands.common.Command; import io.eventuate.tram.commands.common.CommandNameMapping; import io.eventuate.tram.commands.consumer.CommandMessage; +import io.eventuate.tram.commands.consumer.CommandReplyProducer; import io.eventuate.tram.messaging.common.Message; import io.eventuate.tram.messaging.consumer.MessageConsumer; -import io.eventuate.tram.messaging.producer.MessageProducer; -import io.eventuate.tram.sagas.testing.commandhandling.SagaParticipantStubCommandHandler; import io.eventuate.tram.sagas.testing.commandhandling.ReconfigurableCommandHandlers; +import io.eventuate.tram.sagas.testing.commandhandling.SagaParticipantStubCommandHandler; import io.eventuate.tram.sagas.testing.commandhandling.UnhandledMessageTrackingCommandDispatcher; import javax.annotation.PostConstruct; @@ -25,22 +25,20 @@ */ public class SagaParticipantStubManager { private final ReconfigurableCommandHandlers commandHandlers; - private Set commandChannels; + private final Set commandChannels; private final UnhandledMessageTrackingCommandDispatcher commandDispatcher; private String currentCommandChannel; public SagaParticipantStubManager(SagaParticipantChannels sagaParticipantChannels, MessageConsumer messageConsumer, - MessageProducer messageProducer, - CommandNameMapping commandNameMapping) { + CommandNameMapping commandNameMapping, CommandReplyProducer commandReplyProducer) { this.commandChannels = sagaParticipantChannels.getChannels(); this.commandHandlers = new ReconfigurableCommandHandlers(this.commandChannels); this.commandDispatcher = new UnhandledMessageTrackingCommandDispatcher("SagaParticipantStubManager-command-dispatcher-" + System.currentTimeMillis(), commandHandlers, messageConsumer, - messageProducer, - commandNameMapping); + commandNameMapping, commandReplyProducer); /// TODO handle scenario where a command is recieved for which there is not a handler. } @@ -88,7 +86,7 @@ public void verifyCommandReceived(String channel, Class c } - public class SagaParticipantStubManagerHelper { + public class SagaParticipantStubManagerHelper { private Class expectedCommandClass; private final Predicate expectedCommand; private SagaParticipantStubManager sagaParticipantStubManager; diff --git a/eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/commandhandling/SagaParticipantStubCommandHandler.java b/eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/commandhandling/SagaParticipantStubCommandHandler.java index 31a5c53..4fd03b4 100644 --- a/eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/commandhandling/SagaParticipantStubCommandHandler.java +++ b/eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/commandhandling/SagaParticipantStubCommandHandler.java @@ -2,21 +2,19 @@ import io.eventuate.tram.commands.common.Command; import io.eventuate.tram.commands.consumer.CommandHandler; +import io.eventuate.tram.commands.consumer.CommandHandlerArgs; import io.eventuate.tram.commands.consumer.CommandMessage; -import io.eventuate.tram.commands.consumer.PathVariables; import io.eventuate.tram.messaging.common.Message; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.function.Function; import java.util.function.Predicate; import static java.util.Collections.singletonList; -import static org.junit.Assert.fail; -public class SagaParticipantStubCommandHandler extends CommandHandler { +public class SagaParticipantStubCommandHandler extends CommandHandler { private String commandChannel; private final Predicate expectedCommand; @@ -24,7 +22,7 @@ public class SagaParticipantStubCommandHandler extends CommandHandler { private final List> receivedCommands = new ArrayList<>(); public SagaParticipantStubCommandHandler(String commandChannel, Class expectedCommandClass, Predicate expectedCommand, Function, Message> replyBuilder) { - super(commandChannel, Optional.empty(), expectedCommandClass, (cm, pv) -> { + super(commandChannel, Optional.empty(), expectedCommandClass, args -> { // Dummy - override invoke method return null; }); @@ -34,7 +32,8 @@ public SagaParticipantStubCommandHandler(String commandChannel, Class expecte } @Override - public List invokeMethod(CommandMessage cm, Map pathVars) { + public List invokeMethod(CommandHandlerArgs args) { + CommandMessage cm = args.getCommandMessage(); receivedCommands.add(cm); return singletonList(replyBuilder.apply(cm)); } diff --git a/eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/commandhandling/UnhandledMessageTrackingCommandDispatcher.java b/eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/commandhandling/UnhandledMessageTrackingCommandDispatcher.java index ceddf1a..3ff4a27 100644 --- a/eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/commandhandling/UnhandledMessageTrackingCommandDispatcher.java +++ b/eventuate-tram-sagas-testing-support/src/main/java/io/eventuate/tram/sagas/testing/commandhandling/UnhandledMessageTrackingCommandDispatcher.java @@ -1,12 +1,13 @@ package io.eventuate.tram.sagas.testing.commandhandling; +import io.eventuate.tram.commands.common.Command; import io.eventuate.tram.commands.common.CommandNameMapping; import io.eventuate.tram.commands.consumer.CommandDispatcher; import io.eventuate.tram.commands.consumer.CommandHandler; import io.eventuate.tram.commands.consumer.CommandHandlers; +import io.eventuate.tram.commands.consumer.CommandReplyProducer; import io.eventuate.tram.messaging.common.Message; import io.eventuate.tram.messaging.consumer.MessageConsumer; -import io.eventuate.tram.messaging.producer.MessageProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,9 +27,8 @@ public class UnhandledMessageTrackingCommandDispatcher extends CommandDispatcher public UnhandledMessageTrackingCommandDispatcher(String commandDispatcherId, CommandHandlers commandHandlers, MessageConsumer messageConsumer, - MessageProducer messageProducer, - CommandNameMapping commandNameMapping) { - super(commandDispatcherId, commandHandlers, messageConsumer, messageProducer, commandNameMapping); + CommandNameMapping commandNameMapping, CommandReplyProducer commandReplyProducer) { + super(commandDispatcherId, commandHandlers, messageConsumer, commandNameMapping, commandReplyProducer); this.commandHandlers = commandHandlers; } @@ -49,7 +49,7 @@ public void reset() { unhandledMessages.clear(); } - public void noteNewCommandHandler(SagaParticipantStubCommandHandler commandHandler) { + public void noteNewCommandHandler(SagaParticipantStubCommandHandler commandHandler) { List handled = unhandledMessages.stream().filter(commandHandler::handles).collect(toList()); if (!handled.isEmpty()) logger.info("Processing unhandled messages {}", handled);