From d50aeb54a1091209216e8df545de1ddd88bc56e0 Mon Sep 17 00:00:00 2001 From: Javier Date: Sat, 31 Aug 2024 11:23:38 +0200 Subject: [PATCH] [Fix apache/incubator-kie-issues#1457] Allow grouping of events --- .../BlockingMessagingEventConsumer.java | 2 + ...roupingReactiveMessagingEventConsumer.java | 110 ++++++++++++++++++ .../ReactiveMessagingEventConsumer.java | 1 + 3 files changed, 113 insertions(+) create mode 100644 data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/GroupingReactiveMessagingEventConsumer.java diff --git a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/BlockingMessagingEventConsumer.java b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/BlockingMessagingEventConsumer.java index a646d3f68c..f71b4d247f 100644 --- a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/BlockingMessagingEventConsumer.java +++ b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/BlockingMessagingEventConsumer.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import io.quarkus.arc.properties.IfBuildProperty; +import io.quarkus.arc.properties.UnlessBuildProperty; import io.smallrye.reactive.messaging.annotations.Blocking; import jakarta.enterprise.context.ApplicationScoped; @@ -44,6 +45,7 @@ @ApplicationScoped @IfBuildProperty(name = "kogito.data-index.blocking", stringValue = "true") +@UnlessBuildProperty(name = "kogito.events.grouping", stringValue = "true") public class BlockingMessagingEventConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(BlockingMessagingEventConsumer.class); diff --git a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/GroupingReactiveMessagingEventConsumer.java b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/GroupingReactiveMessagingEventConsumer.java new file mode 100644 index 0000000000..79e685cd76 --- /dev/null +++ b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/GroupingReactiveMessagingEventConsumer.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.index.service.messaging; + +import java.util.Collection; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.process.ProcessDefinitionDataEvent; +import org.kie.kogito.event.process.ProcessInstanceDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent; +import org.kie.kogito.index.event.KogitoJobCloudEvent; +import org.kie.kogito.index.service.IndexingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.quarkus.arc.properties.IfBuildProperty; +import io.smallrye.mutiny.Uni; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Event; +import jakarta.inject.Inject; + +@ApplicationScoped +@IfBuildProperty(name = "kogito.events.grouping", stringValue = "true") +public class GroupingReactiveMessagingEventConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(GroupingReactiveMessagingEventConsumer.class); + + public static final String KOGITO_PROCESSINSTANCES_EVENTS = "kogito-processinstances-events"; + public static final String KOGITO_PROCESS_DEFINITIONS_EVENTS = "kogito-processdefinitions-events"; + public static final String KOGITO_USERTASKINSTANCES_EVENTS = "kogito-usertaskinstances-events"; + public static final String KOGITO_JOBS_EVENTS = "kogito-jobs-events"; + + @Inject + IndexingService indexingService; + + @Inject + Event> eventPublisher; + + @Incoming(KOGITO_PROCESSINSTANCES_EVENTS) + public Uni onProcessInstanceEvent(Collection> events) { + LOGGER.debug("Process instance consumer received ProcessInstanceDataEvents: \n{}", events); + for (ProcessInstanceDataEvent event : events) { + try { + indexingService.indexProcessInstanceEvent(event); + eventPublisher.fire(event); + } catch (Exception ex) { + LOGGER.error("Error processing process instance event: {}", event, ex); + } + + } + return Uni.createFrom().voidItem(); + } + + @Incoming(KOGITO_USERTASKINSTANCES_EVENTS) + public Uni onUserTaskInstanceEvent(Collection> events) { + LOGGER.debug("Task instance received UserTaskInstanceDataEvent \n{}", events); + for (UserTaskInstanceDataEvent event : events) { + try { + indexingService.indexUserTaskInstanceEvent(event); + eventPublisher.fire(event); + } catch (Exception ex) { + LOGGER.error("Error processing user task instance event: {}", event, ex); + } + + } + return Uni.createFrom().voidItem(); + } + + @Incoming(KOGITO_JOBS_EVENTS) + public Uni onJobEvent(KogitoJobCloudEvent event) { + LOGGER.debug("Job received KogitoJobCloudEvent \n{}", event); + return Uni.createFrom().item(event) + .onItem().invoke(e -> indexingService.indexJob(e.getData())) + .onFailure().invoke(t -> LOGGER.error("Error processing job KogitoJobCloudEvent: {}", t.getMessage(), t)) + .onItem().ignore().andContinueWithNull(); + } + + @Incoming(KOGITO_PROCESS_DEFINITIONS_EVENTS) + public Uni onProcessDefinitionDataEvent(Collection events) { + LOGGER.debug("Process Definition received ProcessDefinitionDataEvent \n{}", events); + for (ProcessDefinitionDataEvent event : events) { + try { + indexingService.indexProcessDefinition(event); + eventPublisher.fire(event); + } catch (Exception ex) { + LOGGER.error("Error processing process definition event: {}", event, ex); + } + + } + return Uni.createFrom().voidItem(); + } +} diff --git a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/ReactiveMessagingEventConsumer.java b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/ReactiveMessagingEventConsumer.java index 0b75e0f3b4..8a02adc357 100644 --- a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/ReactiveMessagingEventConsumer.java +++ b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/ReactiveMessagingEventConsumer.java @@ -37,6 +37,7 @@ @ApplicationScoped @UnlessBuildProperty(name = "kogito.data-index.blocking", stringValue = "true", enableIfMissing = true) +@UnlessBuildProperty(name = "kogito.events.grouping", stringValue = "true", enableIfMissing = true) public class ReactiveMessagingEventConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveMessagingEventConsumer.class);