From 6a5607868dceccb419daeb3a8a7cd4282b3daaf2 Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Thu, 7 Nov 2024 16:37:38 +0800 Subject: [PATCH 1/2] support fetch alive service config from gateway --- .../meta/loadbalancer/build.gradle | 1 + .../loadbalancer/EntryPointFetcher.java | 4 +- .../loadbalancer/EntryPointInfo.java | 55 ------------------- .../components/loadbalancer/LoadBalancer.java | 5 +- .../config/LoadBalanceConfig.java | 20 ++++++- .../impl/EntryPointConfigLoader.java | 18 ++++-- .../impl/EntryPointFetcherImpl.java | 34 ++++++++++++ .../loadbalancer/impl/LoadBalancerImpl.java | 8 +-- .../scheduler/dag/worker/ModelWorker.java | 4 +- .../scheduler/dag/worker/MpcWorker.java | 4 +- .../scheduler/dag/worker/PsiWorker.java | 4 +- .../impl/dag/DagSchedulerExecutor.java | 6 +- .../executor/impl/ml/MLExecutorClient.java | 4 +- .../hook/PirServicePublishCallback.java | 4 +- wedpr-site/conf/wedpr.properties | 3 + 15 files changed, 92 insertions(+), 82 deletions(-) delete mode 100644 wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/EntryPointInfo.java create mode 100644 wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/EntryPointFetcherImpl.java diff --git a/wedpr-components/meta/loadbalancer/build.gradle b/wedpr-components/meta/loadbalancer/build.gradle index c7150cc1..53eb798b 100644 --- a/wedpr-components/meta/loadbalancer/build.gradle +++ b/wedpr-components/meta/loadbalancer/build.gradle @@ -5,5 +5,6 @@ plugins { } dependencies{ compile project(":wedpr-common-utils") + compile project(":wedpr-components-transport") compile libraries.spring, libraries.spring_boot } diff --git a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/EntryPointFetcher.java b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/EntryPointFetcher.java index ee16e314..3464155d 100644 --- a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/EntryPointFetcher.java +++ b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/EntryPointFetcher.java @@ -15,9 +15,9 @@ package com.webank.wedpr.components.loadbalancer; +import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta; import java.util.List; -// TODO: add service-discovery implementation public interface EntryPointFetcher { - List getAliveEntryPoints(String serviceName); + List getAliveEntryPoints(String serviceName); } diff --git a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/EntryPointInfo.java b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/EntryPointInfo.java deleted file mode 100644 index 797cf5a9..00000000 --- a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/EntryPointInfo.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2017-2025 [webank-wedpr] - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * - */ - -package com.webank.wedpr.components.loadbalancer; - -import com.webank.wedpr.common.utils.Common; -import com.webank.wedpr.common.utils.Constant; -import java.util.ArrayList; -import java.util.List; -import lombok.Data; -import lombok.NoArgsConstructor; -import org.apache.commons.lang3.StringUtils; - -@Data -@NoArgsConstructor -public class EntryPointInfo { - private String serviceName; - private String entryPoint; - - public EntryPointInfo(String serviceName, String entryPoint) { - this.serviceName = serviceName; - this.entryPoint = entryPoint; - } - - public String getUrl(String uriPath) { - if (StringUtils.isBlank(uriPath)) { - return Common.getUrl(entryPoint); - } - if (uriPath.startsWith(Constant.URI_SPLITER)) { - return Common.getUrl(entryPoint + uriPath); - } - return Common.getUrl(entryPoint + Constant.URI_SPLITER + uriPath); - } - - public static List toEntryPointInfo( - String serviceName, List entryPointsList) { - List result = new ArrayList<>(); - for (String entryPoint : entryPointsList) { - result.add(new EntryPointInfo(serviceName, entryPoint)); - } - return result; - } -} diff --git a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/LoadBalancer.java b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/LoadBalancer.java index cd13c6e8..34553afc 100644 --- a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/LoadBalancer.java +++ b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/LoadBalancer.java @@ -15,6 +15,7 @@ package com.webank.wedpr.components.loadbalancer; +import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta; import java.util.List; public interface LoadBalancer { @@ -23,7 +24,7 @@ public static enum Policy { HASH, } - EntryPointInfo selectService(Policy policy, String serviceType); + ServiceMeta.EntryPointMeta selectService(Policy policy, String serviceType); - List selectAllEndPoint(String serviceType); + List selectAllEndPoint(String serviceType); } diff --git a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/config/LoadBalanceConfig.java b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/config/LoadBalanceConfig.java index 65603270..67ceea2a 100644 --- a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/config/LoadBalanceConfig.java +++ b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/config/LoadBalanceConfig.java @@ -15,9 +15,15 @@ package com.webank.wedpr.components.loadbalancer.config; +import com.webank.wedpr.common.config.WeDPRConfig; import com.webank.wedpr.components.loadbalancer.LoadBalancer; import com.webank.wedpr.components.loadbalancer.impl.EntryPointConfigLoader; +import com.webank.wedpr.components.loadbalancer.impl.EntryPointFetcherImpl; import com.webank.wedpr.components.loadbalancer.impl.LoadBalancerImpl; +import com.webank.wedpr.sdk.jni.transport.WeDPRTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; @@ -26,10 +32,20 @@ @Configuration public class LoadBalanceConfig { + private static final Logger logger = LoggerFactory.getLogger(LoadBalanceConfig.class); + @Autowired private WeDPRTransport weDPRTransport; + + private static boolean DEBUG_MODE = WeDPRConfig.apply("wedpr.service.debugMode", false); + @Bean(name = "loadBalancer") @Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON) @ConditionalOnMissingBean - public LoadBalancer debugModeloadBalancer() { - return new LoadBalancerImpl(new EntryPointConfigLoader()); + public LoadBalancer loadBalancer() { + if (DEBUG_MODE) { + logger.info("Create debug mode LoadBalancerImpl, load service config from config file"); + return new LoadBalancerImpl(new EntryPointConfigLoader()); + } + logger.info("Create LoadBalancerImpl, fetch the alive node from the gateway"); + return new LoadBalancerImpl(new EntryPointFetcherImpl(weDPRTransport)); } } diff --git a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/EntryPointConfigLoader.java b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/EntryPointConfigLoader.java index 04e8f44a..b4378d99 100644 --- a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/EntryPointConfigLoader.java +++ b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/EntryPointConfigLoader.java @@ -20,7 +20,8 @@ import com.webank.wedpr.common.utils.ObjectMapperFactory; import com.webank.wedpr.common.utils.WeDPRException; import com.webank.wedpr.components.loadbalancer.EntryPointFetcher; -import com.webank.wedpr.components.loadbalancer.EntryPointInfo; +import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,7 +32,7 @@ public class EntryPointConfigLoader implements EntryPointFetcher { private static final Logger logger = LoggerFactory.getLogger(EntryPointConfigLoader.class); - private Map> alivedEntryPoints = new HashMap<>(); + private Map> alivedEntryPoints = new HashMap<>(); @SneakyThrows(Exception.class) public EntryPointConfigLoader() { @@ -52,13 +53,22 @@ public EntryPointConfigLoader() { entryPoint.getEntryPoints().size()); alivedEntryPoints.put( entryPoint.getServiceName().toLowerCase(), - EntryPointInfo.toEntryPointInfo( + EntryPointConfigLoader.toEntryPointMetaList( entryPoint.getServiceName(), entryPoint.getEntryPoints())); } } + public static List toEntryPointMetaList( + String serviceName, List entryPointsList) { + List result = new ArrayList<>(); + for (String entryPoint : entryPointsList) { + result.add(new ServiceMeta.EntryPointMeta(serviceName, entryPoint)); + } + return result; + } + @Override - public List getAliveEntryPoints(String serviceName) { + public List getAliveEntryPoints(String serviceName) { String lowerCaseServiceName = serviceName.toLowerCase(); if (alivedEntryPoints.containsKey(lowerCaseServiceName)) { return alivedEntryPoints.get(lowerCaseServiceName); diff --git a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/EntryPointFetcherImpl.java b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/EntryPointFetcherImpl.java new file mode 100644 index 00000000..7dba5e00 --- /dev/null +++ b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/EntryPointFetcherImpl.java @@ -0,0 +1,34 @@ +/* + * Copyright 2017-2025 [webank-wedpr] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package com.webank.wedpr.components.loadbalancer.impl; + +import com.webank.wedpr.components.loadbalancer.EntryPointFetcher; +import com.webank.wedpr.sdk.jni.transport.WeDPRTransport; +import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta; +import java.util.List; + +public class EntryPointFetcherImpl implements EntryPointFetcher { + private final WeDPRTransport weDPRTransport; + + public EntryPointFetcherImpl(WeDPRTransport weDPRTransport) { + this.weDPRTransport = weDPRTransport; + } + + @Override + public List getAliveEntryPoints(String serviceName) { + return this.weDPRTransport.getAliveEntryPoints(serviceName); + } +} diff --git a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/LoadBalancerImpl.java b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/LoadBalancerImpl.java index 871fba85..74788870 100644 --- a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/LoadBalancerImpl.java +++ b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/LoadBalancerImpl.java @@ -16,8 +16,8 @@ package com.webank.wedpr.components.loadbalancer.impl; import com.webank.wedpr.components.loadbalancer.EntryPointFetcher; -import com.webank.wedpr.components.loadbalancer.EntryPointInfo; import com.webank.wedpr.components.loadbalancer.LoadBalancer; +import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -31,13 +31,13 @@ public LoadBalancerImpl(EntryPointFetcher entryPointFetcher) { } @Override - public List selectAllEndPoint(String serviceType) { + public List selectAllEndPoint(String serviceType) { return entryPointFetcher.getAliveEntryPoints(serviceType); } @Override - public EntryPointInfo selectService(Policy policy, String serviceType) { - List entryPointInfoList = + public ServiceMeta.EntryPointMeta selectService(Policy policy, String serviceType) { + List entryPointInfoList = entryPointFetcher.getAliveEntryPoints(serviceType); if (entryPointInfoList == null || entryPointInfoList.isEmpty()) { return null; diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/ModelWorker.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/ModelWorker.java index 344346d3..b97848af 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/ModelWorker.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/ModelWorker.java @@ -1,12 +1,12 @@ package com.webank.wedpr.components.scheduler.dag.worker; import com.webank.wedpr.common.utils.WeDPRException; -import com.webank.wedpr.components.loadbalancer.EntryPointInfo; import com.webank.wedpr.components.loadbalancer.LoadBalancer; import com.webank.wedpr.components.scheduler.client.ModelClient; import com.webank.wedpr.components.scheduler.dag.entity.JobWorker; import com.webank.wedpr.components.scheduler.dag.utils.ServiceName; import com.webank.wedpr.components.scheduler.mapper.JobWorkerMapper; +import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +30,7 @@ public WorkerStatus engineRun() throws Exception { String workerId = getWorkerId(); String args = getArgs(); - EntryPointInfo entryPoint = + ServiceMeta.EntryPointMeta entryPoint = getLoadBalancer() .selectService( LoadBalancer.Policy.ROUND_ROBIN, ServiceName.MODEL.getValue()); diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/MpcWorker.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/MpcWorker.java index 81fd7aee..6d21ce28 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/MpcWorker.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/MpcWorker.java @@ -1,13 +1,13 @@ package com.webank.wedpr.components.scheduler.dag.worker; import com.webank.wedpr.common.utils.WeDPRException; -import com.webank.wedpr.components.loadbalancer.EntryPointInfo; import com.webank.wedpr.components.loadbalancer.LoadBalancer; import com.webank.wedpr.components.scheduler.client.MpcClient; import com.webank.wedpr.components.scheduler.dag.entity.JobWorker; import com.webank.wedpr.components.scheduler.dag.utils.ServiceName; import com.webank.wedpr.components.scheduler.executor.impl.mpc.MPCExecutorConfig; import com.webank.wedpr.components.scheduler.mapper.JobWorkerMapper; +import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +31,7 @@ public WorkerStatus engineRun() throws Exception { String workerId = getWorkerId(); String workerArgs = getArgs(); - EntryPointInfo entryPoint = + ServiceMeta.EntryPointMeta entryPoint = getLoadBalancer() .selectService(LoadBalancer.Policy.ROUND_ROBIN, ServiceName.MPC.getValue()); if (entryPoint == null) { diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/PsiWorker.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/PsiWorker.java index 48b55038..bb2a930f 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/PsiWorker.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/PsiWorker.java @@ -1,12 +1,12 @@ package com.webank.wedpr.components.scheduler.dag.worker; import com.webank.wedpr.common.utils.WeDPRException; -import com.webank.wedpr.components.loadbalancer.EntryPointInfo; import com.webank.wedpr.components.loadbalancer.LoadBalancer; import com.webank.wedpr.components.scheduler.client.PsiClient; import com.webank.wedpr.components.scheduler.dag.entity.JobWorker; import com.webank.wedpr.components.scheduler.dag.utils.ServiceName; import com.webank.wedpr.components.scheduler.mapper.JobWorkerMapper; +import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +30,7 @@ public WorkerStatus engineRun() throws Exception { String workerId = getWorkerId(); String workerArgs = getArgs(); - EntryPointInfo entryPoint = + ServiceMeta.EntryPointMeta entryPoint = getLoadBalancer() .selectService(LoadBalancer.Policy.ROUND_ROBIN, ServiceName.PSI.getValue()); if (entryPoint == null) { diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/dag/DagSchedulerExecutor.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/dag/DagSchedulerExecutor.java index 57e370e5..33461224 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/dag/DagSchedulerExecutor.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/dag/DagSchedulerExecutor.java @@ -5,7 +5,6 @@ import com.webank.wedpr.common.utils.ThreadPoolService; import com.webank.wedpr.common.utils.WeDPRException; import com.webank.wedpr.components.http.client.HttpClientImpl; -import com.webank.wedpr.components.loadbalancer.EntryPointInfo; import com.webank.wedpr.components.loadbalancer.LoadBalancer; import com.webank.wedpr.components.project.JobChecker; import com.webank.wedpr.components.project.dao.JobDO; @@ -27,6 +26,7 @@ import com.webank.wedpr.components.scheduler.workflow.WorkFlowOrchestrator; import com.webank.wedpr.components.scheduler.workflow.builder.JobWorkFlowBuilderManager; import com.webank.wedpr.components.storage.api.FileStorageInterface; +import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,14 +136,14 @@ public void kill(JobDO jobDO) throws Exception { // Note: since the job may exist in any node, establish kill command to all nodes public void killModelJob(JobDO jobDO) throws Exception { logger.info("killModelJob: {}", jobDO.getId()); - List aliveEntryPoint = + List aliveEntryPoint = loadBalancer.selectAllEndPoint(ServiceName.MODEL.getValue()); if (aliveEntryPoint == null || aliveEntryPoint.isEmpty()) { return; } boolean failed = false; String reason = ""; - for (EntryPointInfo entryPointInfo : aliveEntryPoint) { + for (ServiceMeta.EntryPointMeta entryPointInfo : aliveEntryPoint) { try { logger.info("kill job: {}, entrypoint: {}", jobDO.toString(), entryPointInfo); HttpClientImpl httpClient = diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/MLExecutorClient.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/MLExecutorClient.java index 24877679..0bd2c57d 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/MLExecutorClient.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/MLExecutorClient.java @@ -18,17 +18,17 @@ import com.webank.wedpr.common.utils.Constant; import com.webank.wedpr.common.utils.WeDPRException; import com.webank.wedpr.components.http.client.HttpClientImpl; -import com.webank.wedpr.components.loadbalancer.EntryPointInfo; import com.webank.wedpr.components.loadbalancer.LoadBalancer; import com.webank.wedpr.components.scheduler.dag.utils.ServiceName; import com.webank.wedpr.components.scheduler.executor.impl.ml.model.ModelJobResult; import com.webank.wedpr.components.scheduler.executor.impl.ml.request.GetTaskResultRequest; +import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta; public class MLExecutorClient { public static Object getJobResult(LoadBalancer loadBalancer, GetTaskResultRequest request) throws Exception { - EntryPointInfo entryPoint = + ServiceMeta.EntryPointMeta entryPoint = loadBalancer.selectService( LoadBalancer.Policy.ROUND_ROBIN, ServiceName.MODEL.getValue()); if (entryPoint == null) { diff --git a/wedpr-components/service-publish/src/main/java/com/webank/wedpr/components/publish/hook/PirServicePublishCallback.java b/wedpr-components/service-publish/src/main/java/com/webank/wedpr/components/publish/hook/PirServicePublishCallback.java index 637cffb6..97249de3 100644 --- a/wedpr-components/service-publish/src/main/java/com/webank/wedpr/components/publish/hook/PirServicePublishCallback.java +++ b/wedpr-components/service-publish/src/main/java/com/webank/wedpr/components/publish/hook/PirServicePublishCallback.java @@ -23,10 +23,10 @@ import com.webank.wedpr.components.db.mapper.service.publish.dao.ServiceAuthMapper; import com.webank.wedpr.components.hook.ServiceHook; import com.webank.wedpr.components.http.client.HttpClientImpl; -import com.webank.wedpr.components.loadbalancer.EntryPointInfo; import com.webank.wedpr.components.loadbalancer.LoadBalancer; import com.webank.wedpr.components.publish.config.ServicePublisherConfig; import com.webank.wedpr.components.publish.entity.request.PublishCreateRequest; +import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta; import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; @@ -72,7 +72,7 @@ public void batchInsertServiceAuthInfo( @Override public void onPublish(Object serviceInfo) throws Exception { PublishCreateRequest publishedServiceInfo = (PublishCreateRequest) serviceInfo; - EntryPointInfo selectedEntryPoint = + ServiceMeta.EntryPointMeta selectedEntryPoint = loadBalancer.selectService( LoadBalancer.Policy.ROUND_ROBIN, publishedServiceInfo.getServiceType()); if (selectedEntryPoint == null) { diff --git a/wedpr-site/conf/wedpr.properties b/wedpr-site/conf/wedpr.properties index fd22882a..a645ec1f 100644 --- a/wedpr-site/conf/wedpr.properties +++ b/wedpr-site/conf/wedpr.properties @@ -90,9 +90,12 @@ wedpr.transport.listen_ip=0.0.0.0 wedpr.transport.listen_port=6001 + ### the admin configuration #wedpr.admin_agency= ### ### the service configuration +wedpr.service.debugMode=false +# only used when debugMode is true #wedpr.service.entrypoints=[{"serviceName": "pir", "entryPoints": ["127.0.0.1:8101"]}] From 0478f213584b19b4f3164c9502125a052e4946f4 Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Thu, 7 Nov 2024 17:27:44 +0800 Subject: [PATCH 2/2] pir register service inforamtion --- .../wedpr/common/config/WeDPRCommonConfig.java | 15 ++++++++++++++- .../com/webank/wedpr/common/utils/Constant.java | 3 +++ .../components/initializer/WeDPRApplication.java | 14 +++++++++++--- wedpr-components/key-generator/build.gradle | 2 -- .../loadbalancer/impl/LoadBalancerImpl.java | 4 ++++ .../components/report/job/ReportQuartzJob.java | 11 +++++------ .../components/publish/sync/PublishQuartzJob.java | 4 ++-- .../plugin/pir/service/impl/PirServiceImpl.java | 15 +++++++++++---- 8 files changed, 50 insertions(+), 18 deletions(-) diff --git a/wedpr-common/utils/src/main/java/com/webank/wedpr/common/config/WeDPRCommonConfig.java b/wedpr-common/utils/src/main/java/com/webank/wedpr/common/config/WeDPRCommonConfig.java index 61a91d8c..c2589c23 100644 --- a/wedpr-common/utils/src/main/java/com/webank/wedpr/common/config/WeDPRCommonConfig.java +++ b/wedpr-common/utils/src/main/java/com/webank/wedpr/common/config/WeDPRCommonConfig.java @@ -19,11 +19,14 @@ import com.webank.wedpr.common.utils.WeDPRException; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; // Note: Since the WeDPRCommonConfig module is a reusable, it shouldn't require the config to exist // when initializing static configuration variables, // which will cause some services that do not need to set these configuration to exception. public class WeDPRCommonConfig { + private static final Logger logger = LoggerFactory.getLogger(WeDPRCommonConfig.class); private static final Integer DEFAULT_READ_TRUNK_SIZE = 1024 * 1024; private static final Integer DEFAULT_WRITE_TRUNK_SIZE = 1024 * 1024; // the agency id @@ -67,6 +70,7 @@ public class WeDPRCommonConfig { WeDPRConfig.apply("wedpr.worker.api.method.submit", "submit"); private static String SHELL_CODE_CONNECTOR = " && "; + private static String SERVER_LISTEN_PORT; @SneakyThrows public static String getAgency() { @@ -79,7 +83,7 @@ public static String getAgency() { @SneakyThrows public static String getAdminAgency() { if (StringUtils.isBlank(ADMIN_AGENCY)) { - throw new WeDPRException("Invalid emtpy agency!"); + throw new WeDPRException("Invalid emtpy admin agency!"); } return ADMIN_AGENCY; } @@ -156,4 +160,13 @@ public static String getWedprWorkerApiPath() { public static String getWedprWorkerSubmitTaskMethod() { return WEDPR_WORKER_SUBMIT_TASK_METHOD; } + + public static String getServerListenPort() { + return SERVER_LISTEN_PORT; + } + + public static void setServerListenPort(String serverListenPort) { + logger.info("setServerListenPort: {}", serverListenPort); + SERVER_LISTEN_PORT = serverListenPort; + } } diff --git a/wedpr-common/utils/src/main/java/com/webank/wedpr/common/utils/Constant.java b/wedpr-common/utils/src/main/java/com/webank/wedpr/common/utils/Constant.java index 13e04f1c..c41e65f2 100644 --- a/wedpr-common/utils/src/main/java/com/webank/wedpr/common/utils/Constant.java +++ b/wedpr-common/utils/src/main/java/com/webank/wedpr/common/utils/Constant.java @@ -82,4 +82,7 @@ public class Constant { // the pir related fields public static final String PIR_ID_FIELD_NAME = "pir_sys_id"; public static final String PIR_ID_HASH_FIELD_NAME = "pir_sys_id_hash"; + + //// the serviceType + public static final String PIR_SERVICE_TYPE = "PIR"; } diff --git a/wedpr-components/initializer/src/main/java/com/webank/wedpr/components/initializer/WeDPRApplication.java b/wedpr-components/initializer/src/main/java/com/webank/wedpr/components/initializer/WeDPRApplication.java index 711f6509..6946d666 100644 --- a/wedpr-components/initializer/src/main/java/com/webank/wedpr/components/initializer/WeDPRApplication.java +++ b/wedpr-components/initializer/src/main/java/com/webank/wedpr/components/initializer/WeDPRApplication.java @@ -15,6 +15,7 @@ package com.webank.wedpr.components.initializer; +import com.webank.wedpr.common.config.WeDPRCommonConfig; import com.webank.wedpr.common.config.WeDPRConfig; import com.webank.wedpr.common.utils.WeDPRException; import java.io.InputStream; @@ -33,6 +34,7 @@ import org.springframework.context.ApplicationListener; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.ComponentScan; +import org.springframework.core.env.Environment; @SpringBootApplication @ComponentScan(basePackages = {"com.webank"}) @@ -53,7 +55,12 @@ public void onApplicationEvent(ApplicationPreparedEvent event) { if (applicationContext == null) { applicationContext = event.getApplicationContext(); } - initWeDPRApplication(serviceName); + Environment environment = applicationContext.getEnvironment(); + String serverListenPort = environment.getProperty("server.port"); + logger.info( + "init WeDPRApplication application, listen port: {}", + serverListenPort); + initWeDPRApplication(serviceName, serverListenPort); logger.info("init WeDPRApplication application success"); } }); @@ -61,7 +68,7 @@ public void onApplicationEvent(ApplicationPreparedEvent event) { applicationContext = application.run(ArrayUtils.addAll(args, springArgs)); } - protected static void initWeDPRApplication(String serviceName) { + protected static void initWeDPRApplication(String serviceName, String serverListenPort) { try { serviceInfo = new ServiceInfo(); String appName = @@ -82,7 +89,8 @@ protected static void initWeDPRApplication(String serviceName) { } // load config loadConfig(serviceInfo.getConfigFile()); - + // Note: must load WeDPRCommonConfig after config file loaded + WeDPRCommonConfig.setServerListenPort(serverListenPort); logger.info("initWeDPRApplication for {} success", appName); } catch (Exception e) { logger.error("initWeDPRApplication failed, error: ", e); diff --git a/wedpr-components/key-generator/build.gradle b/wedpr-components/key-generator/build.gradle index 90f4b43b..8c053508 100644 --- a/wedpr-components/key-generator/build.gradle +++ b/wedpr-components/key-generator/build.gradle @@ -1,7 +1,5 @@ // Apply the java-library plugin to add support for Java Library plugins { - id 'org.springframework.boot' version '2.3.12.RELEASE' - id 'io.spring.dependency-management' version '1.0.11.RELEASE' id 'java' } diff --git a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/LoadBalancerImpl.java b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/LoadBalancerImpl.java index 74788870..94a098bc 100644 --- a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/LoadBalancerImpl.java +++ b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/LoadBalancerImpl.java @@ -20,8 +20,11 @@ import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LoadBalancerImpl implements LoadBalancer { + private static final Logger logger = LoggerFactory.getLogger(LoadBalancerImpl.class); private final EntryPointFetcher entryPointFetcher; private final AtomicInteger lastIdx = new AtomicInteger(0); @@ -50,6 +53,7 @@ public ServiceMeta.EntryPointMeta selectService(Policy policy, String serviceTyp int idx = serviceType.hashCode() % entryPointInfoList.size(); int selectedIdx = Math.max(idx, 0); lastIdx.set(selectedIdx); + logger.info("selectService: {}", entryPointInfoList.get(selectedIdx).toString()); return entryPointInfoList.get(selectedIdx); } } diff --git a/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/job/ReportQuartzJob.java b/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/job/ReportQuartzJob.java index 5b26c337..85c827db 100644 --- a/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/job/ReportQuartzJob.java +++ b/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/job/ReportQuartzJob.java @@ -46,13 +46,12 @@ public void execute(JobExecutionContext context) { @Transactional(rollbackFor = Exception.class) private void doReport() { - log.info("do report..."); + log.debug("do report..."); try { String adminAgency = WeDPRCommonConfig.getAdminAgency(); reportProjectInfo(adminAgency); reportJobInfo(adminAgency); reportJobDatasteRelationInfo(adminAgency); - // reportSysConfig(adminAgency); } catch (Exception e) { log.warn("report error", e); } @@ -62,10 +61,10 @@ private void reportSysConfig(String agency) throws JsonProcessingException { SysConfigReportMessageHandler sysConfigReportMessageHandler = new SysConfigReportMessageHandler(sysConfigMapper); List sysConfigDOList = sysConfigMapper.queryAllConfig(); - log.info("report sysConfigDOList:{}", sysConfigDOList); if (sysConfigDOList.isEmpty()) { return; } + log.info("report sysConfigDOList:{}", sysConfigDOList); byte[] payload = ObjectMapperFactory.getObjectMapper().writeValueAsBytes(sysConfigDOList); weDPRTransport.asyncSendMessageByComponent( TransportTopicEnum.SYS_CONFIG_REPORT.name(), @@ -85,10 +84,10 @@ private void reportJobDatasteRelationInfo(String agency) throws JsonProcessingEx jobDatasetDO.setReportStatus(ReportStatusEnum.NO_REPORT.getReportStatus()); jobDatasetDO.setLimitItems(Constant.DEFAULT_REPORT_PAGE_SIZE); List jobDatasetDOList = projectMapper.queryJobDatasetInfo(jobDatasetDO); - log.info("report jobDatasetDOList:{}", jobDatasetDOList); if (jobDatasetDOList.isEmpty()) { return; } + log.info("report jobDatasetDOList:{}", jobDatasetDOList); byte[] payload = ObjectMapperFactory.getObjectMapper().writeValueAsBytes(jobDatasetDOList); weDPRTransport.asyncSendMessageByComponent( TransportTopicEnum.JOB_DATASET_REPORT.name(), @@ -109,10 +108,10 @@ private void reportJobInfo(String agency) throws JsonProcessingException { jobDO.setReportStatus(ReportStatusEnum.NO_REPORT.getReportStatus()); jobDO.setLimitItems(Constant.DEFAULT_REPORT_PAGE_SIZE); List jobDOList = projectMapper.queryJobs(false, jobDO, null); - log.info("report jobDOList:{}", jobDOList); if (jobDOList.isEmpty()) { return; } + log.info("report jobDOList:{}", jobDOList); byte[] payload = ObjectMapperFactory.getObjectMapper().writeValueAsBytes(jobDOList); weDPRTransport.asyncSendMessageByComponent( TransportTopicEnum.JOB_REPORT.name(), @@ -133,10 +132,10 @@ private void reportProjectInfo(String agency) throws JsonProcessingException { projectDO.setReportStatus(ReportStatusEnum.NO_REPORT.getReportStatus()); projectDO.setLimitItems(Constant.DEFAULT_REPORT_PAGE_SIZE); List projectDOList = projectMapper.queryProject(false, projectDO); - log.info("report projectDOList:{}", projectDOList); if (projectDOList.isEmpty()) { return; } + log.info("report projectDOList:{}", projectDOList); byte[] payload = ObjectMapperFactory.getObjectMapper().writeValueAsBytes(projectDOList); weDPRTransport.asyncSendMessageByComponent( TransportTopicEnum.PROJECT_REPORT.name(), diff --git a/wedpr-components/service-publish/src/main/java/com/webank/wedpr/components/publish/sync/PublishQuartzJob.java b/wedpr-components/service-publish/src/main/java/com/webank/wedpr/components/publish/sync/PublishQuartzJob.java index 3e4aef96..4b4b2bb5 100644 --- a/wedpr-components/service-publish/src/main/java/com/webank/wedpr/components/publish/sync/PublishQuartzJob.java +++ b/wedpr-components/service-publish/src/main/java/com/webank/wedpr/components/publish/sync/PublishQuartzJob.java @@ -54,7 +54,7 @@ public ServiceInfoQueryRequest(Integer syncStatus, ServiceStatus serviceStatus) @Override public void execute(JobExecutionContext context) { - log.info("PublishQuartzJob run"); + log.debug("PublishQuartzJob run"); try { syncPublishSuccessServiceInfo(); } catch (Throwable e) { @@ -64,7 +64,7 @@ public void execute(JobExecutionContext context) { // Note: only sync the publishing success service private void syncPublishSuccessServiceInfo() { - log.info("syncPublishSuccessServiceInfo..."); + log.debug("syncPublishSuccessServiceInfo..."); ServiceInfoQueryRequest request = new ServiceInfoQueryRequest(0, ServiceStatus.PublishSuccess); request.setPageNum(1); diff --git a/wedpr-components/task-plugin/pir/src/main/java/com/webank/wedpr/components/task/plugin/pir/service/impl/PirServiceImpl.java b/wedpr-components/task-plugin/pir/src/main/java/com/webank/wedpr/components/task/plugin/pir/service/impl/PirServiceImpl.java index ca5ad356..ba628176 100644 --- a/wedpr-components/task-plugin/pir/src/main/java/com/webank/wedpr/components/task/plugin/pir/service/impl/PirServiceImpl.java +++ b/wedpr-components/task-plugin/pir/src/main/java/com/webank/wedpr/components/task/plugin/pir/service/impl/PirServiceImpl.java @@ -16,10 +16,7 @@ package com.webank.wedpr.components.task.plugin.pir.service.impl; import com.webank.wedpr.common.config.WeDPRCommonConfig; -import com.webank.wedpr.common.utils.Constant; -import com.webank.wedpr.common.utils.ThreadPoolService; -import com.webank.wedpr.common.utils.WeDPRException; -import com.webank.wedpr.common.utils.WeDPRResponse; +import com.webank.wedpr.common.utils.*; import com.webank.wedpr.components.api.credential.core.CredentialVerifier; import com.webank.wedpr.components.api.credential.core.impl.CredentialVerifierImpl; import com.webank.wedpr.components.db.mapper.dataset.mapper.DatasetMapper; @@ -49,6 +46,7 @@ import com.webank.wedpr.components.task.plugin.pir.service.PirService; import com.webank.wedpr.components.task.plugin.pir.transport.PirTopicSubscriber; import com.webank.wedpr.components.task.plugin.pir.transport.impl.PirTopicSubscriberImpl; +import com.webank.wedpr.sdk.jni.transport.TransportConfig; import com.webank.wedpr.sdk.jni.transport.WeDPRTransport; import java.util.List; import javax.annotation.PostConstruct; @@ -105,6 +103,15 @@ public void init() throws Exception { this.pirTopicSubscriber = new PirTopicSubscriberImpl( weDPRTransport, new CredentialVerifierImpl(null), pirServiceHook); + // get the access entrypoint + TransportConfig transportConfig = weDPRTransport.getTransportConfig(); + String accessEntryPoint = + Common.getUrl( + transportConfig.getSelfEndPoint().getHostIP() + + ":" + + WeDPRCommonConfig.getServerListenPort()); + this.weDPRTransport.registerService(Constant.PIR_SERVICE_TYPE, accessEntryPoint); + logger.info("PirServiceImpl, register service, accessEntryPoint: {}", accessEntryPoint); registerPublishedServices(); this.serviceAuthVerifier = new ServiceAuthVerifierImpl(serviceAuthMapper); }