diff --git a/temporal-spring-boot-autoconfigure/build.gradle b/temporal-spring-boot-autoconfigure/build.gradle index dc94277b4..af5d0afee 100644 --- a/temporal-spring-boot-autoconfigure/build.gradle +++ b/temporal-spring-boot-autoconfigure/build.gradle @@ -8,6 +8,7 @@ ext { dependencies { api(platform("org.springframework.boot:spring-boot-dependencies:$springBootVersion")) api(platform("io.opentelemetry:opentelemetry-bom:$otelVersion")) + implementation "io.nexusrpc:nexus-sdk:$nexusVersion" compileOnly project(':temporal-sdk') compileOnly project(':temporal-testing') diff --git a/temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/NexusServiceImpl.java b/temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/NexusServiceImpl.java new file mode 100644 index 000000000..b890681b1 --- /dev/null +++ b/temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/NexusServiceImpl.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.spring.boot; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Enables the Nexus service bean to be discovered by the Workers auto-discovery. This annotation is + * not needed if only an explicit config is used. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface NexusServiceImpl { + /** + * @return names of Workers to register this nexus service bean with. Workers with these names + * must be present in the application config. Worker is named by its task queue if its name is + * not specified. + */ + String[] workers() default {}; + + /** + * @return Worker Task Queues to register this nexus service bean with. If Worker with the + * specified Task Queue is not present in the application config, it will be created with a + * default config. Can be specified as a property key, e.g.: ${propertyKey}. + */ + String[] taskQueues() default {}; +} diff --git a/temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/properties/WorkerProperties.java b/temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/properties/WorkerProperties.java index f387b6949..73fe94e71 100644 --- a/temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/properties/WorkerProperties.java +++ b/temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/properties/WorkerProperties.java @@ -30,6 +30,7 @@ public class WorkerProperties { private final @Nullable String name; private final @Nullable Collection> workflowClasses; private final @Nullable Collection activityBeans; + private final @Nullable Collection nexusServiceBeans; private final @Nullable CapacityConfigurationProperties capacity; private final @Nullable RateLimitsConfigurationProperties rateLimits; private final @Nullable BuildIdConfigurationProperties buildId; @@ -40,6 +41,7 @@ public WorkerProperties( @Nullable String name, @Nullable Collection> workflowClasses, @Nullable Collection activityBeans, + @Nullable Collection nexusServiceBeans, @Nullable CapacityConfigurationProperties capacity, @Nullable RateLimitsConfigurationProperties rateLimits, @Nullable BuildIdConfigurationProperties buildId) { @@ -47,6 +49,7 @@ public WorkerProperties( this.taskQueue = taskQueue; this.workflowClasses = workflowClasses; this.activityBeans = activityBeans; + this.nexusServiceBeans = nexusServiceBeans; this.capacity = capacity; this.rateLimits = rateLimits; this.buildId = buildId; @@ -87,12 +90,19 @@ public BuildIdConfigurationProperties getBuildId() { return buildId; } + @Nullable + public Collection getNexusServiceBeans() { + return nexusServiceBeans; + } + public static class CapacityConfigurationProperties { private final @Nullable Integer maxConcurrentWorkflowTaskExecutors; private final @Nullable Integer maxConcurrentActivityExecutors; private final @Nullable Integer maxConcurrentLocalActivityExecutors; + private final @Nullable Integer maxConcurrentNexusTaskExecutors; private final @Nullable Integer maxConcurrentWorkflowTaskPollers; private final @Nullable Integer maxConcurrentActivityTaskPollers; + private final @Nullable Integer maxConcurrentNexusTaskPollers; /** * @param maxConcurrentWorkflowTaskExecutors defines {@link @@ -101,23 +111,31 @@ public static class CapacityConfigurationProperties { * io.temporal.worker.WorkerOptions.Builder#setMaxConcurrentActivityExecutionSize(int)} * @param maxConcurrentLocalActivityExecutors defines {@link * io.temporal.worker.WorkerOptions.Builder#setMaxConcurrentLocalActivityExecutionSize(int)} + * @param maxConcurrentNexusTaskExecutors defines {@link + * io.temporal.worker.WorkerOptions.Builder#setMaxConcurrentNexusTaskPollers(int)} (int)} * @param maxConcurrentWorkflowTaskPollers defines {@link * io.temporal.worker.WorkerOptions.Builder#setMaxConcurrentWorkflowTaskPollers(int)} * @param maxConcurrentActivityTaskPollers defines {@link * io.temporal.worker.WorkerOptions.Builder#setMaxConcurrentActivityTaskPollers(int)} + * @param maxConcurrentNexusTaskPollers defines {@link + * io.temporal.worker.WorkerOptions.Builder#setMaxConcurrentNexusTaskPollers(int)} (int)} */ @ConstructorBinding public CapacityConfigurationProperties( @Nullable Integer maxConcurrentWorkflowTaskExecutors, @Nullable Integer maxConcurrentActivityExecutors, @Nullable Integer maxConcurrentLocalActivityExecutors, + @Nullable Integer maxConcurrentNexusTaskExecutors, @Nullable Integer maxConcurrentWorkflowTaskPollers, - @Nullable Integer maxConcurrentActivityTaskPollers) { + @Nullable Integer maxConcurrentActivityTaskPollers, + @Nullable Integer maxConcurrentNexusTaskPollers) { this.maxConcurrentWorkflowTaskExecutors = maxConcurrentWorkflowTaskExecutors; this.maxConcurrentActivityExecutors = maxConcurrentActivityExecutors; this.maxConcurrentLocalActivityExecutors = maxConcurrentLocalActivityExecutors; + this.maxConcurrentNexusTaskExecutors = maxConcurrentNexusTaskExecutors; this.maxConcurrentWorkflowTaskPollers = maxConcurrentWorkflowTaskPollers; this.maxConcurrentActivityTaskPollers = maxConcurrentActivityTaskPollers; + this.maxConcurrentNexusTaskPollers = maxConcurrentNexusTaskPollers; } @Nullable @@ -135,6 +153,11 @@ public Integer getMaxConcurrentLocalActivityExecutors() { return maxConcurrentLocalActivityExecutors; } + @Nullable + public Integer getMaxConcurrentNexusTasksExecutors() { + return maxConcurrentNexusTaskExecutors; + } + @Nullable public Integer getMaxConcurrentWorkflowTaskPollers() { return maxConcurrentWorkflowTaskPollers; @@ -144,6 +167,11 @@ public Integer getMaxConcurrentWorkflowTaskPollers() { public Integer getMaxConcurrentActivityTaskPollers() { return maxConcurrentActivityTaskPollers; } + + @Nullable + public Integer getMaxConcurrentNexusTaskPollers() { + return maxConcurrentNexusTaskPollers; + } } public static class RateLimitsConfigurationProperties { diff --git a/temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/template/WorkerOptionsTemplate.java b/temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/template/WorkerOptionsTemplate.java index 473de302e..56a37deb4 100644 --- a/temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/template/WorkerOptionsTemplate.java +++ b/temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/template/WorkerOptionsTemplate.java @@ -58,10 +58,14 @@ WorkerOptions createWorkerOptions() { .ifPresent(options::setMaxConcurrentActivityExecutionSize); Optional.ofNullable(threadsConfiguration.getMaxConcurrentLocalActivityExecutors()) .ifPresent(options::setMaxConcurrentLocalActivityExecutionSize); + Optional.ofNullable(threadsConfiguration.getMaxConcurrentNexusTasksExecutors()) + .ifPresent(options::setMaxConcurrentNexusExecutionSize); Optional.ofNullable(threadsConfiguration.getMaxConcurrentWorkflowTaskPollers()) .ifPresent(options::setMaxConcurrentWorkflowTaskPollers); Optional.ofNullable(threadsConfiguration.getMaxConcurrentActivityTaskPollers()) .ifPresent(options::setMaxConcurrentActivityTaskPollers); + Optional.ofNullable(threadsConfiguration.getMaxConcurrentNexusTaskPollers()) + .ifPresent(options::setMaxConcurrentNexusTaskPollers); } WorkerProperties.RateLimitsConfigurationProperties rateLimitConfiguration = diff --git a/temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/template/WorkersTemplate.java b/temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/template/WorkersTemplate.java index 753662298..210506bb8 100644 --- a/temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/template/WorkersTemplate.java +++ b/temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/template/WorkersTemplate.java @@ -21,6 +21,8 @@ package io.temporal.spring.boot.autoconfigure.template; import com.google.common.base.Preconditions; +import io.nexusrpc.ServiceDefinition; +import io.nexusrpc.handler.ServiceImplInstance; import io.opentracing.Tracer; import io.temporal.client.WorkflowClient; import io.temporal.common.Experimental; @@ -28,6 +30,7 @@ import io.temporal.common.metadata.POJOWorkflowImplMetadata; import io.temporal.common.metadata.POJOWorkflowMethodMetadata; import io.temporal.spring.boot.ActivityImpl; +import io.temporal.spring.boot.NexusServiceImpl; import io.temporal.spring.boot.TemporalOptionsCustomizer; import io.temporal.spring.boot.WorkflowImpl; import io.temporal.spring.boot.autoconfigure.properties.NamespaceProperties; @@ -151,13 +154,17 @@ private Collection createWorkers(WorkerFactory workerFactory) { Collection> autoDiscoveredWorkflowImplementationClasses = autoDiscoverWorkflowImplementations(); Map autoDiscoveredActivityBeans = autoDiscoverActivityBeans(); + Map autoDiscoveredNexusServiceBeans = autoDiscoverNexusServiceBeans(); configureWorkflowImplementationsByTaskQueue( workerFactory, workers, autoDiscoveredWorkflowImplementationClasses); configureActivityBeansByTaskQueue(workerFactory, workers, autoDiscoveredActivityBeans); + configureNexusServiceBeansByTaskQueue( + workerFactory, workers, autoDiscoveredNexusServiceBeans); configureWorkflowImplementationsByWorkerName( workers, autoDiscoveredWorkflowImplementationClasses); configureActivityBeansByWorkerName(workers, autoDiscoveredActivityBeans); + configureNexusServiceBeansByWorkerName(workers, autoDiscoveredNexusServiceBeans); } return workers.getWorkers(); @@ -215,6 +222,35 @@ private void configureActivityBeansByTaskQueue( }); } + private void configureNexusServiceBeansByTaskQueue( + WorkerFactory workerFactory, + Workers workers, + Map autoDiscoveredNexusServiceBeans) { + autoDiscoveredNexusServiceBeans.forEach( + (beanName, bean) -> { + Class targetClass = AopUtils.getTargetClass(bean); + NexusServiceImpl annotation = + AnnotationUtils.findAnnotation(targetClass, NexusServiceImpl.class); + if (annotation != null) { + for (String taskQueue : annotation.taskQueues()) { + taskQueue = environment.resolvePlaceholders(taskQueue); + Worker worker = workerFactory.tryGetWorker(taskQueue); + if (worker == null) { + log.info( + "Creating a worker with default settings for a task queue '{}' " + + "caused by an auto-discovered nexus service class {}", + taskQueue, + targetClass); + worker = createNewWorker(taskQueue, null, workers); + } + + configureNexusServiceImplementationAutoDiscovery( + worker, bean, beanName, targetClass, null, workers); + } + } + }); + } + private void configureWorkflowImplementationsByWorkerName( Workers workers, Collection> autoDiscoveredWorkflowImplementationClasses) { for (Class clazz : autoDiscoveredWorkflowImplementationClasses) { @@ -259,6 +295,31 @@ private void configureActivityBeansByWorkerName( }); } + private void configureNexusServiceBeansByWorkerName( + Workers workers, Map autoDiscoveredNexusServiceBeans) { + autoDiscoveredNexusServiceBeans.forEach( + (beanName, bean) -> { + Class targetClass = AopUtils.getTargetClass(bean); + NexusServiceImpl annotation = + AnnotationUtils.findAnnotation(targetClass, NexusServiceImpl.class); + if (annotation != null) { + for (String workerName : annotation.workers()) { + Worker worker = workers.getByName(workerName); + if (worker == null) { + throw new BeanDefinitionValidationException( + "Worker with name " + + workerName + + " is not found in the config, but is referenced by auto-discovered nexus service bean " + + beanName); + } + + configureNexusServiceImplementationAutoDiscovery( + worker, bean, beanName, targetClass, workerName, workers); + } + } + }); + } + private Collection> autoDiscoverWorkflowImplementations() { ClassPathScanningCandidateComponentProvider scanner = new ClassPathScanningCandidateComponentProvider(false); @@ -282,6 +343,10 @@ private Map autoDiscoverActivityBeans() { return beanFactory.getBeansWithAnnotation(ActivityImpl.class); } + private Map autoDiscoverNexusServiceBeans() { + return beanFactory.getBeansWithAnnotation(NexusServiceImpl.class); + } + private void createWorkerFromAnExplicitConfig( WorkerFactory workerFactory, WorkerProperties workerProperties, Workers workers) { String taskQueue = workerProperties.getTaskQueue(); @@ -321,6 +386,25 @@ private void createWorkerFromAnExplicitConfig( worker, beanName, bean.getClass().getName(), activityImplMetadata); }); } + + Collection nexusServiceBeans = workerProperties.getNexusServiceBeans(); + if (nexusServiceBeans != null) { + nexusServiceBeans.forEach( + beanName -> { + Object bean = beanFactory.getBean(beanName); + log.info( + "Registering configured nexus service bean '{}' of a {} class on task queue '{}'", + beanName, + AopUtils.getTargetClass(bean), + taskQueue); + worker.registerNexusServiceImplementation(bean); + addRegisteredNexusServiceImpl( + worker, + beanName, + bean.getClass().getName(), + ServiceImplInstance.fromInstance(AopUtils.getTargetClass(bean)).getDefinition()); + }); + } } private void configureActivityImplementationAutoDiscovery( @@ -357,6 +441,42 @@ private void configureActivityImplementationAutoDiscovery( } } + private void configureNexusServiceImplementationAutoDiscovery( + Worker worker, + Object bean, + String beanName, + Class targetClass, + String byWorkerName, + Workers workers) { + try { + worker.registerNexusServiceImplementation(bean); + addRegisteredNexusServiceImpl( + worker, + beanName, + bean.getClass().getName(), + ServiceImplInstance.fromInstance(bean).getDefinition()); + if (log.isInfoEnabled()) { + log.info( + "Registering auto-discovered nexus service bean '{}' of class {} on a worker {} with a task queue '{}'", + beanName, + targetClass, + byWorkerName != null ? "'" + byWorkerName + "' " : "", + worker.getTaskQueue()); + } + } catch (TypeAlreadyRegisteredException registeredEx) { + if (log.isInfoEnabled()) { + log.info( + "Skipping auto-discovered nexus service bean '{}' of class {} on a worker {} with a task queue '{}'" + + " as nexus service type '{}' is already registered on the worker", + beanName, + targetClass, + byWorkerName != null ? "'" + byWorkerName + "' " : "", + worker.getTaskQueue(), + registeredEx.getRegisteredTypeName()); + } + } + } + private void configureWorkflowImplementationAutoDiscovery( Worker worker, Class clazz, String byWorkerName, Workers workers) { try { @@ -475,15 +595,44 @@ private void addRegisteredActivityImpl( } } + private void addRegisteredNexusServiceImpl( + Worker worker, String beanName, String beanClass, ServiceDefinition serviceDefinition) { + if (!registeredInfo.containsKey(worker.getTaskQueue())) { + registeredInfo.put( + worker.getTaskQueue(), + new RegisteredInfo() + .addNexusServiceInfo( + new RegisteredNexusServiceInfo() + .addBeanName(beanName) + .addClassName(beanClass) + .addDefinition(serviceDefinition))); + } else { + registeredInfo + .get(worker.getTaskQueue()) + .getRegisteredNexusServiceInfos() + .add( + new RegisteredNexusServiceInfo() + .addBeanName(beanName) + .addClassName(beanClass) + .addDefinition(serviceDefinition)); + } + } + public static class RegisteredInfo { private final List registeredActivityInfo = new ArrayList<>(); private final List registeredWorkflowInfo = new ArrayList<>(); + private final List registeredNexusServiceInfos = new ArrayList<>(); public RegisteredInfo addActivityInfo(RegisteredActivityInfo activityInfo) { registeredActivityInfo.add(activityInfo); return this; } + public RegisteredInfo addNexusServiceInfo(RegisteredNexusServiceInfo nexusServiceInfo) { + registeredNexusServiceInfos.add(nexusServiceInfo); + return this; + } + public RegisteredInfo addWorkflowInfo(RegisteredWorkflowInfo workflowInfo) { registeredWorkflowInfo.add(workflowInfo); return this; @@ -496,6 +645,10 @@ public List getRegisteredActivityInfo() { public List getRegisteredWorkflowInfo() { return registeredWorkflowInfo; } + + public List getRegisteredNexusServiceInfos() { + return registeredNexusServiceInfos; + } } @Experimental @@ -532,6 +685,40 @@ public POJOActivityImplMetadata getMetadata() { } } + @Experimental + public static class RegisteredNexusServiceInfo { + private String beanName; + private String className; + private ServiceDefinition definition; + + public RegisteredNexusServiceInfo addClassName(String className) { + this.className = className; + return this; + } + + public RegisteredNexusServiceInfo addBeanName(String beanName) { + this.beanName = beanName; + return this; + } + + public RegisteredNexusServiceInfo addDefinition(ServiceDefinition definition) { + this.definition = definition; + return this; + } + + public String getClassName() { + return className; + } + + public String getBeanName() { + return beanName; + } + + public ServiceDefinition getDefinition() { + return definition; + } + } + @Experimental public static class RegisteredWorkflowInfo { private String className; diff --git a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/AutoDiscoveryByTaskQueueResolverTest.java b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/AutoDiscoveryByTaskQueueResolverTest.java index 484c75f4e..c0cbb42d4 100644 --- a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/AutoDiscoveryByTaskQueueResolverTest.java +++ b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/AutoDiscoveryByTaskQueueResolverTest.java @@ -20,14 +20,13 @@ package io.temporal.spring.boot.autoconfigure; +import io.temporal.api.nexus.v1.Endpoint; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.spring.boot.autoconfigure.bytaskqueue.TestWorkflow; +import io.temporal.testing.TestWorkflowEnvironment; import io.temporal.worker.WorkerFactory; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.ConfigurableApplicationContext; @@ -43,11 +42,23 @@ public class AutoDiscoveryByTaskQueueResolverTest { @Autowired WorkflowClient workflowClient; + @Autowired TestWorkflowEnvironment testWorkflowEnvironment; + @Autowired WorkerFactory workerFactory; + Endpoint endpoint; + @BeforeEach void setUp() { applicationContext.start(); + endpoint = + testWorkflowEnvironment.createNexusEndpoint( + "AutoDiscoveryByTaskQueueEndpoint", "PropertyResolverTest"); + } + + @AfterEach + void tearDown() { + testWorkflowEnvironment.deleteNexusEndpoint(endpoint); } @Test @@ -57,7 +68,7 @@ public void testAutoDiscovery() { workflowClient.newWorkflowStub( TestWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue("PropertyResolverTest").build()); - testWorkflow.execute("input"); + testWorkflow.execute("nexus"); } @ComponentScan( diff --git a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/AutoDiscoveryByTaskQueueTest.java b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/AutoDiscoveryByTaskQueueTest.java index 757129166..24edf2c5a 100644 --- a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/AutoDiscoveryByTaskQueueTest.java +++ b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/AutoDiscoveryByTaskQueueTest.java @@ -20,14 +20,12 @@ package io.temporal.spring.boot.autoconfigure; +import io.temporal.api.nexus.v1.Endpoint; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.spring.boot.autoconfigure.bytaskqueue.TestWorkflow; import io.temporal.testing.TestWorkflowEnvironment; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.ConfigurableApplicationContext; @@ -44,10 +42,18 @@ public class AutoDiscoveryByTaskQueueTest { @Autowired TestWorkflowEnvironment testWorkflowEnvironment; @Autowired WorkflowClient workflowClient; + Endpoint endpoint; @BeforeEach void setUp() { applicationContext.start(); + endpoint = + testWorkflowEnvironment.createNexusEndpoint("AutoDiscoveryByTaskQueueEndpoint", "UnitTest"); + } + + @AfterEach + void tearDown() { + testWorkflowEnvironment.deleteNexusEndpoint(endpoint); } @Test @@ -56,7 +62,7 @@ public void testAutoDiscovery() { TestWorkflow testWorkflow = workflowClient.newWorkflowStub( TestWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue("UnitTest").build()); - testWorkflow.execute("input"); + testWorkflow.execute("nexus"); } @ComponentScan( diff --git a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/AutoDiscoveryByWorkerNameTest.java b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/AutoDiscoveryByWorkerNameTest.java index b6930b18c..98b1a4de5 100644 --- a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/AutoDiscoveryByWorkerNameTest.java +++ b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/AutoDiscoveryByWorkerNameTest.java @@ -20,14 +20,12 @@ package io.temporal.spring.boot.autoconfigure; +import io.temporal.api.nexus.v1.Endpoint; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.spring.boot.autoconfigure.bytaskqueue.TestWorkflow; import io.temporal.testing.TestWorkflowEnvironment; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.ConfigurableApplicationContext; @@ -44,10 +42,19 @@ public class AutoDiscoveryByWorkerNameTest { @Autowired TestWorkflowEnvironment testWorkflowEnvironment; @Autowired WorkflowClient workflowClient; + Endpoint endpoint; @BeforeEach void setUp() { applicationContext.start(); + endpoint = + testWorkflowEnvironment.createNexusEndpoint( + "AutoDiscoveryByWorkerNameTestEndpoint", "UnitTest"); + } + + @AfterEach + void tearDown() { + testWorkflowEnvironment.deleteNexusEndpoint(endpoint); } @Test @@ -56,7 +63,7 @@ public void testAutoDiscovery() { TestWorkflow testWorkflow = workflowClient.newWorkflowStub( TestWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue("UnitTest").build()); - testWorkflow.execute("input"); + testWorkflow.execute("nexus"); } @ComponentScan( diff --git a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/OptionalWorkerOptionsTest.java b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/OptionalWorkerOptionsTest.java index 9884742f3..85f40d638 100644 --- a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/OptionalWorkerOptionsTest.java +++ b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/OptionalWorkerOptionsTest.java @@ -113,6 +113,10 @@ public TemporalOptionsCustomizer workerCustomizer() { 1, options.getMaxConcurrentLocalActivityExecutionSize(), "Values from the Spring Config should be respected"); + assertEquals( + 1, + options.getMaxConcurrentNexusExecutionSize(), + "Values from the Spring Config should be respected"); assertEquals( 1, @@ -122,6 +126,10 @@ public TemporalOptionsCustomizer workerCustomizer() { 1, options.getMaxConcurrentActivityTaskPollers(), "Values from the Spring Config should be respected"); + assertEquals( + 1, + options.getMaxConcurrentNexusTaskPollers(), + "Values from the Spring Config should be respected"); assertEquals( 1.0, diff --git a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/RegisteredInfoTest.java b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/RegisteredInfoTest.java index e1a666ff5..d26e55db5 100644 --- a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/RegisteredInfoTest.java +++ b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/RegisteredInfoTest.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.*; +import io.nexusrpc.ServiceDefinition; import io.temporal.common.metadata.POJOActivityImplMetadata; import io.temporal.common.metadata.POJOWorkflowImplMetadata; import io.temporal.spring.boot.autoconfigure.template.WorkersTemplate; @@ -95,6 +96,19 @@ public void testRegisteredInfo() { assertEquals( "execute", metadata.getActivityMethods().get(0).getMethod().getName()); }); + + info.getRegisteredNexusServiceInfos() + .forEach( + (nexusServiceInfo) -> { + assertEquals( + "io.temporal.spring.boot.autoconfigure.bytaskqueue.TestNexusServiceImpl", + nexusServiceInfo.getClassName()); + assertEquals("TestNexusServiceImpl", nexusServiceInfo.getBeanName()); + ServiceDefinition def = nexusServiceInfo.getDefinition(); + assertEquals("TestNexusService", def.getName()); + assertEquals(1, def.getOperations().size()); + assertEquals("operation", def.getOperations().get("operation").getName()); + }); }); } diff --git a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/bytaskqueue/TestNexusService.java b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/bytaskqueue/TestNexusService.java new file mode 100644 index 000000000..ddda00320 --- /dev/null +++ b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/bytaskqueue/TestNexusService.java @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.spring.boot.autoconfigure.bytaskqueue; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; + +@Service +public interface TestNexusService { + @Operation + String operation(String input); +} diff --git a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/bytaskqueue/TestNexusServiceImpl.java b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/bytaskqueue/TestNexusServiceImpl.java new file mode 100644 index 000000000..16c0ff642 --- /dev/null +++ b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/bytaskqueue/TestNexusServiceImpl.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.spring.boot.autoconfigure.bytaskqueue; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.spring.boot.NexusServiceImpl; +import org.springframework.stereotype.Component; + +@Component("TestNexusServiceImpl") +@NexusServiceImpl(taskQueues = "${default-queue.name:UnitTest}") +@ServiceImpl(service = TestNexusService.class) +public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + // Implemented inline + return OperationHandler.sync((ctx, details, name) -> "Hello, " + name + "!"); + } +} diff --git a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/bytaskqueue/TestWorkflowImpl.java b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/bytaskqueue/TestWorkflowImpl.java index 83f4c898b..c7a39a212 100644 --- a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/bytaskqueue/TestWorkflowImpl.java +++ b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/bytaskqueue/TestWorkflowImpl.java @@ -22,6 +22,9 @@ import io.temporal.activity.ActivityOptions; import io.temporal.spring.boot.WorkflowImpl; +import io.temporal.spring.boot.autoconfigure.byworkername.TestNexusService; +import io.temporal.workflow.NexusOperationOptions; +import io.temporal.workflow.NexusServiceOptions; import io.temporal.workflow.Workflow; import java.time.Duration; @@ -29,6 +32,18 @@ public class TestWorkflowImpl implements TestWorkflow { @Override public String execute(String input) { + if (input.equals("nexus")) { + Workflow.newNexusServiceStub( + TestNexusService.class, + NexusServiceOptions.newBuilder() + .setEndpoint("AutoDiscoveryByTaskQueueEndpoint") + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()) + .build()) + .operation(input); + } return Workflow.newActivityStub( TestActivity.class, ActivityOptions.newBuilder() diff --git a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/byworkername/TestNexusService.java b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/byworkername/TestNexusService.java new file mode 100644 index 000000000..c2cba39be --- /dev/null +++ b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/byworkername/TestNexusService.java @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.spring.boot.autoconfigure.byworkername; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; + +@Service +public interface TestNexusService { + @Operation + String operation(String input); +} diff --git a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/byworkername/TestNexusServiceImpl.java b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/byworkername/TestNexusServiceImpl.java new file mode 100644 index 000000000..c78e0646a --- /dev/null +++ b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/byworkername/TestNexusServiceImpl.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.spring.boot.autoconfigure.byworkername; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.spring.boot.NexusServiceImpl; +import io.temporal.spring.boot.autoconfigure.bytaskqueue.TestNexusService; +import org.springframework.stereotype.Component; + +@Component("TestNexusServiceImpl") +@NexusServiceImpl(workers = "mainWorker") +@ServiceImpl(service = TestNexusService.class) +public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + // Implemented inline + return OperationHandler.sync((ctx, details, name) -> "Hello, " + name + "!"); + } +} diff --git a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/byworkername/TestWorkflowImpl.java b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/byworkername/TestWorkflowImpl.java index 2dce1ff4e..eae95af3f 100644 --- a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/byworkername/TestWorkflowImpl.java +++ b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/byworkername/TestWorkflowImpl.java @@ -22,6 +22,8 @@ import io.temporal.activity.ActivityOptions; import io.temporal.spring.boot.WorkflowImpl; +import io.temporal.workflow.NexusOperationOptions; +import io.temporal.workflow.NexusServiceOptions; import io.temporal.workflow.Workflow; import java.time.Duration; @@ -29,6 +31,18 @@ public class TestWorkflowImpl implements TestWorkflow { @Override public String execute(String input) { + if (input.equals("nexus")) { + Workflow.newNexusServiceStub( + TestNexusService.class, + NexusServiceOptions.newBuilder() + .setEndpoint("AutoDiscoveryByWorkerNameTestEndpoint") + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()) + .build()) + .operation(input); + } return Workflow.newActivityStub( TestActivity.class, ActivityOptions.newBuilder() diff --git a/temporal-spring-boot-autoconfigure/src/test/resources/application.yml b/temporal-spring-boot-autoconfigure/src/test/resources/application.yml index d8a2e45a5..7f39c14d3 100644 --- a/temporal-spring-boot-autoconfigure/src/test/resources/application.yml +++ b/temporal-spring-boot-autoconfigure/src/test/resources/application.yml @@ -85,10 +85,12 @@ spring: - task-queue: UnitTest capacity: max-concurrent-workflow-task-executors: 1 + max-concurrent-nexus-task-executors: 1 max-concurrent-activity-executors: 1 max-concurrent-local-activity-executors: 1 max-concurrent-workflow-task-pollers: 1 max-concurrent-activity-task-pollers: 1 + max-concurrent-nexus-task-pollers: 1 rate-limits: max-worker-activities-per-second: 1.0 max-task-queue-activities-per-second: 1.0