diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java index 2fcdd704196..065111c8519 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java @@ -177,6 +177,15 @@ public abstract class AbstractDataEvent implements DataEvent { protected AbstractDataEvent() { } + protected AbstractDataEvent(String type, URI source, T body) { + this.specVersion = SpecVersion.parse(SPEC_VERSION); + this.id = UUID.randomUUID().toString(); + this.source = source; + this.type = type; + this.time = ZonedDateTime.now().toOffsetDateTime(); + this.data = body; + } + protected AbstractDataEvent(String type, String source, T body, @@ -201,12 +210,7 @@ protected AbstractDataEvent(String type, String subject, String dataContentType, String dataSchema) { - this.specVersion = SpecVersion.parse(SPEC_VERSION); - this.id = UUID.randomUUID().toString(); - this.source = Optional.ofNullable(source).map(URI::create).orElse(null); - this.type = type; - this.time = ZonedDateTime.now().toOffsetDateTime(); - this.data = body; + this(type, Optional.ofNullable(source).map(URI::create).orElse(null), body); setKogitoProcessInstanceId(kogitoProcessInstanceId); setKogitoRootProcessInstanceId(kogitoRootProcessInstanceId); setKogitoProcessId(kogitoProcessId); diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java new file mode 100644 index 00000000000..ac865d5dc6a --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.process; + +import java.net.URI; +import java.util.Collection; + +public class MultipleProcessInstanceDataEvent extends ProcessInstanceDataEvent>> { + + public MultipleProcessInstanceDataEvent(URI source, Collection> body) { + super("MultipleProcessInstanceDataEvent", source, body); + } +} diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceDataEvent.java index 8069df7ee39..31131563dcc 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceDataEvent.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceDataEvent.java @@ -18,6 +18,8 @@ */ package org.kie.kogito.event.process; +import java.net.URI; + import org.kie.kogito.event.AbstractDataEvent; public class ProcessInstanceDataEvent extends AbstractDataEvent { @@ -29,6 +31,10 @@ public ProcessInstanceDataEvent(T body) { setData(body); } + protected ProcessInstanceDataEvent(String type, URI source, T body) { + super(type, source, body); + } + public ProcessInstanceDataEvent(String type, String source, T body, diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/MultipleUserTaskInstanceDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/MultipleUserTaskInstanceDataEvent.java new file mode 100644 index 00000000000..12620dd2cb9 --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/MultipleUserTaskInstanceDataEvent.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.usertask; + +import java.net.URI; +import java.util.Collection; + +public class MultipleUserTaskInstanceDataEvent extends UserTaskInstanceDataEvent>> { + + public MultipleUserTaskInstanceDataEvent(URI source, Collection> body) { + super("MultipleUserTaskInstanceDataEvent", source, body); + } +} diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/UserTaskInstanceDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/UserTaskInstanceDataEvent.java index 98fc6528094..c4b3e0af5c9 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/UserTaskInstanceDataEvent.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/UserTaskInstanceDataEvent.java @@ -18,6 +18,7 @@ */ package org.kie.kogito.event.usertask; +import java.net.URI; import java.util.Set; import org.kie.kogito.event.AbstractDataEvent; @@ -48,6 +49,10 @@ public UserTaskInstanceDataEvent(T body) { setData(body); } + protected UserTaskInstanceDataEvent(String type, URI source, T body) { + super(type, source, body); + } + public UserTaskInstanceDataEvent(String type, String source, T body, diff --git a/kogito-build/kogito-dependencies-bom/pom.xml b/kogito-build/kogito-dependencies-bom/pom.xml index e0a87af2f99..ff3b3003f30 100644 --- a/kogito-build/kogito-dependencies-bom/pom.xml +++ b/kogito-build/kogito-dependencies-bom/pom.xml @@ -52,7 +52,7 @@ 2.0.2 2.4.1 0.3.0 - 2.2.0 + 3.2.1 0.2.3 1.5.2 3.25.8 diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/AbstractMessagingEventPublisher.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/AbstractMessagingEventPublisher.java index b3889fd088f..f8092c83575 100644 --- a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/AbstractMessagingEventPublisher.java +++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/AbstractMessagingEventPublisher.java @@ -86,6 +86,9 @@ public void init() { } protected Optional getConsumer(DataEvent event) { + if (event == null) { + return Optional.empty(); + } switch (event.getType()) { case "ProcessDefinitionEvent": return eventsRuntimeConfig.isProcessDefinitionEventsEnabled() ? Optional.of(processDefinitionConsumer) : Optional.empty(); diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java index 61dbd523711..ee1f284b3a8 100644 --- a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java +++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java @@ -18,6 +18,7 @@ */ package org.kie.kogito.events.process; +import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -26,6 +27,10 @@ import java.util.Map.Entry; import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; +import org.kie.kogito.event.process.ProcessInstanceDataEvent; +import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent; import io.quarkus.arc.properties.IfBuildProperty; @@ -40,18 +45,28 @@ public void publish(DataEvent event) { publish(Collections.singletonList(event)); } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void publish(Collection> events) { - Map>> eventsByChannel = new HashMap<>(); + Map eventsByChannel = new HashMap<>(); for (DataEvent event : events) { - if (event == null) { - continue; - } getConsumer(event).ifPresent(c -> eventsByChannel.computeIfAbsent(c, k -> new ArrayList<>()).add(event)); } - for (Entry>> item : eventsByChannel.entrySet()) { - publishToTopic(item.getKey(), item.getValue()); - } + eventsByChannel.entrySet().forEach(this::publishEvents); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void publishEvents(Map.Entry entry) { + DataEvent firstEvent = (DataEvent) entry.getValue().iterator().next(); + URI source = firstEvent.getSource(); + if (firstEvent instanceof UserTaskInstanceDataEvent) { + publishToTopic(entry.getKey(), new MultipleUserTaskInstanceDataEvent(source, (Collection>) entry.getValue())); + } else if (firstEvent instanceof ProcessInstanceDataEvent) { + publishToTopic(entry.getKey(), new MultipleProcessInstanceDataEvent(source, (Collection>) entry.getValue())); + } else { + for (DataEvent event : (Collection>) entry.getValue()) { + publishToTopic(entry.getKey(), event); + } + } + } } diff --git a/quarkus/addons/events/process/runtime/src/test/java/org/kie/kogito/events/process/GroupingMessagingEventPublisherTest.java b/quarkus/addons/events/process/runtime/src/test/java/org/kie/kogito/events/process/GroupingMessagingEventPublisherTest.java index dc77c171978..0d784d3a53b 100644 --- a/quarkus/addons/events/process/runtime/src/test/java/org/kie/kogito/events/process/GroupingMessagingEventPublisherTest.java +++ b/quarkus/addons/events/process/runtime/src/test/java/org/kie/kogito/events/process/GroupingMessagingEventPublisherTest.java @@ -27,6 +27,10 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider; import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; +import org.kie.kogito.event.process.ProcessInstanceDataEvent; +import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent; import org.kie.kogito.events.config.EventsRuntimeConfig; import org.kie.kogito.events.process.AbstractMessagingEventPublisher.AbstractMessageEmitter; import org.mockito.ArgumentCaptor; @@ -149,10 +153,10 @@ public void testReactiveMessagingEventPublisher_publish() throws Exception { @Test public void testPublishGroupingByChannel() { // Create mock events - DataEvent processInstanceEvent = mock(DataEvent.class); + DataEvent processInstanceEvent = mock(ProcessInstanceDataEvent.class); when(processInstanceEvent.getType()).thenReturn("ProcessInstanceStateDataEvent"); - DataEvent userTaskEvent = mock(DataEvent.class); + DataEvent userTaskEvent = mock(UserTaskInstanceDataEvent.class); when(userTaskEvent.getType()).thenReturn("UserTaskInstanceStateDataEvent"); // Mock getConsumer() to return different emitters based on event type @@ -169,17 +173,17 @@ public void testPublishGroupingByChannel() { groupingMessagingEventPublisher.publish(events); // Capture and verify that the correct emitter was used for each event - verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), anyCollection()); - verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), anyCollection()); + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class)); + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), any(MultipleUserTaskInstanceDataEvent.class)); } @Test public void testPublishMultipleEventsGroupedByChannel() { // Create multiple events of different types - DataEvent processInstanceEvent1 = mock(DataEvent.class); - DataEvent processInstanceEvent2 = mock(DataEvent.class); - DataEvent userTaskEvent1 = mock(DataEvent.class); - DataEvent userTaskEvent2 = mock(DataEvent.class); + DataEvent processInstanceEvent1 = mock(ProcessInstanceDataEvent.class); + DataEvent processInstanceEvent2 = mock(ProcessInstanceDataEvent.class); + DataEvent userTaskEvent1 = mock(UserTaskInstanceDataEvent.class); + DataEvent userTaskEvent2 = mock(UserTaskInstanceDataEvent.class); when(processInstanceEvent1.getType()).thenReturn("ProcessInstanceStateDataEvent"); when(processInstanceEvent2.getType()).thenReturn("ProcessInstanceStateDataEvent"); @@ -202,19 +206,21 @@ public void testPublishMultipleEventsGroupedByChannel() { groupingMessagingEventPublisher.publish(events); // Verify that two grouped publishToTopic calls are made: one for processInstanceConsumer, one for userTaskConsumer - verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), anyCollection()); - verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), anyCollection()); + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class)); + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), any(MultipleUserTaskInstanceDataEvent.class)); // Verify that the right number of events was grouped and passed to each emitter - ArgumentCaptor>> captor = ArgumentCaptor.forClass(Collection.class); + ArgumentCaptor captorPI = ArgumentCaptor.forClass(MultipleProcessInstanceDataEvent.class); - verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), captor.capture()); - Collection> groupedProcessInstanceEvents = captor.getValue(); - assertEquals(2, groupedProcessInstanceEvents.size()); // both processInstanceEvents are grouped + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), captorPI.capture()); + MultipleProcessInstanceDataEvent groupedProcessInstanceEvents = captorPI.getValue(); + assertEquals(2, groupedProcessInstanceEvents.getData().size()); // both processInstanceEvents are grouped - verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), captor.capture()); - Collection> groupedUserTaskEvents = captor.getValue(); - assertEquals(2, groupedUserTaskEvents.size()); // both userTaskEvents are grouped + ArgumentCaptor captorUT = ArgumentCaptor.forClass(MultipleUserTaskInstanceDataEvent.class); + + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), captorUT.capture()); + MultipleUserTaskInstanceDataEvent groupedUserTaskEvents = captorUT.getValue(); + assertEquals(2, groupedUserTaskEvents.getData().size()); // both userTaskEvents are grouped } @Test @@ -310,7 +316,7 @@ public void testEventsDisabledInConfig() { @Test public void testNullEventInCollection() { - DataEvent validEvent = mock(DataEvent.class); + DataEvent validEvent = mock(ProcessInstanceDataEvent.class); when(validEvent.getType()).thenReturn("ProcessInstanceStateDataEvent"); Collection> events = Arrays.asList(validEvent, null); // One valid event and one null event @@ -322,7 +328,7 @@ public void testNullEventInCollection() { groupingMessagingEventPublisher.publish(events); // Verify the valid event is processed - verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), anyCollection()); + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class)); } @Test @@ -361,7 +367,7 @@ public void testPublishToTopicWithDecorator() throws Exception { @Test public void testPublishWithMultipleEventTypesSomeWithoutConsumers() { - DataEvent processInstanceEvent = mock(DataEvent.class); + DataEvent processInstanceEvent = mock(ProcessInstanceDataEvent.class); when(processInstanceEvent.getType()).thenReturn("ProcessInstanceStateDataEvent"); DataEvent unsupportedEvent = mock(DataEvent.class); @@ -375,7 +381,7 @@ public void testPublishWithMultipleEventTypesSomeWithoutConsumers() { groupingMessagingEventPublisher.publish(events); // Ensure that only the supported event was published - verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), anyCollection()); + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class)); verify(groupingMessagingEventPublisher, never()).publishToTopic(any(), eq(Collections.singletonList(unsupportedEvent))); }