Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improved handling of ProcessInstanceManager to avoid shared process i… #201

Merged
merged 1 commit into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

public class LightSignalManager implements SignalManager {

private final EventListenerResolver instanceResolver;
private EventListenerResolver instanceResolver;
private SignalManagerHub signalManagerHub;
private ConcurrentHashMap<String, Set<EventListener>> listeners = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -85,4 +85,8 @@ protected String version(ProcessInstance pi) {
}
return "";
}

public void setInstanceResolver(EventListenerResolver instanceResolver) {
this.instanceResolver = instanceResolver;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;

import io.automatiko.engine.api.Model;
import io.automatiko.engine.api.auth.AccessPolicy;
Expand All @@ -31,6 +32,8 @@
import io.automatiko.engine.api.workflow.ProcessInstancesFactory;
import io.automatiko.engine.api.workflow.Signal;
import io.automatiko.engine.api.workflow.workitem.WorkItemExecutionError;
import io.automatiko.engine.services.signal.EventListenerResolver;
import io.automatiko.engine.services.signal.LightSignalManager;
import io.automatiko.engine.workflow.auth.AccessPolicyFactory;
import io.automatiko.engine.workflow.auth.AllowAllAccessPolicy;
import io.automatiko.engine.workflow.base.core.timer.CronExpirationTime;
Expand Down Expand Up @@ -144,6 +147,9 @@ public <S> void send(Signal<S> signal) {
}

public Process<T> configure() {
if (this.services.getSignalManager() instanceof LightSignalManager) {
((LightSignalManager) this.services.getSignalManager()).setInstanceResolver(new ProcessEventListenerResolver());
}
this.accessPolicy = AccessPolicyFactory.newPolicy((String) process().getMetaData().get("accessPolicy"));
registerListeners();
if (isProcessFactorySet()) {
Expand Down Expand Up @@ -323,4 +329,15 @@ public String[] getEventTypes() {
return new String[0];
}
}

private class ProcessEventListenerResolver implements EventListenerResolver {

@Override
public Optional<EventListener> find(String id) {
Optional.ofNullable(instances().findById(id)
.map(pi -> ((AbstractProcessInstance<?>) pi).internalGetProcessInstance()).orElse(null));
return null;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,71 +14,63 @@
import io.automatiko.engine.api.workflow.signal.SignalManagerHub;
import io.automatiko.engine.services.signal.LightSignalManager;
import io.automatiko.engine.workflow.base.core.event.ProcessEventSupport;
import io.automatiko.engine.workflow.base.instance.impl.DefaultProcessInstanceManager;

public class AbstractProcessRuntimeServiceProvider implements ProcessRuntimeServiceProvider {

private final JobsService jobsService;
private final ProcessInstanceManager processInstanceManager;
private final SignalManager signalManager;
private final WorkItemManager workItemManager;
private final ProcessEventSupport eventSupport;
private final UnitOfWorkManager unitOfWorkManager;
private final VariableInitializer variableInitializer;
private final JobsService jobsService;
private final SignalManager signalManager;
private final WorkItemManager workItemManager;
private final ProcessEventSupport eventSupport;
private final UnitOfWorkManager unitOfWorkManager;
private final VariableInitializer variableInitializer;

public AbstractProcessRuntimeServiceProvider(JobsService jobsService, WorkItemHandlerConfig workItemHandlerProvider,
ProcessEventListenerConfig processEventListenerProvider, SignalManagerHub compositeSignalManager,
UnitOfWorkManager unitOfWorkManager, VariableInitializer variableInitializer) {
this.unitOfWorkManager = unitOfWorkManager;
this.variableInitializer = variableInitializer;
processInstanceManager = new DefaultProcessInstanceManager();
signalManager = new LightSignalManager(id -> Optional.ofNullable(processInstanceManager.getProcessInstance(id)),
compositeSignalManager);
this.eventSupport = new ProcessEventSupport(this.unitOfWorkManager);
this.jobsService = jobsService;
this.workItemManager = new LightWorkItemManager(null, processInstanceManager, signalManager, eventSupport);
public AbstractProcessRuntimeServiceProvider(JobsService jobsService, WorkItemHandlerConfig workItemHandlerProvider,
ProcessEventListenerConfig processEventListenerProvider, SignalManagerHub compositeSignalManager,
UnitOfWorkManager unitOfWorkManager, VariableInitializer variableInitializer) {
this.unitOfWorkManager = unitOfWorkManager;
this.variableInitializer = variableInitializer;
signalManager = new LightSignalManager(id -> Optional.empty(),
compositeSignalManager);
this.eventSupport = new ProcessEventSupport(this.unitOfWorkManager);
this.jobsService = jobsService;
this.workItemManager = new LightWorkItemManager(null, signalManager, eventSupport);

for (String workItem : workItemHandlerProvider.names()) {
workItemManager.registerWorkItemHandler(workItem, workItemHandlerProvider.forName(workItem));
}
for (String workItem : workItemHandlerProvider.names()) {
workItemManager.registerWorkItemHandler(workItem, workItemHandlerProvider.forName(workItem));
}

for (ProcessEventListener listener : processEventListenerProvider.listeners()) {
this.eventSupport.addEventListener(listener);
}
}
for (ProcessEventListener listener : processEventListenerProvider.listeners()) {
this.eventSupport.addEventListener(listener);
}
}

@Override
public JobsService getJobsService() {
return jobsService;
}
@Override
public JobsService getJobsService() {
return jobsService;
}

@Override
public ProcessInstanceManager getProcessInstanceManager() {
return processInstanceManager;
}
@Override
public SignalManager getSignalManager() {
return signalManager;
}

@Override
public SignalManager getSignalManager() {
return signalManager;
}
@Override
public WorkItemManager getWorkItemManager() {
return workItemManager;
}

@Override
public WorkItemManager getWorkItemManager() {
return workItemManager;
}
@Override
public ProcessEventSupport getEventSupport() {
return eventSupport;
}

@Override
public ProcessEventSupport getEventSupport() {
return eventSupport;
}
@Override
public UnitOfWorkManager getUnitOfWorkManager() {
return unitOfWorkManager;
}

@Override
public UnitOfWorkManager getUnitOfWorkManager() {
return unitOfWorkManager;
}

@Override
public VariableInitializer getVariableInitializer() {
return variableInitializer;
}
@Override
public VariableInitializer getVariableInitializer() {
return variableInitializer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.automatiko.engine.workflow.base.core.timer.TimeUtils;
import io.automatiko.engine.workflow.base.core.timer.Timer;
import io.automatiko.engine.workflow.base.instance.context.variable.VariableScopeInstance;
import io.automatiko.engine.workflow.base.instance.impl.DefaultProcessInstanceManager;
import io.automatiko.engine.workflow.process.core.node.EventTrigger;
import io.automatiko.engine.workflow.process.core.node.StartNode;
import io.automatiko.engine.workflow.process.core.node.Trigger;
Expand Down Expand Up @@ -70,7 +71,7 @@ public LightProcessRuntime(ProcessRuntimeContext runtimeContext, ProcessRuntimeS
this.unitOfWorkManager = services.getUnitOfWorkManager();
this.runtimeContext = runtimeContext;
this.variableInitializer = services.getVariableInitializer();
this.processInstanceManager = services.getProcessInstanceManager();
this.processInstanceManager = new DefaultProcessInstanceManager();
this.signalManager = services.getSignalManager();
this.jobService = services.getJobsService() == null ? new InMemoryJobService(this, this.unitOfWorkManager)
: services.getJobsService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,15 @@ public class LightWorkItemManager extends DefaultWorkItemManager {
private Map<String, WorkItem> workItems = new ConcurrentHashMap<>();
private Map<String, WorkItemHandler> workItemHandlers = new HashMap<>();

private final ProcessInstanceManager processInstanceManager;
private final SignalManager signalManager;
private final ProcessEventSupport eventSupport;

private Complete completePhase = new Complete();
private Abort abortPhase = new Abort();

public LightWorkItemManager(InternalProcessRuntime runtime, ProcessInstanceManager processInstanceManager,
public LightWorkItemManager(InternalProcessRuntime runtime,
SignalManager signalManager, ProcessEventSupport eventSupport) {
super(runtime);
this.processInstanceManager = processInstanceManager;
this.signalManager = signalManager;
this.eventSupport = eventSupport;
}
Expand All @@ -61,9 +59,6 @@ public void internalExecuteWorkItem(WorkItem workItem) {
WorkItemHandler handler = this.workItemHandlers.get(workItem.getName());
if (handler != null) {
ProcessInstance processInstance = workItem.getProcessInstance();
if (processInstance == null) {
processInstance = processInstanceManager.getProcessInstance(workItem.getProcessInstanceId());
}
Transition<?> transition = new TransitionToActive();
eventSupport.fireBeforeWorkItemTransition(processInstance, workItem, transition, null);

Expand Down Expand Up @@ -92,9 +87,6 @@ public void internalAbortWorkItem(String id) {
if (handler != null) {

ProcessInstance processInstance = workItem.getProcessInstance();
if (processInstance == null) {
processInstance = processInstanceManager.getProcessInstance(workItem.getProcessInstanceId());
}
Transition<?> transition = new TransitionToAbort(Collections.emptyList());
eventSupport.fireBeforeWorkItemTransition(processInstance, workItem, transition, null);

Expand Down Expand Up @@ -149,9 +141,6 @@ public void completeWorkItem(String id, Map<String, Object> results, Policy<?>..

public void internalCompleteWorkItem(WorkItem workItem) {
ProcessInstance processInstance = workItem.getProcessInstance();
if (processInstance == null) {
processInstance = processInstanceManager.getProcessInstance(workItem.getProcessInstanceId());
}
((WorkItemImpl) workItem).setState(COMPLETED);
workItem.setCompleteDate(new Date());

Expand All @@ -165,9 +154,6 @@ public void internalCompleteWorkItem(WorkItem workItem) {

public void internalAbortWorkItem(WorkItem workItem) {
ProcessInstance processInstance = workItem.getProcessInstance();
if (processInstance == null) {
processInstance = processInstanceManager.getProcessInstance(workItem.getProcessInstanceId());
}

((WorkItemImpl) workItem).setState(ABORTED);

Expand All @@ -189,9 +175,6 @@ public void transitionWorkItem(String id, Transition<?> transition) {
WorkItemHandler handler = this.workItemHandlers.get(workItem.getName());
if (handler != null) {
ProcessInstance processInstance = workItem.getProcessInstance();
if (processInstance == null) {
processInstance = processInstanceManager.getProcessInstance(workItem.getProcessInstanceId());
}
eventSupport.fireBeforeWorkItemTransition(processInstance, workItem, transition, null);

try {
Expand Down Expand Up @@ -222,8 +205,7 @@ public void abortWorkItem(String id, Policy<?>... policies) {
throw new NotAuthorizedException(
"Work item can be aborted as it does not fulfil policies (e.g. security)");
}
ProcessInstance processInstance = processInstanceManager
.getProcessInstance(workItem.getProcessInstanceId());
ProcessInstance processInstance = workItem.getProcessInstance();
Transition<?> transition = new TransitionToAbort(Arrays.asList(policies));
eventSupport.fireBeforeWorkItemTransition(processInstance, workItem, transition, null);
workItem.setState(ABORTED);
Expand Down Expand Up @@ -283,9 +265,6 @@ public void failWorkItem(String id, Throwable error) {
// work item may have been aborted
if (workItem != null) {
ProcessInstance processInstance = workItem.getProcessInstance();
if (processInstance == null) {
processInstance = processInstanceManager.getProcessInstance(workItem.getProcessInstanceId());
}
workItem.setState(FAILED);
// process instance may have finished already
if (processInstance != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,15 @@

public interface ProcessRuntimeServiceProvider {

JobsService getJobsService();
JobsService getJobsService();

ProcessInstanceManager getProcessInstanceManager();
SignalManager getSignalManager();

SignalManager getSignalManager();
WorkItemManager getWorkItemManager();

WorkItemManager getWorkItemManager();
ProcessEventSupport getEventSupport();

ProcessEventSupport getEventSupport();
UnitOfWorkManager getUnitOfWorkManager();

UnitOfWorkManager getUnitOfWorkManager();

VariableInitializer getVariableInitializer();
VariableInitializer getVariableInitializer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,8 @@ public static AutomatikoMessages.WorkItem writeWorkItem(MarshallerWriteContext c
return _workItem.build();
}

public static WorkItem readWorkItem(MarshallerReaderContext context, AutomatikoMessages.WorkItem _workItem)
public static WorkItem readWorkItem(WorkflowProcessInstance processInstance, MarshallerReaderContext context,
AutomatikoMessages.WorkItem _workItem)
throws IOException {
WorkItemImpl workItem = new WorkItemImpl();
workItem.setId(_workItem.getId());
Expand All @@ -525,6 +526,7 @@ public static WorkItem readWorkItem(MarshallerReaderContext context, AutomatikoM
workItem.setPhaseId(_workItem.getPhaseId());
workItem.setPhaseStatus(_workItem.getPhaseStatus());
workItem.setStartDate(new Date(_workItem.getStartDate()));
workItem.setProcessInstance(processInstance);
if (_workItem.getCompleteDate() > 0) {
workItem.setCompleteDate(new Date(_workItem.getCompleteDate()));
}
Expand Down Expand Up @@ -593,7 +595,8 @@ public static AutomatikoMessages.HumanTaskWorkItem writeHumanTaskWorkItem(Marsha
return _workItem.build();
}

public static HumanTaskWorkItem readHumanTaskWorkItem(MarshallerReaderContext context,
public static HumanTaskWorkItem readHumanTaskWorkItem(WorkflowProcessInstance processInstance,
MarshallerReaderContext context,
AutomatikoMessages.HumanTaskWorkItem _workItem) throws IOException {
HumanTaskWorkItemImpl workItem = new HumanTaskWorkItemImpl();
workItem.setId(_workItem.getId());
Expand All @@ -606,6 +609,7 @@ public static HumanTaskWorkItem readHumanTaskWorkItem(MarshallerReaderContext co
workItem.setPhaseId(_workItem.getPhaseId());
workItem.setPhaseStatus(_workItem.getPhaseStatus());
workItem.setStartDate(new Date(_workItem.getStartDate()));
workItem.setProcessInstance(processInstance);
if (_workItem.getCompleteDate() > 0) {
workItem.setCompleteDate(new Date(_workItem.getCompleteDate()));
}
Expand Down Expand Up @@ -955,7 +959,7 @@ protected NodeInstanceImpl readNodeInstanceContent(AutomatikoMessages.ProcessIns
nodeInstance = new HumanTaskNodeInstance();
((HumanTaskNodeInstance) nodeInstance).internalSetWorkItemId(_content.getHumanTask().getWorkItemId());
((HumanTaskNodeInstance) nodeInstance).internalSetWorkItem(
(WorkItemImpl) readHumanTaskWorkItem(context, _content.getHumanTask().getWorkitem()));
(WorkItemImpl) readHumanTaskWorkItem(processInstance, context, _content.getHumanTask().getWorkitem()));
if (_content.getHumanTask().getTimerInstanceIdCount() > 0) {
List<String> timerInstances = new ArrayList<>();
for (String _timerId : _content.getHumanTask().getTimerInstanceIdList()) {
Expand All @@ -970,7 +974,8 @@ protected NodeInstanceImpl readNodeInstanceContent(AutomatikoMessages.ProcessIns
nodeInstance = new WorkItemNodeInstance();
((WorkItemNodeInstance) nodeInstance).internalSetWorkItemId(_content.getWorkItem().getWorkItemId());
((WorkItemNodeInstance) nodeInstance)
.internalSetWorkItem((WorkItemImpl) readWorkItem(context, _content.getWorkItem().getWorkitem()));
.internalSetWorkItem(
(WorkItemImpl) readWorkItem(processInstance, context, _content.getWorkItem().getWorkitem()));
if (_content.getWorkItem().getTimerInstanceIdCount() > 0) {
List<String> timerInstances = new ArrayList<>();
for (String _timerId : _content.getWorkItem().getTimerInstanceIdList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ public void internalSetWorkItemId(String workItemId) {

public void internalSetWorkItem(WorkItem workItem) {
this.workItem = workItem;
this.workItem.setProcessInstance(getProcessInstance());
if (getProcessInstance() != null) {
this.workItem.setProcessInstance(getProcessInstance());
}
this.workItem.setNodeInstance(this);
}

Expand All @@ -125,6 +127,7 @@ public boolean isInversionOfControl() {
}

public void internalRegisterWorkItem() {
workItem.setProcessInstance(getProcessInstance());
((DefaultWorkItemManager) getProcessInstance().getProcessRuntime().getWorkItemManager())
.internalAddWorkItem(workItem);

Expand Down