Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for auto discovery of Nexus Services #2239

Merged
merged 4 commits into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions temporal-spring-boot-autoconfigure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ ext {
dependencies {
api(platform("org.springframework.boot:spring-boot-dependencies:$springBootVersion"))
api(platform("io.opentelemetry:opentelemetry-bom:$otelVersion"))
implementation "io.nexusrpc:nexus-sdk:$nexusVersion"

compileOnly project(':temporal-sdk')
compileOnly project(':temporal-testing')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.spring.boot;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Enables the Nexus service bean to be discovered by the Workers auto-discovery. This annotation is
* not needed if only an explicit config is used.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface NexusServiceImpl {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't sure on the name here. This matches what we did for activity where we use ActivityImpl

/**
* @return names of Workers to register this nexus service bean with. Workers with these names
* must be present in the application config. Worker is named by its task queue if its name is
* not specified.
*/
String[] workers() default {};

/**
* @return Worker Task Queues to register this nexus service bean with. If Worker with the
* specified Task Queue is not present in the application config, it will be created with a
* default config. Can be specified as a property key, e.g.: ${propertyKey}.
*/
String[] taskQueues() default {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class WorkerProperties {
private final @Nullable String name;
private final @Nullable Collection<Class<?>> workflowClasses;
private final @Nullable Collection<String> activityBeans;
private final @Nullable Collection<String> nexusServiceBeans;
private final @Nullable CapacityConfigurationProperties capacity;
private final @Nullable RateLimitsConfigurationProperties rateLimits;
private final @Nullable BuildIdConfigurationProperties buildId;
Expand All @@ -40,13 +41,15 @@ public WorkerProperties(
@Nullable String name,
@Nullable Collection<Class<?>> workflowClasses,
@Nullable Collection<String> activityBeans,
@Nullable Collection<String> nexusServiceBeans,
@Nullable CapacityConfigurationProperties capacity,
@Nullable RateLimitsConfigurationProperties rateLimits,
@Nullable BuildIdConfigurationProperties buildId) {
this.name = name;
this.taskQueue = taskQueue;
this.workflowClasses = workflowClasses;
this.activityBeans = activityBeans;
this.nexusServiceBeans = nexusServiceBeans;
this.capacity = capacity;
this.rateLimits = rateLimits;
this.buildId = buildId;
Expand Down Expand Up @@ -87,12 +90,19 @@ public BuildIdConfigurationProperties getBuildId() {
return buildId;
}

@Nullable
public Collection<String> getNexusServiceBeans() {
return nexusServiceBeans;
}

public static class CapacityConfigurationProperties {
private final @Nullable Integer maxConcurrentWorkflowTaskExecutors;
private final @Nullable Integer maxConcurrentActivityExecutors;
private final @Nullable Integer maxConcurrentLocalActivityExecutors;
private final @Nullable Integer maxConcurrentNexusTaskExecutors;
private final @Nullable Integer maxConcurrentWorkflowTaskPollers;
private final @Nullable Integer maxConcurrentActivityTaskPollers;
private final @Nullable Integer maxConcurrentNexusTaskPollers;

/**
* @param maxConcurrentWorkflowTaskExecutors defines {@link
Expand All @@ -101,23 +111,31 @@ public static class CapacityConfigurationProperties {
* io.temporal.worker.WorkerOptions.Builder#setMaxConcurrentActivityExecutionSize(int)}
* @param maxConcurrentLocalActivityExecutors defines {@link
* io.temporal.worker.WorkerOptions.Builder#setMaxConcurrentLocalActivityExecutionSize(int)}
* @param maxConcurrentNexusTaskExecutors defines {@link
* io.temporal.worker.WorkerOptions.Builder#setMaxConcurrentNexusTaskPollers(int)} (int)}
* @param maxConcurrentWorkflowTaskPollers defines {@link
* io.temporal.worker.WorkerOptions.Builder#setMaxConcurrentWorkflowTaskPollers(int)}
* @param maxConcurrentActivityTaskPollers defines {@link
* io.temporal.worker.WorkerOptions.Builder#setMaxConcurrentActivityTaskPollers(int)}
* @param maxConcurrentNexusTaskPollers defines {@link
* io.temporal.worker.WorkerOptions.Builder#setMaxConcurrentNexusTaskPollers(int)} (int)}
*/
@ConstructorBinding
public CapacityConfigurationProperties(
@Nullable Integer maxConcurrentWorkflowTaskExecutors,
@Nullable Integer maxConcurrentActivityExecutors,
@Nullable Integer maxConcurrentLocalActivityExecutors,
@Nullable Integer maxConcurrentNexusTaskExecutors,
@Nullable Integer maxConcurrentWorkflowTaskPollers,
@Nullable Integer maxConcurrentActivityTaskPollers) {
@Nullable Integer maxConcurrentActivityTaskPollers,
@Nullable Integer maxConcurrentNexusTaskPollers) {
this.maxConcurrentWorkflowTaskExecutors = maxConcurrentWorkflowTaskExecutors;
this.maxConcurrentActivityExecutors = maxConcurrentActivityExecutors;
this.maxConcurrentLocalActivityExecutors = maxConcurrentLocalActivityExecutors;
this.maxConcurrentNexusTaskExecutors = maxConcurrentNexusTaskExecutors;
this.maxConcurrentWorkflowTaskPollers = maxConcurrentWorkflowTaskPollers;
this.maxConcurrentActivityTaskPollers = maxConcurrentActivityTaskPollers;
this.maxConcurrentNexusTaskPollers = maxConcurrentNexusTaskPollers;
}

@Nullable
Expand All @@ -135,6 +153,11 @@ public Integer getMaxConcurrentLocalActivityExecutors() {
return maxConcurrentLocalActivityExecutors;
}

@Nullable
public Integer getMaxConcurrentNexusTasksExecutors() {
return maxConcurrentNexusTaskExecutors;
}

@Nullable
public Integer getMaxConcurrentWorkflowTaskPollers() {
return maxConcurrentWorkflowTaskPollers;
Expand All @@ -144,6 +167,11 @@ public Integer getMaxConcurrentWorkflowTaskPollers() {
public Integer getMaxConcurrentActivityTaskPollers() {
return maxConcurrentActivityTaskPollers;
}

@Nullable
public Integer getMaxConcurrentNexusTaskPollers() {
return maxConcurrentNexusTaskPollers;
}
}

public static class RateLimitsConfigurationProperties {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,14 @@ WorkerOptions createWorkerOptions() {
.ifPresent(options::setMaxConcurrentActivityExecutionSize);
Optional.ofNullable(threadsConfiguration.getMaxConcurrentLocalActivityExecutors())
.ifPresent(options::setMaxConcurrentLocalActivityExecutionSize);
Optional.ofNullable(threadsConfiguration.getMaxConcurrentNexusTasksExecutors())
.ifPresent(options::setMaxConcurrentNexusExecutionSize);
Optional.ofNullable(threadsConfiguration.getMaxConcurrentWorkflowTaskPollers())
.ifPresent(options::setMaxConcurrentWorkflowTaskPollers);
Optional.ofNullable(threadsConfiguration.getMaxConcurrentActivityTaskPollers())
.ifPresent(options::setMaxConcurrentActivityTaskPollers);
Optional.ofNullable(threadsConfiguration.getMaxConcurrentNexusTaskPollers())
.ifPresent(options::setMaxConcurrentNexusTaskPollers);
}

WorkerProperties.RateLimitsConfigurationProperties rateLimitConfiguration =
Expand Down
Loading
Loading