Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflow java dsl: correlation as expression, timers value expression… #464

Merged
merged 2 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.interceptor.Interceptor;
import org.bson.Document;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.MongoWriteConcernException;
import com.mongodb.MongoWriteException;
Expand All @@ -36,11 +35,6 @@
import com.mongodb.client.model.Projections;
import com.mongodb.client.result.UpdateResult;

import org.bson.Document;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.automatiko.engine.api.Application;
import io.automatiko.engine.api.Model;
import io.automatiko.engine.api.audit.AuditEntry;
Expand All @@ -65,6 +59,12 @@
import io.automatiko.engine.workflow.base.core.timer.NoOpExpirationTime;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.interceptor.Interceptor;

@ApplicationScoped
public class MongodbJobService implements JobsService {
Expand Down Expand Up @@ -103,88 +103,103 @@ public class MongodbJobService implements JobsService {
private Optional<Integer> threads;

@Inject
public MongodbJobService(MongoClient mongoClient,
public MongodbJobService(Instance<MongoClient> mongoClient,
Processes processes, Application application, Auditor auditor,
@ConfigProperty(name = "quarkus.automatiko.persistence.disabled") Optional<Boolean> persistenceDisabled,
@ConfigProperty(name = MongodbJobsConfig.DATABASE_KEY) Optional<String> database,
@ConfigProperty(name = MongodbJobsConfig.INTERVAL_KEY) Optional<Long> interval,
@ConfigProperty(name = MongodbJobsConfig.THREADS_KEY) Optional<Integer> threads) {
this.mongoClient = mongoClient;
this.database = database;
this.interval = interval;
this.threads = threads;

processes.processIds().forEach(id -> mappedProcesses.put(id, processes.processById(id)));
if (!persistenceDisabled.orElse(false)) {
this.mongoClient = mongoClient.get();
this.database = database;
this.interval = interval;
this.threads = threads;

processes.processIds().forEach(id -> mappedProcesses.put(id, processes.processById(id)));

this.unitOfWorkManager = application.unitOfWorkManager();
this.auditor = auditor;
this.unitOfWorkManager = application.unitOfWorkManager();
this.auditor = auditor;

this.scheduler = new ScheduledThreadPoolExecutor(this.threads.orElse(1),
r -> new Thread(r, "automatiko-jobs-executor"));
this.loadScheduler = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "automatiko-jobs-loader"));
this.scheduler = new ScheduledThreadPoolExecutor(this.threads.orElse(1),
r -> new Thread(r, "automatiko-jobs-executor"));
this.loadScheduler = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "automatiko-jobs-loader"));
} else {
this.mongoClient = null;
this.unitOfWorkManager = null;
this.auditor = null;
this.scheduler = null;
this.loadScheduler = null;
}
}

