Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support fetch alive service config from gateway #128

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import com.webank.wedpr.common.utils.WeDPRException;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// Note: Since the WeDPRCommonConfig module is a reusable, it shouldn't require the config to exist
// when initializing static configuration variables,
// which will cause some services that do not need to set these configuration to exception.
public class WeDPRCommonConfig {
private static final Logger logger = LoggerFactory.getLogger(WeDPRCommonConfig.class);
private static final Integer DEFAULT_READ_TRUNK_SIZE = 1024 * 1024;
private static final Integer DEFAULT_WRITE_TRUNK_SIZE = 1024 * 1024;
// the agency id
Expand Down Expand Up @@ -67,6 +70,7 @@ public class WeDPRCommonConfig {
WeDPRConfig.apply("wedpr.worker.api.method.submit", "submit");

private static String SHELL_CODE_CONNECTOR = " && ";
private static String SERVER_LISTEN_PORT;

@SneakyThrows
public static String getAgency() {
Expand All @@ -79,7 +83,7 @@ public static String getAgency() {
@SneakyThrows
public static String getAdminAgency() {
if (StringUtils.isBlank(ADMIN_AGENCY)) {
throw new WeDPRException("Invalid emtpy agency!");
throw new WeDPRException("Invalid emtpy admin agency!");
}
return ADMIN_AGENCY;
}
Expand Down Expand Up @@ -156,4 +160,13 @@ public static String getWedprWorkerApiPath() {
public static String getWedprWorkerSubmitTaskMethod() {
return WEDPR_WORKER_SUBMIT_TASK_METHOD;
}

public static String getServerListenPort() {
return SERVER_LISTEN_PORT;
}

public static void setServerListenPort(String serverListenPort) {
logger.info("setServerListenPort: {}", serverListenPort);
SERVER_LISTEN_PORT = serverListenPort;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,7 @@ public class Constant {
// the pir related fields
public static final String PIR_ID_FIELD_NAME = "pir_sys_id";
public static final String PIR_ID_HASH_FIELD_NAME = "pir_sys_id_hash";

//// the serviceType
public static final String PIR_SERVICE_TYPE = "PIR";
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.webank.wedpr.components.initializer;

import com.webank.wedpr.common.config.WeDPRCommonConfig;
import com.webank.wedpr.common.config.WeDPRConfig;
import com.webank.wedpr.common.utils.WeDPRException;
import java.io.InputStream;
Expand All @@ -33,6 +34,7 @@
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.core.env.Environment;

@SpringBootApplication
@ComponentScan(basePackages = {"com.webank"})
Expand All @@ -53,15 +55,20 @@ public void onApplicationEvent(ApplicationPreparedEvent event) {
if (applicationContext == null) {
applicationContext = event.getApplicationContext();
}
initWeDPRApplication(serviceName);
Environment environment = applicationContext.getEnvironment();
String serverListenPort = environment.getProperty("server.port");
logger.info(
"init WeDPRApplication application, listen port: {}",
serverListenPort);
initWeDPRApplication(serviceName, serverListenPort);
logger.info("init WeDPRApplication application success");
}
});
String[] springArgs = generateSpringArgs();
applicationContext = application.run(ArrayUtils.addAll(args, springArgs));
}

protected static void initWeDPRApplication(String serviceName) {
protected static void initWeDPRApplication(String serviceName, String serverListenPort) {
try {
serviceInfo = new ServiceInfo();
String appName =
Expand All @@ -82,7 +89,8 @@ protected static void initWeDPRApplication(String serviceName) {
}
// load config
loadConfig(serviceInfo.getConfigFile());

// Note: must load WeDPRCommonConfig after config file loaded
WeDPRCommonConfig.setServerListenPort(serverListenPort);
logger.info("initWeDPRApplication for {} success", appName);
} catch (Exception e) {
logger.error("initWeDPRApplication failed, error: ", e);
Expand Down
2 changes: 0 additions & 2 deletions wedpr-components/key-generator/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// Apply the java-library plugin to add support for Java Library
plugins {
id 'org.springframework.boot' version '2.3.12.RELEASE'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
}

Expand Down
1 change: 1 addition & 0 deletions wedpr-components/meta/loadbalancer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ plugins {
}
dependencies{
compile project(":wedpr-common-utils")
compile project(":wedpr-components-transport")
compile libraries.spring, libraries.spring_boot
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

package com.webank.wedpr.components.loadbalancer;

import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import java.util.List;

// TODO: add service-discovery implementation
public interface EntryPointFetcher {
List<EntryPointInfo> getAliveEntryPoints(String serviceName);
List<ServiceMeta.EntryPointMeta> getAliveEntryPoints(String serviceName);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.webank.wedpr.components.loadbalancer;

import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import java.util.List;

public interface LoadBalancer {
Expand All @@ -23,7 +24,7 @@ public static enum Policy {
HASH,
}

EntryPointInfo selectService(Policy policy, String serviceType);
ServiceMeta.EntryPointMeta selectService(Policy policy, String serviceType);

List<EntryPointInfo> selectAllEndPoint(String serviceType);
List<ServiceMeta.EntryPointMeta> selectAllEndPoint(String serviceType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@

package com.webank.wedpr.components.loadbalancer.config;

import com.webank.wedpr.common.config.WeDPRConfig;
import com.webank.wedpr.components.loadbalancer.LoadBalancer;
import com.webank.wedpr.components.loadbalancer.impl.EntryPointConfigLoader;
import com.webank.wedpr.components.loadbalancer.impl.EntryPointFetcherImpl;
import com.webank.wedpr.components.loadbalancer.impl.LoadBalancerImpl;
import com.webank.wedpr.sdk.jni.transport.WeDPRTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
Expand All @@ -26,10 +32,20 @@

@Configuration
public class LoadBalanceConfig {
private static final Logger logger = LoggerFactory.getLogger(LoadBalanceConfig.class);
@Autowired private WeDPRTransport weDPRTransport;

private static boolean DEBUG_MODE = WeDPRConfig.apply("wedpr.service.debugMode", false);

@Bean(name = "loadBalancer")
@Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
@ConditionalOnMissingBean
public LoadBalancer debugModeloadBalancer() {
return new LoadBalancerImpl(new EntryPointConfigLoader());
public LoadBalancer loadBalancer() {
if (DEBUG_MODE) {
logger.info("Create debug mode LoadBalancerImpl, load service config from config file");
return new LoadBalancerImpl(new EntryPointConfigLoader());
}
logger.info("Create LoadBalancerImpl, fetch the alive node from the gateway");
return new LoadBalancerImpl(new EntryPointFetcherImpl(weDPRTransport));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import com.webank.wedpr.common.utils.ObjectMapperFactory;
import com.webank.wedpr.common.utils.WeDPRException;
import com.webank.wedpr.components.loadbalancer.EntryPointFetcher;
import com.webank.wedpr.components.loadbalancer.EntryPointInfo;
import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -31,7 +32,7 @@

public class EntryPointConfigLoader implements EntryPointFetcher {
private static final Logger logger = LoggerFactory.getLogger(EntryPointConfigLoader.class);
private Map<String, List<EntryPointInfo>> alivedEntryPoints = new HashMap<>();
private Map<String, List<ServiceMeta.EntryPointMeta>> alivedEntryPoints = new HashMap<>();

@SneakyThrows(Exception.class)
public EntryPointConfigLoader() {
Expand All @@ -52,13 +53,22 @@ public EntryPointConfigLoader() {
entryPoint.getEntryPoints().size());
alivedEntryPoints.put(
entryPoint.getServiceName().toLowerCase(),
EntryPointInfo.toEntryPointInfo(
EntryPointConfigLoader.toEntryPointMetaList(
entryPoint.getServiceName(), entryPoint.getEntryPoints()));
}
}

public static List<ServiceMeta.EntryPointMeta> toEntryPointMetaList(
String serviceName, List<String> entryPointsList) {
List<ServiceMeta.EntryPointMeta> result = new ArrayList<>();
for (String entryPoint : entryPointsList) {
result.add(new ServiceMeta.EntryPointMeta(serviceName, entryPoint));
}
return result;
}

@Override
public List<EntryPointInfo> getAliveEntryPoints(String serviceName) {
public List<ServiceMeta.EntryPointMeta> getAliveEntryPoints(String serviceName) {
String lowerCaseServiceName = serviceName.toLowerCase();
if (alivedEntryPoints.containsKey(lowerCaseServiceName)) {
return alivedEntryPoints.get(lowerCaseServiceName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2017-2025 [webank-wedpr]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package com.webank.wedpr.components.loadbalancer.impl;

import com.webank.wedpr.components.loadbalancer.EntryPointFetcher;
import com.webank.wedpr.sdk.jni.transport.WeDPRTransport;
import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import java.util.List;

public class EntryPointFetcherImpl implements EntryPointFetcher {
private final WeDPRTransport weDPRTransport;

public EntryPointFetcherImpl(WeDPRTransport weDPRTransport) {
this.weDPRTransport = weDPRTransport;
}

@Override
public List<ServiceMeta.EntryPointMeta> getAliveEntryPoints(String serviceName) {
return this.weDPRTransport.getAliveEntryPoints(serviceName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
package com.webank.wedpr.components.loadbalancer.impl;

import com.webank.wedpr.components.loadbalancer.EntryPointFetcher;
import com.webank.wedpr.components.loadbalancer.EntryPointInfo;
import com.webank.wedpr.components.loadbalancer.LoadBalancer;
import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoadBalancerImpl implements LoadBalancer {
private static final Logger logger = LoggerFactory.getLogger(LoadBalancerImpl.class);

private final EntryPointFetcher entryPointFetcher;
private final AtomicInteger lastIdx = new AtomicInteger(0);
Expand All @@ -31,13 +34,13 @@ public LoadBalancerImpl(EntryPointFetcher entryPointFetcher) {
}

@Override
public List<EntryPointInfo> selectAllEndPoint(String serviceType) {
public List<ServiceMeta.EntryPointMeta> selectAllEndPoint(String serviceType) {
return entryPointFetcher.getAliveEntryPoints(serviceType);
}

@Override
public EntryPointInfo selectService(Policy policy, String serviceType) {
List<EntryPointInfo> entryPointInfoList =
public ServiceMeta.EntryPointMeta selectService(Policy policy, String serviceType) {
List<ServiceMeta.EntryPointMeta> entryPointInfoList =
entryPointFetcher.getAliveEntryPoints(serviceType);
if (entryPointInfoList == null || entryPointInfoList.isEmpty()) {
return null;
Expand All @@ -50,6 +53,7 @@ public EntryPointInfo selectService(Policy policy, String serviceType) {
int idx = serviceType.hashCode() % entryPointInfoList.size();
int selectedIdx = Math.max(idx, 0);
lastIdx.set(selectedIdx);
logger.info("selectService: {}", entryPointInfoList.get(selectedIdx).toString());
return entryPointInfoList.get(selectedIdx);
}
}
Loading
Loading