diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/AbstractMessagingEventPublisher.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/AbstractMessagingEventPublisher.java new file mode 100644 index 00000000000..b3889fd088f --- /dev/null +++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/AbstractMessagingEventPublisher.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.events.process; + +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.OnOverflow; +import org.eclipse.microprofile.reactive.messaging.OnOverflow.Strategy; +import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider; +import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.EventPublisher; +import org.kie.kogito.events.config.EventsRuntimeConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.smallrye.reactive.messaging.MutinyEmitter; +import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; + +public abstract class AbstractMessagingEventPublisher implements EventPublisher { + + private static final Logger logger = LoggerFactory.getLogger(AbstractMessagingEventPublisher.class); + + @Inject + ObjectMapper json; + + @Inject + @Channel(PROCESS_INSTANCES_TOPIC_NAME) + @OnOverflow(Strategy.UNBOUNDED_BUFFER) + MutinyEmitter processInstancesEventsEmitter; + private AbstractMessageEmitter processInstanceConsumer; + + @Inject + @Channel(PROCESS_DEFINITIONS_TOPIC_NAME) + MutinyEmitter processDefinitionEventsEmitter; + private AbstractMessageEmitter processDefinitionConsumer; + + @Inject + @Channel(USER_TASK_INSTANCES_TOPIC_NAME) + MutinyEmitter userTasksEventsEmitter; + private AbstractMessageEmitter userTaskConsumer; + @Inject + EventsRuntimeConfig eventsRuntimeConfig; + + @Inject + Instance decoratorProviderInstance; + + private MessageDecoratorProvider decoratorProvider; + + @PostConstruct + public void init() { + decoratorProvider = decoratorProviderInstance.isResolvable() ? decoratorProviderInstance.get() : null; + processDefinitionConsumer = eventsRuntimeConfig.isProcessInstancesPropagateError() ? new BlockingMessageEmitter(processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME) + : new ReactiveMessageEmitter(processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME); + processInstanceConsumer = eventsRuntimeConfig.isProcessDefinitionPropagateError() ? new BlockingMessageEmitter(processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME) + : new ReactiveMessageEmitter(processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME); + userTaskConsumer = eventsRuntimeConfig.isUserTasksPropagateError() ? new BlockingMessageEmitter(userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME) + : new ReactiveMessageEmitter(userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME); + } + + protected Optional getConsumer(DataEvent event) { + switch (event.getType()) { + case "ProcessDefinitionEvent": + return eventsRuntimeConfig.isProcessDefinitionEventsEnabled() ? Optional.of(processDefinitionConsumer) : Optional.empty(); + + case "ProcessInstanceErrorDataEvent": + case "ProcessInstanceNodeDataEvent": + case "ProcessInstanceSLADataEvent": + case "ProcessInstanceStateDataEvent": + case "ProcessInstanceVariableDataEvent": + return eventsRuntimeConfig.isProcessInstancesEventsEnabled() ? Optional.of(processInstanceConsumer) : Optional.empty(); + + case "UserTaskInstanceAssignmentDataEvent": + case "UserTaskInstanceAttachmentDataEvent": + case "UserTaskInstanceCommentDataEvent": + case "UserTaskInstanceDeadlineDataEvent": + case "UserTaskInstanceStateDataEvent": + case "UserTaskInstanceVariableDataEvent": + return eventsRuntimeConfig.isUserTasksEventsEnabled() ? Optional.of(userTaskConsumer) : Optional.empty(); + + default: + return Optional.empty(); + } + } + + @Override + public void publish(Collection> events) { + for (DataEvent event : events) { + publish(event); + } + } + + protected void publishToTopic(AbstractMessageEmitter emitter, Object event) { + logger.debug("About to publish event {} to topic {}", event, emitter.topic); + Message message = null; + try { + String eventString = json.writeValueAsString(event); + logger.debug("Event payload '{}'", eventString); + message = decorateMessage(ContextAwareMessage.of(eventString)); + } catch (Exception e) { + logger.error("Error while creating event to topic {} for event {}", emitter.topic, event); + } + if (message != null) { + emitter.accept(message); + } + } + + protected Message decorateMessage(Message message) { + return decoratorProvider != null ? decoratorProvider.decorate(message) : message; + } + + protected static abstract class AbstractMessageEmitter implements Consumer> { + + protected final String topic; + protected final MutinyEmitter emitter; + + protected AbstractMessageEmitter(MutinyEmitter emitter, String topic) { + this.emitter = emitter; + this.topic = topic; + } + } + + private static class BlockingMessageEmitter extends AbstractMessageEmitter { + protected BlockingMessageEmitter(MutinyEmitter emitter, String topic) { + super(emitter, topic); + } + + @Override + public void accept(Message message) { + emitter.sendMessageAndAwait(message); + logger.debug("Successfully published message {}", message.getPayload()); + } + } + + private static class ReactiveMessageEmitter extends AbstractMessageEmitter { + protected ReactiveMessageEmitter(MutinyEmitter emitter, String topic) { + super(emitter, topic); + } + + @Override + public void accept(Message message) { + emitter.sendMessageAndForget(message + .withAck(() -> onAck(message)) + .withNack(reason -> onNack(reason, message))); + } + + private CompletionStage onAck(Message message) { + logger.debug("Successfully published message {}", message.getPayload()); + return CompletableFuture.completedFuture(null); + } + + private CompletionStage onNack(Throwable reason, Message message) { + logger.error("Error while publishing message {}", message, reason); + return CompletableFuture.completedFuture(null); + } + + } +} diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java new file mode 100644 index 00000000000..32939f4d482 --- /dev/null +++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.events.process; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.kie.kogito.event.DataEvent; + +import io.quarkus.arc.properties.IfBuildProperty; + +import jakarta.inject.Singleton; + +@Singleton +@IfBuildProperty(name = "kogito.events.grouping", stringValue = "true") +public class GroupingMessagingEventPublisher extends AbstractMessagingEventPublisher { + + @Override + public void publish(DataEvent event) { + publish(Collections.singletonList(event)); + } + + @Override + public void publish(Collection> events) { + Map>> eventsByChannel = new HashMap<>(); + for (DataEvent event : events) { + getConsumer(event).ifPresent(c -> eventsByChannel.computeIfAbsent(c, k -> new ArrayList<>()).add(event)); + } + for (Entry>> item : eventsByChannel.entrySet()) { + publishToTopic(item.getKey(), item.getValue()); + } + } + +} diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java index c232a1eb471..c6aa4424f0c 100644 --- a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java +++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java @@ -19,102 +19,20 @@ package org.kie.kogito.events.process; import java.util.Collection; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.function.BiConsumer; -import org.eclipse.microprofile.reactive.messaging.Channel; -import org.eclipse.microprofile.reactive.messaging.Message; -import org.eclipse.microprofile.reactive.messaging.OnOverflow; -import org.eclipse.microprofile.reactive.messaging.OnOverflow.Strategy; -import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider; import org.kie.kogito.event.DataEvent; -import org.kie.kogito.event.EventPublisher; -import org.kie.kogito.events.config.EventsRuntimeConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; +import io.quarkus.arc.properties.UnlessBuildProperty; -import io.smallrye.reactive.messaging.MutinyEmitter; -import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; - -import jakarta.annotation.PostConstruct; -import jakarta.enterprise.inject.Instance; -import jakarta.inject.Inject; import jakarta.inject.Singleton; @Singleton -public class ReactiveMessagingEventPublisher implements EventPublisher { - - private static final Logger logger = LoggerFactory.getLogger(ReactiveMessagingEventPublisher.class); - - @Inject - ObjectMapper json; - - @Inject - @Channel(PROCESS_INSTANCES_TOPIC_NAME) - @OnOverflow(Strategy.UNBOUNDED_BUFFER) - MutinyEmitter processInstancesEventsEmitter; - private BiConsumer, Message> processInstanceConsumer; - - @Inject - @Channel(PROCESS_DEFINITIONS_TOPIC_NAME) - MutinyEmitter processDefinitionEventsEmitter; - private BiConsumer, Message> processDefinitionConsumer; - - @Inject - @Channel(USER_TASK_INSTANCES_TOPIC_NAME) - MutinyEmitter userTasksEventsEmitter; - private BiConsumer, Message> userTaskConsumer; - @Inject - EventsRuntimeConfig eventsRuntimeConfig; - - @Inject - Instance decoratorProviderInstance; - - private MessageDecoratorProvider decoratorProvider; - - @PostConstruct - public void init() { - decoratorProvider = decoratorProviderInstance.isResolvable() ? decoratorProviderInstance.get() : null; - processInstanceConsumer = eventsRuntimeConfig.isProcessInstancesPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter(); - processDefinitionConsumer = eventsRuntimeConfig.isProcessDefinitionPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter(); - userTaskConsumer = eventsRuntimeConfig.isUserTasksPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter(); - } +@UnlessBuildProperty(name = "kogito.events.grouping", stringValue = "true", enableIfMissing = true) +public class ReactiveMessagingEventPublisher extends AbstractMessagingEventPublisher { @Override public void publish(DataEvent event) { - - switch (event.getType()) { - case "ProcessDefinitionEvent": - if (eventsRuntimeConfig.isProcessDefinitionEventsEnabled()) { - publishToTopic(processDefinitionConsumer, event, processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME); - } - break; - case "ProcessInstanceErrorDataEvent": - case "ProcessInstanceNodeDataEvent": - case "ProcessInstanceSLADataEvent": - case "ProcessInstanceStateDataEvent": - case "ProcessInstanceVariableDataEvent": - if (eventsRuntimeConfig.isProcessInstancesEventsEnabled()) { - publishToTopic(processInstanceConsumer, event, processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME); - } - break; - - case "UserTaskInstanceAssignmentDataEvent": - case "UserTaskInstanceAttachmentDataEvent": - case "UserTaskInstanceCommentDataEvent": - case "UserTaskInstanceDeadlineDataEvent": - case "UserTaskInstanceStateDataEvent": - case "UserTaskInstanceVariableDataEvent": - if (eventsRuntimeConfig.isUserTasksEventsEnabled()) { - publishToTopic(userTaskConsumer, event, userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME); - } - break; - default: - logger.debug("Unknown type of event '{}', ignoring for this publisher", event.getType()); - } + getConsumer(event).ifPresent(emitter -> publishToTopic(emitter, event)); } @Override @@ -124,49 +42,4 @@ public void publish(Collection> events) { } } - protected void publishToTopic(BiConsumer, Message> consumer, DataEvent event, MutinyEmitter emitter, String topic) { - logger.debug("About to publish event {} to topic {}", event, topic); - Message message = null; - try { - String eventString = json.writeValueAsString(event); - logger.debug("Event payload '{}'", eventString); - message = decorateMessage(ContextAwareMessage.of(eventString)); - } catch (Exception e) { - logger.error("Error while creating event to topic {} for event {}", topic, event); - } - if (message != null) { - consumer.accept(emitter, message); - } - } - - protected CompletionStage onAck(Message message) { - logger.debug("Successfully published message {}", message.getPayload()); - return CompletableFuture.completedFuture(null); - } - - protected CompletionStage onNack(Throwable reason, Message message) { - logger.error("Error while publishing message {}", message, reason); - return CompletableFuture.completedFuture(null); - } - - protected Message decorateMessage(Message message) { - return decoratorProvider != null ? decoratorProvider.decorate(message) : message; - } - - private class BlockingMessageEmitter implements BiConsumer, Message> { - @Override - public void accept(MutinyEmitter emitter, Message message) { - emitter.sendMessageAndAwait(message); - logger.debug("Successfully published message {}", message.getPayload()); - } - } - - private class ReactiveMessageEmitter implements BiConsumer, Message> { - @Override - public void accept(MutinyEmitter emitter, Message message) { - emitter.sendMessageAndForget(message - .withAck(() -> onAck(message)) - .withNack(reason -> onNack(reason, message))); - } - } }