Skip to content

Commit

Permalink
Updates to reflect changes made by eventuate-tram/eventuate-tram-core…
Browse files Browse the repository at this point in the history
  • Loading branch information
cer committed Aug 15, 2022
1 parent 10c7cd1 commit 0bfe1df
Show file tree
Hide file tree
Showing 21 changed files with 141 additions and 134 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.eventuate.tram.sagas.micronaut.participant;

import io.eventuate.tram.commands.common.CommandNameMapping;
import io.eventuate.tram.commands.consumer.CommandReplyProducer;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageProducer;
import io.eventuate.tram.sagas.common.SagaLockManager;
Expand All @@ -15,7 +16,9 @@ public class SagaParticipantFactory {
public SagaCommandDispatcherFactory sagaCommandDispatcherFactory(MessageConsumer messageConsumer,
MessageProducer messageProducer,
SagaLockManager sagaLockManager,
CommandNameMapping commandNameMapping) {
return new SagaCommandDispatcherFactory(messageConsumer, messageProducer, sagaLockManager, commandNameMapping);
CommandNameMapping commandNameMapping, CommandReplyProducer commandReplyProducer) {
return new SagaCommandDispatcherFactory(messageConsumer, messageProducer, sagaLockManager, commandNameMapping, commandReplyProducer);
}


}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.eventuate.tram.sagas.micronaut.testing;

import io.eventuate.tram.commands.common.CommandNameMapping;
import io.eventuate.tram.commands.consumer.CommandReplyProducer;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageProducer;
import io.eventuate.tram.sagas.testing.SagaParticipantChannels;
import io.eventuate.tram.sagas.testing.SagaParticipantStubManager;
import io.micronaut.context.annotation.Factory;
Expand All @@ -15,8 +15,9 @@ public class SagaParticipantStubManagerFactory {
@Singleton
public SagaParticipantStubManager sagaParticipantStubManager(SagaParticipantChannels sagaParticipantChannels,
MessageConsumer messageConsumer,
MessageProducer messageProducer,
CommandNameMapping commandNameMapping) {
return new SagaParticipantStubManager(sagaParticipantChannels, messageConsumer, messageProducer, commandNameMapping);
CommandNameMapping commandNameMapping, CommandReplyProducer commandReplyProducer) {
return new SagaParticipantStubManager(sagaParticipantChannels, messageConsumer, commandNameMapping, commandReplyProducer);
}


}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.eventuate.tram.sagas.participant;

import io.eventuate.tram.commands.common.Command;
import io.eventuate.tram.commands.consumer.CommandMessage;
import io.eventuate.tram.messaging.common.Message;

Expand All @@ -9,14 +10,14 @@
import java.util.function.Function;

