Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rest easy reactive move, async service invocation #115

Merged
merged 2 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ public void create(String id, ProcessInstance instance) {
if (Files.exists(processInstanceStorage)) {
throw new ProcessInstanceDuplicatedException(id);
}
storeProcessInstance(processInstanceStorage, instance);

cachedInstances.remove(resolvedId);
cachedInstances.remove(id);

storeProcessInstance(processInstanceStorage, instance);
} else if (isPending(instance)) {
if (cachedInstances.putIfAbsent(resolvedId, instance) != null) {
throw new ProcessInstanceDuplicatedException(id);
Expand All @@ -217,6 +219,7 @@ public void create(String id, ProcessInstance instance) {
@Override
public void update(String id, ProcessInstance instance) {
String resolvedId = resolveId(id, instance);
cachedInstances.remove(resolvedId);
if (isActive(instance)) {

Path processInstanceStorage = Paths.get(storage.toString(), resolvedId);
Expand All @@ -225,7 +228,6 @@ public void update(String id, ProcessInstance instance) {
storeProcessInstance(processInstanceStorage, instance);
}
}
cachedInstances.remove(resolvedId);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public T doGetInstanceInError(String processId, String processInstanceId) {

public T doGetWorkItemsInProcessInstance(String processId, String processInstanceId) {

return executeOnInstance(processId, processInstanceId, processInstance -> {
return executeOnInstance(processId, processInstanceId, "active", processInstance -> {
// use special security policy to bypass auth check as this is management
// operation
List<WorkItem> workItems = processInstance.workItems(new SecurityPolicy(null) {
Expand Down Expand Up @@ -128,7 +128,7 @@ public T doSkipInstanceInErrorByErrorId(String processId, String processInstance

public T doTriggerNodeInstanceId(String processId, String processInstanceId, String nodeId) {

return executeOnInstance(processId, processInstanceId, processInstance -> {
return executeOnInstance(processId, processInstanceId, "active", processInstance -> {
processInstance.triggerNode(nodeId);

if (processInstance.status() == ProcessInstance.STATE_ERROR) {
Expand All @@ -142,7 +142,7 @@ public T doTriggerNodeInstanceId(String processId, String processInstanceId, Str

public T doRetriggerNodeInstanceId(String processId, String processInstanceId, String nodeInstanceId) {

return executeOnInstance(processId, processInstanceId, processInstance -> {
return executeOnInstance(processId, processInstanceId, "active", processInstance -> {
processInstance.retriggerNodeInstance(nodeInstanceId);

if (processInstance.status() == ProcessInstance.STATE_ERROR) {
Expand All @@ -156,7 +156,7 @@ public T doRetriggerNodeInstanceId(String processId, String processInstanceId, S

public T doCancelNodeInstanceId(String processId, String processInstanceId, String nodeInstanceId) {

return executeOnInstance(processId, processInstanceId, processInstance -> {
return executeOnInstance(processId, processInstanceId, "active", processInstance -> {
processInstance.cancelNodeInstance(nodeInstanceId);

if (processInstance.status() == ProcessInstance.STATE_ERROR) {
Expand All @@ -168,9 +168,9 @@ public T doCancelNodeInstanceId(String processId, String processInstanceId, Stri
});
}

public T doCancelProcessInstanceId(String processId, String processInstanceId) {
public T doCancelProcessInstanceId(String processId, String processInstanceId, String status) {

return executeOnInstance(processId, processInstanceId, processInstance -> {
return executeOnInstance(processId, processInstanceId, status, processInstance -> {
processInstance.abort();

if (processInstance.status() == ProcessInstance.STATE_ERROR) {
Expand Down Expand Up @@ -214,7 +214,8 @@ private T executeOnInstanceInError(String processId, String processInstanceId,
});
}

private T executeOnInstance(String processId, String processInstanceId, Function<ProcessInstance<?>, T> supplier) {
private T executeOnInstance(String processId, String processInstanceId, String status,
Function<ProcessInstance<?>, T> supplier) {
if (processId == null || processInstanceId == null) {
return badRequestResponse(PROCESS_AND_INSTANCE_REQUIRED);
}
Expand All @@ -225,7 +226,7 @@ private T executeOnInstance(String processId, String processInstanceId, Function
}
return UnitOfWorkExecutor.executeInUnitOfWork(application.unitOfWorkManager(), () -> {
Optional<? extends ProcessInstance<?>> processInstanceFound = process.instances()
.findById(processInstanceId);
.findById(processInstanceId, mapStatus(status), ProcessInstanceReadMode.MUTABLE);
if (processInstanceFound.isPresent()) {
ProcessInstance<?> processInstance = processInstanceFound.get();

Expand All @@ -241,4 +242,46 @@ private T executeOnInstance(String processId, String processInstanceId, Function
protected abstract T badRequestResponse(String message);

protected abstract T notFoundResponse(String message);

protected int mapStatus(String status) {
int state = 1;
switch (status.toLowerCase()) {
case "active":
state = 1;
break;
case "completed":
state = 2;
break;
case "aborted":
state = 3;
break;
case "error":
state = 5;
break;
default:
break;
}
return state;
}

protected String reverseMapStatus(int status) {
String state = "active";
switch (status) {
case 1:
state = "active";
break;
case 2:
state = "completed";
break;
case 3:
state = "aborted";
break;
case 5:
state = "error";
break;
default:
break;
}
return state;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ T retriggerNodeInstanceId(String processId, String processInstanceId, String nod

T cancelNodeInstanceId(String processId, String processInstanceId, String nodeInstanceId, String user, List<String> groups);

T cancelProcessInstanceId(String processId, String processInstanceId, String user, List<String> groups);
T cancelProcessInstanceId(String processId, String processInstanceId, String status, String user, List<String> groups);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@

package io.automatiko.engine.addons.process.management;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -21,13 +19,11 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;

import org.eclipse.microprofile.openapi.annotations.ExternalDocumentation;
Expand All @@ -39,6 +35,8 @@
import org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
import org.eclipse.microprofile.openapi.annotations.responses.APIResponses;
import org.eclipse.microprofile.openapi.annotations.tags.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.automatiko.engine.addons.process.management.export.ProcessInstanceExporter;
import io.automatiko.engine.addons.process.management.model.ErrorInfoDTO;
Expand Down Expand Up @@ -70,6 +68,8 @@
@Path("/management/processes")
public class ProcessInstanceManagementResource extends BaseProcessInstanceManagementResource<Response> {

private static final Logger LOGGER = LoggerFactory.getLogger(ProcessInstanceManagementResource.class);

private IdentitySupplier identitySupplier;
private ProcessInstanceExporter exporter;

Expand Down Expand Up @@ -403,8 +403,7 @@ public Response getInstanceImage(
if (process.version() != null) {
path = "/v" + process.version().replaceAll("\\.", "_") + "/" + path;
}
StreamingOutput entity = new ImageStreamingOutput(instance.get().image(path));
ResponseBuilder builder = Response.ok().entity(entity);
ResponseBuilder builder = Response.ok().entity(instance.get().image(path));
return builder.header("Content-Type", "image/svg+xml").build();
});
} finally {
Expand Down Expand Up @@ -581,24 +580,13 @@ public Response cancelNodeInstanceId(
public Response cancelProcessInstanceId(
@Parameter(description = "Unique identifier of the process", required = true) @PathParam("processId") String processId,
@Parameter(description = "Unique identifier of the instance", required = true) @PathParam("processInstanceId") String processInstanceId,
@Parameter(description = "Status of the process instance", required = false, schema = @Schema(enumeration = {
"active", "completed", "aborted",
"error" })) @QueryParam("status") @DefaultValue("active") final String status,
@Parameter(description = "User identifier as alternative autroization info", required = false, hidden = true) @QueryParam("user") final String user,
@Parameter(description = "Groups as alternative autroization info", required = false, hidden = true) @QueryParam("group") final List<String> groups) {
identitySupplier.buildIdentityProvider(user, groups);
return doCancelProcessInstanceId(processId, processInstanceId);
}

protected class ImageStreamingOutput implements StreamingOutput {

private String image;

public ImageStreamingOutput(String image) {
this.image = image;
}

@Override
public void write(OutputStream output) throws IOException, WebApplicationException {
output.write(image.getBytes(StandardCharsets.UTF_8));
}
return doCancelProcessInstanceId(processId, processInstanceId, status);
}

@APIResponses(value = {
Expand Down Expand Up @@ -637,7 +625,7 @@ public JsonExportedProcessInstance exportInstance(@Context UriInfo uriInfo,
});

if (abort) {
cancelProcessInstanceId(processId, instanceId, user, groups);
cancelProcessInstanceId(processId, instanceId, status, user, groups);
}

return exported;
Expand Down Expand Up @@ -722,71 +710,21 @@ public Response archiveInstance(@Context UriInfo uriInfo,
return pi.archive(new JsonArchiveBuilder());

});
StreamingOutput entity = new ZipStreamingOutput(archived);
ResponseBuilder builder = Response.ok().entity(entity);

if (abort) {
cancelProcessInstanceId(processId, instanceId, user, groups);
}

return builder.header("Content-Type", "application/zip").header("Content-Disposition",
"attachment; filename=" + archived.getId() + ".zip").build();
}

protected class ZipStreamingOutput implements StreamingOutput {

private ArchivedProcessInstance archived;

public ZipStreamingOutput(ArchivedProcessInstance archived) {
this.archived = archived;
}

@Override
public void write(OutputStream output) throws IOException, WebApplicationException {
try {
ByteArrayOutputStream output = new ByteArrayOutputStream();
archived.writeAsZip(output);
}
}
ResponseBuilder builder = Response.ok().entity(output.toByteArray());

protected int mapStatus(String status) {
int state = 1;
switch (status.toLowerCase()) {
case "active":
state = 1;
break;
case "completed":
state = 2;
break;
case "aborted":
state = 3;
break;
case "error":
state = 5;
break;
default:
break;
}
return state;
}
if (abort) {
cancelProcessInstanceId(processId, instanceId, "active", user, groups);
}

protected String reverseMapStatus(int status) {
String state = "active";
switch (status) {
case 1:
state = "active";
break;
case 2:
state = "completed";
break;
case 3:
state = "aborted";
break;
case 5:
state = "error";
break;
default:
break;
return builder.header("Content-Type", "application/zip").header("Content-Disposition",
"attachment; filename=" + archived.getId() + ".zip").build();
} catch (Exception e) {
LOGGER.error("Error generating process instance archive", e);
return Response.serverError().entity("Error generating process instance archive").build();
}
return state;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public ProcessDTO(String id, String version, String name, String description, St
this.id = id;
this.version = version == null ? "" : version;
this.name = name;
this.description = description;
this.description = description == null ? "" : description;
this.image = image;
this.activeInstances = activeInstances;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -72,8 +73,10 @@ void setUp() {
Map<String, Process<?>> processes = Mockito.mock(Map.class);
lenient().when(processes.get(anyString())).thenReturn(process);

when(process.instances()).thenReturn(instances);
lenient().when(process.instances()).thenReturn(instances);
lenient().when(instances.findById(anyString())).thenReturn(Optional.of(processInstance));
lenient().when(instances.findById(anyString(), anyInt(), any()))
.thenReturn(Optional.of(processInstance));
lenient().when(processInstance.errors()).thenReturn(Optional.of(errors));
lenient().when(processInstance.variables()).thenReturn(variables);
lenient().when(processInstance.id()).thenReturn(PROCESS_INSTANCE_ID);
Expand Down Expand Up @@ -139,7 +142,8 @@ public Object cancelNodeInstanceId(String processId, String processInstanceId, S
}

@Override
public Object cancelProcessInstanceId(String processId, String processInstanceId, String user, List groups) {
public Object cancelProcessInstanceId(String processId, String processInstanceId, String status, String user,
List groups) {
return null;
}

Expand Down Expand Up @@ -254,7 +258,7 @@ private ProcessErrors mockProcessInstanceStatusActiveOnError() {
@Test
void testDoCancelProcessInstanceId() {
mockProcessInstanceStatusActive().abort();
Object response = tested.doCancelProcessInstanceId(PROCESS_ID, PROCESS_INSTANCE_ID);
Object response = tested.doCancelProcessInstanceId(PROCESS_ID, PROCESS_INSTANCE_ID, "active");
verify(processInstance, times(0)).errors();
verify(processInstance, times(1)).abort();
assertResultOk(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ public void testCancelNodeInstanceId() {

@Test
public void testCancelProcessInstanceId() {
resource.cancelProcessInstanceId(PROCESS_ID, PROCESS_INSTANCE_ID, null, Collections.emptyList());
verify(resource).doCancelProcessInstanceId(PROCESS_ID, PROCESS_INSTANCE_ID);
resource.cancelProcessInstanceId(PROCESS_ID, PROCESS_INSTANCE_ID, "active", null, Collections.emptyList());
verify(resource).doCancelProcessInstanceId(PROCESS_ID, PROCESS_INSTANCE_ID, "active");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-qute</artifactId>
<artifactId>quarkus-resteasy-reactive-qute</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.openapi</groupId>
Expand Down
Loading