Skip to content

Commit

Permalink
support fetch alive service config from gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Nov 7, 2024
1 parent 35777ad commit 6a56078
Show file tree
Hide file tree
Showing 15 changed files with 92 additions and 82 deletions.
1 change: 1 addition & 0 deletions wedpr-components/meta/loadbalancer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ plugins {
}
dependencies{
compile project(":wedpr-common-utils")
compile project(":wedpr-components-transport")
compile libraries.spring, libraries.spring_boot
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

package com.webank.wedpr.components.loadbalancer;

import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import java.util.List;

// TODO: add service-discovery implementation
public interface EntryPointFetcher {
List<EntryPointInfo> getAliveEntryPoints(String serviceName);
List<ServiceMeta.EntryPointMeta> getAliveEntryPoints(String serviceName);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.webank.wedpr.components.loadbalancer;

import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import java.util.List;

public interface LoadBalancer {
Expand All @@ -23,7 +24,7 @@ public static enum Policy {
HASH,
}

EntryPointInfo selectService(Policy policy, String serviceType);
ServiceMeta.EntryPointMeta selectService(Policy policy, String serviceType);

List<EntryPointInfo> selectAllEndPoint(String serviceType);
List<ServiceMeta.EntryPointMeta> selectAllEndPoint(String serviceType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@

package com.webank.wedpr.components.loadbalancer.config;

import com.webank.wedpr.common.config.WeDPRConfig;
import com.webank.wedpr.components.loadbalancer.LoadBalancer;
import com.webank.wedpr.components.loadbalancer.impl.EntryPointConfigLoader;
import com.webank.wedpr.components.loadbalancer.impl.EntryPointFetcherImpl;
import com.webank.wedpr.components.loadbalancer.impl.LoadBalancerImpl;
import com.webank.wedpr.sdk.jni.transport.WeDPRTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
Expand All @@ -26,10 +32,20 @@

@Configuration
public class LoadBalanceConfig {
private static final Logger logger = LoggerFactory.getLogger(LoadBalanceConfig.class);
@Autowired private WeDPRTransport weDPRTransport;

private static boolean DEBUG_MODE = WeDPRConfig.apply("wedpr.service.debugMode", false);

@Bean(name = "loadBalancer")
@Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
@ConditionalOnMissingBean
public LoadBalancer debugModeloadBalancer() {
return new LoadBalancerImpl(new EntryPointConfigLoader());
public LoadBalancer loadBalancer() {
if (DEBUG_MODE) {
logger.info("Create debug mode LoadBalancerImpl, load service config from config file");
return new LoadBalancerImpl(new EntryPointConfigLoader());
}
logger.info("Create LoadBalancerImpl, fetch the alive node from the gateway");
return new LoadBalancerImpl(new EntryPointFetcherImpl(weDPRTransport));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import com.webank.wedpr.common.utils.ObjectMapperFactory;
import com.webank.wedpr.common.utils.WeDPRException;
import com.webank.wedpr.components.loadbalancer.EntryPointFetcher;
import com.webank.wedpr.components.loadbalancer.EntryPointInfo;
import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -31,7 +32,7 @@

public class EntryPointConfigLoader implements EntryPointFetcher {
private static final Logger logger = LoggerFactory.getLogger(EntryPointConfigLoader.class);
private Map<String, List<EntryPointInfo>> alivedEntryPoints = new HashMap<>();
private Map<String, List<ServiceMeta.EntryPointMeta>> alivedEntryPoints = new HashMap<>();

@SneakyThrows(Exception.class)
public EntryPointConfigLoader() {
Expand All @@ -52,13 +53,22 @@ public EntryPointConfigLoader() {
entryPoint.getEntryPoints().size());
alivedEntryPoints.put(
entryPoint.getServiceName().toLowerCase(),
EntryPointInfo.toEntryPointInfo(
EntryPointConfigLoader.toEntryPointMetaList(
entryPoint.getServiceName(), entryPoint.getEntryPoints()));
}
}

public static List<ServiceMeta.EntryPointMeta> toEntryPointMetaList(
String serviceName, List<String> entryPointsList) {
List<ServiceMeta.EntryPointMeta> result = new ArrayList<>();
for (String entryPoint : entryPointsList) {
result.add(new ServiceMeta.EntryPointMeta(serviceName, entryPoint));
}
return result;
}

@Override
public List<EntryPointInfo> getAliveEntryPoints(String serviceName) {
public List<ServiceMeta.EntryPointMeta> getAliveEntryPoints(String serviceName) {
String lowerCaseServiceName = serviceName.toLowerCase();
if (alivedEntryPoints.containsKey(lowerCaseServiceName)) {
return alivedEntryPoints.get(lowerCaseServiceName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2017-2025 [webank-wedpr]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package com.webank.wedpr.components.loadbalancer.impl;

import com.webank.wedpr.components.loadbalancer.EntryPointFetcher;
import com.webank.wedpr.sdk.jni.transport.WeDPRTransport;
import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import java.util.List;

public class EntryPointFetcherImpl implements EntryPointFetcher {
private final WeDPRTransport weDPRTransport;

public EntryPointFetcherImpl(WeDPRTransport weDPRTransport) {
this.weDPRTransport = weDPRTransport;
}

@Override
public List<ServiceMeta.EntryPointMeta> getAliveEntryPoints(String serviceName) {
return this.weDPRTransport.getAliveEntryPoints(serviceName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package com.webank.wedpr.components.loadbalancer.impl;

import com.webank.wedpr.components.loadbalancer.EntryPointFetcher;
import com.webank.wedpr.components.loadbalancer.EntryPointInfo;
import com.webank.wedpr.components.loadbalancer.LoadBalancer;
import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -31,13 +31,13 @@ public LoadBalancerImpl(EntryPointFetcher entryPointFetcher) {
}

@Override
public List<EntryPointInfo> selectAllEndPoint(String serviceType) {
public List<ServiceMeta.EntryPointMeta> selectAllEndPoint(String serviceType) {
return entryPointFetcher.getAliveEntryPoints(serviceType);
}

@Override
public EntryPointInfo selectService(Policy policy, String serviceType) {
List<EntryPointInfo> entryPointInfoList =
public ServiceMeta.EntryPointMeta selectService(Policy policy, String serviceType) {
List<ServiceMeta.EntryPointMeta> entryPointInfoList =
entryPointFetcher.getAliveEntryPoints(serviceType);
if (entryPointInfoList == null || entryPointInfoList.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.webank.wedpr.components.scheduler.dag.worker;

import com.webank.wedpr.common.utils.WeDPRException;
import com.webank.wedpr.components.loadbalancer.EntryPointInfo;
import com.webank.wedpr.components.loadbalancer.LoadBalancer;
import com.webank.wedpr.components.scheduler.client.ModelClient;
import com.webank.wedpr.components.scheduler.dag.entity.JobWorker;
import com.webank.wedpr.components.scheduler.dag.utils.ServiceName;
import com.webank.wedpr.components.scheduler.mapper.JobWorkerMapper;
import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,7 +30,7 @@ public WorkerStatus engineRun() throws Exception {
String workerId = getWorkerId();
String args = getArgs();

EntryPointInfo entryPoint =
ServiceMeta.EntryPointMeta entryPoint =
getLoadBalancer()
.selectService(
LoadBalancer.Policy.ROUND_ROBIN, ServiceName.MODEL.getValue());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.webank.wedpr.components.scheduler.dag.worker;

import com.webank.wedpr.common.utils.WeDPRException;
import com.webank.wedpr.components.loadbalancer.EntryPointInfo;
import com.webank.wedpr.components.loadbalancer.LoadBalancer;
import com.webank.wedpr.components.scheduler.client.MpcClient;
import com.webank.wedpr.components.scheduler.dag.entity.JobWorker;
import com.webank.wedpr.components.scheduler.dag.utils.ServiceName;
import com.webank.wedpr.components.scheduler.executor.impl.mpc.MPCExecutorConfig;
import com.webank.wedpr.components.scheduler.mapper.JobWorkerMapper;
import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,7 +31,7 @@ public WorkerStatus engineRun() throws Exception {
String workerId = getWorkerId();
String workerArgs = getArgs();

EntryPointInfo entryPoint =
ServiceMeta.EntryPointMeta entryPoint =
getLoadBalancer()
.selectService(LoadBalancer.Policy.ROUND_ROBIN, ServiceName.MPC.getValue());
if (entryPoint == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.webank.wedpr.components.scheduler.dag.worker;

import com.webank.wedpr.common.utils.WeDPRException;
import com.webank.wedpr.components.loadbalancer.EntryPointInfo;
import com.webank.wedpr.components.loadbalancer.LoadBalancer;
import com.webank.wedpr.components.scheduler.client.PsiClient;
import com.webank.wedpr.components.scheduler.dag.entity.JobWorker;
import com.webank.wedpr.components.scheduler.dag.utils.ServiceName;
import com.webank.wedpr.components.scheduler.mapper.JobWorkerMapper;
import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,7 +30,7 @@ public WorkerStatus engineRun() throws Exception {
String workerId = getWorkerId();
String workerArgs = getArgs();

EntryPointInfo entryPoint =
ServiceMeta.EntryPointMeta entryPoint =
getLoadBalancer()
.selectService(LoadBalancer.Policy.ROUND_ROBIN, ServiceName.PSI.getValue());
if (entryPoint == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.webank.wedpr.common.utils.ThreadPoolService;
import com.webank.wedpr.common.utils.WeDPRException;
import com.webank.wedpr.components.http.client.HttpClientImpl;
import com.webank.wedpr.components.loadbalancer.EntryPointInfo;
import com.webank.wedpr.components.loadbalancer.LoadBalancer;
import com.webank.wedpr.components.project.JobChecker;
import com.webank.wedpr.components.project.dao.JobDO;
Expand All @@ -27,6 +26,7 @@
import com.webank.wedpr.components.scheduler.workflow.WorkFlowOrchestrator;
import com.webank.wedpr.components.scheduler.workflow.builder.JobWorkFlowBuilderManager;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -136,14 +136,14 @@ public void kill(JobDO jobDO) throws Exception {
// Note: since the job may exist in any node, establish kill command to all nodes
public void killModelJob(JobDO jobDO) throws Exception {
logger.info("killModelJob: {}", jobDO.getId());
List<EntryPointInfo> aliveEntryPoint =
List<ServiceMeta.EntryPointMeta> aliveEntryPoint =
loadBalancer.selectAllEndPoint(ServiceName.MODEL.getValue());
if (aliveEntryPoint == null || aliveEntryPoint.isEmpty()) {
return;
}
boolean failed = false;
String reason = "";
for (EntryPointInfo entryPointInfo : aliveEntryPoint) {
for (ServiceMeta.EntryPointMeta entryPointInfo : aliveEntryPoint) {
try {
logger.info("kill job: {}, entrypoint: {}", jobDO.toString(), entryPointInfo);
HttpClientImpl httpClient =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
import com.webank.wedpr.common.utils.Constant;
import com.webank.wedpr.common.utils.WeDPRException;
import com.webank.wedpr.components.http.client.HttpClientImpl;
import com.webank.wedpr.components.loadbalancer.EntryPointInfo;
import com.webank.wedpr.components.loadbalancer.LoadBalancer;
import com.webank.wedpr.components.scheduler.dag.utils.ServiceName;
import com.webank.wedpr.components.scheduler.executor.impl.ml.model.ModelJobResult;
import com.webank.wedpr.components.scheduler.executor.impl.ml.request.GetTaskResultRequest;
import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;

public class MLExecutorClient {
public static Object getJobResult(LoadBalancer loadBalancer, GetTaskResultRequest request)
throws Exception {

EntryPointInfo entryPoint =
ServiceMeta.EntryPointMeta entryPoint =
loadBalancer.selectService(
LoadBalancer.Policy.ROUND_ROBIN, ServiceName.MODEL.getValue());
if (entryPoint == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import com.webank.wedpr.components.db.mapper.service.publish.dao.ServiceAuthMapper;
import com.webank.wedpr.components.hook.ServiceHook;
import com.webank.wedpr.components.http.client.HttpClientImpl;
import com.webank.wedpr.components.loadbalancer.EntryPointInfo;
import com.webank.wedpr.components.loadbalancer.LoadBalancer;
import com.webank.wedpr.components.publish.config.ServicePublisherConfig;
import com.webank.wedpr.components.publish.entity.request.PublishCreateRequest;
import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
Expand Down Expand Up @@ -72,7 +72,7 @@ public void batchInsertServiceAuthInfo(
@Override
public void onPublish(Object serviceInfo) throws Exception {
PublishCreateRequest publishedServiceInfo = (PublishCreateRequest) serviceInfo;
EntryPointInfo selectedEntryPoint =
ServiceMeta.EntryPointMeta selectedEntryPoint =
loadBalancer.selectService(
LoadBalancer.Policy.ROUND_ROBIN, publishedServiceInfo.getServiceType());
if (selectedEntryPoint == null) {
Expand Down
3 changes: 3 additions & 0 deletions wedpr-site/conf/wedpr.properties
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,12 @@ wedpr.transport.listen_ip=0.0.0.0
wedpr.transport.listen_port=6001



### the admin configuration
#wedpr.admin_agency=
###

### the service configuration
wedpr.service.debugMode=false
# only used when debugMode is true
#wedpr.service.entrypoints=[{"serviceName": "pir", "entryPoints": ["127.0.0.1:8101"]}]

0 comments on commit 6a56078

Please sign in to comment.