Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

additional fixes and test support module #99

Merged
merged 5 commits into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public void scheduleOnLoad(@Observes StartupEvent event) {
@Override
public String scheduleProcessJob(ProcessJobDescription description) {
LOGGER.debug("ScheduleProcessJob: {}", description);
if (scheduledJobs.containsKey(description.id())) {
LOGGER.debug("Already scheduled: {}", description);
return description.id();
}

ScheduledFuture<?> future = null;
ScheduledJob scheduledJob = null;
if (description.expirationTime().repeatInterval() != null) {
Expand Down Expand Up @@ -200,6 +205,10 @@ protected void loadAndSchedule(Path path) {
return;
}
ScheduledJob job = mapper.readValue(Files.readAllBytes(path), ScheduledJob.class);
if (scheduledJobs.containsKey(job.getId())) {
LOGGER.debug("Already scheduled: {}", job);
return;
}
ScheduledFuture<?> future = null;
if (job.getProcessInstanceId() != null) {
ProcessInstanceJobDescription description = ProcessInstanceJobDescription.of(job.getId(), job.getTriggerType(),
Expand All @@ -218,7 +227,7 @@ protected void loadAndSchedule(Path path) {
}

} else {
ProcessJobDescription description = ProcessJobDescription.of(build(job), null, job.getProcessId());
ProcessJobDescription description = ProcessJobDescription.of(build(job), job.getProcessId(), null);

if (job.getReapeatInterval() != null) {
future = scheduler.scheduleAtFixedRate(
Expand Down Expand Up @@ -282,7 +291,7 @@ protected ExpirationTime build(ScheduledJob job) {
}

protected long log(ZonedDateTime dt, long delay) {
LOGGER.debug("Timer scheduled for date {} will expire in {}", dt, delay);
LOGGER.info("Timer scheduled for date {} will expire in {}", dt, delay);
return delay;
}

Expand Down Expand Up @@ -343,6 +352,7 @@ public void run() {
LOGGER.debug("Job {} completed", id);
} finally {
if (description.expirationTime().next() != null) {
scheduledJobs.remove(id);
scheduleProcessInstanceJob(description);
} else if (removeAtExecution) {
scheduledJobs.remove(id);
Expand Down Expand Up @@ -400,6 +410,7 @@ public void run() {
LOGGER.debug("Job {} completed", id);
} finally {
if (description.expirationTime().next() != null) {
scheduledJobs.remove(id);
scheduleProcessJob(description);
} else if (removeAtExecution) {
scheduledJobs.remove(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ public File convert(Object input) {

MailMessage mailMessage = (MailMessage) input;

mailMessage.getMessageId();

Map<String, Attachment> attachments = mailMessage.getExchange().getProperty("CamelAttachmentObjects",
Map.class);
if (attachments != null && attachments.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class HumanTaskHandlerConfig extends DefaultWorkItemHandlerConfig {

@Inject
public HumanTaskHandlerConfig(Mailer mailer, EmailAddressResolver emailAddressResolver, Engine engine,
@ConfigProperty(name = "quarkus.automatiko.serviceUrl") Optional<String> serviceUrl) {
@ConfigProperty(name = "quarkus.automatiko.service-url") Optional<String> serviceUrl) {
register("Human Task",
new HumanTaskWorkItemHandler(
new HumanTaskLifeCycleWithEmail(mailer, emailAddressResolver, engine, serviceUrl)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public ProcessJobDescription(ExpirationTime expirationTime, Integer priority, Pr
this.expirationTime = requireNonNull(expirationTime);
this.priority = requireNonNull(priority);
this.process = requireNonNull(process);
this.processId = requireNonNull(process.id());
this.processVersion = process.version();
}

Expand Down
12 changes: 12 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,18 @@
<version>${project.version}</version>
<classifier>sources</classifier>
</dependency>

<dependency>
<groupId>io.automatiko.quarkus</groupId>
<artifactId>automatiko-test-support</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.automatiko.quarkus</groupId>
<artifactId>automatiko-test-support</artifactId>
<version>${project.version}</version>
<classifier>sources</classifier>
</dependency>
</dependencies>

</dependencyManagement>
Expand Down
1 change: 1 addition & 0 deletions docs/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
** xref:components/event-publishers.adoc[Event publishers]
** xref:components/user-tasks.adoc[User tasks]
** xref:components/async-execution.adoc[Async Execution]
* xref:configuration.adoc[Configuration reference]
* Services
** xref:services/archive.adoc[Archive]
** xref:services/email.adoc[Send Email]
Expand Down
33 changes: 33 additions & 0 deletions quarkus-extension/automatiko-test-support/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.automatiko.quarkus</groupId>
<artifactId>automatiko-quarkus-parent</artifactId>
<version>0.0.0-SNAPSHOT</version>
</parent>
<artifactId>automatiko-test-support</artifactId>
<name>Automatiko Engine :: Quarkus Extension :: Test Support</name>
<description>Test Support services and utilities for Quarkus Extension for Automatiko Engine</description>

<properties>
<java.module.name>io.automatiko.quarkus.test.support</java.module.name>
</properties>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
</dependency>

<dependency>
<groupId>io.automatiko.engine</groupId>
<artifactId>automatiko-engine-api</artifactId>
</dependency>
<dependency>
<groupId>io.automatiko.workflow</groupId>
<artifactId>automatiko-workflow-core</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.automatiko.quarkus.tests;

import java.util.HashSet;
import java.util.Set;

import io.automatiko.quarkus.tests.jobs.TestJobService;
import io.quarkus.test.junit.QuarkusTestProfile;

public class AutomatikoTestProfile implements QuarkusTestProfile {

@Override
public Set<Class<?>> getEnabledAlternatives() {
Set<Class<?>> alternatives = new HashSet<>();

alternatives.add(TestJobService.class);

return alternatives;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package io.automatiko.quarkus.tests.jobs;

import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Alternative;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.automatiko.engine.api.Application;
import io.automatiko.engine.api.Model;
import io.automatiko.engine.api.auth.IdentityProvider;
import io.automatiko.engine.api.auth.TrustedIdentityProvider;
import io.automatiko.engine.api.jobs.JobDescription;
import io.automatiko.engine.api.jobs.JobsService;
import io.automatiko.engine.api.jobs.ProcessInstanceJobDescription;
import io.automatiko.engine.api.jobs.ProcessJobDescription;
import io.automatiko.engine.api.uow.UnitOfWorkManager;
import io.automatiko.engine.api.workflow.Process;
import io.automatiko.engine.api.workflow.ProcessInstance;
import io.automatiko.engine.api.workflow.Processes;
import io.automatiko.engine.services.time.TimerInstance;
import io.automatiko.engine.services.uow.UnitOfWorkExecutor;
import io.automatiko.engine.workflow.Sig;

@Alternative
@ApplicationScoped
public class TestJobService implements JobsService {

private static final Logger LOGGER = LoggerFactory.getLogger(TestJobService.class);

private static final String TRIGGER = "timer";

private Map<String, JobDescription> jobs = new ConcurrentHashMap<>();

protected Map<String, Process<? extends Model>> mappedProcesses = new HashMap<>();

protected final UnitOfWorkManager unitOfWorkManager;

public TestJobService(Processes processes, Application application) {
processes.processIds().forEach(id -> mappedProcesses.put(id, processes.processById(id)));

this.unitOfWorkManager = application.unitOfWorkManager();
}

@Override
public String scheduleProcessJob(ProcessJobDescription description) {
LOGGER.debug("scheduling process job {}", description);
jobs.put(description.id(), description);
return description.id();
}

@Override
public String scheduleProcessInstanceJob(ProcessInstanceJobDescription description) {
LOGGER.debug("scheduling process instance job {}", description);
jobs.put(description.id(), description);
return description.id();
}

@Override
public boolean cancelJob(String id) {
LOGGER.debug("Removing job {}", id);
JobDescription removed = jobs.remove(id);
return removed != null;
}

@Override
public ZonedDateTime getScheduledTime(String id) {
JobDescription job = jobs.remove(id);

if (job != null) {
return job.expirationTime().get();
}
return null;
}

public Set<String> jobIds() {
return jobs.keySet();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
public void triggerProcessJob(String jobId) {

ProcessJobDescription job = (ProcessJobDescription) jobs.remove(jobId);

if (job == null) {
throw new IllegalArgumentException("Job with id " + jobId + " not found");
}
int limit = job.expirationTime().repeatLimit();
try {
LOGGER.debug("Job {} started", job.id());

Process process = mappedProcesses.get(job.processId());
if (process == null) {
LOGGER.warn("No process found for process id {}", job.processId());
return;
}
IdentityProvider.set(new TrustedIdentityProvider("System<timer>"));
UnitOfWorkExecutor.executeInUnitOfWork(unitOfWorkManager, () -> {
ProcessInstance<?> pi = process.createInstance(process.createModel());
if (pi != null) {
pi.start(TRIGGER, null, null);
}

return null;
});
limit--;
if (limit == 0) {
jobs.remove(jobId);
}
LOGGER.debug("Job {} completed", job.id());
} finally {
if (job.expirationTime().next() != null) {
jobs.remove(jobId);
scheduleProcessJob(job);
} else {
jobs.remove(jobId);
}
}
}

public void triggerProcessInstanceJob(String jobId) {

LOGGER.debug("Job {} started", jobId);

ProcessInstanceJobDescription job = (ProcessInstanceJobDescription) jobs.remove(jobId);

if (job == null) {
throw new IllegalArgumentException("Job with id " + jobId + " not found");
}
try {
Process<?> process = mappedProcesses.get(job.processId());
if (process == null) {
LOGGER.warn("No process found for process id {}", job.processId());
return;
}
IdentityProvider.set(new TrustedIdentityProvider("System<timer>"));
UnitOfWorkExecutor.executeInUnitOfWork(unitOfWorkManager, () -> {
Optional<? extends ProcessInstance<?>> processInstanceFound = process.instances()
.findById(job.processInstanceId());
if (processInstanceFound.isPresent()) {
ProcessInstance<?> processInstance = processInstanceFound.get();
String[] ids = job.id().split("_");
processInstance
.send(Sig.of(job.triggerType(),
TimerInstance.with(Long.parseLong(ids[1]), job.id(), job.expirationTime().repeatLimit())));
if (job.expirationTime().repeatLimit() == 0) {

jobs.remove(jobId);
}
} else {
// since owning process instance does not exist cancel timers
jobs.remove(jobId);
}

return null;
});
LOGGER.debug("Job {} completed", job.id());
} finally {
if (job.expirationTime().next() != null) {
jobs.remove(jobId);
scheduleProcessInstanceJob(job);
} else {
jobs.remove(jobId);
}
}
}

}
1 change: 1 addition & 0 deletions quarkus-extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
<module>function-flow-deployment</module>
<module>operator-deployment</module>
<module>integration-test</module>
<module>automatiko-test-support</module>
</modules>

<build>
Expand Down