public void start(@Observes @Priority(Interceptor.Priority.LIBRARY_AFTER) StartupEvent event) {

collection().createIndex(Indexes.ascending(INSTANCE_ID_FIELD));
collection().createIndex(Indexes.descending(FIRE_AT_FIELD));

loadScheduler.scheduleAtFixedRate(() -> {
try {
long next = LocalDateTime.now().plus(Duration.ofMinutes(interval.orElse(10L)))
.atZone(ZoneId.systemDefault()).toInstant()
.toEpochMilli();

FindIterable<Document> jobs = collection().find(lt(FIRE_AT_FIELD, next));

for (Document job : jobs) {

if (job.getString(OWNER_INSTANCE_ID_FIELD) == null) {
ProcessJobDescription description = ProcessJobDescription.of(build(job.getString(EXPRESSION_FIELD)),
null,
job.getString(OWNER_DEF_ID_FIELD));

scheduledJobs.computeIfAbsent(job.getString(INSTANCE_ID_FIELD), k -> {
return log(job.getString(INSTANCE_ID_FIELD),
scheduler.schedule(new StartProcessOnExpiredTimer(job.getString(INSTANCE_ID_FIELD),
job.getString(OWNER_DEF_ID_FIELD), -1, description),
Duration.between(LocalDateTime.now(),
ZonedDateTime.ofInstant(
Instant.ofEpochMilli(job.getLong(FIRE_AT_FIELD)),
ZoneId.systemDefault()))
.toMillis(),
TimeUnit.MILLISECONDS));
});
} else {
ProcessInstanceJobDescription description = ProcessInstanceJobDescription.of(
job.getString(INSTANCE_ID_FIELD),
job.getString(TRIGGER_TYPE_FIELD),
build(job.getString(EXPRESSION_FIELD)), job.getString(OWNER_INSTANCE_ID_FIELD),
job.getString(OWNER_DEF_ID_FIELD), null);

scheduledJobs.computeIfAbsent(job.getString(INSTANCE_ID_FIELD), k -> {
return log(job.getString(INSTANCE_ID_FIELD), scheduler.schedule(
new SignalProcessInstanceOnExpiredTimer(job.getString(INSTANCE_ID_FIELD),
job.getString(TRIGGER_TYPE_FIELD),
job.getString(OWNER_DEF_ID_FIELD),
job.getString(OWNER_INSTANCE_ID_FIELD),
job.getInteger(FIRE_LIMIT_FIELD), description),
Duration.between(LocalDateTime.now(), ZonedDateTime.ofInstant(
Instant.ofEpochMilli(job.getLong(FIRE_AT_FIELD)),
ZoneId.systemDefault())).toMillis(),
TimeUnit.MILLISECONDS));
});
if (this.mongoClient != null) {
collection().createIndex(Indexes.ascending(INSTANCE_ID_FIELD));
collection().createIndex(Indexes.descending(FIRE_AT_FIELD));

loadScheduler.scheduleAtFixedRate(() -> {
try {
long next = LocalDateTime.now().plus(Duration.ofMinutes(interval.orElse(10L)))
.atZone(ZoneId.systemDefault()).toInstant()
.toEpochMilli();

FindIterable<Document> jobs = collection().find(lt(FIRE_AT_FIELD, next));

for (Document job : jobs) {

if (job.getString(OWNER_INSTANCE_ID_FIELD) == null) {
ProcessJobDescription description = ProcessJobDescription.of(build(job.getString(EXPRESSION_FIELD)),
null,
job.getString(OWNER_DEF_ID_FIELD));

scheduledJobs.computeIfAbsent(job.getString(INSTANCE_ID_FIELD), k -> {
return log(job.getString(INSTANCE_ID_FIELD),
scheduler.schedule(new StartProcessOnExpiredTimer(job.getString(INSTANCE_ID_FIELD),
job.getString(OWNER_DEF_ID_FIELD), -1, description),
Duration.between(LocalDateTime.now(),
ZonedDateTime.ofInstant(
Instant.ofEpochMilli(job.getLong(FIRE_AT_FIELD)),
ZoneId.systemDefault()))
.toMillis(),
TimeUnit.MILLISECONDS));
});
} else {
ProcessInstanceJobDescription description = ProcessInstanceJobDescription.of(
job.getString(INSTANCE_ID_FIELD),
job.getString(TRIGGER_TYPE_FIELD),
build(job.getString(EXPRESSION_FIELD)), job.getString(OWNER_INSTANCE_ID_FIELD),
job.getString(OWNER_DEF_ID_FIELD), null);

scheduledJobs.computeIfAbsent(job.getString(INSTANCE_ID_FIELD), k -> {
return log(job.getString(INSTANCE_ID_FIELD), scheduler.schedule(
new SignalProcessInstanceOnExpiredTimer(job.getString(INSTANCE_ID_FIELD),
job.getString(TRIGGER_TYPE_FIELD),
job.getString(OWNER_DEF_ID_FIELD),
job.getString(OWNER_INSTANCE_ID_FIELD),
job.getInteger(FIRE_LIMIT_FIELD), description),
Duration.between(LocalDateTime.now(), ZonedDateTime.ofInstant(
Instant.ofEpochMilli(job.getLong(FIRE_AT_FIELD)),
ZoneId.systemDefault())).toMillis(),
TimeUnit.MILLISECONDS));
});
}
}
} catch (Exception e) {
LOGGER.error("Error while loading jobs from cassandra", e);
}
} catch (Exception e) {
LOGGER.error("Error while loading jobs from cassandra", e);
}
}, 1, interval.orElse(10L) * 60, TimeUnit.SECONDS);
}, 1, interval.orElse(10L) * 60, TimeUnit.SECONDS);
}
}

public void shutdown(@Observes ShutdownEvent event) {
this.loadScheduler.shutdownNow();
if (loadScheduler != null) {
this.loadScheduler.shutdownNow();
}

this.scheduler.shutdown();
if (scheduler != null) {
this.scheduler.shutdown();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@

public class PersistenceConfig {

/**
* Determines if persistence is enabled
*/
public Optional<Boolean> disabled() {
return Optional.empty();
}

/**
* Determines the type of persistence to be used
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import io.automatiko.engine.codegen.di.DependencyInjectionAnnotator;
import io.automatiko.engine.workflow.compiler.canonical.TriggerMetaData;
import io.automatiko.engine.workflow.process.executable.core.Metadata;

public class CodegenUtils {

Expand Down Expand Up @@ -150,7 +151,17 @@ public static boolean isEventSourceField(FieldDeclaration fd) {

public static MethodDeclaration extractOptionalInjection(String type, String fieldName, String defaultMethod,
DependencyInjectionAnnotator annotator) {
return extractOptionalInjection(type, fieldName, defaultMethod, annotator, null);
}

public static MethodDeclaration extractOptionalInjection(String type, String fieldName, String defaultMethod,
DependencyInjectionAnnotator annotator, IfStmt disabledStatement) {
BlockStmt body = new BlockStmt();

if (disabledStatement != null) {
body.addStatement(disabledStatement);
}

MethodDeclaration extractMethod = new MethodDeclaration().addModifier(Modifier.Keyword.PROTECTED)
.setName("extract_" + fieldName).setType(type).setBody(body);
Expression condition = annotator.optionalInstanceExists(fieldName);
Expand Down Expand Up @@ -219,6 +230,14 @@ public static String matchConnectorByName(String connector) {
}

public static String triggerSanitizedName(TriggerMetaData trigger, String version) {

// check if node information is available in the context and use it to extract the channel name if exists
String channel = (String) trigger.getContext(Metadata.TRIGGER_CHANNEL);
// channel is explicitly used to name the channel for incoming/outgoing annotation
if (channel != null) {
return channel;
}

return trigger.getName().replaceAll("/", "-").replaceAll("\\+", "x").replaceAll("#", "any") + version(version);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ public CompilationUnit generateCompilationUnit() {
ClassOrInterfaceDeclaration template = clazz.findFirst(ClassOrInterfaceDeclaration.class).orElseThrow(
() -> new NoSuchElementException("Compilation unit doesn't contain a class or interface declaration!"));

String category = (String) process.getMetaData().getOrDefault("category", process.getName());
String categoryDescription = (String) process.getMetaData().getOrDefault("categoryDescription",
process.getMetaData().getOrDefault("Documentation", processName).toString());

template.addAnnotation(new NormalAnnotationExpr(new Name("org.eclipse.microprofile.openapi.annotations.tags.Tag"),
NodeList.nodeList(new MemberValuePair("name", new StringLiteralExpr(category)),
new MemberValuePair("description", new StringLiteralExpr(categoryDescription)))));

template.setName(resourceClazzName);
AtomicInteger index = new AtomicInteger(0);
AtomicInteger uindex = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,11 @@ public boolean equals(Object obj) {
return false;
} else if (!trigger.getName().equals(other.trigger.getName()))
return false;
if (resourceClazzName == null) {
if (other.resourceClazzName != null)
return false;
} else if (!resourceClazzName.equals(other.resourceClazzName))
return false;
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.github.javaparser.ast.expr.ObjectCreationExpr;
import com.github.javaparser.ast.expr.ThisExpr;
import com.github.javaparser.ast.stmt.BlockStmt;
import com.github.javaparser.ast.stmt.IfStmt;
import com.github.javaparser.ast.stmt.ReturnStmt;
import com.github.javaparser.ast.type.ClassOrInterfaceType;

Expand Down Expand Up @@ -172,12 +173,15 @@ public List<BodyDeclaration<?>> members() {
members.add(extractOptionalInjection(UnitOfWorkManager.class.getCanonicalName(), VAR_UNIT_OF_WORK_MANAGER,
VAR_DEFAULT_UNIT_OF_WORK_MANAGER, annotator));
members.add(extractOptionalInjection(JobsService.class.getCanonicalName(), VAR_JOBS_SERVICE,
VAR_DEFAULT_JOBS_SEVICE, annotator));
VAR_DEFAULT_JOBS_SEVICE, annotator,
new IfStmt(new MethodCallExpr(null, "isPersistenceDisabled"), new ReturnStmt(new NullLiteralExpr()),
null)));
members.add(extractOptionalInjection(VariableInitializer.class.getCanonicalName(), VAR_VARIABLE_INITIALIZER,
VAR_DEFAULT_VARIABLE_INITIALIZER, annotator));
members.add(extractOptionalInjection("io.automatiko.engine.api.workflow.ProcessInstancesFactory",
VAR_INSTANCE_FACTORY, VAR_DEFAULT_INSTANCE_FACTORY, annotator));

VAR_INSTANCE_FACTORY, VAR_DEFAULT_INSTANCE_FACTORY, annotator,
new IfStmt(new MethodCallExpr(null, "isPersistenceDisabled"), new ReturnStmt(new NullLiteralExpr()),
null)));
members.add(generateExtractEventListenerConfigMethod());
members.add(generateMergeEventListenerConfigMethod());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,26 @@ public CompletionStage<Void> consume(Message<String> msg) {
cloudEventsExtensions(msg, event);
}

correlation = correlationEvent(event, msg);
accepted = acceptedEvent(event, msg);
if (!accepted) {
metrics.messageRejected(CONNECTOR, MESSAGE, ((io.automatiko.engine.workflow.AbstractProcess<?>)process).process());
LOGGER.debug("Message has been rejected by filter expression");
return msg.ack();
}
correlation = correlationEvent(event, msg);
} else {
eventData = convert(msg, $DataType$.class);
model = new $Type$();

correlation = correlationPayload(eventData, msg);

accepted = acceptedPayload(eventData, msg);
if (!accepted) {
metrics.messageRejected(CONNECTOR, MESSAGE, ((io.automatiko.engine.workflow.AbstractProcess<?>)process).process());
LOGGER.debug("Message has been rejected by filter expression");
return msg.ack();
}
correlation = correlationPayload(eventData, msg);
}
if (!accepted) {
metrics.messageRejected(CONNECTOR, MESSAGE, ((io.automatiko.engine.workflow.AbstractProcess<?>)process).process());
LOGGER.debug("Message has been rejected by filter expression");
return msg.ack();
}

io.automatiko.engine.services.uow.UnitOfWorkExecutor.executeInUnitOfWork(application.unitOfWorkManager(), () -> {

if (correlation != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ public CompletionStage<Void> consume(Message<$DataType$> msg) {


eventData = convert(msg, $DataType$.class);
model = new $Type$();

correlation = correlationPayload(eventData, msg);
model = new $Type$();

boolean accepted = acceptedPayload(eventData, msg);

Expand All @@ -67,6 +65,9 @@ public CompletionStage<Void> consume(Message<$DataType$> msg) {
LOGGER.debug("Message has been rejected by filter expression");
return msg.ack();
}

correlation = correlationPayload(eventData, msg);

io.automatiko.engine.services.uow.UnitOfWorkExecutor.executeInUnitOfWork(application.unitOfWorkManager(), () -> {

if (correlation != null) {
Expand Down
Loading