Skip to content

Commit

Permalink
[Fix apache#3721] Optimize event grouping
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Oct 18, 2024
1 parent 4d5b927 commit a7a8a63
Show file tree
Hide file tree
Showing 29 changed files with 1,209 additions and 39 deletions.
8 changes: 4 additions & 4 deletions api/kogito-events-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@
</dependency>

<!-- CloudEvents -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand All @@ -72,6 +68,10 @@
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-jackson-utils</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.event.process;

import org.kie.kogito.event.DataEvent;

public interface CloudEventVisitor {
void visit(DataEvent<?> cloudEvent);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.event.process;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.Date;

import org.kie.kogito.jackson.utils.ObjectMapperFactory;

import com.fasterxml.jackson.databind.JsonNode;

public class KogitoEventBodySerializationHelper {

private KogitoEventBodySerializationHelper() {
}

public static String readUTF(DataInput in) throws IOException {
byte value = in.readByte();
return value > 0 ? in.readUTF() : null;
}

public static void writeUTF(DataOutput out, String string) throws IOException {
if (string == null) {
out.writeByte(0);
} else {
out.writeByte(1);
out.writeUTF(string);
}

}

public static Date readDate(DataInput in) throws IOException {
byte value = in.readByte();
return value > 0 ? new Date(in.readLong()) : null;
}

public static void writeDate(DataOutput out, Date date) throws IOException {
if (date == null) {
out.writeByte(0);
} else {
out.writeByte(1);
out.writeLong(date.getTime());
}
}

public static void writeInt(DataOutput out, Integer integer) throws IOException {
out.writeInt(integer == null ? Integer.MIN_VALUE : integer.intValue());
}

public static Integer readInt(DataInput in) throws IOException {
int integer = in.readInt();
return integer == Integer.MIN_VALUE ? null : Integer.valueOf(integer);
}

public static void writeUTFCollection(DataOutput out, Collection<String> collection) throws IOException {
if (collection == null) {
out.writeShort(Short.MIN_VALUE);
} else {
out.writeShort(collection.size());
for (String item : collection) {
writeUTF(out, item);
}
}
}

public static <T extends Collection<String>> T readUTFCollection(DataInput in, T holder) throws IOException {
int size = in.readShort();
if (size == Short.MIN_VALUE) {
return null;
}
while (size-- > 0) {
holder.add(readUTF(in));
}
return holder;
}

public static void writeObject(DataOutput out, Object obj) throws IOException {
// TODO support java types directly (without using jackson)
if (obj == null) {
out.writeByte(0);
} else {
Class<?> type = obj.getClass();
if (JsonNode.class.isAssignableFrom(type)) {
out.writeByte(1);
} else {
out.writeByte(2);
}
byte[] bytes = ObjectMapperFactory.get().writeValueAsBytes(obj);
out.writeInt(bytes.length);
out.write(bytes);
}
}

public static Object readObject(DataInput in) throws IOException {
// TODO support java types directly (without using jackson)
byte helperByte = in.readByte();
if (helperByte == 0) {
return null;
} else {
Class<?> type;
if (helperByte == 1) {
type = JsonNode.class;
} else {
type = Object.class;
}
byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
return ObjectMapperFactory.get().readValue(bytes, type);
}
}

public static Date toDate(OffsetDateTime time) {
return time == null ? null : Date.from(time.toInstant());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.event.process;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public interface KogitoMarshallEventSupport {

void writeEvent(DataOutput out) throws IOException;

void readEvent(DataInput in) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@

package org.kie.kogito.event.process;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;

public class ProcessInstanceErrorEventBody {
import org.kie.kogito.event.DataEvent;

import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;

public class ProcessInstanceErrorEventBody implements KogitoMarshallEventSupport, CloudEventVisitor {

// common fields for events
private Date eventDate;
Expand Down Expand Up @@ -138,4 +145,26 @@ public ProcessInstanceErrorEventBody build() {
return instance;
}
}

@Override
public void readEvent(DataInput in) throws IOException {
nodeDefinitionId = in.readUTF();
nodeInstanceId = in.readUTF();
errorMessage = in.readUTF();
}

@Override
public void writeEvent(DataOutput out) throws IOException {
out.writeUTF(nodeDefinitionId);
out.writeUTF(nodeInstanceId);
out.writeUTF(errorMessage);
}

@Override
public void visit(DataEvent<?> dataEvent) {
this.processId = dataEvent.getKogitoProcessId();
this.processInstanceId = dataEvent.getKogitoProcessInstanceId();
this.processVersion = dataEvent.getKogitoProcessInstanceVersion();
this.eventDate = toDate(dataEvent.getTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@
*/
package org.kie.kogito.event.process;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class ProcessInstanceNodeEventBody {
import org.kie.kogito.event.DataEvent;

import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;

public class ProcessInstanceNodeEventBody implements KogitoMarshallEventSupport, CloudEventVisitor {

public static final int EVENT_TYPE_ENTER = 1;

Expand Down Expand Up @@ -71,7 +78,39 @@ public class ProcessInstanceNodeEventBody {

private Map<String, Object> data;

private ProcessInstanceNodeEventBody() {
@Override
public void writeEvent(DataOutput out) throws IOException {
writeUTF(out, connectionNodeDefinitionId);
out.writeUTF(nodeDefinitionId);
writeUTF(out, nodeName);
out.writeUTF(nodeType);
out.writeUTF(nodeInstanceId);
writeUTF(out, workItemId);
writeDate(out, slaDueDate);
writeObject(out, data);
}

@Override
public void readEvent(DataInput in) throws IOException {
connectionNodeDefinitionId = readUTF(in);
nodeDefinitionId = in.readUTF();
nodeName = readUTF(in);
nodeType = in.readUTF();
nodeInstanceId = in.readUTF();
workItemId = readUTF(in);
slaDueDate = readDate(in);
data = (Map<String, Object>) readObject(in);
}

@Override
public void visit(DataEvent<?> dataEvent) {
this.processId = dataEvent.getKogitoProcessId();
this.processInstanceId = dataEvent.getKogitoProcessInstanceId();
this.processVersion = dataEvent.getKogitoProcessInstanceVersion();
this.eventDate = toDate(dataEvent.getTime());
}

public ProcessInstanceNodeEventBody() {
this.data = new HashMap<>();
}

Expand Down Expand Up @@ -246,5 +285,4 @@ public ProcessInstanceNodeEventBody build() {
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,20 @@
*/
package org.kie.kogito.event.process;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;

public class ProcessInstanceSLAEventBody {
import org.kie.kogito.event.DataEvent;

import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readDate;
import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readUTF;
import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.toDate;
import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeDate;
import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeUTF;

public class ProcessInstanceSLAEventBody implements KogitoMarshallEventSupport, CloudEventVisitor {

// common fields for events
private Date eventDate;
Expand All @@ -47,6 +58,33 @@ public class ProcessInstanceSLAEventBody {

private Date slaDueDate;

@Override
public void writeEvent(DataOutput out) throws IOException {
out.writeUTF(nodeDefinitionId);
writeUTF(out, nodeName);
out.writeUTF(nodeType);
out.writeUTF(nodeInstanceId);
writeDate(out, slaDueDate);

}

@Override
public void readEvent(DataInput in) throws IOException {
nodeDefinitionId = in.readUTF();
nodeName = readUTF(in);
nodeType = in.readUTF();
nodeInstanceId = in.readUTF();
slaDueDate = readDate(in);
}

@Override
public void visit(DataEvent<?> dataEvent) {
this.processId = dataEvent.getKogitoProcessId();
this.processInstanceId = dataEvent.getKogitoProcessInstanceId();
this.processVersion = dataEvent.getKogitoProcessInstanceVersion();
this.eventDate = toDate(dataEvent.getTime());
}

public Date getSlaDueDate() {
return slaDueDate;
}
Expand Down
Loading

0 comments on commit a7a8a63

Please sign in to comment.