Skip to content

Commit

Permalink
added support for service task invocation of async services via Mutin…
Browse files Browse the repository at this point in the history
…y and completion stage return types
  • Loading branch information
mswiderski committed Sep 15, 2021
1 parent 25d593e commit a35de2e
Show file tree
Hide file tree
Showing 70 changed files with 3,029 additions and 1,508 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ public void create(String id, ProcessInstance instance) {
if (Files.exists(processInstanceStorage)) {
throw new ProcessInstanceDuplicatedException(id);
}
storeProcessInstance(processInstanceStorage, instance);

cachedInstances.remove(resolvedId);
cachedInstances.remove(id);

storeProcessInstance(processInstanceStorage, instance);
} else if (isPending(instance)) {
if (cachedInstances.putIfAbsent(resolvedId, instance) != null) {
throw new ProcessInstanceDuplicatedException(id);
Expand All @@ -217,6 +219,7 @@ public void create(String id, ProcessInstance instance) {
@Override
public void update(String id, ProcessInstance instance) {
String resolvedId = resolveId(id, instance);
cachedInstances.remove(resolvedId);
if (isActive(instance)) {

Path processInstanceStorage = Paths.get(storage.toString(), resolvedId);
Expand All @@ -225,7 +228,6 @@ public void update(String id, ProcessInstance instance) {
storeProcessInstance(processInstanceStorage, instance);
}
}
cachedInstances.remove(resolvedId);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public T doGetInstanceInError(String processId, String processInstanceId) {

public T doGetWorkItemsInProcessInstance(String processId, String processInstanceId) {

return executeOnInstance(processId, processInstanceId, processInstance -> {
return executeOnInstance(processId, processInstanceId, "active", processInstance -> {
// use special security policy to bypass auth check as this is management
// operation
List<WorkItem> workItems = processInstance.workItems(new SecurityPolicy(null) {
Expand Down Expand Up @@ -128,7 +128,7 @@ public T doSkipInstanceInErrorByErrorId(String processId, String processInstance

public T doTriggerNodeInstanceId(String processId, String processInstanceId, String nodeId) {

return executeOnInstance(processId, processInstanceId, processInstance -> {
return executeOnInstance(processId, processInstanceId, "active", processInstance -> {
processInstance.triggerNode(nodeId);

if (processInstance.status() == ProcessInstance.STATE_ERROR) {
Expand All @@ -142,7 +142,7 @@ public T doTriggerNodeInstanceId(String processId, String processInstanceId, Str

public T doRetriggerNodeInstanceId(String processId, String processInstanceId, String nodeInstanceId) {

return executeOnInstance(processId, processInstanceId, processInstance -> {
return executeOnInstance(processId, processInstanceId, "active", processInstance -> {
processInstance.retriggerNodeInstance(nodeInstanceId);

if (processInstance.status() == ProcessInstance.STATE_ERROR) {
Expand All @@ -156,7 +156,7 @@ public T doRetriggerNodeInstanceId(String processId, String processInstanceId, S

public T doCancelNodeInstanceId(String processId, String processInstanceId, String nodeInstanceId) {

return executeOnInstance(processId, processInstanceId, processInstance -> {
return executeOnInstance(processId, processInstanceId, "active", processInstance -> {
processInstance.cancelNodeInstance(nodeInstanceId);

if (processInstance.status() == ProcessInstance.STATE_ERROR) {
Expand All @@ -168,9 +168,9 @@ public T doCancelNodeInstanceId(String processId, String processInstanceId, Stri
});
}

public T doCancelProcessInstanceId(String processId, String processInstanceId) {
public T doCancelProcessInstanceId(String processId, String processInstanceId, String status) {

return executeOnInstance(processId, processInstanceId, processInstance -> {
return executeOnInstance(processId, processInstanceId, status, processInstance -> {
processInstance.abort();

if (processInstance.status() == ProcessInstance.STATE_ERROR) {
Expand Down Expand Up @@ -214,7 +214,8 @@ private T executeOnInstanceInError(String processId, String processInstanceId,
});
}

private T executeOnInstance(String processId, String processInstanceId, Function<ProcessInstance<?>, T> supplier) {
private T executeOnInstance(String processId, String processInstanceId, String status,
Function<ProcessInstance<?>, T> supplier) {
if (processId == null || processInstanceId == null) {
return badRequestResponse(PROCESS_AND_INSTANCE_REQUIRED);
}
Expand All @@ -225,7 +226,7 @@ private T executeOnInstance(String processId, String processInstanceId, Function
}
return UnitOfWorkExecutor.executeInUnitOfWork(application.unitOfWorkManager(), () -> {
Optional<? extends ProcessInstance<?>> processInstanceFound = process.instances()
.findById(processInstanceId);
.findById(processInstanceId, mapStatus(status), ProcessInstanceReadMode.MUTABLE);
if (processInstanceFound.isPresent()) {
ProcessInstance<?> processInstance = processInstanceFound.get();

Expand All @@ -241,4 +242,46 @@ private T executeOnInstance(String processId, String processInstanceId, Function
protected abstract T badRequestResponse(String message);

protected abstract T notFoundResponse(String message);

protected int mapStatus(String status) {
int state = 1;
switch (status.toLowerCase()) {
case "active":
state = 1;
break;
case "completed":
state = 2;
break;
case "aborted":
state = 3;
break;
case "error":
state = 5;
break;
default:
break;
}
return state;
}

protected String reverseMapStatus(int status) {
String state = "active";
switch (status) {
case 1:
state = "active";
break;
case 2:
state = "completed";
break;
case 3:
state = "aborted";
break;
case 5:
state = "error";
break;
default:
break;
}
return state;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ T retriggerNodeInstanceId(String processId, String processInstanceId, String nod

T cancelNodeInstanceId(String processId, String processInstanceId, String nodeInstanceId, String user, List<String> groups);

T cancelProcessInstanceId(String processId, String processInstanceId, String user, List<String> groups);
T cancelProcessInstanceId(String processId, String processInstanceId, String status, String user, List<String> groups);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@

package io.automatiko.engine.addons.process.management;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -21,13 +19,11 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;

import org.eclipse.microprofile.openapi.annotations.ExternalDocumentation;
Expand All @@ -39,6 +35,8 @@
import org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
import org.eclipse.microprofile.openapi.annotations.responses.APIResponses;
import org.eclipse.microprofile.openapi.annotations.tags.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.automatiko.engine.addons.process.management.export.ProcessInstanceExporter;
import io.automatiko.engine.addons.process.management.model.ErrorInfoDTO;
Expand Down Expand Up @@ -70,6 +68,8 @@
@Path("/management/processes")
public class ProcessInstanceManagementResource extends BaseProcessInstanceManagementResource<Response> {

private static final Logger LOGGER = LoggerFactory.getLogger(ProcessInstanceManagementResource.class);

private IdentitySupplier identitySupplier;
private ProcessInstanceExporter exporter;

Expand Down Expand Up @@ -403,8 +403,7 @@ public Response getInstanceImage(
if (process.version() != null) {
path = "/v" + process.version().replaceAll("\\.", "_") + "/" + path;
}
StreamingOutput entity = new ImageStreamingOutput(instance.get().image(path));
ResponseBuilder builder = Response.ok().entity(entity);
ResponseBuilder builder = Response.ok().entity(instance.get().image(path));
return builder.header("Content-Type", "image/svg+xml").build();
});
} finally {
Expand Down Expand Up @@ -581,24 +580,13 @@ public Response cancelNodeInstanceId(
public Response cancelProcessInstanceId(
@Parameter(description = "Unique identifier of the process", required = true) @PathParam("processId") String processId,
@Parameter(description = "Unique identifier of the instance", required = true) @PathParam("processInstanceId") String processInstanceId,
@Parameter(description = "Status of the process instance", required = false, schema = @Schema(enumeration = {
"active", "completed", "aborted",
"error" })) @QueryParam("status") @DefaultValue("active") final String status,
@Parameter(description = "User identifier as alternative autroization info", required = false, hidden = true) @QueryParam("user") final String user,
@Parameter(description = "Groups as alternative autroization info", required = false, hidden = true) @QueryParam("group") final List<String> groups) {
identitySupplier.buildIdentityProvider(user, groups);
return doCancelProcessInstanceId(processId, processInstanceId);
}

protected class ImageStreamingOutput implements StreamingOutput {

private String image;

public ImageStreamingOutput(String image) {
this.image = image;
}

@Override
public void write(OutputStream output) throws IOException, WebApplicationException {
output.write(image.getBytes(StandardCharsets.UTF_8));
}
return doCancelProcessInstanceId(processId, processInstanceId, status);
}

@APIResponses(value = {
Expand Down Expand Up @@ -637,7 +625,7 @@ public JsonExportedProcessInstance exportInstance(@Context UriInfo uriInfo,
});

if (abort) {
cancelProcessInstanceId(processId, instanceId, user, groups);
cancelProcessInstanceId(processId, instanceId, status, user, groups);
}

return exported;
Expand Down Expand Up @@ -722,71 +710,21 @@ public Response archiveInstance(@Context UriInfo uriInfo,
return pi.archive(new JsonArchiveBuilder());

});
StreamingOutput entity = new ZipStreamingOutput(archived);
ResponseBuilder builder = Response.ok().entity(entity);

if (abort) {
cancelProcessInstanceId(processId, instanceId, user, groups);
}

return builder.header("Content-Type", "application/zip").header("Content-Disposition",
"attachment; filename=" + archived.getId() + ".zip").build();
}

protected class ZipStreamingOutput implements StreamingOutput {

private ArchivedProcessInstance archived;

public ZipStreamingOutput(ArchivedProcessInstance archived) {
this.archived = archived;
}

@Override
public void write(OutputStream output) throws IOException, WebApplicationException {
try {
ByteArrayOutputStream output = new ByteArrayOutputStream();
archived.writeAsZip(output);
}
}
ResponseBuilder builder = Response.ok().entity(output.toByteArray());

protected int mapStatus(String status) {
int state = 1;
switch (status.toLowerCase()) {
case "active":
state = 1;
break;
case "completed":
state = 2;
break;
case "aborted":
state = 3;
break;
case "error":
state = 5;
break;
default:
break;
}
return state;
}
if (abort) {
cancelProcessInstanceId(processId, instanceId, "active", user, groups);
}

protected String reverseMapStatus(int status) {
String state = "active";
switch (status) {
case 1:
state = "active";
break;
case 2:
state = "completed";
break;
case 3:
state = "aborted";
break;
case 5:
state = "error";
break;
default:
break;
return builder.header("Content-Type", "application/zip").header("Content-Disposition",
"attachment; filename=" + archived.getId() + ".zip").build();
} catch (Exception e) {
LOGGER.error("Error generating process instance archive", e);
return Response.serverError().entity("Error generating process instance archive").build();
}
return state;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public ProcessDTO(String id, String version, String name, String description, St
this.id = id;
this.version = version == null ? "" : version;
this.name = name;
this.description = description;
this.description = description == null ? "" : description;
this.image = image;
this.activeInstances = activeInstances;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -72,8 +73,10 @@ void setUp() {
Map<String, Process<?>> processes = Mockito.mock(Map.class);
lenient().when(processes.get(anyString())).thenReturn(process);

when(process.instances()).thenReturn(instances);
lenient().when(process.instances()).thenReturn(instances);
lenient().when(instances.findById(anyString())).thenReturn(Optional.of(processInstance));
lenient().when(instances.findById(anyString(), anyInt(), any()))
.thenReturn(Optional.of(processInstance));
lenient().when(processInstance.errors()).thenReturn(Optional.of(errors));
lenient().when(processInstance.variables()).thenReturn(variables);
lenient().when(processInstance.id()).thenReturn(PROCESS_INSTANCE_ID);
Expand Down Expand Up @@ -139,7 +142,8 @@ public Object cancelNodeInstanceId(String processId, String processInstanceId, S
}

@Override
public Object cancelProcessInstanceId(String processId, String processInstanceId, String user, List groups) {
public Object cancelProcessInstanceId(String processId, String processInstanceId, String status, String user,
List groups) {
return null;
}

Expand Down Expand Up @@ -254,7 +258,7 @@ private ProcessErrors mockProcessInstanceStatusActiveOnError() {
@Test
void testDoCancelProcessInstanceId() {
mockProcessInstanceStatusActive().abort();
Object response = tested.doCancelProcessInstanceId(PROCESS_ID, PROCESS_INSTANCE_ID);
Object response = tested.doCancelProcessInstanceId(PROCESS_ID, PROCESS_INSTANCE_ID, "active");
verify(processInstance, times(0)).errors();
verify(processInstance, times(1)).abort();
assertResultOk(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ public void testCancelNodeInstanceId() {

@Test
public void testCancelProcessInstanceId() {
resource.cancelProcessInstanceId(PROCESS_ID, PROCESS_INSTANCE_ID, null, Collections.emptyList());
verify(resource).doCancelProcessInstanceId(PROCESS_ID, PROCESS_INSTANCE_ID);
resource.cancelProcessInstanceId(PROCESS_ID, PROCESS_INSTANCE_ID, "active", null, Collections.emptyList());
verify(resource).doCancelProcessInstanceId(PROCESS_ID, PROCESS_INSTANCE_ID, "active");
}

@Test
Expand Down
Loading

0 comments on commit a35de2e

Please sign in to comment.