From d3ee6fca84b725e8d83868d58f8fc1b064dd791b Mon Sep 17 00:00:00 2001 From: Maciej Swiderski Date: Wed, 25 Aug 2021 08:23:13 +0200 Subject: [PATCH 1/5] removed not needed message id retrieval --- .../services/receiveemail/EmailAttachmentInputConverter.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/addons/services/automatiko-receive-email-addon/src/main/java/io/automatiko/engine/addons/services/receiveemail/EmailAttachmentInputConverter.java b/addons/services/automatiko-receive-email-addon/src/main/java/io/automatiko/engine/addons/services/receiveemail/EmailAttachmentInputConverter.java index 0938a0ff4..df2adc03e 100644 --- a/addons/services/automatiko-receive-email-addon/src/main/java/io/automatiko/engine/addons/services/receiveemail/EmailAttachmentInputConverter.java +++ b/addons/services/automatiko-receive-email-addon/src/main/java/io/automatiko/engine/addons/services/receiveemail/EmailAttachmentInputConverter.java @@ -23,8 +23,6 @@ public File convert(Object input) { MailMessage mailMessage = (MailMessage) input; - mailMessage.getMessageId(); - Map attachments = mailMessage.getExchange().getProperty("CamelAttachmentObjects", Map.class); if (attachments != null && attachments.size() > 0) { From 093eafe12c1417f72495e0c8d258ba2c7060a7a7 Mon Sep 17 00:00:00 2001 From: Maciej Swiderski Date: Wed, 25 Aug 2021 08:23:43 +0200 Subject: [PATCH 2/5] fixed wrong config property name to get service url --- .../engine/addons/usertasks/email/HumanTaskHandlerConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/addons/user-tasks/automatiko-user-tasks-email-addon/src/main/java/io/automatiko/engine/addons/usertasks/email/HumanTaskHandlerConfig.java b/addons/user-tasks/automatiko-user-tasks-email-addon/src/main/java/io/automatiko/engine/addons/usertasks/email/HumanTaskHandlerConfig.java index 63da769fc..dbc350959 100644 --- a/addons/user-tasks/automatiko-user-tasks-email-addon/src/main/java/io/automatiko/engine/addons/usertasks/email/HumanTaskHandlerConfig.java +++ b/addons/user-tasks/automatiko-user-tasks-email-addon/src/main/java/io/automatiko/engine/addons/usertasks/email/HumanTaskHandlerConfig.java @@ -17,7 +17,7 @@ public class HumanTaskHandlerConfig extends DefaultWorkItemHandlerConfig { @Inject public HumanTaskHandlerConfig(Mailer mailer, EmailAddressResolver emailAddressResolver, Engine engine, - @ConfigProperty(name = "quarkus.automatiko.serviceUrl") Optional serviceUrl) { + @ConfigProperty(name = "quarkus.automatiko.service-url") Optional serviceUrl) { register("Human Task", new HumanTaskWorkItemHandler( new HumanTaskLifeCycleWithEmail(mailer, emailAddressResolver, engine, serviceUrl))); From 830464f349125d6d69e20d882f69c5180df5e794 Mon Sep 17 00:00:00 2001 From: Maciej Swiderski Date: Wed, 25 Aug 2021 08:24:10 +0200 Subject: [PATCH 3/5] avoid duplicated timer registration when using file based job service --- .../filesystem/job/FileSystemBasedJobService.java | 15 +++++++++++++-- .../engine/api/jobs/ProcessJobDescription.java | 1 + 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/addons/persistence/automatiko-filesystem-persistence-addon/src/main/java/io/automatiko/engine/addons/persistence/filesystem/job/FileSystemBasedJobService.java b/addons/persistence/automatiko-filesystem-persistence-addon/src/main/java/io/automatiko/engine/addons/persistence/filesystem/job/FileSystemBasedJobService.java index 9170da772..52a0ec9b6 100644 --- a/addons/persistence/automatiko-filesystem-persistence-addon/src/main/java/io/automatiko/engine/addons/persistence/filesystem/job/FileSystemBasedJobService.java +++ b/addons/persistence/automatiko-filesystem-persistence-addon/src/main/java/io/automatiko/engine/addons/persistence/filesystem/job/FileSystemBasedJobService.java @@ -91,6 +91,11 @@ public void scheduleOnLoad(@Observes StartupEvent event) { @Override public String scheduleProcessJob(ProcessJobDescription description) { LOGGER.debug("ScheduleProcessJob: {}", description); + if (scheduledJobs.containsKey(description.id())) { + LOGGER.debug("Already scheduled: {}", description); + return description.id(); + } + ScheduledFuture future = null; ScheduledJob scheduledJob = null; if (description.expirationTime().repeatInterval() != null) { @@ -200,6 +205,10 @@ protected void loadAndSchedule(Path path) { return; } ScheduledJob job = mapper.readValue(Files.readAllBytes(path), ScheduledJob.class); + if (scheduledJobs.containsKey(job.getId())) { + LOGGER.debug("Already scheduled: {}", job); + return; + } ScheduledFuture future = null; if (job.getProcessInstanceId() != null) { ProcessInstanceJobDescription description = ProcessInstanceJobDescription.of(job.getId(), job.getTriggerType(), @@ -218,7 +227,7 @@ protected void loadAndSchedule(Path path) { } } else { - ProcessJobDescription description = ProcessJobDescription.of(build(job), null, job.getProcessId()); + ProcessJobDescription description = ProcessJobDescription.of(build(job), job.getProcessId(), null); if (job.getReapeatInterval() != null) { future = scheduler.scheduleAtFixedRate( @@ -282,7 +291,7 @@ protected ExpirationTime build(ScheduledJob job) { } protected long log(ZonedDateTime dt, long delay) { - LOGGER.debug("Timer scheduled for date {} will expire in {}", dt, delay); + LOGGER.info("Timer scheduled for date {} will expire in {}", dt, delay); return delay; } @@ -343,6 +352,7 @@ public void run() { LOGGER.debug("Job {} completed", id); } finally { if (description.expirationTime().next() != null) { + scheduledJobs.remove(id); scheduleProcessInstanceJob(description); } else if (removeAtExecution) { scheduledJobs.remove(id); @@ -400,6 +410,7 @@ public void run() { LOGGER.debug("Job {} completed", id); } finally { if (description.expirationTime().next() != null) { + scheduledJobs.remove(id); scheduleProcessJob(description); } else if (removeAtExecution) { scheduledJobs.remove(id); diff --git a/api/automatiko-engine-api/src/main/java/io/automatiko/engine/api/jobs/ProcessJobDescription.java b/api/automatiko-engine-api/src/main/java/io/automatiko/engine/api/jobs/ProcessJobDescription.java index b6dfdcadf..8363c26f5 100644 --- a/api/automatiko-engine-api/src/main/java/io/automatiko/engine/api/jobs/ProcessJobDescription.java +++ b/api/automatiko-engine-api/src/main/java/io/automatiko/engine/api/jobs/ProcessJobDescription.java @@ -38,6 +38,7 @@ public ProcessJobDescription(ExpirationTime expirationTime, Integer priority, Pr this.expirationTime = requireNonNull(expirationTime); this.priority = requireNonNull(priority); this.process = requireNonNull(process); + this.processId = requireNonNull(process.id()); this.processVersion = process.version(); } From 3c50d6e87fc16876da80a0140c0873c811ef8be4 Mon Sep 17 00:00:00 2001 From: Maciej Swiderski Date: Wed, 25 Aug 2021 08:24:53 +0200 Subject: [PATCH 4/5] restored configuration reference in docs --- docs/modules/ROOT/nav.adoc | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index 942361193..0f37a9f5d 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -13,6 +13,7 @@ ** xref:components/event-publishers.adoc[Event publishers] ** xref:components/user-tasks.adoc[User tasks] ** xref:components/async-execution.adoc[Async Execution] +* xref:configuration.adoc[Configuration reference] * Services ** xref:services/archive.adoc[Archive] ** xref:services/email.adoc[Send Email] From d33a4e216677689be1bc062e3178b6dcb7b1081d Mon Sep 17 00:00:00 2001 From: Maciej Swiderski Date: Wed, 25 Aug 2021 08:25:43 +0200 Subject: [PATCH 5/5] added test support module to help developers build tests for services, TestJobService as the first item that is enabled with dedicated QuarkusTestProfile --- bom/pom.xml | 12 ++ .../automatiko-test-support/pom.xml | 33 ++++ .../quarkus/tests/AutomatikoTestProfile.java | 20 ++ .../quarkus/tests/jobs/TestJobService.java | 175 ++++++++++++++++++ .../src/main/resources/META-INF/beans.xml | 0 quarkus-extension/pom.xml | 1 + 6 files changed, 241 insertions(+) create mode 100644 quarkus-extension/automatiko-test-support/pom.xml create mode 100644 quarkus-extension/automatiko-test-support/src/main/java/io/automatiko/quarkus/tests/AutomatikoTestProfile.java create mode 100644 quarkus-extension/automatiko-test-support/src/main/java/io/automatiko/quarkus/tests/jobs/TestJobService.java create mode 100644 quarkus-extension/automatiko-test-support/src/main/resources/META-INF/beans.xml diff --git a/bom/pom.xml b/bom/pom.xml index 774317349..d70278de7 100755 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -454,6 +454,18 @@ ${project.version} sources + + + io.automatiko.quarkus + automatiko-test-support + ${project.version} + + + io.automatiko.quarkus + automatiko-test-support + ${project.version} + sources + diff --git a/quarkus-extension/automatiko-test-support/pom.xml b/quarkus-extension/automatiko-test-support/pom.xml new file mode 100644 index 000000000..7492de7f6 --- /dev/null +++ b/quarkus-extension/automatiko-test-support/pom.xml @@ -0,0 +1,33 @@ + + 4.0.0 + + io.automatiko.quarkus + automatiko-quarkus-parent + 0.0.0-SNAPSHOT + + automatiko-test-support + Automatiko Engine :: Quarkus Extension :: Test Support + Test Support services and utilities for Quarkus Extension for Automatiko Engine + + + io.automatiko.quarkus.test.support + + + + + io.quarkus + quarkus-junit5 + + + + io.automatiko.engine + automatiko-engine-api + + + io.automatiko.workflow + automatiko-workflow-core + + + \ No newline at end of file diff --git a/quarkus-extension/automatiko-test-support/src/main/java/io/automatiko/quarkus/tests/AutomatikoTestProfile.java b/quarkus-extension/automatiko-test-support/src/main/java/io/automatiko/quarkus/tests/AutomatikoTestProfile.java new file mode 100644 index 000000000..33bf2aea0 --- /dev/null +++ b/quarkus-extension/automatiko-test-support/src/main/java/io/automatiko/quarkus/tests/AutomatikoTestProfile.java @@ -0,0 +1,20 @@ +package io.automatiko.quarkus.tests; + +import java.util.HashSet; +import java.util.Set; + +import io.automatiko.quarkus.tests.jobs.TestJobService; +import io.quarkus.test.junit.QuarkusTestProfile; + +public class AutomatikoTestProfile implements QuarkusTestProfile { + + @Override + public Set> getEnabledAlternatives() { + Set> alternatives = new HashSet<>(); + + alternatives.add(TestJobService.class); + + return alternatives; + } + +} diff --git a/quarkus-extension/automatiko-test-support/src/main/java/io/automatiko/quarkus/tests/jobs/TestJobService.java b/quarkus-extension/automatiko-test-support/src/main/java/io/automatiko/quarkus/tests/jobs/TestJobService.java new file mode 100644 index 000000000..bc917ab6a --- /dev/null +++ b/quarkus-extension/automatiko-test-support/src/main/java/io/automatiko/quarkus/tests/jobs/TestJobService.java @@ -0,0 +1,175 @@ +package io.automatiko.quarkus.tests.jobs; + +import java.time.ZonedDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.inject.Alternative; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.automatiko.engine.api.Application; +import io.automatiko.engine.api.Model; +import io.automatiko.engine.api.auth.IdentityProvider; +import io.automatiko.engine.api.auth.TrustedIdentityProvider; +import io.automatiko.engine.api.jobs.JobDescription; +import io.automatiko.engine.api.jobs.JobsService; +import io.automatiko.engine.api.jobs.ProcessInstanceJobDescription; +import io.automatiko.engine.api.jobs.ProcessJobDescription; +import io.automatiko.engine.api.uow.UnitOfWorkManager; +import io.automatiko.engine.api.workflow.Process; +import io.automatiko.engine.api.workflow.ProcessInstance; +import io.automatiko.engine.api.workflow.Processes; +import io.automatiko.engine.services.time.TimerInstance; +import io.automatiko.engine.services.uow.UnitOfWorkExecutor; +import io.automatiko.engine.workflow.Sig; + +@Alternative +@ApplicationScoped +public class TestJobService implements JobsService { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestJobService.class); + + private static final String TRIGGER = "timer"; + + private Map jobs = new ConcurrentHashMap<>(); + + protected Map> mappedProcesses = new HashMap<>(); + + protected final UnitOfWorkManager unitOfWorkManager; + + public TestJobService(Processes processes, Application application) { + processes.processIds().forEach(id -> mappedProcesses.put(id, processes.processById(id))); + + this.unitOfWorkManager = application.unitOfWorkManager(); + } + + @Override + public String scheduleProcessJob(ProcessJobDescription description) { + LOGGER.debug("scheduling process job {}", description); + jobs.put(description.id(), description); + return description.id(); + } + + @Override + public String scheduleProcessInstanceJob(ProcessInstanceJobDescription description) { + LOGGER.debug("scheduling process instance job {}", description); + jobs.put(description.id(), description); + return description.id(); + } + + @Override + public boolean cancelJob(String id) { + LOGGER.debug("Removing job {}", id); + JobDescription removed = jobs.remove(id); + return removed != null; + } + + @Override + public ZonedDateTime getScheduledTime(String id) { + JobDescription job = jobs.remove(id); + + if (job != null) { + return job.expirationTime().get(); + } + return null; + } + + public Set jobIds() { + return jobs.keySet(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void triggerProcessJob(String jobId) { + + ProcessJobDescription job = (ProcessJobDescription) jobs.remove(jobId); + + if (job == null) { + throw new IllegalArgumentException("Job with id " + jobId + " not found"); + } + int limit = job.expirationTime().repeatLimit(); + try { + LOGGER.debug("Job {} started", job.id()); + + Process process = mappedProcesses.get(job.processId()); + if (process == null) { + LOGGER.warn("No process found for process id {}", job.processId()); + return; + } + IdentityProvider.set(new TrustedIdentityProvider("System")); + UnitOfWorkExecutor.executeInUnitOfWork(unitOfWorkManager, () -> { + ProcessInstance pi = process.createInstance(process.createModel()); + if (pi != null) { + pi.start(TRIGGER, null, null); + } + + return null; + }); + limit--; + if (limit == 0) { + jobs.remove(jobId); + } + LOGGER.debug("Job {} completed", job.id()); + } finally { + if (job.expirationTime().next() != null) { + jobs.remove(jobId); + scheduleProcessJob(job); + } else { + jobs.remove(jobId); + } + } + } + + public void triggerProcessInstanceJob(String jobId) { + + LOGGER.debug("Job {} started", jobId); + + ProcessInstanceJobDescription job = (ProcessInstanceJobDescription) jobs.remove(jobId); + + if (job == null) { + throw new IllegalArgumentException("Job with id " + jobId + " not found"); + } + try { + Process process = mappedProcesses.get(job.processId()); + if (process == null) { + LOGGER.warn("No process found for process id {}", job.processId()); + return; + } + IdentityProvider.set(new TrustedIdentityProvider("System")); + UnitOfWorkExecutor.executeInUnitOfWork(unitOfWorkManager, () -> { + Optional> processInstanceFound = process.instances() + .findById(job.processInstanceId()); + if (processInstanceFound.isPresent()) { + ProcessInstance processInstance = processInstanceFound.get(); + String[] ids = job.id().split("_"); + processInstance + .send(Sig.of(job.triggerType(), + TimerInstance.with(Long.parseLong(ids[1]), job.id(), job.expirationTime().repeatLimit()))); + if (job.expirationTime().repeatLimit() == 0) { + + jobs.remove(jobId); + } + } else { + // since owning process instance does not exist cancel timers + jobs.remove(jobId); + } + + return null; + }); + LOGGER.debug("Job {} completed", job.id()); + } finally { + if (job.expirationTime().next() != null) { + jobs.remove(jobId); + scheduleProcessInstanceJob(job); + } else { + jobs.remove(jobId); + } + } + } + +} diff --git a/quarkus-extension/automatiko-test-support/src/main/resources/META-INF/beans.xml b/quarkus-extension/automatiko-test-support/src/main/resources/META-INF/beans.xml new file mode 100644 index 000000000..e69de29bb diff --git a/quarkus-extension/pom.xml b/quarkus-extension/pom.xml index 4d4653a1c..f458d7c07 100644 --- a/quarkus-extension/pom.xml +++ b/quarkus-extension/pom.xml @@ -50,6 +50,7 @@ function-flow-deployment operator-deployment integration-test + automatiko-test-support