public interface AbstractSagaCommandHandlersBuilder {
<C> SagaCommandHandlerBuilder<C> onMessageReturningMessages(Class<C> commandClass,
Function<CommandMessage<C>, List<Message>> handler);
<C extends Command> SagaCommandHandlerBuilder<C> onMessageReturningMessages(Class<C> commandClass,
Function<CommandMessage<C>, List<Message>> handler);

<C> SagaCommandHandlerBuilder<C> onMessageReturningOptionalMessage(Class<C> commandClass,
<C extends Command> SagaCommandHandlerBuilder<C> onMessageReturningOptionalMessage(Class<C> commandClass,
Function<CommandMessage<C>, Optional<Message>> handler);

<C> SagaCommandHandlerBuilder<C> onMessage(Class<C> commandClass,
<C extends Command> SagaCommandHandlerBuilder<C> onMessage(Class<C> commandClass,
Function<CommandMessage<C>, Message> handler);

<C> SagaCommandHandlerBuilder<C> onMessage(Class<C> commandClass, Consumer<CommandMessage<C>> handler);
<C extends Command> SagaCommandHandlerBuilder<C> onMessage(Class<C> commandClass, Consumer<CommandMessage<C>> handler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@

import io.eventuate.tram.commands.common.CommandMessageHeaders;
import io.eventuate.tram.commands.common.CommandNameMapping;
import io.eventuate.tram.commands.consumer.CommandDispatcher;
import io.eventuate.tram.commands.consumer.CommandHandler;
import io.eventuate.tram.commands.consumer.CommandHandlers;
import io.eventuate.tram.commands.consumer.CommandMessage;
import io.eventuate.tram.commands.consumer.PathVariables;
import io.eventuate.tram.commands.consumer.*;
import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageBuilder;
import io.eventuate.tram.messaging.producer.MessageProducer;
import io.eventuate.tram.sagas.common.*;

import java.util.List;
Expand All @@ -25,10 +20,9 @@ public class SagaCommandDispatcher extends CommandDispatcher {
public SagaCommandDispatcher(String commandDispatcherId,
CommandHandlers target,
MessageConsumer messageConsumer,
MessageProducer messageProducer,
SagaLockManager sagaLockManager,
CommandNameMapping commandNameMapping) {
super(commandDispatcherId, target, messageConsumer, messageProducer, commandNameMapping);
CommandNameMapping commandNameMapping, CommandReplyProducer commandReplyProducer) {
super(commandDispatcherId, target, messageConsumer, commandNameMapping, commandReplyProducer);
this.sagaLockManager = sagaLockManager;
}

Expand Down Expand Up @@ -61,7 +55,7 @@ private String getSagaType(Message message) {


@Override
protected List<Message> invoke(CommandHandler commandHandler, CommandMessage cm, Map<String, String> pathVars) {
protected List<Message> invoke(CommandHandler commandHandler, CommandMessage cm, Map<String, String> pathVars, CommandReplyToken commandReplyToken) {
Optional<String> lockedTarget = Optional.empty();
if (commandHandler instanceof SagaCommandHandler) {
SagaCommandHandler sch = (SagaCommandHandler) commandHandler;
Expand All @@ -77,7 +71,7 @@ protected List<Message> invoke(CommandHandler commandHandler, CommandMessage cm,
}
}

List<Message> messages = super.invoke(commandHandler, cm, pathVars);
List<Message> messages = super.invoke(commandHandler, cm, pathVars, commandReplyToken);

if (lockedTarget.isPresent())
return addLockedHeader(messages, lockedTarget.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.eventuate.tram.commands.common.CommandNameMapping;
import io.eventuate.tram.commands.consumer.CommandHandlers;
import io.eventuate.tram.commands.consumer.CommandReplyProducer;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageProducer;
import io.eventuate.tram.sagas.common.SagaLockManager;
Expand All @@ -12,18 +13,20 @@ public class SagaCommandDispatcherFactory {
private final MessageProducer messageProducer;
private final SagaLockManager sagaLockManager;
private final CommandNameMapping commandNameMapping;
private final CommandReplyProducer commandReplyProducer;

public SagaCommandDispatcherFactory(MessageConsumer messageConsumer,
MessageProducer messageProducer,
SagaLockManager sagaLockManager,
CommandNameMapping commandNameMapping) {
CommandNameMapping commandNameMapping, CommandReplyProducer commandReplyProducer) {
this.messageConsumer = messageConsumer;
this.messageProducer = messageProducer;
this.sagaLockManager = sagaLockManager;
this.commandNameMapping = commandNameMapping;
this.commandReplyProducer = commandReplyProducer;
}

public SagaCommandDispatcher make(String commandDispatcherId, CommandHandlers target) {
return new SagaCommandDispatcher(commandDispatcherId, target, messageConsumer, messageProducer, sagaLockManager, commandNameMapping);
return new SagaCommandDispatcher(commandDispatcherId, target, messageConsumer, sagaLockManager, commandNameMapping, commandReplyProducer);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.eventuate.tram.sagas.participant;

import io.eventuate.tram.commands.common.Command;
import io.eventuate.tram.commands.consumer.CommandHandler;
import io.eventuate.tram.commands.consumer.CommandHandlerArgs;
import io.eventuate.tram.commands.consumer.CommandMessage;
import io.eventuate.tram.commands.consumer.PathVariables;
import io.eventuate.tram.messaging.common.Message;
Expand All @@ -17,12 +19,8 @@ public class SagaCommandHandler extends CommandHandler {
private Optional<BiFunction<CommandMessage, PathVariables, LockTarget>> preLock = Optional.empty();
private Optional<PostLockFunction> postLock = Optional.empty();

public <C> SagaCommandHandler(String channel, String resource, Class<C> commandClass, BiFunction<CommandMessage<C>, PathVariables, List<Message>> handler) {
super(channel, Optional.of(resource), commandClass, handler);
}

public <C> SagaCommandHandler(String channel, Class<C> commandClass, Function<CommandMessage<C>, List<Message>> handler) {
super(channel, Optional.empty(), commandClass, (c, pv) -> handler.apply(c));
public <C extends Command> SagaCommandHandler(String channel, Class<C> commandClass, Function<CommandHandlerArgs<C>, List<Message>> handler) {
super(channel, Optional.empty(), commandClass, handler);
}

public void setPreLock(BiFunction<CommandMessage, PathVariables, LockTarget> preLock) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.eventuate.tram.sagas.participant;

import io.eventuate.tram.commands.common.Command;
import io.eventuate.tram.commands.consumer.CommandHandlers;
import io.eventuate.tram.commands.consumer.CommandMessage;
import io.eventuate.tram.commands.consumer.PathVariables;
Expand All @@ -12,7 +13,7 @@
import java.util.function.Consumer;
import java.util.function.Function;

public class SagaCommandHandlerBuilder<C> implements AbstractSagaCommandHandlersBuilder{
public class SagaCommandHandlerBuilder<C extends Command> implements AbstractSagaCommandHandlersBuilder {
private final SagaCommandHandlersBuilder parent;
private final SagaCommandHandler h;

Expand All @@ -23,22 +24,22 @@ public SagaCommandHandlerBuilder(SagaCommandHandlersBuilder parent, SagaCommandH
}

@Override
public <C> SagaCommandHandlerBuilder<C> onMessageReturningMessages(Class<C> commandClass, Function<CommandMessage<C>, List<Message>> handler) {
public <C extends Command> SagaCommandHandlerBuilder<C> onMessageReturningMessages(Class<C> commandClass, Function<CommandMessage<C>, List<Message>> handler) {
return parent.onMessageReturningMessages(commandClass, handler);
}

@Override
public <C> SagaCommandHandlerBuilder<C> onMessageReturningOptionalMessage(Class<C> commandClass, Function<CommandMessage<C>, Optional<Message>> handler) {
public <C extends Command> SagaCommandHandlerBuilder<C> onMessageReturningOptionalMessage(Class<C> commandClass, Function<CommandMessage<C>, Optional<Message>> handler) {
return parent.onMessageReturningOptionalMessage(commandClass, handler);
}

@Override
public <C> SagaCommandHandlerBuilder<C> onMessage(Class<C> commandClass, Function<CommandMessage<C>, Message> handler) {
public <C extends Command> SagaCommandHandlerBuilder<C> onMessage(Class<C> commandClass, Function<CommandMessage<C>, Message> handler) {
return parent.onMessage(commandClass, handler);
}

@Override
public <C> SagaCommandHandlerBuilder<C> onMessage(Class<C> commandClass, Consumer<CommandMessage<C>> handler) {
public <C extends Command> SagaCommandHandlerBuilder<C> onMessage(Class<C> commandClass, Consumer<CommandMessage<C>> handler) {
return parent.onMessage(commandClass, handler);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.eventuate.tram.sagas.participant;


import io.eventuate.tram.commands.common.Command;
import io.eventuate.tram.commands.consumer.CommandHandler;
import io.eventuate.tram.commands.consumer.CommandHandlers;
import io.eventuate.tram.commands.consumer.CommandMessage;
Expand All @@ -16,7 +17,7 @@
public class SagaCommandHandlersBuilder implements AbstractSagaCommandHandlersBuilder {
private String channel;

private List<CommandHandler> handlers = new ArrayList<>();
private final List<CommandHandler> handlers = new ArrayList<>();

public static SagaCommandHandlersBuilder fromChannel(String channel) {
return new SagaCommandHandlersBuilder().andFromChannel(channel);
Expand All @@ -28,39 +29,39 @@ private SagaCommandHandlersBuilder andFromChannel(String channel) {
}

@Override
public <C> SagaCommandHandlerBuilder<C> onMessageReturningMessages(Class<C> commandClass,
Function<CommandMessage<C>, List<Message>> handler) {
SagaCommandHandler h = new SagaCommandHandler(channel, commandClass, handler);
public <C extends Command> SagaCommandHandlerBuilder<C> onMessageReturningMessages(Class<C> commandClass,
Function<CommandMessage<C>, List<Message>> handler) {
SagaCommandHandler h = new SagaCommandHandler(channel, commandClass, args -> handler.apply(args.getCommandMessage()));
this.handlers.add(h);
return new SagaCommandHandlerBuilder<C>(this, h);
return new SagaCommandHandlerBuilder<>(this, h);
}

@Override
public <C> SagaCommandHandlerBuilder<C> onMessageReturningOptionalMessage(Class<C> commandClass,
public <C extends Command> SagaCommandHandlerBuilder<C> onMessageReturningOptionalMessage(Class<C> commandClass,
Function<CommandMessage<C>, Optional<Message>> handler) {
SagaCommandHandler h = new SagaCommandHandler(channel, commandClass, (c) -> handler.apply(c).map(Collections::singletonList).orElse(Collections.EMPTY_LIST));
SagaCommandHandler h = new SagaCommandHandler(channel, commandClass, args -> handler.apply(args.getCommandMessage()).map(Collections::singletonList).orElse(Collections.emptyList()));
this.handlers.add(h);
return new SagaCommandHandlerBuilder<C>(this, h);
return new SagaCommandHandlerBuilder<>(this, h);
}

@Override
public <C> SagaCommandHandlerBuilder<C> onMessage(Class<C> commandClass,
public <C extends Command> SagaCommandHandlerBuilder<C> onMessage(Class<C> commandClass,
Function<CommandMessage<C>, Message> handler) {
SagaCommandHandler h = new SagaCommandHandler(channel, commandClass,
(c) -> Collections.singletonList(handler.apply(c)));
args -> Collections.singletonList(handler.apply(args.getCommandMessage())));
this.handlers.add(h);
return new SagaCommandHandlerBuilder<C>(this, h);
return new SagaCommandHandlerBuilder<>(this, h);
}

@Override
public <C> SagaCommandHandlerBuilder<C> onMessage(Class<C> commandClass, Consumer<CommandMessage<C>> handler) {
public <C extends Command> SagaCommandHandlerBuilder<C> onMessage(Class<C> commandClass, Consumer<CommandMessage<C>> handler) {
SagaCommandHandler h = new SagaCommandHandler(channel, commandClass,
(c) -> {
handler.accept(c);
args -> {
handler.accept(args.getCommandMessage());
return Collections.emptyList();
});
this.handlers.add(h);
return new SagaCommandHandlerBuilder<C>(this, h);
return new SagaCommandHandlerBuilder<>(this, h);
}

public CommandHandlers build() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package io.eventuate.tram.sagas.reactive.participant;

import io.eventuate.tram.commands.common.Command;
import io.eventuate.tram.commands.consumer.CommandMessage;
import io.eventuate.tram.messaging.common.Message;
import org.reactivestreams.Publisher;

import java.util.function.Function;

public interface AbstractReactiveSagaCommandHandlersBuilder {
<C> ReactiveSagaCommandHandlerBuilder<C> onMessage(Class<C> commandClass,
Function<CommandMessage<C>, Publisher<Message>> handler);
<C extends Command> ReactiveSagaCommandHandlerBuilder<C> onMessage(Class<C> commandClass,
Function<CommandMessage<C>, Publisher<Message>> handler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@
import io.eventuate.tram.commands.common.CommandMessageHeaders;
import io.eventuate.tram.commands.consumer.CommandHandlerParams;
import io.eventuate.tram.commands.consumer.CommandMessage;
import io.eventuate.tram.commands.consumer.CommandReplyToken;
import io.eventuate.tram.commands.consumer.PathVariables;
import io.eventuate.tram.consumer.common.reactive.ReactiveMessageConsumer;
import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.messaging.producer.MessageBuilder;
import io.eventuate.tram.reactive.commands.consumer.ReactiveCommandDispatcher;
import io.eventuate.tram.reactive.commands.consumer.ReactiveCommandHandler;
import io.eventuate.tram.reactive.commands.consumer.ReactiveCommandHandlers;
import io.eventuate.tram.reactive.messaging.producer.common.ReactiveMessageProducer;
import io.eventuate.tram.reactive.commands.consumer.ReactiveCommandReplyProducer;
import io.eventuate.tram.sagas.common.*;
import io.eventuate.tram.sagas.participant.SagaReplyMessage;
import io.eventuate.tram.sagas.reactive.common.ReactiveSagaLockManager;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Optional;

public class ReactiveSagaCommandDispatcher extends ReactiveCommandDispatcher {
Expand All @@ -26,9 +28,9 @@ public class ReactiveSagaCommandDispatcher extends ReactiveCommandDispatcher {
public ReactiveSagaCommandDispatcher(String commandDispatcherId,
ReactiveCommandHandlers target,
ReactiveMessageConsumer messageConsumer,
ReactiveMessageProducer messageProducer,
ReactiveSagaLockManager sagaLockManager) {
super(commandDispatcherId, target, messageConsumer, messageProducer);
ReactiveSagaLockManager sagaLockManager,
ReactiveCommandReplyProducer commandReplyProducer) {
super(commandDispatcherId, target, messageConsumer, commandReplyProducer);

this.sagaLockManager = sagaLockManager;
}
Expand Down Expand Up @@ -62,7 +64,7 @@ private String getSagaType(Message message) {


@Override
protected Publisher<Message> invoke(ReactiveCommandHandler commandHandler, CommandMessage cm, CommandHandlerParams commandHandlerParams) {
protected Publisher<Message> invoke(ReactiveCommandHandler commandHandler, CommandMessage cm, CommandHandlerParams commandHandlerParams, CommandReplyToken commandReplyToken) {
Optional<String> lockedTarget = Optional.empty();

Flux<Message> result = Flux.empty();
Expand All @@ -86,7 +88,7 @@ protected Publisher<Message> invoke(ReactiveCommandHandler commandHandler, Comma
}
}

result = result.thenMany(super.invoke(commandHandler, cm, commandHandlerParams)).cache();
result = result.thenMany(super.invoke(commandHandler, cm, commandHandlerParams, commandReplyToken)).cache();
Flux<Message> finalizedResult = result;

if (lockedTarget.isPresent())
Expand Down
Loading

1 comment on commit 0bfe1df

@cer
Copy link
Collaborator Author

@cer cer commented on 0bfe1df Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction: reflects changes made by eventuate-tram/eventuate-tram-core#179

Please sign in to comment.