diff --git a/build.gradle b/build.gradle index b30bfed4..36ae3def 100644 --- a/build.gradle +++ b/build.gradle @@ -20,7 +20,7 @@ ext { versions = [ awaitility : '4.2.0', commonsLang : '3.12.0', - conductor : '3.9.21-orkes', + conductor : '3.9.24-orkes', jackson : '2.11.4!!', junit : '5.9.0', slf4j : '1.7.36', @@ -47,7 +47,9 @@ dependencies { api ("io.orkes.conductor:conductor-common:${versions.conductor}") api ("io.orkes.conductor:conductor-grpc:${versions.conductor}") - api ("io.orkes.conductor:conductor-java-sdk:${versions.conductor}") + api ("io.orkes.conductor:conductor-java-sdk:${versions.conductor}") { + exclude group: 'com.netflix.conductor' + } implementation 'javax.annotation:javax.annotation-api:1.3.2' implementation "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:${versions.jackson}" @@ -98,7 +100,6 @@ dependencies { implementation "org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}!!" implementation "org.apache.logging.log4j:log4j-jul:${versions.log4j}!!" implementation "org.apache.logging.log4j:log4j-web:${versions.log4j}!!" - //implementation "org.apache.logging.log4j:log4j-to-slf4j:${versions.log4j}!!" //spring implementation "org.springframework:spring-context:5.3.24" diff --git a/src/main/java/io/orkes/conductor/client/WorkflowClient.java b/src/main/java/io/orkes/conductor/client/WorkflowClient.java index 9b443c2b..0a8655ad 100644 --- a/src/main/java/io/orkes/conductor/client/WorkflowClient.java +++ b/src/main/java/io/orkes/conductor/client/WorkflowClient.java @@ -27,6 +27,7 @@ import io.orkes.conductor.client.http.ApiException; import io.orkes.conductor.client.model.JumpWorkflowExecutionRequest; +import io.orkes.conductor.client.model.WorkflowStateUpdate; import io.orkes.conductor.client.model.WorkflowStatus; import io.orkes.conductor.common.model.WorkflowRun; @@ -96,4 +97,18 @@ public abstract Map> getWorkflowsByNamesAndCorrelationIds public abstract void jumpToTask(String workflowId, JumpWorkflowExecutionRequest jumpWorkflowExecutionRequest); public abstract void upgradeRunningWorkflow(String workflowId, UpgradeWorkflowRequest body); + + /** + * + * Update a runningw workflow by updating its variables or one of the scheduled task identified by task reference name + * @param workflowId Id of the workflow to be updated + * @param waitUntilTaskRefNames List of task reference names to wait for. The api call will wait for ANY of these tasks to be availble in workflow. + * @param waitForSeconds Maximum time to wait for. If the workflow does not complete or reach one of the tasks listed in waitUntilTaskRefNames by this time, + * the call will return with the current status of the workflow + * @param updateRequest Payload for updating state of workflow. + * + * @return + */ + public abstract WorkflowRun updateWorkflow(String workflowId, List waitUntilTaskRefNames, Integer waitForSeconds, + WorkflowStateUpdate updateRequest); } diff --git a/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java b/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java index a78fae5a..59d530d4 100644 --- a/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java +++ b/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java @@ -35,6 +35,7 @@ import io.orkes.conductor.client.http.api.WorkflowResourceApi; import io.orkes.conductor.client.model.CorrelationIdsSearchRequest; import io.orkes.conductor.client.model.JumpWorkflowExecutionRequest; +import io.orkes.conductor.client.model.WorkflowStateUpdate; import io.orkes.conductor.client.model.WorkflowStatus; import io.orkes.conductor.common.model.WorkflowRun; @@ -352,6 +353,15 @@ public void upgradeRunningWorkflow(String workflowId, UpgradeWorkflowRequest upg httpClient.upgradeRunningWorkflow(upgradeWorkflowRequest, workflowId); } + @Override + public WorkflowRun updateWorkflow(String workflowId, List waitUntilTaskRefNames, Integer waitForSeconds, WorkflowStateUpdate updateRequest) { + String joinedReferenceNames = ""; + if (waitUntilTaskRefNames != null) { + joinedReferenceNames = String.join(",", waitUntilTaskRefNames); + } + return httpClient.updateWorkflowState(updateRequest, UUID.randomUUID().toString(), workflowId, joinedReferenceNames, waitForSeconds); + } + @Override public void close() { shutdown(); diff --git a/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java b/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java index 3b6f110f..c761592f 100644 --- a/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java +++ b/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java @@ -29,6 +29,7 @@ import io.orkes.conductor.common.model.WorkflowRun; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.reflect.TypeToken; import com.squareup.okhttp.Call; public class WorkflowResourceApi { @@ -4012,4 +4013,106 @@ public com.squareup.okhttp.Response intercept(com.squareup.okhttp.Interceptor.Ch return apiClient.buildCall(localVarPath, "POST", localVarQueryParams, localVarCollectionQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarAuthNames, progressRequestListener); } + + + /** + * Update workflow and task status + * Updates the workflow variables, tasks and triggers evaluation. + * @param updateRequest (required) + * @param requestId (required) + * @param workflowId (required) + * @param waitUntilTaskRef (optional) + * @param waitForSeconds (optional, default to 10) + * @return WorkflowRun + * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body + */ + public WorkflowRun updateWorkflowState(WorkflowStateUpdate updateRequest, String requestId, String workflowId, String waitUntilTaskRef, Integer waitForSeconds) throws ApiException { + ApiResponse resp = updateWorkflowAndTaskStateWithHttpInfo(updateRequest, requestId, workflowId, waitUntilTaskRef, waitForSeconds); + return resp.getData(); + } + + /** + * Update workflow and task status + * Updates the workflow variables, tasks and triggers evaluation. + * @param body (required) + * @param requestId (required) + * @param workflowId (required) + * @param waitUntilTaskRef (optional) + * @param waitForSeconds (optional, default to 10) + * @return ApiResponse<WorkflowRun> + * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body + */ + public ApiResponse updateWorkflowAndTaskStateWithHttpInfo(WorkflowStateUpdate body, String requestId, String workflowId, String waitUntilTaskRef, Integer waitForSeconds) throws ApiException { + com.squareup.okhttp.Call call = updateWorkflowAndTaskStateValidateBeforeCall(body, requestId, workflowId, waitUntilTaskRef, waitForSeconds, null, null); + Type localVarReturnType = new TypeToken(){}.getType(); + return apiClient.execute(call, localVarReturnType); + } + + + public com.squareup.okhttp.Call updateWorkflowAndTaskStateCall(WorkflowStateUpdate body, String requestId, String workflowId, String waitUntilTaskRef, Integer waitForSeconds, final ProgressResponseBody.ProgressListener progressListener, final ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException { + Object localVarPostBody = body; + + // create path and map variables + String localVarPath = "/workflow/{workflowId}/state" + .replaceAll("\\{" + "workflowId" + "\\}", apiClient.escapeString(workflowId.toString())); + + List localVarQueryParams = new ArrayList(); + List localVarCollectionQueryParams = new ArrayList(); + if (requestId != null) + localVarQueryParams.addAll(apiClient.parameterToPair("requestId", requestId)); + if (waitUntilTaskRef != null) + localVarQueryParams.addAll(apiClient.parameterToPair("waitUntilTaskRef", waitUntilTaskRef)); + if (waitForSeconds != null) + localVarQueryParams.addAll(apiClient.parameterToPair("waitForSeconds", waitForSeconds)); + + Map localVarHeaderParams = new HashMap(); + + Map localVarFormParams = new HashMap(); + + final String[] localVarAccepts = { + "*/*" + }; + final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); + if (localVarAccept != null) localVarHeaderParams.put("Accept", localVarAccept); + + final String[] localVarContentTypes = { + "application/json" + }; + final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes); + localVarHeaderParams.put("Content-Type", localVarContentType); + + if(progressListener != null) { + apiClient.getHttpClient().networkInterceptors().add(new com.squareup.okhttp.Interceptor() { + @Override + public com.squareup.okhttp.Response intercept(com.squareup.okhttp.Interceptor.Chain chain) throws IOException { + com.squareup.okhttp.Response originalResponse = chain.proceed(chain.request()); + return originalResponse.newBuilder() + .body(new ProgressResponseBody(originalResponse.body(), progressListener)) + .build(); + } + }); + } + + String[] localVarAuthNames = new String[] { "api_key" }; + return apiClient.buildCall(localVarPath, "POST", localVarQueryParams, localVarCollectionQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarAuthNames, progressRequestListener); + } + + @SuppressWarnings("rawtypes") + private com.squareup.okhttp.Call updateWorkflowAndTaskStateValidateBeforeCall(WorkflowStateUpdate body, String requestId, String workflowId, String waitUntilTaskRef, Integer waitForSeconds, final ProgressResponseBody.ProgressListener progressListener, final ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException { + // verify the required parameter 'body' is set + if (body == null) { + throw new ApiException("Missing the required parameter 'body' when calling updateWorkflowAndTaskState(Async)"); + } + // verify the required parameter 'requestId' is set + if (requestId == null) { + throw new ApiException("Missing the required parameter 'requestId' when calling updateWorkflowAndTaskState(Async)"); + } + // verify the required parameter 'workflowId' is set + if (workflowId == null) { + throw new ApiException("Missing the required parameter 'workflowId' when calling updateWorkflowAndTaskState(Async)"); + } + + com.squareup.okhttp.Call call = updateWorkflowAndTaskStateCall(body, requestId, workflowId, waitUntilTaskRef, waitForSeconds, progressListener, progressRequestListener); + return call; + } } diff --git a/src/main/java/io/orkes/conductor/client/model/WorkflowStateUpdate.java b/src/main/java/io/orkes/conductor/client/model/WorkflowStateUpdate.java new file mode 100644 index 00000000..f374a35f --- /dev/null +++ b/src/main/java/io/orkes/conductor/client/model/WorkflowStateUpdate.java @@ -0,0 +1,26 @@ +/* + * Copyright 2024 Orkes, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.client.model; + +import java.util.Map; + +import com.netflix.conductor.common.metadata.tasks.TaskResult; + +import lombok.Data; + +@Data +public class WorkflowStateUpdate { + private String taskReferenceName; + private Map variables; + private TaskResult taskResult; +} diff --git a/src/test/java/io/orkes/conductor/client/api/EventClientTests.java b/src/test/java/io/orkes/conductor/client/api/EventClientTests.java index 897e9138..fb061c21 100644 --- a/src/test/java/io/orkes/conductor/client/api/EventClientTests.java +++ b/src/test/java/io/orkes/conductor/client/api/EventClientTests.java @@ -13,10 +13,7 @@ package io.orkes.conductor.client.api; import java.util.List; -import java.util.Map; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.jupiter.api.Test; import com.netflix.conductor.common.metadata.events.EventHandler; @@ -25,17 +22,10 @@ import io.orkes.conductor.client.EventClient; import io.orkes.conductor.client.http.ApiException; -import io.orkes.conductor.client.model.event.QueueConfiguration; -import io.orkes.conductor.client.model.event.QueueWorkerConfiguration; -import io.orkes.conductor.client.model.event.kafka.KafkaConfiguration; -import io.orkes.conductor.client.model.event.kafka.KafkaConsumer; -import io.orkes.conductor.client.model.event.kafka.KafkaProducer; import io.orkes.conductor.client.util.Commons; -import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertIterableEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; public class EventClientTests extends ClientTest { private static final String EVENT_NAME = "test_sdk_java_event_name"; @@ -73,39 +63,9 @@ void testEventHandler() { assertIterableEquals(List.of(), eventClient.getEventHandlers(EVENT, false)); } - @Test - void testKafkaQueueConfiguration() throws Exception { - QueueConfiguration queueConfiguration = getQueueConfiguration(); - eventClient.deleteQueueConfig(queueConfiguration); - assertThrows( - ApiException.class, - () -> { - eventClient.getQueueConfig(queueConfiguration); - }); - eventClient.putQueueConfig(queueConfiguration); - Map configurationResponse = eventClient.getQueueConfig(queueConfiguration); - assertTrue(configurationResponse.containsKey("consumer")); - assertTrue(configurationResponse.containsKey("producer")); - eventClient.deleteQueueConfig(queueConfiguration); - } - QueueConfiguration getQueueConfiguration() throws Exception { - return new KafkaConfiguration(KAFKA_QUEUE_TOPIC_NAME) - .withConsumer(getKafkaConsumer()) - .withProducer(getKafkaProducer()); - } - QueueWorkerConfiguration getKafkaConsumer() throws Exception { - return new KafkaConsumer(KAFKA_BOOTSTRAP_SERVERS_CONFIG) - // 1 second, instead of default 2 seconds - .withConfiguration(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000"); - } - QueueWorkerConfiguration getKafkaProducer() throws Exception { - return new KafkaProducer(KAFKA_BOOTSTRAP_SERVERS_CONFIG) - // send messages in chunks of 1024 bytes, instead of default every new data - .withConfiguration(ProducerConfig.BATCH_SIZE_CONFIG, "1024"); - } EventHandler getEventHandler() { EventHandler eventHandler = new EventHandler(); diff --git a/src/test/java/io/orkes/conductor/client/api/WorkflowStateUpdateTests.java b/src/test/java/io/orkes/conductor/client/api/WorkflowStateUpdateTests.java new file mode 100644 index 00000000..1505179f --- /dev/null +++ b/src/test/java/io/orkes/conductor/client/api/WorkflowStateUpdateTests.java @@ -0,0 +1,99 @@ +/* + * Copyright 2024 Orkes, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.client.api; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskResult; +import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; +import com.netflix.conductor.common.run.Workflow; + +import io.orkes.conductor.client.WorkflowClient; +import io.orkes.conductor.client.model.WorkflowStateUpdate; +import io.orkes.conductor.common.model.WorkflowRun; + +import lombok.SneakyThrows; + +import static org.junit.Assert.assertEquals; + +public class WorkflowStateUpdateTests extends ClientTest { + + private static WorkflowClient workflowClient; + + @BeforeAll + public static void init() { + workflowClient = orkesClients.getWorkflowClient(); + } + + @SneakyThrows + public String startWorkflow() { + StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest(); + startWorkflowRequest.setName("sync_task_variable_updates"); + startWorkflowRequest.setVersion(1); + var run = workflowClient.executeWorkflow(startWorkflowRequest, "wait_task_ref"); + return run.get(10, TimeUnit.SECONDS) + .getWorkflowId(); + } + + @Test + public void test() { + String workflowId = startWorkflow(); + System.out.println(workflowId); + + TaskResult taskResult = new TaskResult(); + taskResult.setOutputData(Map.of("a", "b")); + + WorkflowStateUpdate request = new WorkflowStateUpdate(); + request.setTaskReferenceName("wait_task_ref"); + request.setTaskResult(taskResult); + + request.setVariables(Map.of("case", "case1")); + + WorkflowRun workflowRun = workflowClient.updateWorkflow(workflowId, List.of("wait_task_ref_1", "wait_task_ref_2"), 10, request); + + System.out.println(workflowRun); + System.out.println(workflowRun.getStatus()); + System.out.println(workflowRun.getTasks() + .stream() + .map(task -> task.getReferenceTaskName() + ":" + task.getStatus()) + .collect(Collectors.toList())); + + request = new WorkflowStateUpdate(); + request.setTaskReferenceName("wait_task_ref_2"); + request.setTaskResult(taskResult); + workflowRun = workflowClient.updateWorkflow(workflowId, List.of(), 10, request); + + assertEquals(Workflow.WorkflowStatus.COMPLETED, workflowRun.getStatus()); + Set allTaskStatus = workflowRun.getTasks() + .stream() + .map(t -> t.getStatus()) + .collect(Collectors.toSet()); + assertEquals(1, allTaskStatus.size()); + assertEquals(Task.Status.COMPLETED, allTaskStatus.iterator().next()); + + System.out.println(workflowRun.getStatus()); + System.out.println(workflowRun.getTasks() + .stream() + .map(task -> task.getReferenceTaskName() + ":" + task.getStatus()) + .collect(Collectors.toList())); + + } +}